/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.UUID;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.kafka.internals.metrics.DefaultKafkaMetricAccumulator;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.util.NetUtils;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class FlinkKafkaProducerBase<IN>
extends RichSinkFunction<IN> {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaProducerBase.class);
    private static final long serialVersionUID = 1L;
    public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
    protected final int[] partitions;
    protected final Properties producerConfig;
    protected final String topicId;
    protected final KeyedSerializationSchema<IN> schema;
    protected final KafkaPartitioner<IN> partitioner;
    private final String producerId;
    protected boolean logFailuresOnly;
    protected transient KafkaProducer<byte[], byte[]> producer;
    protected transient Callback callback;
    protected volatile transient Exception asyncException;

    public FlinkKafkaProducerBase(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
        Objects.requireNonNull(topicId, "TopicID not set");
        Objects.requireNonNull(serializationSchema, "serializationSchema not set");
        Objects.requireNonNull(producerConfig, "producerConfig not set");
        ClosureCleaner.ensureSerializable(customPartitioner);
        ClosureCleaner.ensureSerializable(serializationSchema);
        this.topicId = topicId;
        this.schema = serializationSchema;
        this.producerConfig = producerConfig;
        if (!producerConfig.contains("key.serializer")) {
            this.producerConfig.put("key.serializer", ByteArraySerializer.class.getCanonicalName());
        } else {
            LOG.warn("Overwriting the '{}' is not recommended", (Object)"key.serializer");
        }
        if (!producerConfig.contains("value.serializer")) {
            this.producerConfig.put("value.serializer", ByteArraySerializer.class.getCanonicalName());
        } else {
            LOG.warn("Overwriting the '{}' is not recommended", (Object)"value.serializer");
        }
        try (KafkaProducer getPartitionsProd = new KafkaProducer(this.producerConfig);){
            List partitionsList = getPartitionsProd.partitionsFor(topicId);
            this.partitions = new int[partitionsList.size()];
            for (int i = 0; i < this.partitions.length; ++i) {
                this.partitions[i] = ((PartitionInfo)partitionsList.get(i)).partition();
            }
            getPartitionsProd.close();
        }
        this.partitioner = customPartitioner;
        this.producerId = UUID.randomUUID().toString();
    }

    public void setLogFailuresOnly(boolean logFailuresOnly) {
        this.logFailuresOnly = logFailuresOnly;
    }

    public void open(Configuration configuration) {
        this.producer = new KafkaProducer(this.producerConfig);
        RuntimeContext ctx = this.getRuntimeContext();
        if (this.partitioner != null) {
            this.partitioner.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks(), this.partitions);
        }
        LOG.info("Starting FlinkKafkaProducer ({}/{}) to produce into topic {}", new Object[]{ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks(), this.topicId});
        if (!Boolean.valueOf(this.producerConfig.getProperty(KEY_DISABLE_METRICS, "false")).booleanValue()) {
            Map metrics = this.producer.metrics();
            if (metrics == null) {
                LOG.info("Producer implementation does not support metrics");
            } else {
                for (Map.Entry metric : metrics.entrySet()) {
                    String name = this.producerId + "-producer-" + ((MetricName)metric.getKey()).name();
                    DefaultKafkaMetricAccumulator kafkaAccumulator = DefaultKafkaMetricAccumulator.createFor((Metric)metric.getValue());
                    if (kafkaAccumulator == null) continue;
                    this.getRuntimeContext().addAccumulator(name, (Accumulator)kafkaAccumulator);
                }
            }
        }
        this.callback = this.logFailuresOnly ? new Callback(){

            public void onCompletion(RecordMetadata metadata, Exception e) {
                if (e != null) {
                    LOG.error("Error while sending record to Kafka: " + e.getMessage(), (Throwable)e);
                }
            }
        } : new Callback(){

            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception != null && FlinkKafkaProducerBase.this.asyncException == null) {
                    FlinkKafkaProducerBase.this.asyncException = exception;
                }
            }
        };
    }

    public void invoke(IN next) throws Exception {
        this.checkErroneous();
        byte[] serializedKey = this.schema.serializeKey(next);
        byte[] serializedValue = this.schema.serializeValue(next);
        ProducerRecord record = this.partitioner == null ? new ProducerRecord(this.topicId, (Object)serializedKey, (Object)serializedValue) : new ProducerRecord(this.topicId, Integer.valueOf(this.partitioner.partition(next, serializedKey, serializedValue, this.partitions.length)), (Object)serializedKey, (Object)serializedValue);
        this.producer.send(record, this.callback);
    }

    public void close() throws Exception {
        if (this.producer != null) {
            this.producer.close();
        }
        this.checkErroneous();
    }

    protected void checkErroneous() throws Exception {
        Exception e = this.asyncException;
        if (e != null) {
            this.asyncException = null;
            throw new Exception("Failed to send data to Kafka: " + e.getMessage(), e);
        }
    }

    public static Properties getPropertiesFromBrokerList(String brokerList) {
        String[] elements;
        for (String broker : elements = brokerList.split(",")) {
            NetUtils.getCorrectHostnamePort((String)broker);
        }
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", brokerList);
        return props;
    }
}

