package org.apache.flink.streaming.connectors.kafka;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.UUID;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.kafka.internals.metrics.DefaultKafkaMetricAccumulator;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.util.NetUtils;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.class */
public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaProducerBase.class);
    private static final long serialVersionUID = 1;
    public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
    protected final int[] partitions;
    protected final Properties producerConfig;
    protected final String topicId;
    protected final KeyedSerializationSchema<IN> schema;
    protected final KafkaPartitioner<IN> partitioner;
    private final String producerId;
    protected boolean logFailuresOnly;
    protected transient KafkaProducer<byte[], byte[]> producer;
    protected transient Callback callback;
    protected volatile transient Exception asyncException;

    public FlinkKafkaProducerBase(String str, KeyedSerializationSchema<IN> keyedSerializationSchema, Properties properties, KafkaPartitioner<IN> kafkaPartitioner) {
        Objects.requireNonNull(str, "TopicID not set");
        Objects.requireNonNull(keyedSerializationSchema, "serializationSchema not set");
        Objects.requireNonNull(properties, "producerConfig not set");
        ClosureCleaner.ensureSerializable(kafkaPartitioner);
        ClosureCleaner.ensureSerializable(keyedSerializationSchema);
        this.topicId = str;
        this.schema = keyedSerializationSchema;
        this.producerConfig = properties;
        if (properties.contains("key.serializer")) {
            LOG.warn("Overwriting the '{}' is not recommended", "key.serializer");
        } else {
            this.producerConfig.put("key.serializer", ByteArraySerializer.class.getCanonicalName());
        }
        if (properties.contains("value.serializer")) {
            LOG.warn("Overwriting the '{}' is not recommended", "value.serializer");
        } else {
            this.producerConfig.put("value.serializer", ByteArraySerializer.class.getCanonicalName());
        }
        KafkaProducer kafkaProducer = new KafkaProducer(this.producerConfig);
        Throwable th = null;
        try {
            List partitionsFor = kafkaProducer.partitionsFor(str);
            this.partitions = new int[partitionsFor.size()];
            for (int i = 0; i < this.partitions.length; i++) {
                this.partitions[i] = ((PartitionInfo) partitionsFor.get(i)).partition();
            }
            kafkaProducer.close();
            if (kafkaProducer != null) {
                if (0 != 0) {
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    kafkaProducer.close();
                }
            }
            this.partitioner = kafkaPartitioner;
            this.producerId = UUID.randomUUID().toString();
        } catch (Throwable th3) {
            if (kafkaProducer != null) {
                if (0 != 0) {
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaProducer.close();
                }
            }
            throw th3;
        }
    }

    public void setLogFailuresOnly(boolean z) {
        this.logFailuresOnly = z;
    }

    public void open(Configuration configuration) {
        this.producer = new KafkaProducer<>(this.producerConfig);
        RuntimeContext runtimeContext = getRuntimeContext();
        if (this.partitioner != null) {
            this.partitioner.open(runtimeContext.getIndexOfThisSubtask(), runtimeContext.getNumberOfParallelSubtasks(), this.partitions);
        }
        LOG.info("Starting FlinkKafkaProducer ({}/{}) to produce into topic {}", new Object[]{Integer.valueOf(runtimeContext.getIndexOfThisSubtask()), Integer.valueOf(runtimeContext.getNumberOfParallelSubtasks()), this.topicId});
        if (!Boolean.valueOf(this.producerConfig.getProperty(KEY_DISABLE_METRICS, "false")).booleanValue()) {
            Map metrics = this.producer.metrics();
            if (metrics == null) {
                LOG.info("Producer implementation does not support metrics");
            } else {
                for (Map.Entry entry : metrics.entrySet()) {
                    String str = this.producerId + "-producer-" + ((MetricName) entry.getKey()).name();
                    DefaultKafkaMetricAccumulator createFor = DefaultKafkaMetricAccumulator.createFor((Metric) entry.getValue());
                    if (createFor != null) {
                        getRuntimeContext().addAccumulator(str, createFor);
                    }
                }
            }
        }
        if (this.logFailuresOnly) {
            this.callback = new Callback() { // from class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.1
                public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                    if (exc != null) {
                        FlinkKafkaProducerBase.LOG.error("Error while sending record to Kafka: " + exc.getMessage(), exc);
                    }
                }
            };
        } else {
            this.callback = new Callback() { // from class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.2
                public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                    if (exc == null || FlinkKafkaProducerBase.this.asyncException != null) {
                        return;
                    }
                    FlinkKafkaProducerBase.this.asyncException = exc;
                }
            };
        }
    }

    public void invoke(IN in) throws Exception {
        checkErroneous();
        byte[] serializeKey = this.schema.serializeKey(in);
        byte[] serializeValue = this.schema.serializeValue(in);
        this.producer.send(this.partitioner == null ? new ProducerRecord(this.topicId, serializeKey, serializeValue) : new ProducerRecord(this.topicId, Integer.valueOf(this.partitioner.partition(in, serializeKey, serializeValue, this.partitions.length)), serializeKey, serializeValue), this.callback);
    }

    public void close() throws Exception {
        if (this.producer != null) {
            this.producer.close();
        }
        checkErroneous();
    }

    protected void checkErroneous() throws Exception {
        Exception exc = this.asyncException;
        if (exc != null) {
            this.asyncException = null;
            throw new Exception("Failed to send data to Kafka: " + exc.getMessage(), exc);
        }
    }

    public static Properties getPropertiesFromBrokerList(String str) {
        for (String str2 : str.split(",")) {
            NetUtils.getCorrectHostnamePort(str2);
        }
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", str);
        return properties;
    }
}
