package org.apache.beam.sdk.io.kafka;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.io.kafka.ProducerSpEL;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.metrics.SinkMetrics;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.vendor.grpc.v1p43p2.io.netty.handler.codec.rtsp.RtspHeaders;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LoadingCache;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.RemovalCause;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.joda.time.DateTimeUtils;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.class */
public class KafkaExactlyOnceSink<K, V> extends PTransform<PCollection<ProducerRecord<K, V>>, PCollection<Void>> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) KafkaExactlyOnceSink.class);
    private static final String METRIC_NAMESPACE = "KafkaExactlyOnceSink";
    private final KafkaIO.WriteRecords<K, V> spec;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink$ExactlyOnceWriter.class */
    public static class ExactlyOnceWriter<K, V> extends DoFn<KV<Integer, Iterable<KV<Long, TimestampedValue<ProducerRecord<K, V>>>>>, Void> {
        private static final String NEXT_ID = "nextId";
        private static final String MIN_BUFFERED_ID = "minBufferedId";
        private static final String OUT_OF_ORDER_BUFFER = "outOfOrderBuffer";
        private static final String WRITER_ID = "writerId";
        private static final int MAX_RECORDS_PER_TXN = 1000;

        @DoFn.StateId(OUT_OF_ORDER_BUFFER)
        private final StateSpec<BagState<KV<Long, TimestampedValue<ProducerRecord<K, V>>>>> outOfOrderBufferSpec;
        private final KafkaIO.WriteRecords<K, V> spec;
        private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
        private static final LoadingCache<String, ShardWriterCache<?, ?>> CACHE_BY_GROUP_ID = CacheBuilder.newBuilder().build(new CacheLoader<String, ShardWriterCache<?, ?>>() { // from class: org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink.ExactlyOnceWriter.1
            @Override // org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader
            public ShardWriterCache<?, ?> load(String str) throws Exception {
                return new ShardWriterCache<>();
            }
        });

        @DoFn.StateId(NEXT_ID)
        private final StateSpec<ValueState<Long>> sequenceIdSpec = StateSpecs.value();

        @DoFn.StateId(MIN_BUFFERED_ID)
        private final StateSpec<ValueState<Long>> minBufferedIdSpec = StateSpecs.value();

        @DoFn.StateId(WRITER_ID)
        private final StateSpec<ValueState<String>> writerIdSpec = StateSpecs.value();
        private final Counter elementsWritten = SinkMetrics.elementsWritten();
        private final Counter elementsBuffered = Metrics.counter(KafkaExactlyOnceSink.METRIC_NAMESPACE, "elementsBuffered");
        private final Counter numTransactions = Metrics.counter(KafkaExactlyOnceSink.METRIC_NAMESPACE, "numTransactions");

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink$ExactlyOnceWriter$ShardMetadata.class */
        public static class ShardMetadata {

            @JsonProperty(RtspHeaders.Values.SEQ)
            public final long sequenceId;

            @JsonProperty("id")
            public final String writerId;

            private ShardMetadata() {
                this.sequenceId = -1L;
                this.writerId = null;
            }

            ShardMetadata(long j, String str) {
                this.sequenceId = j;
                this.writerId = str;
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink$ExactlyOnceWriter$ShardWriter.class */
        public static class ShardWriter<K, V> {
            private final int shard;
            private final String writerId;
            private final Producer<K, V> producer;
            private final String producerName;
            private final KafkaIO.WriteRecords<K, V> spec;
            private long committedId;

            ShardWriter(int i, String str, Producer<K, V> producer, String str2, KafkaIO.WriteRecords<K, V> writeRecords, long j) {
                this.shard = i;
                this.writerId = str;
                this.producer = producer;
                this.producerName = str2;
                this.spec = writeRecords;
                this.committedId = j;
            }

            void beginTxn() {
                ProducerSpEL.beginTransaction(this.producer);
            }

            Future<RecordMetadata> sendRecord(TimestampedValue<ProducerRecord<K, V>> timestampedValue, Counter counter) {
                try {
                    Future<RecordMetadata> send = this.producer.send(new ProducerRecord<>(this.spec.getTopic(), (Integer) null, this.spec.getPublishTimestampFunction() != null ? Long.valueOf(this.spec.getPublishTimestampFunction().getTimestamp(timestampedValue.getValue(), timestampedValue.getTimestamp()).getMillis()) : null, timestampedValue.getValue().key(), timestampedValue.getValue().value()));
                    counter.inc();
                    return send;
                } catch (KafkaException e) {
                    ProducerSpEL.abortTransaction(this.producer);
                    throw e;
                }
            }

            void commitTxn(long j, Counter counter) throws IOException {
                try {
                    ProducerSpEL.sendOffsetsToTransaction(this.producer, ImmutableMap.of(new TopicPartition(this.spec.getTopic(), this.shard), new OffsetAndMetadata(0L, ExactlyOnceWriter.JSON_MAPPER.writeValueAsString(new ShardMetadata(j, this.writerId)))), this.spec.getSinkGroupId());
                    ProducerSpEL.commitTransaction(this.producer);
                    counter.inc();
                    KafkaExactlyOnceSink.LOG.debug("{} : committed {} records", Integer.valueOf(this.shard), Long.valueOf(j - this.committedId));
                    this.committedId = j;
                } catch (KafkaException e) {
                    ProducerSpEL.abortTransaction(this.producer);
                    throw e;
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink$ExactlyOnceWriter$ShardWriterCache.class */
        public static class ShardWriterCache<K, V> {
            static final ScheduledExecutorService SCHEDULED_CLEAN_UP_THREAD = Executors.newSingleThreadScheduledExecutor();
            static final int CLEAN_UP_CHECK_INTERVAL_MS = 10000;
            static final int IDLE_TIMEOUT_MS = 60000;
            private final Cache<Integer, ShardWriter<K, V>> cache = CacheBuilder.newBuilder().expireAfterWrite(60000, TimeUnit.MILLISECONDS).removalListener(removalNotification -> {
                if (removalNotification.getCause() != RemovalCause.EXPLICIT) {
                    ShardWriter shardWriter = (ShardWriter) removalNotification.getValue();
                    KafkaExactlyOnceSink.LOG.info("{} : Closing idle shard writer {} after 1 minute of idle time.", Integer.valueOf(shardWriter.shard), shardWriter.producerName);
                    shardWriter.producer.close();
                }
            }).build();

            ShardWriterCache() {
                ScheduledExecutorService scheduledExecutorService = SCHEDULED_CLEAN_UP_THREAD;
                Cache<Integer, ShardWriter<K, V>> cache = this.cache;
                Objects.requireNonNull(cache);
                scheduledExecutorService.scheduleAtFixedRate(cache::cleanUp, 10000L, 10000L, TimeUnit.MILLISECONDS);
            }

            ShardWriter<K, V> removeIfPresent(int i) {
                return this.cache.asMap().remove(Integer.valueOf(i));
            }

            void insert(int i, ShardWriter<K, V> shardWriter) {
                Preconditions.checkState(this.cache.asMap().putIfAbsent(Integer.valueOf(i), shardWriter) == null, "Unexpected multiple instances of writers for shard %s", i);
            }
        }

        ExactlyOnceWriter(KafkaIO.WriteRecords<K, V> writeRecords, Coder<ProducerRecord<K, V>> coder) {
            this.spec = writeRecords;
            this.outOfOrderBufferSpec = StateSpecs.bag(KvCoder.of(BigEndianLongCoder.of(), TimestampedValue.TimestampedValueCoder.of(coder)));
        }

        @DoFn.Setup
        public void setup() {
            KafkaExactlyOnceSink.ensureEOSSupport();
        }

        @DoFn.ProcessElement
        @DoFn.RequiresStableInput
        public void processElement(@DoFn.StateId("nextId") ValueState<Long> valueState, @DoFn.StateId("minBufferedId") ValueState<Long> valueState2, @DoFn.StateId("outOfOrderBuffer") BagState<KV<Long, TimestampedValue<ProducerRecord<K, V>>>> bagState, @DoFn.StateId("writerId") ValueState<String> valueState3, DoFn<KV<Integer, Iterable<KV<Long, TimestampedValue<ProducerRecord<K, V>>>>>, Void>.ProcessContext processContext) throws IOException {
            int intValue = processContext.element().getKey().intValue();
            valueState2.readLater();
            long longValue = ((Long) MoreObjects.firstNonNull(valueState.read(), 0L)).longValue();
            long longValue2 = ((Long) MoreObjects.firstNonNull(valueState2.read(), Long.MAX_VALUE)).longValue();
            ShardWriterCache<?, ?> unchecked = CACHE_BY_GROUP_ID.getUnchecked(this.spec.getSinkGroupId());
            ShardWriter<K, V> removeIfPresent = unchecked.removeIfPresent(intValue);
            if (removeIfPresent == null) {
                removeIfPresent = initShardWriter(intValue, valueState3, longValue);
            }
            long j = ((ShardWriter) removeIfPresent).committedId;
            if (j >= longValue) {
                KafkaExactlyOnceSink.LOG.info("{}: committed id {} is ahead of expected {}. {} records will be dropped (these are already written).", Integer.valueOf(intValue), Long.valueOf(j), Long.valueOf(longValue - 1), Long.valueOf((j - longValue) + 1));
                longValue = j + 1;
            }
            try {
                try {
                    removeIfPresent.beginTxn();
                    int i = 0;
                    Iterator<KV<Long, TimestampedValue<ProducerRecord<K, V>>>> it = processContext.element().getValue().iterator();
                    while (it.hasNext()) {
                        KV<Long, TimestampedValue<ProducerRecord<K, V>>> next = it.next();
                        long longValue3 = next.getKey().longValue();
                        if (longValue3 < longValue) {
                            KafkaExactlyOnceSink.LOG.info("{}: dropping older record {}. Already committed till {}", Integer.valueOf(intValue), Long.valueOf(longValue3), Long.valueOf(j));
                        } else if (longValue3 > longValue) {
                            KafkaExactlyOnceSink.LOG.info("{}: Saving out of order record {}, next record id to be written is {}", Integer.valueOf(intValue), Long.valueOf(longValue3), Long.valueOf(longValue));
                            bagState.add(next);
                            longValue2 = Math.min(longValue2, longValue3);
                            valueState2.write(Long.valueOf(longValue2));
                            this.elementsBuffered.inc();
                        } else {
                            removeIfPresent.sendRecord(next.getValue(), this.elementsWritten);
                            longValue++;
                            i++;
                            if (i >= 1000) {
                                removeIfPresent.commitTxn(longValue3, this.numTransactions);
                                i = 0;
                                removeIfPresent.beginTxn();
                            }
                            if (longValue2 == longValue) {
                                ArrayList newArrayList = Lists.newArrayList(bagState.read());
                                newArrayList.sort(new KV.OrderByKey());
                                KafkaExactlyOnceSink.LOG.info("{} : merging {} buffered records (min buffered id is {}).", Integer.valueOf(intValue), Integer.valueOf(newArrayList.size()), Long.valueOf(longValue2));
                                bagState.clear();
                                valueState2.clear();
                                longValue2 = Long.MAX_VALUE;
                                it = Iterators.mergeSorted(ImmutableList.of((Iterator) it, newArrayList.iterator()), new KV.OrderByKey());
                            }
                        }
                    }
                    removeIfPresent.commitTxn(longValue - 1, this.numTransactions);
                    valueState.write(Long.valueOf(longValue));
                    if (removeIfPresent != null) {
                        unchecked.insert(intValue, removeIfPresent);
                    }
                } catch (ProducerSpEL.UnrecoverableProducerException e) {
                    KafkaExactlyOnceSink.LOG.warn("{} : closing producer {} after unrecoverable error. The work might have migrated. Committed id {}, current id {}.", Integer.valueOf(((ShardWriter) removeIfPresent).shard), ((ShardWriter) removeIfPresent).producerName, Long.valueOf(((ShardWriter) removeIfPresent).committedId), Long.valueOf(longValue - 1), e);
                    ((ShardWriter) removeIfPresent).producer.close();
                    removeIfPresent = null;
                    throw e;
                }
            } catch (Throwable th) {
                if (removeIfPresent != null) {
                    unchecked.insert(intValue, removeIfPresent);
                }
                throw th;
            }
        }

        private ShardWriter<K, V> initShardWriter(int i, ValueState<String> valueState, long j) throws IOException {
            String format = String.format("producer_%d_for_%s", Integer.valueOf(i), this.spec.getSinkGroupId());
            Producer initializeExactlyOnceProducer = KafkaExactlyOnceSink.initializeExactlyOnceProducer(this.spec, format);
            try {
                String read = valueState.read();
                Consumer openConsumer = KafkaExactlyOnceSink.openConsumer(this.spec);
                Throwable th = null;
                try {
                    try {
                        OffsetAndMetadata committed = openConsumer.committed(new TopicPartition(this.spec.getTopic(), i));
                        if (openConsumer != null) {
                            if (0 != 0) {
                                try {
                                    openConsumer.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                openConsumer.close();
                            }
                        }
                        long j2 = -1;
                        if (committed == null || committed.metadata() == null || committed.metadata().isEmpty()) {
                            Preconditions.checkState(j == 0 && read == null, "State exists for shard %s (nextId %s, writerId '%s'), but there is no state stored with Kafka topic '%s' group id '%s'", Integer.valueOf(i), Long.valueOf(j), read, this.spec.getTopic(), this.spec.getSinkGroupId());
                            read = String.format("%X - %s", Integer.valueOf(new Random().nextInt(Integer.MAX_VALUE)), DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").withZone(DateTimeZone.UTC).print(DateTimeUtils.currentTimeMillis()));
                            valueState.write(read);
                            KafkaExactlyOnceSink.LOG.info("Assigned writer id '{}' to shard {}", read, Integer.valueOf(i));
                        } else {
                            ShardMetadata shardMetadata = (ShardMetadata) JSON_MAPPER.readValue(committed.metadata(), ShardMetadata.class);
                            Preconditions.checkNotNull(shardMetadata.writerId);
                            if (read == null) {
                                throw new IllegalStateException(String.format("Kafka metadata exists for shard %s, but there is no stored state for it. This mostly indicates groupId '%s' is used else where or in earlier runs. Try another group id. Metadata for this shard on Kafka : '%s'", Integer.valueOf(i), this.spec.getSinkGroupId(), committed.metadata()));
                            }
                            Preconditions.checkState(read.equals(shardMetadata.writerId), "Writer ids don't match. This is mostly a unintended misuse of groupId('%s').Beam '%s', Kafka '%s'", this.spec.getSinkGroupId(), read, shardMetadata.writerId);
                            j2 = shardMetadata.sequenceId;
                            Preconditions.checkState(j2 >= j - 1, "Committed sequence id can not be lower than %s, partition metadata : %s", j - 1, committed.metadata());
                        }
                        KafkaExactlyOnceSink.LOG.info("{} : initialized producer {} with committed sequence id {}", Integer.valueOf(i), format, Long.valueOf(j2));
                        return new ShardWriter<>(i, read, initializeExactlyOnceProducer, format, this.spec, j2);
                    } finally {
                    }
                } finally {
                }
            } catch (IOException e) {
                initializeExactlyOnceProducer.close();
                throw e;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink$Reshard.class */
    public static class Reshard<K, V> extends DoFn<ProducerRecord<K, V>, KV<Integer, TimestampedValue<ProducerRecord<K, V>>>> {
        private final int numShards;
        private transient int shardId;

        Reshard(int i) {
            this.numShards = i;
        }

        @DoFn.Setup
        public void setup() {
            this.shardId = ThreadLocalRandom.current().nextInt(this.numShards);
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<ProducerRecord<K, V>, KV<Integer, TimestampedValue<ProducerRecord<K, V>>>>.ProcessContext processContext) {
            this.shardId = (this.shardId + 1) % this.numShards;
            processContext.output(KV.of(Integer.valueOf(this.shardId), TimestampedValue.of(processContext.element(), processContext.timestamp())));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink$Sequencer.class */
    public static class Sequencer<K, V> extends DoFn<KV<Integer, Iterable<TimestampedValue<ProducerRecord<K, V>>>>, KV<Integer, KV<Long, TimestampedValue<ProducerRecord<K, V>>>>> {
        private static final String NEXT_ID = "nextId";

        @DoFn.StateId(NEXT_ID)
        private final StateSpec<ValueState<Long>> nextIdSpec;

        private Sequencer() {
            this.nextIdSpec = StateSpecs.value();
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.StateId("nextId") ValueState<Long> valueState, DoFn<KV<Integer, Iterable<TimestampedValue<ProducerRecord<K, V>>>>, KV<Integer, KV<Long, TimestampedValue<ProducerRecord<K, V>>>>>.ProcessContext processContext) {
            long longValue = ((Long) MoreObjects.firstNonNull(valueState.read(), 0L)).longValue();
            int intValue = processContext.element().getKey().intValue();
            Iterator<TimestampedValue<ProducerRecord<K, V>>> it = processContext.element().getValue().iterator();
            while (it.hasNext()) {
                processContext.output(KV.of(Integer.valueOf(intValue), KV.of(Long.valueOf(longValue), it.next())));
                longValue++;
            }
            valueState.write(Long.valueOf(longValue));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void ensureEOSSupport() {
        Preconditions.checkArgument(ProducerSpEL.supportsTransactions(), "%s %s", "This version of Kafka client does not support transactions required to support", "exactly-once semantics. Please use Kafka client version 0.11 or newer.");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaExactlyOnceSink(KafkaIO.WriteRecords<K, V> writeRecords) {
        this.spec = writeRecords;
    }

    @Override // org.apache.beam.sdk.transforms.PTransform
    public PCollection<Void> expand(PCollection<ProducerRecord<K, V>> pCollection) {
        int numShards = this.spec.getNumShards();
        if (numShards <= 0) {
            Consumer<?, ?> openConsumer = openConsumer(this.spec);
            Throwable th = null;
            try {
                try {
                    numShards = openConsumer.partitionsFor(this.spec.getTopic()).size();
                    LOG.info("Using {} shards for exactly-once writer, matching number of partitions for topic '{}'", Integer.valueOf(numShards), this.spec.getTopic());
                    if (openConsumer != null) {
                        $closeResource(null, openConsumer);
                    }
                } finally {
                }
            } catch (Throwable th2) {
                if (openConsumer != null) {
                    $closeResource(th, openConsumer);
                }
                throw th2;
            }
        }
        Preconditions.checkState(numShards > 0, "Could not set number of shards");
        return (PCollection) ((PCollection) ((PCollection) ((PCollection) ((PCollection) ((PCollection) pCollection.apply(Window.into(new GlobalWindows()).triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))).discardingFiredPanes())).apply(String.format("Shuffle across %d shards", Integer.valueOf(numShards)), ParDo.of(new Reshard(numShards)))).apply("Persist sharding", GroupByKey.create())).apply("Assign sequential ids", ParDo.of(new Sequencer()))).apply("Persist ids", GroupByKey.create())).apply(String.format("Write to Kafka topic '%s'", this.spec.getTopic()), ParDo.of(new ExactlyOnceWriter(this.spec, pCollection.getCoder())));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Consumer<?, ?> openConsumer(KafkaIO.WriteRecords<?, ?> writeRecords) {
        return writeRecords.getConsumerFactoryFn().apply(ImmutableMap.of("bootstrap.servers", (Class) writeRecords.getProducerConfig().get("bootstrap.servers"), "group.id", (Class) writeRecords.getSinkGroupId(), ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <K, V> Producer<K, V> initializeExactlyOnceProducer(KafkaIO.WriteRecords<K, V> writeRecords, String str) {
        HashMap hashMap = new HashMap(writeRecords.getProducerConfig());
        hashMap.putAll(ImmutableMap.of(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, (String) writeRecords.getKeySerializer(), ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, (String) writeRecords.getValueSerializer(), ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, (String) true, ProducerConfig.TRANSACTIONAL_ID_CONFIG, str));
        Producer<K, V> apply = writeRecords.getProducerFactoryFn() != null ? writeRecords.getProducerFactoryFn().apply(hashMap) : new KafkaProducer<>(hashMap);
        ProducerSpEL.initTransactions(apply);
        return apply;
    }

    private static /* synthetic */ void $closeResource(Throwable th, AutoCloseable autoCloseable) {
        if (th == null) {
            autoCloseable.close();
            return;
        }
        try {
            autoCloseable.close();
        } catch (Throwable th2) {
            th.addSuppressed(th2);
        }
    }
}
