package org.apache.flink.table.store.shaded.streaming.connectors.kafka;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
import org.apache.flink.table.store.shaded.org.apache.kafka.clients.producer.Callback;
import org.apache.flink.table.store.shaded.org.apache.kafka.clients.producer.Producer;
import org.apache.flink.table.store.shaded.org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.flink.table.store.shaded.org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.flink.table.store.shaded.org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.flink.table.store.shaded.org.apache.kafka.common.Metric;
import org.apache.flink.table.store.shaded.org.apache.kafka.common.MetricName;
import org.apache.flink.table.store.shaded.org.apache.kafka.common.PartitionInfo;
import org.apache.flink.table.store.shaded.org.apache.kafka.common.errors.InvalidTxnStateException;
import org.apache.flink.table.store.shaded.org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.flink.table.store.shaded.org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.flink.table.store.shaded.org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.flink.table.store.shaded.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer;
import org.apache.flink.table.store.shaded.streaming.connectors.kafka.internals.KafkaSerializationSchemaWrapper;
import org.apache.flink.table.store.shaded.streaming.connectors.kafka.internals.TransactionalIdsGenerator;
import org.apache.flink.table.store.shaded.streaming.connectors.kafka.internals.metrics.KafkaMetricMutableWrapper;
import org.apache.flink.table.store.shaded.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
import org.apache.flink.table.store.shaded.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.table.store.shaded.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TemporaryClassLoaderContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PublicEvolving
@Deprecated
/* loaded from: input_file:org/apache/flink/table/store/shaded/streaming/connectors/kafka/FlinkKafkaProducer.class */
public class FlinkKafkaProducer<IN> extends TwoPhaseCommitSinkFunction<IN, KafkaTransactionState, KafkaTransactionContext> {
    private static final long serialVersionUID = 1;
    private static final short maxTaskNameSize = 1000;
    public static final int SAFE_SCALE_DOWN_FACTOR = 5;
    public static final int DEFAULT_KAFKA_PRODUCERS_POOL_SIZE = 5;
    public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
    private transient ListState<NextTransactionalIdHint> nextTransactionalIdHintState;
    private transient TransactionalIdsGenerator transactionalIdsGenerator;
    private transient NextTransactionalIdHint nextTransactionalIdHint;
    protected final Properties producerConfig;
    protected final String defaultTopicId;

    @Nullable
    private final KeyedSerializationSchema<IN> keyedSchema;

    @Nullable
    private final KafkaSerializationSchema<IN> kafkaSchema;

    @Nullable
    private final FlinkKafkaPartitioner<IN> flinkKafkaPartitioner;
    protected final Map<String, int[]> topicPartitionsMap;
    private final int kafkaProducersPoolSize;
    private final BlockingDeque<String> availableTransactionalIds;
    protected boolean writeTimestampToKafka;

    @Nullable
    private String transactionalIdPrefix;
    private boolean logFailuresOnly;
    protected Semantic semantic;

    @Nullable
    protected transient Callback callback;

