package org.apache.flink.streaming.connectors.kafka;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.SerializableObject;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.class */
public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> implements CheckpointedFunction {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaProducerBase.class);
    private static final long serialVersionUID = 1;
    public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
    protected final Properties producerConfig;
    protected final String defaultTopicId;
    protected final KeyedSerializationSchema<IN> schema;
    protected final FlinkKafkaPartitioner<IN> flinkKafkaPartitioner;
    protected final Map<String, int[]> topicPartitionsMap;
    protected boolean logFailuresOnly;
    protected transient KafkaProducer<byte[], byte[]> producer;
    protected transient Callback callback;
    protected volatile transient Exception asyncException;
    protected long pendingRecords;
    protected boolean flushOnCheckpoint = true;
    protected final SerializableObject pendingRecordsLock = new SerializableObject();

    public FlinkKafkaProducerBase(String str, KeyedSerializationSchema<IN> keyedSerializationSchema, Properties properties, FlinkKafkaPartitioner<IN> flinkKafkaPartitioner) {
        Objects.requireNonNull(str, "TopicID not set");
        Objects.requireNonNull(keyedSerializationSchema, "serializationSchema not set");
        Objects.requireNonNull(properties, "producerConfig not set");
        ClosureCleaner.clean(flinkKafkaPartitioner, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
        ClosureCleaner.ensureSerializable(keyedSerializationSchema);
        this.defaultTopicId = str;
        this.schema = keyedSerializationSchema;
        this.producerConfig = properties;
        this.flinkKafkaPartitioner = flinkKafkaPartitioner;
        if (properties.containsKey("key.serializer")) {
            LOG.warn("Overwriting the '{}' is not recommended", "key.serializer");
        } else {
            this.producerConfig.put("key.serializer", ByteArraySerializer.class.getName());
        }
        if (properties.containsKey("value.serializer")) {
            LOG.warn("Overwriting the '{}' is not recommended", "value.serializer");
        } else {
            this.producerConfig.put("value.serializer", ByteArraySerializer.class.getName());
        }
        if (!this.producerConfig.containsKey("bootstrap.servers")) {
            throw new IllegalArgumentException("bootstrap.servers must be supplied in the producer config properties.");
        }
        this.topicPartitionsMap = new HashMap();
    }

    public void setLogFailuresOnly(boolean z) {
        this.logFailuresOnly = z;
    }

    public void setFlushOnCheckpoint(boolean z) {
        this.flushOnCheckpoint = z;
    }

    @VisibleForTesting
    protected <K, V> KafkaProducer<K, V> getKafkaProducer(Properties properties) {
        return new KafkaProducer<>(properties);
    }

    public void open(Configuration configuration) throws Exception {
        if (this.schema instanceof KeyedSerializationSchemaWrapper) {
            ((KeyedSerializationSchemaWrapper) this.schema).getSerializationSchema().open(RuntimeContextInitializationContextAdapters.serializationAdapter(getRuntimeContext(), metricGroup -> {
                return metricGroup.addGroup("user");
            }));
        }
        this.producer = getKafkaProducer(this.producerConfig);
        RuntimeContext runtimeContext = getRuntimeContext();
        if (null != this.flinkKafkaPartitioner) {
            this.flinkKafkaPartitioner.open(runtimeContext.getIndexOfThisSubtask(), runtimeContext.getNumberOfParallelSubtasks());
        }
        LOG.info("Starting FlinkKafkaProducer ({}/{}) to produce into default topic {}", new Object[]{Integer.valueOf(runtimeContext.getIndexOfThisSubtask() + 1), Integer.valueOf(runtimeContext.getNumberOfParallelSubtasks()), this.defaultTopicId});
        if (!Boolean.parseBoolean(this.producerConfig.getProperty("flink.disable-metrics", "false"))) {
            Map metrics = this.producer.metrics();
            if (metrics == null) {
                LOG.info("Producer implementation does not support metrics");
            } else {
                MetricGroup addGroup = getRuntimeContext().getMetricGroup().addGroup("KafkaProducer");
                for (Map.Entry entry : metrics.entrySet()) {
                    addGroup.gauge(((MetricName) entry.getKey()).name(), new KafkaMetricWrapper((Metric) entry.getValue()));
                }
            }
        }
        if (this.flushOnCheckpoint && !getRuntimeContext().isCheckpointingEnabled()) {
            LOG.warn("Flushing on checkpoint is enabled, but checkpointing is not enabled. Disabling flushing.");
            this.flushOnCheckpoint = false;
        }
        if (this.logFailuresOnly) {
            this.callback = new Callback() { // from class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.1
                public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                    if (exc != null) {
                        FlinkKafkaProducerBase.LOG.error("Error while sending record to Kafka: " + exc.getMessage(), exc);
                    }
                    FlinkKafkaProducerBase.this.acknowledgeMessage();
                }
            };
        } else {
            this.callback = new Callback() { // from class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.2
                public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                    if (exc != null && FlinkKafkaProducerBase.this.asyncException == null) {
                        FlinkKafkaProducerBase.this.asyncException = exc;
                    }
                    FlinkKafkaProducerBase.this.acknowledgeMessage();
                }
            };
        }
    }

    public void invoke(IN in, SinkFunction.Context context) throws Exception {
        checkErroneous();
        byte[] serializeKey = this.schema.serializeKey(in);
        byte[] serializeValue = this.schema.serializeValue(in);
        String targetTopic = this.schema.getTargetTopic(in);
        if (targetTopic == null) {
            targetTopic = this.defaultTopicId;
        }
        int[] iArr = this.topicPartitionsMap.get(targetTopic);
        if (null == iArr) {
            iArr = getPartitionsByTopic(targetTopic, this.producer);
            this.topicPartitionsMap.put(targetTopic, iArr);
        }
        ProducerRecord producerRecord = this.flinkKafkaPartitioner == null ? new ProducerRecord(targetTopic, serializeKey, serializeValue) : new ProducerRecord(targetTopic, Integer.valueOf(this.flinkKafkaPartitioner.partition(in, serializeKey, serializeValue, targetTopic, iArr)), serializeKey, serializeValue);
        if (this.flushOnCheckpoint) {
            synchronized (this.pendingRecordsLock) {
                this.pendingRecords += serialVersionUID;
            }
        }
        this.producer.send(producerRecord, this.callback);
    }

    public void close() throws Exception {
        if (this.producer != null) {
            this.producer.close();
        }
        checkErroneous();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void acknowledgeMessage() {
        if (this.flushOnCheckpoint) {
            synchronized (this.pendingRecordsLock) {
                this.pendingRecords -= serialVersionUID;
                if (this.pendingRecords == 0) {
                    this.pendingRecordsLock.notifyAll();
                }
            }
        }
    }

    protected abstract void flush();

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        checkErroneous();
        if (this.flushOnCheckpoint) {
            flush();
            synchronized (this.pendingRecordsLock) {
                if (this.pendingRecords != 0) {
                    throw new IllegalStateException("Pending record count must be zero at this point: " + this.pendingRecords);
                }
                checkErroneous();
            }
        }
    }

    protected void checkErroneous() throws Exception {
        Exception exc = this.asyncException;
        if (exc != null) {
            this.asyncException = null;
            throw new Exception("Failed to send data to Kafka: " + exc.getMessage(), exc);
        }
    }

    public static Properties getPropertiesFromBrokerList(String str) {
        for (String str2 : str.split(",")) {
            NetUtils.getCorrectHostnamePort(str2);
        }
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", str);
        return properties;
    }

    protected static int[] getPartitionsByTopic(String str, KafkaProducer<byte[], byte[]> kafkaProducer) {
        ArrayList arrayList = new ArrayList(kafkaProducer.partitionsFor(str));
        Collections.sort(arrayList, new Comparator<PartitionInfo>() { // from class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.3
            @Override // java.util.Comparator
            public int compare(PartitionInfo partitionInfo, PartitionInfo partitionInfo2) {
                return Integer.compare(partitionInfo.partition(), partitionInfo2.partition());
            }
        });
        int[] iArr = new int[arrayList.size()];
        for (int i = 0; i < iArr.length; i++) {
            iArr[i] = ((PartitionInfo) arrayList.get(i)).partition();
        }
        return iArr;
    }

    @VisibleForTesting
    protected long numPendingRecords() {
        long j;
        synchronized (this.pendingRecordsLock) {
            j = this.pendingRecords;
        }
        return j;
    }
}