    @Nullable
    protected volatile transient Exception asyncException;
    protected final AtomicLong pendingRecords;
    private final Map<String, KafkaMetricMutableWrapper> previouslyCreatedMetrics;
    private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaProducer.class);
    public static final Time DEFAULT_KAFKA_TRANSACTION_TIMEOUT = Time.hours(1);

    @Deprecated
    private static final ListStateDescriptor<NextTransactionalIdHint> NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR = new ListStateDescriptor<>("next-transactional-id-hint", TypeInformation.of(NextTransactionalIdHint.class));
    private static final ListStateDescriptor<NextTransactionalIdHint> NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2 = new ListStateDescriptor<>("next-transactional-id-hint-v2", new NextTransactionalIdHintSerializer());

    @VisibleForTesting
    @Internal
    /* loaded from: input_file:org/apache/flink/table/store/shaded/streaming/connectors/kafka/FlinkKafkaProducer$ContextStateSerializer.class */
    public static class ContextStateSerializer extends TypeSerializerSingleton<KafkaTransactionContext> {
        private static final long serialVersionUID = 1;

        /* loaded from: input_file:org/apache/flink/table/store/shaded/streaming/connectors/kafka/FlinkKafkaProducer$ContextStateSerializer$ContextStateSerializerSnapshot.class */
        public static final class ContextStateSerializerSnapshot extends SimpleTypeSerializerSnapshot<KafkaTransactionContext> {
            public ContextStateSerializerSnapshot() {
                super(ContextStateSerializer::new);
            }
        }

        public boolean isImmutableType() {
            return true;
        }

        /* renamed from: createInstance, reason: merged with bridge method [inline-methods] */
        public KafkaTransactionContext m1412createInstance() {
            return null;
        }

        public KafkaTransactionContext copy(KafkaTransactionContext kafkaTransactionContext) {
            return kafkaTransactionContext;
        }

        public KafkaTransactionContext copy(KafkaTransactionContext kafkaTransactionContext, KafkaTransactionContext kafkaTransactionContext2) {
            return kafkaTransactionContext;
        }

        public int getLength() {
            return -1;
        }

        public void serialize(KafkaTransactionContext kafkaTransactionContext, DataOutputView dataOutputView) throws IOException {
            dataOutputView.writeInt(kafkaTransactionContext.transactionalIds.size());
            Iterator<String> it = kafkaTransactionContext.transactionalIds.iterator();
            while (it.hasNext()) {
                dataOutputView.writeUTF(it.next());
            }
        }

        /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
        public KafkaTransactionContext m1411deserialize(DataInputView dataInputView) throws IOException {
            int readInt = dataInputView.readInt();
            HashSet hashSet = new HashSet(readInt);
            for (int i = 0; i < readInt; i++) {
                hashSet.add(dataInputView.readUTF());
            }
            return new KafkaTransactionContext(hashSet);
        }

        public KafkaTransactionContext deserialize(KafkaTransactionContext kafkaTransactionContext, DataInputView dataInputView) throws IOException {
            return m1411deserialize(dataInputView);
        }

        public void copy(DataInputView dataInputView, DataOutputView dataOutputView) throws IOException {
            int readInt = dataInputView.readInt();
            dataOutputView.writeInt(readInt);
            for (int i = 0; i < readInt; i++) {
                dataOutputView.writeUTF(dataInputView.readUTF());
            }
        }

        public TypeSerializerSnapshot<KafkaTransactionContext> snapshotConfiguration() {
            return new ContextStateSerializerSnapshot();
        }
    }

    @VisibleForTesting
    @Internal
    /* loaded from: input_file:org/apache/flink/table/store/shaded/streaming/connectors/kafka/FlinkKafkaProducer$KafkaTransactionContext.class */
    public static class KafkaTransactionContext {
        final Set<String> transactionalIds;

        @VisibleForTesting
        public KafkaTransactionContext(Set<String> set) {
            Preconditions.checkNotNull(set);
            this.transactionalIds = set;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            return this.transactionalIds.equals(((KafkaTransactionContext) obj).transactionalIds);
        }

        public int hashCode() {
            return this.transactionalIds.hashCode();
        }
    }

    @VisibleForTesting
    @Internal
    /* loaded from: input_file:org/apache/flink/table/store/shaded/streaming/connectors/kafka/FlinkKafkaProducer$KafkaTransactionState.class */
    public static class KafkaTransactionState {
        private final transient FlinkKafkaInternalProducer<byte[], byte[]> producer;

        @Nullable
        final String transactionalId;
        final long producerId;
        final short epoch;

        @VisibleForTesting
        public KafkaTransactionState(String str, FlinkKafkaInternalProducer<byte[], byte[]> flinkKafkaInternalProducer) {
            this(str, flinkKafkaInternalProducer.getProducerId(), flinkKafkaInternalProducer.getEpoch(), flinkKafkaInternalProducer);
        }

        @VisibleForTesting
        public KafkaTransactionState(FlinkKafkaInternalProducer<byte[], byte[]> flinkKafkaInternalProducer) {
            this(null, -1L, (short) -1, flinkKafkaInternalProducer);
        }

        @VisibleForTesting
        public KafkaTransactionState(@Nullable String str, long j, short s, FlinkKafkaInternalProducer<byte[], byte[]> flinkKafkaInternalProducer) {
            this.transactionalId = str;
            this.producerId = j;
            this.epoch = s;
            this.producer = flinkKafkaInternalProducer;
        }

        boolean isTransactional() {
            return this.transactionalId != null;
        }

        public FlinkKafkaInternalProducer<byte[], byte[]> getProducer() {
            return this.producer;
        }

        public String toString() {
            return String.format("%s [transactionalId=%s, producerId=%s, epoch=%s]", getClass().getSimpleName(), this.transactionalId, Long.valueOf(this.producerId), Short.valueOf(this.epoch));
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            KafkaTransactionState kafkaTransactionState = (KafkaTransactionState) obj;
            if (this.producerId == kafkaTransactionState.producerId && this.epoch == kafkaTransactionState.epoch) {
                return this.transactionalId != null ? this.transactionalId.equals(kafkaTransactionState.transactionalId) : kafkaTransactionState.transactionalId == null;
            }
            return false;
        }

        public int hashCode() {
            return (31 * ((31 * (this.transactionalId != null ? this.transactionalId.hashCode() : 0)) + ((int) (this.producerId ^ (this.producerId >>> 32))))) + this.epoch;
        }
    }

    /* loaded from: input_file:org/apache/flink/table/store/shaded/streaming/connectors/kafka/FlinkKafkaProducer$NextTransactionalIdHint.class */
    public static class NextTransactionalIdHint {
        public int lastParallelism;
        public long nextFreeTransactionalId;

        public NextTransactionalIdHint() {
            this(0, 0L);
        }

        public NextTransactionalIdHint(int i, long j) {
            this.lastParallelism = 0;
            this.nextFreeTransactionalId = 0L;
            this.lastParallelism = i;
            this.nextFreeTransactionalId = j;
        }

        public String toString() {
            return "NextTransactionalIdHint[lastParallelism=" + this.lastParallelism + ", nextFreeTransactionalId=" + this.nextFreeTransactionalId + ']';
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            NextTransactionalIdHint nextTransactionalIdHint = (NextTransactionalIdHint) obj;
            return this.lastParallelism == nextTransactionalIdHint.lastParallelism && this.nextFreeTransactionalId == nextTransactionalIdHint.nextFreeTransactionalId;
        }

        public int hashCode() {
            return (31 * this.lastParallelism) + ((int) (this.nextFreeTransactionalId ^ (this.nextFreeTransactionalId >>> 32)));
        }
    }

    @VisibleForTesting
    @Internal
    /* loaded from: input_file:org/apache/flink/table/store/shaded/streaming/connectors/kafka/FlinkKafkaProducer$NextTransactionalIdHintSerializer.class */
    public static class NextTransactionalIdHintSerializer extends TypeSerializerSingleton<NextTransactionalIdHint> {
        private static final long serialVersionUID = 1;

        /* loaded from: input_file:org/apache/flink/table/store/shaded/streaming/connectors/kafka/FlinkKafkaProducer$NextTransactionalIdHintSerializer$NextTransactionalIdHintSerializerSnapshot.class */
        public static final class NextTransactionalIdHintSerializerSnapshot extends SimpleTypeSerializerSnapshot<NextTransactionalIdHint> {
            public NextTransactionalIdHintSerializerSnapshot() {
                super(NextTransactionalIdHintSerializer::new);
            }
        }

        public boolean isImmutableType() {
            return true;
        }

        /* renamed from: createInstance, reason: merged with bridge method [inline-methods] */
        public NextTransactionalIdHint m1414createInstance() {
            return new NextTransactionalIdHint();
        }

        public NextTransactionalIdHint copy(NextTransactionalIdHint nextTransactionalIdHint) {
            return nextTransactionalIdHint;
        }

        public NextTransactionalIdHint copy(NextTransactionalIdHint nextTransactionalIdHint, NextTransactionalIdHint nextTransactionalIdHint2) {
            return nextTransactionalIdHint;
        }

        public int getLength() {
            return 12;
        }

        public void serialize(NextTransactionalIdHint nextTransactionalIdHint, DataOutputView dataOutputView) throws IOException {
            dataOutputView.writeLong(nextTransactionalIdHint.nextFreeTransactionalId);
            dataOutputView.writeInt(nextTransactionalIdHint.lastParallelism);
        }

        /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
        public NextTransactionalIdHint m1413deserialize(DataInputView dataInputView) throws IOException {
            return new NextTransactionalIdHint(dataInputView.readInt(), dataInputView.readLong());
        }

        public NextTransactionalIdHint deserialize(NextTransactionalIdHint nextTransactionalIdHint, DataInputView dataInputView) throws IOException {
            return m1413deserialize(dataInputView);
        }

        public void copy(DataInputView dataInputView, DataOutputView dataOutputView) throws IOException {
            dataOutputView.writeLong(dataInputView.readLong());
            dataOutputView.writeInt(dataInputView.readInt());
        }

        public TypeSerializerSnapshot<NextTransactionalIdHint> snapshotConfiguration() {
            return new NextTransactionalIdHintSerializerSnapshot();
        }
    }

    /* loaded from: input_file:org/apache/flink/table/store/shaded/streaming/connectors/kafka/FlinkKafkaProducer$Semantic.class */
    public enum Semantic {
        EXACTLY_ONCE,
        AT_LEAST_ONCE,
        NONE
    }

    @VisibleForTesting
    @Internal
    /* loaded from: input_file:org/apache/flink/table/store/shaded/streaming/connectors/kafka/FlinkKafkaProducer$TransactionStateSerializer.class */
    public static class TransactionStateSerializer extends TypeSerializerSingleton<KafkaTransactionState> {
        private static final long serialVersionUID = 1;

        /* loaded from: input_file:org/apache/flink/table/store/shaded/streaming/connectors/kafka/FlinkKafkaProducer$TransactionStateSerializer$TransactionStateSerializerSnapshot.class */
        public static final class TransactionStateSerializerSnapshot extends SimpleTypeSerializerSnapshot<KafkaTransactionState> {
            public TransactionStateSerializerSnapshot() {
                super(TransactionStateSerializer::new);
            }
        }

        public boolean isImmutableType() {
            return true;
        }

        /* renamed from: createInstance, reason: merged with bridge method [inline-methods] */
        public KafkaTransactionState m1417createInstance() {
            return null;
        }

        public KafkaTransactionState copy(KafkaTransactionState kafkaTransactionState) {
            return kafkaTransactionState;
        }

        public KafkaTransactionState copy(KafkaTransactionState kafkaTransactionState, KafkaTransactionState kafkaTransactionState2) {
            return kafkaTransactionState;
        }

        public int getLength() {
            return -1;
        }

        public void serialize(KafkaTransactionState kafkaTransactionState, DataOutputView dataOutputView) throws IOException {
            if (kafkaTransactionState.transactionalId == null) {
                dataOutputView.writeBoolean(false);
            } else {
                dataOutputView.writeBoolean(true);
                dataOutputView.writeUTF(kafkaTransactionState.transactionalId);
            }
            dataOutputView.writeLong(kafkaTransactionState.producerId);
            dataOutputView.writeShort(kafkaTransactionState.epoch);
        }

        /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
        public KafkaTransactionState m1416deserialize(DataInputView dataInputView) throws IOException {
            String str = null;
            if (dataInputView.readBoolean()) {
                str = dataInputView.readUTF();
            }
            return new KafkaTransactionState(str, dataInputView.readLong(), dataInputView.readShort(), null);
        }

        public KafkaTransactionState deserialize(KafkaTransactionState kafkaTransactionState, DataInputView dataInputView) throws IOException {
            return m1416deserialize(dataInputView);
        }

        public void copy(DataInputView dataInputView, DataOutputView dataOutputView) throws IOException {
            boolean readBoolean = dataInputView.readBoolean();
            dataOutputView.writeBoolean(readBoolean);
            if (readBoolean) {
                dataOutputView.writeUTF(dataInputView.readUTF());
            }
            dataOutputView.writeLong(dataInputView.readLong());
            dataOutputView.writeShort(dataInputView.readShort());
        }

        public TypeSerializerSnapshot<KafkaTransactionState> snapshotConfiguration() {
            return new TransactionStateSerializerSnapshot();
        }
    }

    public FlinkKafkaProducer(String str, String str2, SerializationSchema<IN> serializationSchema) {
        this(str2, serializationSchema, getPropertiesFromBrokerList(str));
    }

    public FlinkKafkaProducer(String str, SerializationSchema<IN> serializationSchema, Properties properties) {
        this(str, serializationSchema, properties, Optional.of(new FlinkFixedPartitioner()));
    }

    public FlinkKafkaProducer(String str, SerializationSchema<IN> serializationSchema, Properties properties, Optional<FlinkKafkaPartitioner<IN>> optional) {
        this(str, serializationSchema, properties, optional.orElse(null), Semantic.AT_LEAST_ONCE, 5);
    }

    public FlinkKafkaProducer(String str, SerializationSchema<IN> serializationSchema, Properties properties, @Nullable FlinkKafkaPartitioner<IN> flinkKafkaPartitioner, Semantic semantic, int i) {
        this(str, null, null, new KafkaSerializationSchemaWrapper(str, flinkKafkaPartitioner, false, serializationSchema), properties, semantic, i);
    }

    @Deprecated
    public FlinkKafkaProducer(String str, String str2, KeyedSerializationSchema<IN> keyedSerializationSchema) {
        this(str2, keyedSerializationSchema, getPropertiesFromBrokerList(str), Optional.of(new FlinkFixedPartitioner()));
    }

    @Deprecated
    public FlinkKafkaProducer(String str, KeyedSerializationSchema<IN> keyedSerializationSchema, Properties properties) {
        this(str, keyedSerializationSchema, properties, Optional.of(new FlinkFixedPartitioner()));
    }

    @Deprecated
    public FlinkKafkaProducer(String str, KeyedSerializationSchema<IN> keyedSerializationSchema, Properties properties, Semantic semantic) {
        this(str, keyedSerializationSchema, properties, Optional.of(new FlinkFixedPartitioner()), semantic, 5);
    }

    @Deprecated
    public FlinkKafkaProducer(String str, KeyedSerializationSchema<IN> keyedSerializationSchema, Properties properties, Optional<FlinkKafkaPartitioner<IN>> optional) {
        this(str, keyedSerializationSchema, properties, optional, Semantic.AT_LEAST_ONCE, 5);
    }

    @Deprecated
    public FlinkKafkaProducer(String str, KeyedSerializationSchema<IN> keyedSerializationSchema, Properties properties, Optional<FlinkKafkaPartitioner<IN>> optional, Semantic semantic, int i) {
        this(str, keyedSerializationSchema, optional.orElse(null), null, properties, semantic, i);
    }

    public FlinkKafkaProducer(String str, KafkaSerializationSchema<IN> kafkaSerializationSchema, Properties properties, Semantic semantic) {
        this(str, kafkaSerializationSchema, properties, semantic, 5);
    }

    public FlinkKafkaProducer(String str, KafkaSerializationSchema<IN> kafkaSerializationSchema, Properties properties, Semantic semantic, int i) {
        this(str, null, null, kafkaSerializationSchema, properties, semantic, i);
    }

    private FlinkKafkaProducer(String str, KeyedSerializationSchema<IN> keyedSerializationSchema, FlinkKafkaPartitioner<IN> flinkKafkaPartitioner, KafkaSerializationSchema<IN> kafkaSerializationSchema, Properties properties, Semantic semantic, int i) {
        super(new TransactionStateSerializer(), new ContextStateSerializer());
        this.availableTransactionalIds = new LinkedBlockingDeque();
        this.writeTimestampToKafka = false;
        this.transactionalIdPrefix = null;
        this.pendingRecords = new AtomicLong();
        this.previouslyCreatedMetrics = new HashMap();
        this.defaultTopicId = (String) Preconditions.checkNotNull(str, "defaultTopic is null");
        if (kafkaSerializationSchema != null) {
            this.keyedSchema = null;
            this.kafkaSchema = kafkaSerializationSchema;
            this.flinkKafkaPartitioner = null;
            ClosureCleaner.clean(this.kafkaSchema, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
            if (flinkKafkaPartitioner != null) {
                throw new IllegalArgumentException("Customer partitioner can only be used whenusing a KeyedSerializationSchema or SerializationSchema.");
            }
        } else {
            if (keyedSerializationSchema == null) {
                throw new IllegalArgumentException("You must provide either a KafkaSerializationSchema or aKeyedSerializationSchema.");
            }
            this.kafkaSchema = null;
            this.keyedSchema = keyedSerializationSchema;
            this.flinkKafkaPartitioner = flinkKafkaPartitioner;
            ClosureCleaner.clean(this.flinkKafkaPartitioner, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
            ClosureCleaner.clean(this.keyedSchema, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
        }
        this.producerConfig = (Properties) Preconditions.checkNotNull(properties, "producerConfig is null");
        this.semantic = (Semantic) Preconditions.checkNotNull(semantic, "semantic is null");
        this.kafkaProducersPoolSize = i;
        Preconditions.checkState(i > 0, "kafkaProducersPoolSize must be non empty");
        if (properties.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
            LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
        } else {
            this.producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
        }
        if (properties.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
            LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
        } else {
            this.producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
        }
        if (!this.producerConfig.containsKey("bootstrap.servers")) {
            throw new IllegalArgumentException("bootstrap.servers must be supplied in the producer config properties.");
        }
        if (!properties.containsKey(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG)) {
            long milliseconds = DEFAULT_KAFKA_TRANSACTION_TIMEOUT.toMilliseconds();
            Preconditions.checkState(milliseconds < 2147483647L && milliseconds > 0, "timeout does not fit into 32 bit integer");
            this.producerConfig.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, Integer.valueOf((int) milliseconds));
            LOG.warn("Property [{}] not specified. Setting it to {}", ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, DEFAULT_KAFKA_TRANSACTION_TIMEOUT);
        }
        if (semantic == Semantic.EXACTLY_ONCE) {
            super.setTransactionTimeout(getTransactionTimeout(properties));
            super.enableTransactionTimeoutWarnings(0.8d);
        }
        this.topicPartitionsMap = new HashMap();
    }

    public void setWriteTimestampToKafka(boolean z) {
        this.writeTimestampToKafka = z;
        if (this.kafkaSchema instanceof KafkaSerializationSchemaWrapper) {
            ((KafkaSerializationSchemaWrapper) this.kafkaSchema).setWriteTimestamp(z);
        }
    }

    public void setLogFailuresOnly(boolean z) {
        this.logFailuresOnly = z;
    }

    public void setTransactionalIdPrefix(String str) {
        this.transactionalIdPrefix = (String) Preconditions.checkNotNull(str);
    }

    /* renamed from: ignoreFailuresAfterTransactionTimeout, reason: merged with bridge method [inline-methods] */
    public FlinkKafkaProducer<IN> m1408ignoreFailuresAfterTransactionTimeout() {
        super.ignoreFailuresAfterTransactionTimeout();
        return this;
    }

    public void open(Configuration configuration) throws Exception {
        if (this.logFailuresOnly) {
            this.callback = new Callback() { // from class: org.apache.flink.table.store.shaded.streaming.connectors.kafka.FlinkKafkaProducer.1
                @Override // org.apache.flink.table.store.shaded.org.apache.kafka.clients.producer.Callback
                public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                    if (exc != null) {
                        FlinkKafkaProducer.LOG.error("Error while sending record to Kafka: " + exc.getMessage(), exc);
                    }
                    FlinkKafkaProducer.this.acknowledgeMessage();
                }
            };
        } else {
            this.callback = new Callback() { // from class: org.apache.flink.table.store.shaded.streaming.connectors.kafka.FlinkKafkaProducer.2
                @Override // org.apache.flink.table.store.shaded.org.apache.kafka.clients.producer.Callback
                public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                    if (exc != null && FlinkKafkaProducer.this.asyncException == null) {
                        FlinkKafkaProducer.this.asyncException = exc;
                    }
                    FlinkKafkaProducer.this.acknowledgeMessage();
                }
            };
        }
        RuntimeContext runtimeContext = getRuntimeContext();
        if (this.flinkKafkaPartitioner != null) {
            this.flinkKafkaPartitioner.open(runtimeContext.getIndexOfThisSubtask(), runtimeContext.getNumberOfParallelSubtasks());
        }
        if (this.kafkaSchema instanceof KafkaContextAware) {
            KafkaContextAware kafkaContextAware = (KafkaContextAware) this.kafkaSchema;
            kafkaContextAware.setParallelInstanceId(runtimeContext.getIndexOfThisSubtask());
            kafkaContextAware.setNumParallelInstances(runtimeContext.getNumberOfParallelSubtasks());
        }
        if (this.kafkaSchema != null) {
            this.kafkaSchema.open(RuntimeContextInitializationContextAdapters.serializationAdapter(getRuntimeContext(), metricGroup -> {
                return metricGroup.addGroup(ClientQuotaEntity.USER);
            }));
        }
        super.open(configuration);
    }

    public void invoke(KafkaTransactionState kafkaTransactionState, IN in, SinkFunction.Context context) throws FlinkKafkaException {
        ProducerRecord<byte[], byte[]> serialize;
        checkErroneous();
        if (this.keyedSchema != null) {
            byte[] serializeKey = this.keyedSchema.serializeKey(in);
            byte[] serializeValue = this.keyedSchema.serializeValue(in);
            String targetTopic = this.keyedSchema.getTargetTopic(in);
            if (targetTopic == null) {
                targetTopic = this.defaultTopicId;
            }
            Long l = null;
            if (this.writeTimestampToKafka) {
                l = context.timestamp();
            }
            int[] iArr = this.topicPartitionsMap.get(targetTopic);
            if (null == iArr) {
                iArr = getPartitionsByTopic(targetTopic, kafkaTransactionState.producer);
                this.topicPartitionsMap.put(targetTopic, iArr);
            }
            serialize = this.flinkKafkaPartitioner != null ? new ProducerRecord<>(targetTopic, Integer.valueOf(this.flinkKafkaPartitioner.partition(in, serializeKey, serializeValue, targetTopic, iArr)), l, serializeKey, serializeValue) : new ProducerRecord<>(targetTopic, (Integer) null, l, serializeKey, serializeValue);
        } else {
            if (this.kafkaSchema == null) {
                throw new RuntimeException("We have neither KafkaSerializationSchema nor KeyedSerializationSchema, thisis a bug.");
            }
            if (this.kafkaSchema instanceof KafkaContextAware) {
                KafkaContextAware kafkaContextAware = (KafkaContextAware) this.kafkaSchema;
                String targetTopic2 = kafkaContextAware.getTargetTopic(in);
                if (targetTopic2 == null) {
                    targetTopic2 = this.defaultTopicId;
                }
                int[] iArr2 = this.topicPartitionsMap.get(targetTopic2);
                if (null == iArr2) {
                    iArr2 = getPartitionsByTopic(targetTopic2, kafkaTransactionState.producer);
                    this.topicPartitionsMap.put(targetTopic2, iArr2);
                }
                kafkaContextAware.setPartitions(iArr2);
            }
            serialize = this.kafkaSchema.serialize(in, context.timestamp());
        }
        this.pendingRecords.incrementAndGet();
        kafkaTransactionState.producer.send(serialize, this.callback);
    }

    public void close() throws FlinkKafkaException {
        try {
            try {
                KafkaTransactionState kafkaTransactionState = (KafkaTransactionState) currentTransaction();
                if (kafkaTransactionState != null) {
                    flush(kafkaTransactionState);
                    switch (this.semantic) {
                        case AT_LEAST_ONCE:
                        case NONE:
                            kafkaTransactionState.producer.flush();
                            kafkaTransactionState.producer.close(Duration.ofSeconds(0L));
                            break;
                    }
                }
                super.close();
                if (currentTransaction() != null) {
                    try {
                        ((KafkaTransactionState) currentTransaction()).producer.close(Duration.ofSeconds(0L));
                    } catch (Throwable th) {
                        LOG.warn("Error closing producer.", th);
                    }
                }
                pendingTransactions().forEach(entry -> {
                    try {
                        ((KafkaTransactionState) entry.getValue()).producer.close(Duration.ofSeconds(0L));
                    } catch (Throwable th2) {
                        LOG.warn("Error closing producer.", th2);
                    }
                });
                checkErroneous();
            } catch (Exception e) {
                this.asyncException = (Exception) ExceptionUtils.firstOrSuppressed(e, this.asyncException);
                if (currentTransaction() != null) {
                    try {
                        ((KafkaTransactionState) currentTransaction()).producer.close(Duration.ofSeconds(0L));
                    } catch (Throwable th2) {
                        LOG.warn("Error closing producer.", th2);
                    }
                }
                pendingTransactions().forEach(entry2 -> {
                    try {
                        ((KafkaTransactionState) entry2.getValue()).producer.close(Duration.ofSeconds(0L));
                    } catch (Throwable th22) {
                        LOG.warn("Error closing producer.", th22);
                    }
                });
                checkErroneous();
            }
        } catch (Throwable th3) {
            if (currentTransaction() != null) {
                try {
                    ((KafkaTransactionState) currentTransaction()).producer.close(Duration.ofSeconds(0L));
                } catch (Throwable th4) {
                    LOG.warn("Error closing producer.", th4);
                }
            }
            pendingTransactions().forEach(entry22 -> {
                try {
                    ((KafkaTransactionState) entry22.getValue()).producer.close(Duration.ofSeconds(0L));
                } catch (Throwable th22) {
                    LOG.warn("Error closing producer.", th22);
                }
            });
            checkErroneous();
            throw th3;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* renamed from: beginTransaction, reason: merged with bridge method [inline-methods] */
    public KafkaTransactionState m1409beginTransaction() throws FlinkKafkaException {
        switch (this.semantic) {
            case EXACTLY_ONCE:
                FlinkKafkaInternalProducer<byte[], byte[]> createTransactionalProducer = createTransactionalProducer();
                createTransactionalProducer.beginTransaction();
                return new KafkaTransactionState(createTransactionalProducer.getTransactionalId(), createTransactionalProducer);
            case AT_LEAST_ONCE:
            case NONE:
                KafkaTransactionState kafkaTransactionState = (KafkaTransactionState) currentTransaction();
                return (kafkaTransactionState == null || kafkaTransactionState.producer == null) ? new KafkaTransactionState(initNonTransactionalProducer(true)) : new KafkaTransactionState(kafkaTransactionState.producer);
            default:
                throw new UnsupportedOperationException("Not implemented semantic");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void preCommit(KafkaTransactionState kafkaTransactionState) throws FlinkKafkaException {
        switch (this.semantic) {
            case EXACTLY_ONCE:
            case AT_LEAST_ONCE:
                flush(kafkaTransactionState);
                break;
            case NONE:
                break;
            default:
                throw new UnsupportedOperationException("Not implemented semantic");
        }
        checkErroneous();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void commit(KafkaTransactionState kafkaTransactionState) {
        if (kafkaTransactionState.isTransactional()) {
            try {
                kafkaTransactionState.producer.commitTransaction();
            } finally {
                recycleTransactionalProducer(kafkaTransactionState.producer);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void recoverAndCommit(KafkaTransactionState kafkaTransactionState) {
        if (kafkaTransactionState.isTransactional()) {
            FlinkKafkaInternalProducer<byte[], byte[]> flinkKafkaInternalProducer = null;
            try {
                try {
                    flinkKafkaInternalProducer = initTransactionalProducer(kafkaTransactionState.transactionalId, false);
                    flinkKafkaInternalProducer.resumeTransaction(kafkaTransactionState.producerId, kafkaTransactionState.epoch);
                    flinkKafkaInternalProducer.commitTransaction();
                    if (flinkKafkaInternalProducer != null) {
                        flinkKafkaInternalProducer.close(Duration.ofSeconds(0L));
                    }
                } catch (InvalidTxnStateException e) {
                    LOG.warn("Unable to commit recovered transaction ({}) because it's in an invalid state. Most likely the transaction has been aborted for some reason. Please check the Kafka logs for more details.", kafkaTransactionState, e);
                    if (flinkKafkaInternalProducer != null) {
                        flinkKafkaInternalProducer.close(Duration.ofSeconds(0L));
                    }
                } catch (ProducerFencedException e2) {
                    LOG.warn("Unable to commit recovered transaction ({}) because its producer is already fenced. This means that you either have a different producer with the same '{}' or recovery took longer than '{}' ({}ms). In both cases this most likely signals data loss, please consult the Flink documentation for more details.", new Object[]{kafkaTransactionState, ProducerConfig.TRANSACTIONAL_ID_CONFIG, ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, Long.valueOf(getTransactionTimeout(this.producerConfig)), e2});
                    if (flinkKafkaInternalProducer != null) {
                        flinkKafkaInternalProducer.close(Duration.ofSeconds(0L));
                    }
                }
            } catch (Throwable th) {
                if (flinkKafkaInternalProducer != null) {
                    flinkKafkaInternalProducer.close(Duration.ofSeconds(0L));
                }
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void abort(KafkaTransactionState kafkaTransactionState) {
        if (kafkaTransactionState.isTransactional()) {
            kafkaTransactionState.producer.abortTransaction();
            recycleTransactionalProducer(kafkaTransactionState.producer);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void recoverAndAbort(KafkaTransactionState kafkaTransactionState) {
        if (kafkaTransactionState.isTransactional()) {
            FlinkKafkaInternalProducer<byte[], byte[]> flinkKafkaInternalProducer = null;
            try {
                flinkKafkaInternalProducer = initTransactionalProducer(kafkaTransactionState.transactionalId, false);
                flinkKafkaInternalProducer.initTransactions();
                if (flinkKafkaInternalProducer != null) {
                    flinkKafkaInternalProducer.close(Duration.ofSeconds(0L));
                }
            } catch (Throwable th) {
                if (flinkKafkaInternalProducer != null) {
                    flinkKafkaInternalProducer.close(Duration.ofSeconds(0L));
                }
                throw th;
            }
        }
    }

    protected void acknowledgeMessage() {
        this.pendingRecords.decrementAndGet();
    }

    private void flush(KafkaTransactionState kafkaTransactionState) throws FlinkKafkaException {
        if (kafkaTransactionState.producer != null) {
            kafkaTransactionState.producer.flush();
        }
        long j = this.pendingRecords.get();
        if (j != 0) {
            throw new IllegalStateException("Pending record count must be zero at this point: " + j);
        }
        checkErroneous();
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        super.snapshotState(functionSnapshotContext);
        this.nextTransactionalIdHintState.clear();
        if (getRuntimeContext().getIndexOfThisSubtask() == 0 && this.semantic == Semantic.EXACTLY_ONCE) {
            Preconditions.checkState(this.nextTransactionalIdHint != null, "nextTransactionalIdHint must be set for EXACTLY_ONCE");
            long j = this.nextTransactionalIdHint.nextFreeTransactionalId;
            if (getRuntimeContext().getNumberOfParallelSubtasks() > this.nextTransactionalIdHint.lastParallelism) {
                j += getRuntimeContext().getNumberOfParallelSubtasks() * this.kafkaProducersPoolSize;
            }
            this.nextTransactionalIdHintState.add(new NextTransactionalIdHint(getRuntimeContext().getNumberOfParallelSubtasks(), j));
        }
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        String str;
        if (this.semantic != Semantic.NONE && !getRuntimeContext().isCheckpointingEnabled()) {
            LOG.warn("Using {} semantic, but checkpointing is not enabled. Switching to {} semantic.", this.semantic, Semantic.NONE);
            this.semantic = Semantic.NONE;
        }
        this.nextTransactionalIdHintState = functionInitializationContext.getOperatorStateStore().getUnionListState(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2);
        if (functionInitializationContext.getOperatorStateStore().getRegisteredStateNames().contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR)) {
            migrateNextTransactionalIdHindState(functionInitializationContext);
        }
        if (this.transactionalIdPrefix != null) {
            str = this.transactionalIdPrefix;
        } else {
            String taskName = getRuntimeContext().getTaskName();
            if (taskName.length() > 1000) {
                taskName = taskName.substring(0, 1000);
                LOG.warn("Truncated task name for Kafka TransactionalId from {} to {}.", getRuntimeContext().getTaskName(), taskName);
            }
            str = taskName + "-" + getRuntimeContext().getOperatorUniqueID();
        }
        this.transactionalIdsGenerator = new TransactionalIdsGenerator(str, getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks(), this.kafkaProducersPoolSize, 5);
        if (this.semantic != Semantic.EXACTLY_ONCE) {
            this.nextTransactionalIdHint = null;
        } else {
            ArrayList newArrayList = Lists.newArrayList((Iterable) this.nextTransactionalIdHintState.get());
            if (newArrayList.size() > 1) {
                throw new IllegalStateException("There should be at most one next transactional id hint written by the first subtask");
            }
            if (newArrayList.size() == 0) {
                this.nextTransactionalIdHint = new NextTransactionalIdHint(0, 0L);
                abortTransactions(this.transactionalIdsGenerator.generateIdsToAbort());
            } else {
                this.nextTransactionalIdHint = (NextTransactionalIdHint) newArrayList.get(0);
            }
        }
        super.initializeState(functionInitializationContext);
    }

    protected Optional<KafkaTransactionContext> initializeUserContext() {
        if (this.semantic != Semantic.EXACTLY_ONCE) {
            return Optional.empty();
        }
        Set<String> generateNewTransactionalIds = generateNewTransactionalIds();
        resetAvailableTransactionalIdsPool(generateNewTransactionalIds);
        return Optional.of(new KafkaTransactionContext(generateNewTransactionalIds));
    }

    private Set<String> generateNewTransactionalIds() {
        Preconditions.checkState(this.nextTransactionalIdHint != null, "nextTransactionalIdHint must be present for EXACTLY_ONCE");
        Set<String> generateIdsToUse = this.transactionalIdsGenerator.generateIdsToUse(this.nextTransactionalIdHint.nextFreeTransactionalId);
        LOG.info("Generated new transactionalIds {}", generateIdsToUse);
        return generateIdsToUse;
    }

    protected void finishRecoveringContext(Collection<KafkaTransactionState> collection) {
        cleanUpUserContext(collection);
        resetAvailableTransactionalIdsPool(((KafkaTransactionContext) getUserContext().get()).transactionalIds);
        LOG.info("Recovered transactionalIds {}", ((KafkaTransactionContext) getUserContext().get()).transactionalIds);
    }

    protected FlinkKafkaInternalProducer<byte[], byte[]> createProducer() {
        return new FlinkKafkaInternalProducer<>(this.producerConfig);
    }

    private void cleanUpUserContext(Collection<KafkaTransactionState> collection) {
        if (getUserContext().isPresent()) {
            HashSet hashSet = new HashSet(((KafkaTransactionContext) getUserContext().get()).transactionalIds);
            collection.forEach(kafkaTransactionState -> {
                hashSet.remove(kafkaTransactionState.transactionalId);
            });
            abortTransactions(hashSet);
        }
    }

    private void resetAvailableTransactionalIdsPool(Collection<String> collection) {
        this.availableTransactionalIds.clear();
        this.availableTransactionalIds.addAll(collection);
    }

    private void abortTransactions(Set<String> set) {
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        set.parallelStream().forEach(str -> {
            TemporaryClassLoaderContext of = TemporaryClassLoaderContext.of(contextClassLoader);
            Throwable th = null;
            try {
                Properties properties = new Properties();
                properties.putAll(this.producerConfig);
                initTransactionalProducerConfig(properties, str);
                FlinkKafkaInternalProducer flinkKafkaInternalProducer = null;
                try {
                    flinkKafkaInternalProducer = new FlinkKafkaInternalProducer(properties);
                    flinkKafkaInternalProducer.initTransactions();
                    if (flinkKafkaInternalProducer != null) {
                        flinkKafkaInternalProducer.close(Duration.ofSeconds(0L));
                    }
                    if (of != null) {
                        if (0 == 0) {
                            of.close();
                            return;
                        }
                        try {
                            of.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                } catch (Throwable th3) {
                    if (flinkKafkaInternalProducer != null) {
                        flinkKafkaInternalProducer.close(Duration.ofSeconds(0L));
                    }
                    throw th3;
                }
            } catch (Throwable th4) {
                if (of != null) {
                    if (0 != 0) {
                        try {
                            of.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        of.close();
                    }
                }
                throw th4;
            }
        });
    }

    int getTransactionCoordinatorId() {
        KafkaTransactionState kafkaTransactionState = (KafkaTransactionState) currentTransaction();
        if (kafkaTransactionState == null || kafkaTransactionState.producer == null) {
            throw new IllegalArgumentException();
        }
        return kafkaTransactionState.producer.getTransactionCoordinatorId();
    }

    @VisibleForTesting
    String getTransactionalId() {
        KafkaTransactionState kafkaTransactionState = (KafkaTransactionState) currentTransaction();
        if (kafkaTransactionState == null || kafkaTransactionState.producer == null) {
            throw new IllegalArgumentException();
        }
        return kafkaTransactionState.producer.getTransactionalId();
    }

    private FlinkKafkaInternalProducer<byte[], byte[]> createTransactionalProducer() throws FlinkKafkaException {
        String poll = this.availableTransactionalIds.poll();
        if (poll == null) {
            throw new FlinkKafkaException(FlinkKafkaErrorCode.PRODUCERS_POOL_EMPTY, "Too many ongoing snapshots. Increase kafka producers pool size or decrease number of concurrent checkpoints.");
        }
        FlinkKafkaInternalProducer<byte[], byte[]> initTransactionalProducer = initTransactionalProducer(poll, true);
        initTransactionalProducer.initTransactions();
        return initTransactionalProducer;
    }

    private void recycleTransactionalProducer(FlinkKafkaInternalProducer<byte[], byte[]> flinkKafkaInternalProducer) {
        this.availableTransactionalIds.add(flinkKafkaInternalProducer.getTransactionalId());
        flinkKafkaInternalProducer.flush();
        flinkKafkaInternalProducer.close(Duration.ofSeconds(0L));
    }

    private FlinkKafkaInternalProducer<byte[], byte[]> initTransactionalProducer(String str, boolean z) {
        initTransactionalProducerConfig(this.producerConfig, str);
        return initProducer(z);
    }

    private static void initTransactionalProducerConfig(Properties properties, String str) {
        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, str);
    }

    private FlinkKafkaInternalProducer<byte[], byte[]> initNonTransactionalProducer(boolean z) {
        this.producerConfig.remove(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
        return initProducer(z);
    }

    private FlinkKafkaInternalProducer<byte[], byte[]> initProducer(boolean z) {
        FlinkKafkaInternalProducer<byte[], byte[]> createProducer = createProducer();
        LOG.info("Starting FlinkKafkaInternalProducer ({}/{}) to produce into default topic {}", new Object[]{Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask() + 1), Integer.valueOf(getRuntimeContext().getNumberOfParallelSubtasks()), this.defaultTopicId});
        if (z && !Boolean.parseBoolean(this.producerConfig.getProperty("flink.disable-metrics", "false"))) {
            Map<MetricName, ? extends Metric> metrics = createProducer.metrics();
            if (metrics == null) {
                LOG.info("Producer implementation does not support metrics");
            } else {
                MetricGroup addGroup = getRuntimeContext().getMetricGroup().addGroup("KafkaProducer");
                for (Map.Entry<MetricName, ? extends Metric> entry : metrics.entrySet()) {
                    String name = entry.getKey().name();
                    Metric value = entry.getValue();
                    KafkaMetricMutableWrapper kafkaMetricMutableWrapper = this.previouslyCreatedMetrics.get(name);
                    if (kafkaMetricMutableWrapper != null) {
                        kafkaMetricMutableWrapper.setKafkaMetric(value);
                    } else {
                        KafkaMetricMutableWrapper kafkaMetricMutableWrapper2 = new KafkaMetricMutableWrapper(value);
                        this.previouslyCreatedMetrics.put(name, kafkaMetricMutableWrapper2);
                        addGroup.gauge(name, kafkaMetricMutableWrapper2);
                    }
                }
            }
        }
        return createProducer;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void checkErroneous() throws FlinkKafkaException {
        Exception exc = this.asyncException;
        if (exc != null) {
            this.asyncException = null;
            throw new FlinkKafkaException(FlinkKafkaErrorCode.EXTERNAL_ERROR, "Failed to send data to Kafka: " + exc.getMessage(), exc);
        }
    }

    private void readObject(ObjectInputStream objectInputStream) throws IOException, ClassNotFoundException {
        objectInputStream.defaultReadObject();
    }

    private void migrateNextTransactionalIdHindState(FunctionInitializationContext functionInitializationContext) throws Exception {
        ListState unionListState = functionInitializationContext.getOperatorStateStore().getUnionListState(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
        this.nextTransactionalIdHintState = functionInitializationContext.getOperatorStateStore().getUnionListState(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2);
        ArrayList newArrayList = Lists.newArrayList((Iterable) unionListState.get());
        if (newArrayList.isEmpty()) {
            return;
        }
        this.nextTransactionalIdHintState.addAll(newArrayList);
        unionListState.clear();
    }

    private static Properties getPropertiesFromBrokerList(String str) {
        for (String str2 : str.split(",")) {
            NetUtils.getCorrectHostnamePort(str2);
        }
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", str);
        return properties;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static int[] getPartitionsByTopic(String str, Producer<byte[], byte[]> producer) {
        ArrayList arrayList = new ArrayList(producer.partitionsFor(str));
        Collections.sort(arrayList, new Comparator<PartitionInfo>() { // from class: org.apache.flink.table.store.shaded.streaming.connectors.kafka.FlinkKafkaProducer.3
            @Override // java.util.Comparator
            public int compare(PartitionInfo partitionInfo, PartitionInfo partitionInfo2) {
                return Integer.compare(partitionInfo.partition(), partitionInfo2.partition());
            }
        });
        int[] iArr = new int[arrayList.size()];
        for (int i = 0; i < iArr.length; i++) {
            iArr[i] = ((PartitionInfo) arrayList.get(i)).partition();
        }
        return iArr;
    }

    public static long getTransactionTimeout(Properties properties) {
        Object obj = properties.get(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG);
        if ((obj instanceof String) && StringUtils.isNumeric((String) obj)) {
            return Long.parseLong((String) obj);
        }
        if (obj instanceof Number) {
            return ((Number) obj).longValue();
        }
        throw new IllegalArgumentException("transaction.timeout.ms must be numeric, was " + obj);
    }

    /* JADX WARN: Multi-variable type inference failed */
    public /* bridge */ /* synthetic */ void invoke(Object obj, Object obj2, SinkFunction.Context context) throws Exception {
        invoke((KafkaTransactionState) obj, (KafkaTransactionState) obj2, context);
    }
}
