package org.apache.beam.sdk.io.kafka;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.io.BoundedReadFromUnboundedSource;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.kafka.AutoValue_KafkaIO_Read;
import org.apache.beam.sdk.io.kafka.AutoValue_KafkaIO_Write;
import org.apache.beam.sdk.io.kafka.KafkaCheckpointMark;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Gauge;
import org.apache.beam.sdk.metrics.SinkMetrics;
import org.apache.beam.sdk.metrics.SourceMetrics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdks.java.io.kafka.repackaged.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.sdks.java.io.kafka.repackaged.com.google.common.base.Function;
import org.apache.beam.sdks.java.io.kafka.repackaged.com.google.common.base.Joiner;
import org.apache.beam.sdks.java.io.kafka.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.sdks.java.io.kafka.repackaged.com.google.common.collect.ComparisonChain;
import org.apache.beam.sdks.java.io.kafka.repackaged.com.google.common.collect.ImmutableList;
import org.apache.beam.sdks.java.io.kafka.repackaged.com.google.common.collect.ImmutableMap;
import org.apache.beam.sdks.java.io.kafka.repackaged.com.google.common.collect.Iterators;
import org.apache.beam.sdks.java.io.kafka.repackaged.com.google.common.collect.Lists;
import org.apache.beam.sdks.java.io.kafka.repackaged.com.google.common.io.Closeables;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.AppInfoParser;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/*  JADX ERROR: NullPointerException in pass: ClassModifier
    java.lang.NullPointerException: Cannot invoke "java.util.List.forEach(java.util.function.Consumer)" because "blocks" is null
    	at jadx.core.utils.BlockUtils.collectAllInsns(BlockUtils.java:1017)
    	at jadx.core.dex.visitors.ClassModifier.removeBridgeMethod(ClassModifier.java:239)
    	at jadx.core.dex.visitors.ClassModifier.removeSyntheticMethods(ClassModifier.java:154)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.ClassModifier.visit(ClassModifier.java:64)
    	at jadx.core.dex.visitors.ClassModifier.visit(ClassModifier.java:57)
    */
@Experimental(Experimental.Kind.SOURCE_SINK)
/* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO.class */
public class KafkaIO {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaIO.class);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$KafkaValueWrite.class */
    public static class KafkaValueWrite<K, V> extends PTransform<PCollection<V>, PDone> {
        private final Write<K, V> kvWriteTransform;

        /* renamed from: org.apache.beam.sdk.io.kafka.KafkaIO$KafkaValueWrite$1 */
        /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$KafkaValueWrite$1.class */
        public class AnonymousClass1 extends SimpleFunction<V, KV<K, V>> {
            AnonymousClass1() {
            }

            public KV<K, V> apply(V v) {
                return KV.of((Object) null, v);
            }

            /* renamed from: apply */
            public /* bridge */ /* synthetic */ Object m2apply(Object obj) {
                return apply((AnonymousClass1) obj);
            }
        }

        private KafkaValueWrite(Write<K, V> write) {
            this.kvWriteTransform = write;
        }

        public PDone expand(PCollection<V> pCollection) {
            return pCollection.apply("Kafka values with default key", MapElements.via(new SimpleFunction<V, KV<K, V>>() { // from class: org.apache.beam.sdk.io.kafka.KafkaIO.KafkaValueWrite.1
                AnonymousClass1() {
                }

                public KV<K, V> apply(V v) {
                    return KV.of((Object) null, v);
                }

                /* renamed from: apply */
                public /* bridge */ /* synthetic */ Object m2apply(Object obj) {
                    return apply((AnonymousClass1) obj);
                }
            })).setCoder(KvCoder.of(new NullOnlyCoder(null), pCollection.getCoder())).apply(this.kvWriteTransform);
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            this.kvWriteTransform.populateDisplayData(builder);
        }

        /* synthetic */ KafkaValueWrite(Write write, AnonymousClass1 anonymousClass1) {
            this(write);
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$KafkaWriter.class */
    public static class KafkaWriter<K, V> extends DoFn<KV<K, V>, Void> {
        private final Write<K, V> spec;
        private final Map<String, Object> producerConfig;
        private transient Producer<K, V> producer = null;
        private transient Exception sendException = null;
        private transient long numSendFailures = 0;
        private final Counter elementsWritten = SinkMetrics.elementsWritten();

        /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$KafkaWriter$SendCallback.class */
        private class SendCallback implements Callback {
            private SendCallback() {
            }

            public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                if (exc == null) {
                    return;
                }
                synchronized (KafkaWriter.this) {
                    if (KafkaWriter.this.sendException == null) {
                        KafkaWriter.this.sendException = exc;
                    }
                    KafkaWriter.access$2008(KafkaWriter.this);
                }
                KafkaIO.LOG.warn("KafkaWriter send failed : '{}'", exc.getMessage());
            }

            /* synthetic */ SendCallback(KafkaWriter kafkaWriter, AnonymousClass1 anonymousClass1) {
                this();
            }
        }

        @DoFn.Setup
        public void setup() {
            if (this.spec.getProducerFactoryFn() != null) {
                this.producer = (Producer) this.spec.getProducerFactoryFn().apply(this.producerConfig);
            } else {
                this.producer = new KafkaProducer(this.producerConfig);
            }
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<KV<K, V>, Void>.ProcessContext processContext) throws Exception {
            checkForFailures();
            KV kv = (KV) processContext.element();
            this.producer.send(new ProducerRecord(this.spec.getTopic(), kv.getKey(), kv.getValue()), new SendCallback());
            this.elementsWritten.inc();
        }

        @DoFn.FinishBundle
        public void finishBundle() throws IOException {
            this.producer.flush();
            checkForFailures();
        }

        @DoFn.Teardown
        public void teardown() {
            this.producer.close();
        }

        KafkaWriter(Write<K, V> write) {
            this.spec = write;
            this.producerConfig = new HashMap(write.getProducerConfig());
            this.producerConfig.put("key.serializer", write.getKeySerializer());
            this.producerConfig.put("value.serializer", write.getValueSerializer());
        }

        private synchronized void checkForFailures() throws IOException {
            if (this.numSendFailures == 0) {
                return;
            }
            String format = String.format("KafkaWriter : failed to send %d records (since last report)", Long.valueOf(this.numSendFailures));
            Exception exc = this.sendException;
            this.sendException = null;
            this.numSendFailures = 0L;
            KafkaIO.LOG.warn(format);
            throw new IOException(format, exc);
        }

        /*  JADX ERROR: Failed to decode insn: 0x0005: MOVE_MULTI, method: org.apache.beam.sdk.io.kafka.KafkaIO.KafkaWriter.access$2008(org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter):long
            java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[8]
            	at java.base/java.lang.System.arraycopy(Native Method)
            	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
            	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
            	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
            	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
            	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
            	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
            	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
            	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
            	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
            	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:449)
            	at jadx.core.ProcessClass.process(ProcessClass.java:70)
            	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
            	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
            	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
            	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
            */
        static /* synthetic */ long access$2008(org.apache.beam.sdk.io.kafka.KafkaIO.KafkaWriter r8) {
            /*
                r0 = r8
                r1 = r0
                long r1 = r1.numSendFailures
                // decode failed: arraycopy: source index -1 out of bounds for object array[8]
                r2 = 1
                long r1 = r1 + r2
                r0.numSendFailures = r1
                return r-1
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.beam.sdk.io.kafka.KafkaIO.KafkaWriter.access$2008(org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter):long");
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$NullOnlyCoder.class */
    public static class NullOnlyCoder<T> extends AtomicCoder<T> {
        private NullOnlyCoder() {
        }

        public void encode(T t, OutputStream outputStream) {
            Preconditions.checkArgument(t == null, "Can only encode nulls");
        }

        public T decode(InputStream inputStream) {
            return null;
        }

        /* synthetic */ NullOnlyCoder(AnonymousClass1 anonymousClass1) {
            this();
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$Read.class */
    public static abstract class Read<K, V> extends PTransform<PBegin, PCollection<KafkaRecord<K, V>>> {
        private static final Map<String, String> IGNORED_CONSUMER_PROPERTIES = ImmutableMap.of("key.deserializer", "Set keyDeserializer instead", "value.deserializer", "Set valueDeserializer instead");
        private static final Map<String, Object> DEFAULT_CONSUMER_PROPERTIES = ImmutableMap.of("key.deserializer", (boolean) ByteArrayDeserializer.class.getName(), "value.deserializer", (boolean) ByteArrayDeserializer.class.getName(), "receive.buffer.bytes", true, "auto.offset.reset", (boolean) "latest", "enable.auto.commit", false);
        private static final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> KAFKA_CONSUMER_FACTORY_FN = new SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>() { // from class: org.apache.beam.sdk.io.kafka.KafkaIO.Read.2
            AnonymousClass2() {
            }

            public Consumer<byte[], byte[]> apply(Map<String, Object> map) {
                return new KafkaConsumer(map);
            }

            public /* bridge */ /* synthetic */ Object apply(Object obj) {
                return apply((Map<String, Object>) obj);
            }
        };

        /* renamed from: org.apache.beam.sdk.io.kafka.KafkaIO$Read$1 */
        /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$Read$1.class */
        public static class AnonymousClass1<KeyT, OutT, ValueT> implements SerializableFunction<KafkaRecord<KeyT, ValueT>, OutT> {
            final /* synthetic */ SerializableFunction val$fn;

            AnonymousClass1(SerializableFunction serializableFunction) {
                r4 = serializableFunction;
            }

            public OutT apply(KafkaRecord<KeyT, ValueT> kafkaRecord) {
                return (OutT) r4.apply(kafkaRecord.getKV());
            }

            public /* bridge */ /* synthetic */ Object apply(Object obj) {
                return apply((KafkaRecord) obj);
            }
        }

        /* renamed from: org.apache.beam.sdk.io.kafka.KafkaIO$Read$2 */
        /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$Read$2.class */
        static class AnonymousClass2 implements SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> {
            AnonymousClass2() {
            }

            public Consumer<byte[], byte[]> apply(Map<String, Object> map) {
                return new KafkaConsumer(map);
            }

            public /* bridge */ /* synthetic */ Object apply(Object obj) {
                return apply((Map<String, Object>) obj);
            }
        }

        /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$Read$Builder.class */
        public static abstract class Builder<K, V> {
            public Builder() {
            }

            abstract Builder<K, V> setConsumerConfig(Map<String, Object> map);

            abstract Builder<K, V> setTopics(List<String> list);

            abstract Builder<K, V> setTopicPartitions(List<TopicPartition> list);

            abstract Builder<K, V> setKeyCoder(Coder<K> coder);

            abstract Builder<K, V> setValueCoder(Coder<V> coder);

            abstract Builder<K, V> setKeyDeserializer(Class<? extends Deserializer<K>> cls);

            abstract Builder<K, V> setValueDeserializer(Class<? extends Deserializer<V>> cls);

            abstract Builder<K, V> setConsumerFactoryFn(SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> serializableFunction);

            abstract Builder<K, V> setTimestampFn(SerializableFunction<KafkaRecord<K, V>, Instant> serializableFunction);

            abstract Builder<K, V> setWatermarkFn(SerializableFunction<KafkaRecord<K, V>, Instant> serializableFunction);

            abstract Builder<K, V> setMaxNumRecords(long j);

            abstract Builder<K, V> setMaxReadTime(Duration duration);

            abstract Builder<K, V> setStartReadTime(Instant instant);

            abstract Read<K, V> build();
        }

        public Read() {
        }

        public abstract Map<String, Object> getConsumerConfig();

        public abstract List<String> getTopics();

        public abstract List<TopicPartition> getTopicPartitions();

        @Nullable
        public abstract Coder<K> getKeyCoder();

        @Nullable
        public abstract Coder<V> getValueCoder();

        @Nullable
        public abstract Class<? extends Deserializer<K>> getKeyDeserializer();

        @Nullable
        public abstract Class<? extends Deserializer<V>> getValueDeserializer();

        public abstract SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> getConsumerFactoryFn();

        @Nullable
        public abstract SerializableFunction<KafkaRecord<K, V>, Instant> getTimestampFn();

        @Nullable
        public abstract SerializableFunction<KafkaRecord<K, V>, Instant> getWatermarkFn();

        public abstract long getMaxNumRecords();

        @Nullable
        public abstract Duration getMaxReadTime();

        @Nullable
        public abstract Instant getStartReadTime();

        abstract Builder<K, V> toBuilder();

        public Read<K, V> withBootstrapServers(String str) {
            return updateConsumerProperties(ImmutableMap.of("bootstrap.servers", str));
        }

        public Read<K, V> withTopic(String str) {
            return withTopics(ImmutableList.of(str));
        }

        public Read<K, V> withTopics(List<String> list) {
            Preconditions.checkState(getTopicPartitions().isEmpty(), "Only topics or topicPartitions can be set, not both");
            return toBuilder().setTopics(ImmutableList.copyOf((Collection) list)).build();
        }

        public Read<K, V> withTopicPartitions(List<TopicPartition> list) {
            Preconditions.checkState(getTopics().isEmpty(), "Only topics or topicPartitions can be set, not both");
            return toBuilder().setTopicPartitions(ImmutableList.copyOf((Collection) list)).build();
        }

        public Read<K, V> withKeyDeserializer(Class<? extends Deserializer<K>> cls) {
            return toBuilder().setKeyDeserializer(cls).build();
        }

        public Read<K, V> withKeyDeserializerAndCoder(Class<? extends Deserializer<K>> cls, Coder<K> coder) {
            return toBuilder().setKeyDeserializer(cls).setKeyCoder(coder).build();
        }

        public Read<K, V> withValueDeserializer(Class<? extends Deserializer<V>> cls) {
            return toBuilder().setValueDeserializer(cls).build();
        }

        public Read<K, V> withValueDeserializerAndCoder(Class<? extends Deserializer<V>> cls, Coder<V> coder) {
            return toBuilder().setValueDeserializer(cls).setValueCoder(coder).build();
        }

        public Read<K, V> withConsumerFactoryFn(SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> serializableFunction) {
            return toBuilder().setConsumerFactoryFn(serializableFunction).build();
        }

        public Read<K, V> updateConsumerProperties(Map<String, Object> map) {
            return toBuilder().setConsumerConfig(KafkaIO.updateKafkaProperties(getConsumerConfig(), IGNORED_CONSUMER_PROPERTIES, map)).build();
        }

        public Read<K, V> withMaxNumRecords(long j) {
            return toBuilder().setMaxNumRecords(j).setMaxReadTime(null).build();
        }

        public Read<K, V> withStartReadTime(Instant instant) {
            return toBuilder().setStartReadTime(instant).build();
        }

        public Read<K, V> withMaxReadTime(Duration duration) {
            return toBuilder().setMaxNumRecords(Long.MAX_VALUE).setMaxReadTime(duration).build();
        }

        public Read<K, V> withTimestampFn2(SerializableFunction<KafkaRecord<K, V>, Instant> serializableFunction) {
            Preconditions.checkNotNull(serializableFunction);
            return toBuilder().setTimestampFn(serializableFunction).build();
        }

        public Read<K, V> withWatermarkFn2(SerializableFunction<KafkaRecord<K, V>, Instant> serializableFunction) {
            Preconditions.checkNotNull(serializableFunction);
            return toBuilder().setWatermarkFn(serializableFunction).build();
        }

        public Read<K, V> withTimestampFn(SerializableFunction<KV<K, V>, Instant> serializableFunction) {
            Preconditions.checkNotNull(serializableFunction);
            return withTimestampFn2(unwrapKafkaAndThen(serializableFunction));
        }

        public Read<K, V> withWatermarkFn(SerializableFunction<KV<K, V>, Instant> serializableFunction) {
            Preconditions.checkNotNull(serializableFunction);
            return withWatermarkFn2(unwrapKafkaAndThen(serializableFunction));
        }

        public PTransform<PBegin, PCollection<KV<K, V>>> withoutMetadata() {
            return new TypedWithoutMetadata(this);
        }

        public void validate(PipelineOptions pipelineOptions) {
            Preconditions.checkNotNull(getConsumerConfig().get("bootstrap.servers"), "Kafka bootstrap servers should be set");
            Preconditions.checkArgument(getTopics().size() > 0 || getTopicPartitions().size() > 0, "Kafka topics or topic_partitions are required");
            Preconditions.checkNotNull(getKeyDeserializer(), "Key deserializer must be set");
            Preconditions.checkNotNull(getValueDeserializer(), "Value deserializer must be set");
            if (getStartReadTime() != null) {
                Preconditions.checkArgument(new ConsumerSpEL().hasOffsetsForTimes(), "Consumer.offsetsForTimes is only supported by Kafka Client 0.10.1.0 onwards, current version of Kafka Client is " + AppInfoParser.getVersion() + ". If you are building with maven, set \"kafka.clients.version\" maven property to 0.10.1.0 or newer.");
            }
        }

        public PCollection<KafkaRecord<K, V>> expand(PBegin pBegin) {
            CoderRegistry coderRegistry = pBegin.getPipeline().getCoderRegistry();
            BoundedReadFromUnboundedSource from = org.apache.beam.sdk.io.Read.from(toBuilder().setKeyCoder((Coder) Preconditions.checkNotNull(getKeyCoder() != null ? getKeyCoder() : KafkaIO.inferCoder(coderRegistry, getKeyDeserializer()), "Key coder could not be inferred from key deserializer. Please providekey coder explicitly using withKeyDeserializerAndCoder()")).setValueCoder((Coder) Preconditions.checkNotNull(getValueCoder() != null ? getValueCoder() : KafkaIO.inferCoder(coderRegistry, getValueDeserializer()), "Value coder could not be inferred from value deserializer. Please providevalue coder explicitly using withValueDeserializerAndCoder()")).build().makeSource());
            BoundedReadFromUnboundedSource boundedReadFromUnboundedSource = from;
            if (getMaxNumRecords() < Long.MAX_VALUE) {
                boundedReadFromUnboundedSource = from.withMaxNumRecords(getMaxNumRecords());
            } else if (getMaxReadTime() != null) {
                boundedReadFromUnboundedSource = from.withMaxReadTime(getMaxReadTime());
            }
            return pBegin.getPipeline().apply(boundedReadFromUnboundedSource);
        }

        @VisibleForTesting
        UnboundedSource<KafkaRecord<K, V>, KafkaCheckpointMark> makeSource() {
            return new UnboundedKafkaSource(this, -1);
        }

        private static <KeyT, ValueT, OutT> SerializableFunction<KafkaRecord<KeyT, ValueT>, OutT> unwrapKafkaAndThen(SerializableFunction<KV<KeyT, ValueT>, OutT> serializableFunction) {
            return new SerializableFunction<KafkaRecord<KeyT, ValueT>, OutT>() { // from class: org.apache.beam.sdk.io.kafka.KafkaIO.Read.1
                final /* synthetic */ SerializableFunction val$fn;

                AnonymousClass1(SerializableFunction serializableFunction2) {
                    r4 = serializableFunction2;
                }

                public OutT apply(KafkaRecord<KeyT, ValueT> kafkaRecord) {
                    return (OutT) r4.apply(kafkaRecord.getKV());
                }

                public /* bridge */ /* synthetic */ Object apply(Object obj) {
                    return apply((KafkaRecord) obj);
                }
            };
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            List<String> topics = getTopics();
            List<TopicPartition> topicPartitions = getTopicPartitions();
            if (topics.size() > 0) {
                builder.add(DisplayData.item("topics", Joiner.on(",").join(topics)).withLabel("Topic/s"));
            } else if (topicPartitions.size() > 0) {
                builder.add(DisplayData.item("topicPartitions", Joiner.on(",").join(topicPartitions)).withLabel("Topic Partition/s"));
            }
            Set<String> keySet = IGNORED_CONSUMER_PROPERTIES.keySet();
            for (Map.Entry<String, Object> entry : getConsumerConfig().entrySet()) {
                String key = entry.getKey();
                if (!keySet.contains(key)) {
                    builder.add(DisplayData.item(key, ValueProvider.StaticValueProvider.of(DisplayData.inferType(entry.getValue()) != null ? entry.getValue() : String.valueOf(entry.getValue()))));
                }
            }
        }

        public /* bridge */ /* synthetic */ POutput expand(PInput pInput) {
            return expand((PBegin) pInput);
        }

        static {
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$TypedWithoutMetadata.class */
    public static class TypedWithoutMetadata<K, V> extends PTransform<PBegin, PCollection<KV<K, V>>> {
        private final Read<K, V> read;

        /* renamed from: org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1 */
        /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$TypedWithoutMetadata$1.class */
        public class AnonymousClass1 extends DoFn<KafkaRecord<K, V>, KV<K, V>> {
            final /* synthetic */ TypedWithoutMetadata this$0;

            AnonymousClass1(TypedWithoutMetadata typedWithoutMetadata) {
                this.this$0 = typedWithoutMetadata;
            }

            @DoFn.ProcessElement
            public void processElement(DoFn<KafkaRecord<K, V>, KV<K, V>>.ProcessContext processContext) {
                processContext.output(((KafkaRecord) processContext.element()).getKV());
            }
        }

        TypedWithoutMetadata(Read<K, V> read) {
            super("KafkaIO.Read");
            this.read = read;
        }

        public PCollection<KV<K, V>> expand(PBegin pBegin) {
            return pBegin.apply(this.read).apply("Remove Kafka Metadata", ParDo.of(new DoFn<KafkaRecord<K, V>, KV<K, V>>(this) { // from class: org.apache.beam.sdk.io.kafka.KafkaIO.TypedWithoutMetadata.1
                final /* synthetic */ TypedWithoutMetadata this$0;

                AnonymousClass1(TypedWithoutMetadata this) {
                    this.this$0 = this;
                }

                @DoFn.ProcessElement
                public void processElement(DoFn<KafkaRecord<K, V>, KV<K, V>>.ProcessContext processContext) {
                    processContext.output(((KafkaRecord) processContext.element()).getKV());
                }
            }));
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            this.read.populateDisplayData(builder);
        }

        public /* bridge */ /* synthetic */ POutput expand(PInput pInput) {
            return expand((PBegin) pInput);
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$UnboundedKafkaReader.class */
    public static class UnboundedKafkaReader<K, V> extends UnboundedSource.UnboundedReader<KafkaRecord<K, V>> {
        private final UnboundedKafkaSource<K, V> source;
        private final String name;
        private Consumer<byte[], byte[]> consumer;
        private final List<PartitionState> partitionStates;
        private KafkaRecord<K, V> curRecord;
        private Instant curTimestamp;
        private final Counter elementsReadBySplit;
        private final Counter bytesReadBySplit;
        private final Gauge backlogBytesOfSplit;
        private final Gauge backlogElementsOfSplit;
        private Consumer<byte[], byte[]> offsetConsumer;
        private static final int OFFSET_UPDATE_INTERVAL_SECONDS = 5;
        private static final long UNINITIALIZED_OFFSET = -1;
        private static final Duration KAFKA_POLL_TIMEOUT = Duration.millis(1000);
        private static final Duration NEW_RECORDS_POLL_TIMEOUT = Duration.millis(10);
        private static Instant initialWatermark = new Instant(Long.MIN_VALUE);
        private Iterator<PartitionState> curBatch = Collections.emptyIterator();
        private Deserializer<K> keyDeserializerInstance = null;
        private Deserializer<V> valueDeserializerInstance = null;
        private final Counter elementsRead = SourceMetrics.elementsRead();
        private final Counter bytesRead = SourceMetrics.bytesRead();
        private final ExecutorService consumerPollThread = Executors.newSingleThreadExecutor();
        private final SynchronousQueue<ConsumerRecords<byte[], byte[]>> availableRecordsQueue = new SynchronousQueue<>();
        private AtomicBoolean closed = new AtomicBoolean(false);
        private final ScheduledExecutorService offsetFetcherThread = Executors.newSingleThreadScheduledExecutor();
        private transient ConsumerSpEL consumerSpEL = new ConsumerSpEL();

        /* JADX INFO: Access modifiers changed from: package-private */
        /* renamed from: org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaReader$1 */
        /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$UnboundedKafkaReader$1.class */
        public class AnonymousClass1 implements Function<TopicPartition, PartitionState> {
            final /* synthetic */ UnboundedKafkaReader this$0;

            AnonymousClass1(UnboundedKafkaReader unboundedKafkaReader) {
                this.this$0 = unboundedKafkaReader;
            }

            /* renamed from: apply */
            public PartitionState apply2(TopicPartition topicPartition) {
                return new PartitionState(topicPartition, -1L);
            }

            @Override // org.apache.beam.sdks.java.io.kafka.repackaged.com.google.common.base.Function
            public /* bridge */ /* synthetic */ PartitionState apply(TopicPartition topicPartition) {
                return apply2(topicPartition);
            }
        }

        /* renamed from: org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaReader$2 */
        /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$UnboundedKafkaReader$2.class */
        class AnonymousClass2 implements Runnable {
            final /* synthetic */ PartitionState val$pState;
            final /* synthetic */ UnboundedKafkaReader this$0;

            AnonymousClass2(UnboundedKafkaReader unboundedKafkaReader, PartitionState partitionState) {
                this.this$0 = unboundedKafkaReader;
                r5 = partitionState;
            }

            @Override // java.lang.Runnable
            public void run() {
                this.this$0.setupInitialOffset(r5);
            }
        }

        /* renamed from: org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaReader$3 */
        /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$UnboundedKafkaReader$3.class */
        class AnonymousClass3 implements Runnable {
            final /* synthetic */ UnboundedKafkaReader this$0;

            AnonymousClass3(UnboundedKafkaReader unboundedKafkaReader) {
                this.this$0 = unboundedKafkaReader;
            }

            @Override // java.lang.Runnable
            public void run() {
                this.this$0.consumerPollLoop();
            }
        }

        /* renamed from: org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaReader$4 */
        /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$UnboundedKafkaReader$4.class */
        class AnonymousClass4 implements Runnable {
            final /* synthetic */ UnboundedKafkaReader this$0;

            AnonymousClass4(UnboundedKafkaReader unboundedKafkaReader) {
                this.this$0 = unboundedKafkaReader;
            }

            @Override // java.lang.Runnable
            public void run() {
                this.this$0.updateLatestOffsets();
            }
        }

        /* renamed from: org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaReader$5 */
        /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$UnboundedKafkaReader$5.class */
        class AnonymousClass5 implements Function<PartitionState, KafkaCheckpointMark.PartitionMark> {
            final /* synthetic */ UnboundedKafkaReader this$0;

            AnonymousClass5(UnboundedKafkaReader unboundedKafkaReader) {
                this.this$0 = unboundedKafkaReader;
            }

            /* renamed from: apply */
            public KafkaCheckpointMark.PartitionMark apply2(PartitionState partitionState) {
                return new KafkaCheckpointMark.PartitionMark(partitionState.topicPartition.topic(), partitionState.topicPartition.partition(), partitionState.nextOffset);
            }

            @Override // org.apache.beam.sdks.java.io.kafka.repackaged.com.google.common.base.Function
            public /* bridge */ /* synthetic */ KafkaCheckpointMark.PartitionMark apply(PartitionState partitionState) {
                return apply2(partitionState);
            }
        }

        /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$UnboundedKafkaReader$MovingAvg.class */
        public static class MovingAvg {
            private static final int MOVING_AVG_WINDOW = 1000;
            private double avg;
            private long numUpdates;

            private MovingAvg() {
                this.avg = 0.0d;
                this.numUpdates = 0L;
            }

            void update(double d) {
                this.numUpdates++;
                this.avg += (d - this.avg) / Math.min(1000L, this.numUpdates);
            }

            double get() {
                return this.avg;
            }

            /* synthetic */ MovingAvg(AnonymousClass1 anonymousClass1) {
                this();
            }
        }

        /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$UnboundedKafkaReader$PartitionState.class */
        public static class PartitionState {
            private final TopicPartition topicPartition;
            private long nextOffset;
            private Iterator<ConsumerRecord<byte[], byte[]>> recordIter = Collections.emptyIterator();
            private MovingAvg avgRecordSize = new MovingAvg(null);
            private MovingAvg avgOffsetGap = new MovingAvg(null);
            private long latestOffset = -1;

            PartitionState(TopicPartition topicPartition, long j) {
                this.topicPartition = topicPartition;
                this.nextOffset = j;
            }

            void recordConsumed(long j, int i, long j2) {
                this.nextOffset = j + 1;
                this.avgRecordSize.update(i);
                this.avgOffsetGap.update(j2);
            }

            synchronized void setLatestOffset(long j) {
                this.latestOffset = j;
            }

            synchronized long approxBacklogInBytes() {
                long backlogMessageCount = backlogMessageCount();
                if (backlogMessageCount == -1) {
                    return -1L;
                }
                return (long) (backlogMessageCount * this.avgRecordSize.get());
            }

            synchronized long backlogMessageCount() {
                if (this.latestOffset < 0 || this.nextOffset < 0) {
                    return -1L;
                }
                return Math.max(0L, (long) Math.ceil((this.latestOffset - this.nextOffset) / (1.0d + this.avgOffsetGap.get())));
            }

            /*  JADX ERROR: Failed to decode insn: 0x0002: MOVE_MULTI, method: org.apache.beam.sdk.io.kafka.KafkaIO.UnboundedKafkaReader.PartitionState.access$802(org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaReader$PartitionState, long):long
                java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
                	at java.base/java.lang.System.arraycopy(Native Method)
                	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
                	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
                	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
                	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
                	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
                	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
                	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
                	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
                	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
                	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:449)
                */
            static /* synthetic */ long access$802(org.apache.beam.sdk.io.kafka.KafkaIO.UnboundedKafkaReader.PartitionState r6, long r7) {
                /*
                    r0 = r6
                    r1 = r7
                    // decode failed: arraycopy: source index -1 out of bounds for object array[6]
                    r0.nextOffset = r1
                    return r-1
                */
                throw new UnsupportedOperationException("Method not decompiled: org.apache.beam.sdk.io.kafka.KafkaIO.UnboundedKafkaReader.PartitionState.access$802(org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaReader$PartitionState, long):long");
            }
        }

        public String toString() {
            return this.name;
        }

        public UnboundedKafkaReader(UnboundedKafkaSource<K, V> unboundedKafkaSource, @Nullable KafkaCheckpointMark kafkaCheckpointMark) {
            this.source = unboundedKafkaSource;
            this.name = "Reader-" + ((UnboundedKafkaSource) unboundedKafkaSource).id;
            List<TopicPartition> topicPartitions = ((UnboundedKafkaSource) unboundedKafkaSource).spec.getTopicPartitions();
            this.partitionStates = ImmutableList.copyOf((Collection) Lists.transform(topicPartitions, new Function<TopicPartition, PartitionState>(this) { // from class: org.apache.beam.sdk.io.kafka.KafkaIO.UnboundedKafkaReader.1
                final /* synthetic */ UnboundedKafkaReader this$0;

                AnonymousClass1(UnboundedKafkaReader this) {
                    this.this$0 = this;
                }

                /* renamed from: apply */
                public PartitionState apply2(TopicPartition topicPartition) {
                    return new PartitionState(topicPartition, -1L);
                }

                @Override // org.apache.beam.sdks.java.io.kafka.repackaged.com.google.common.base.Function
                public /* bridge */ /* synthetic */ PartitionState apply(TopicPartition topicPartition) {
                    return apply2(topicPartition);
                }
            }));
            if (kafkaCheckpointMark != null) {
                Preconditions.checkState(kafkaCheckpointMark.getPartitions().size() == topicPartitions.size(), "checkPointMark and assignedPartitions should match");
                for (int i = 0; i < topicPartitions.size(); i++) {
                    KafkaCheckpointMark.PartitionMark partitionMark = kafkaCheckpointMark.getPartitions().get(i);
                    TopicPartition topicPartition = topicPartitions.get(i);
                    TopicPartition topicPartition2 = new TopicPartition(partitionMark.getTopic(), partitionMark.getPartition());
                    Preconditions.checkState(topicPartition2.equals(topicPartition), "checkpointed partition %s and assigned partition %s don't match", topicPartition2, topicPartition);
                    PartitionState.access$802(this.partitionStates.get(i), partitionMark.getNextOffset());
                }
            }
            String valueOf = String.valueOf(((UnboundedKafkaSource) unboundedKafkaSource).id);
            this.elementsReadBySplit = SourceMetrics.elementsReadBySplit(valueOf);
            this.bytesReadBySplit = SourceMetrics.bytesReadBySplit(valueOf);
            this.backlogBytesOfSplit = SourceMetrics.backlogBytesOfSplit(valueOf);
            this.backlogElementsOfSplit = SourceMetrics.backlogElementsOfSplit(valueOf);
        }

        public void consumerPollLoop() {
            while (!this.closed.get()) {
                try {
                    ConsumerRecords<byte[], byte[]> poll = this.consumer.poll(KAFKA_POLL_TIMEOUT.getMillis());
                    if (!poll.isEmpty() && !this.closed.get()) {
                        this.availableRecordsQueue.put(poll);
                    }
                } catch (InterruptedException e) {
                    KafkaIO.LOG.warn("{}: consumer thread is interrupted", this, e);
                } catch (WakeupException e2) {
                }
            }
            KafkaIO.LOG.info("{}: Returning from consumer pool loop", this);
        }

        private void nextBatch() {
            this.curBatch = Collections.emptyIterator();
            try {
                ConsumerRecords<byte[], byte[]> poll = this.availableRecordsQueue.poll(NEW_RECORDS_POLL_TIMEOUT.getMillis(), TimeUnit.MILLISECONDS);
                if (poll == null) {
                    return;
                }
                LinkedList linkedList = new LinkedList();
                for (PartitionState partitionState : this.partitionStates) {
                    partitionState.recordIter = poll.records(partitionState.topicPartition).iterator();
                    if (partitionState.recordIter.hasNext()) {
                        linkedList.add(partitionState);
                    }
                }
                this.curBatch = Iterators.cycle(linkedList);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                KafkaIO.LOG.warn("{}: Unexpected", this, e);
            }
        }

        public void setupInitialOffset(PartitionState partitionState) {
            Read read = ((UnboundedKafkaSource) this.source).spec;
            if (partitionState.nextOffset != -1) {
                this.consumer.seek(partitionState.topicPartition, partitionState.nextOffset);
            } else if (read.getStartReadTime() == null) {
                PartitionState.access$802(partitionState, this.consumer.position(partitionState.topicPartition));
            } else {
                PartitionState.access$802(partitionState, this.consumerSpEL.offsetForTime(this.consumer, partitionState.topicPartition, read.getStartReadTime()));
                this.consumer.seek(partitionState.topicPartition, partitionState.nextOffset);
            }
        }

        public boolean start() throws IOException {
            Read read = ((UnboundedKafkaSource) this.source).spec;
            this.consumer = (Consumer) read.getConsumerFactoryFn().apply(read.getConsumerConfig());
            this.consumerSpEL.evaluateAssign(this.consumer, read.getTopicPartitions());
            try {
                this.keyDeserializerInstance = ((UnboundedKafkaSource) this.source).spec.getKeyDeserializer().newInstance();
                this.valueDeserializerInstance = ((UnboundedKafkaSource) this.source).spec.getValueDeserializer().newInstance();
                this.keyDeserializerInstance.configure(read.getConsumerConfig(), true);
                this.valueDeserializerInstance.configure(read.getConsumerConfig(), false);
                for (PartitionState partitionState : this.partitionStates) {
                    try {
                        this.consumerPollThread.submit(new Runnable(this) { // from class: org.apache.beam.sdk.io.kafka.KafkaIO.UnboundedKafkaReader.2
                            final /* synthetic */ PartitionState val$pState;
                            final /* synthetic */ UnboundedKafkaReader this$0;

                            AnonymousClass2(UnboundedKafkaReader this, PartitionState partitionState2) {
                                this.this$0 = this;
                                r5 = partitionState2;
                            }

                            @Override // java.lang.Runnable
                            public void run() {
                                this.this$0.setupInitialOffset(r5);
                            }
                        }).get(((Integer) ((UnboundedKafkaSource) this.source).spec.getConsumerConfig().get("request.timeout.ms")) != null ? 2 * r0.intValue() : 60000L, TimeUnit.MILLISECONDS);
                        KafkaIO.LOG.info("{}: reading from {} starting at offset {}", new Object[]{this.name, partitionState2.topicPartition, Long.valueOf(partitionState2.nextOffset)});
                    } catch (TimeoutException e) {
                        this.consumer.wakeup();
                        String format = String.format("%s: Timeout while initializing partition '%s'. Kafka client may not be able to connect to servers.", this, partitionState2.topicPartition);
                        KafkaIO.LOG.error("{}", format);
                        throw new IOException(format);
                    } catch (Exception e2) {
                        throw new IOException(e2);
                    }
                }
                this.consumerPollThread.submit(new Runnable(this) { // from class: org.apache.beam.sdk.io.kafka.KafkaIO.UnboundedKafkaReader.3
                    final /* synthetic */ UnboundedKafkaReader this$0;

                    AnonymousClass3(UnboundedKafkaReader this) {
                        this.this$0 = this;
                    }

                    @Override // java.lang.Runnable
                    public void run() {
                        this.this$0.consumerPollLoop();
                    }
                });
                Object obj = read.getConsumerConfig().get("group.id");
                Object[] objArr = new Object[3];
                objArr[0] = this.name;
                objArr[1] = Integer.valueOf(new Random().nextInt(Integer.MAX_VALUE));
                objArr[2] = obj == null ? "none" : obj;
                String format2 = String.format("%s_offset_consumer_%d_%s", objArr);
                HashMap hashMap = new HashMap(read.getConsumerConfig());
                hashMap.put("group.id", format2);
                hashMap.put("enable.auto.commit", false);
                this.offsetConsumer = (Consumer) read.getConsumerFactoryFn().apply(hashMap);
                this.consumerSpEL.evaluateAssign(this.offsetConsumer, read.getTopicPartitions());
                this.offsetFetcherThread.scheduleAtFixedRate(new Runnable(this) { // from class: org.apache.beam.sdk.io.kafka.KafkaIO.UnboundedKafkaReader.4
                    final /* synthetic */ UnboundedKafkaReader this$0;

                    AnonymousClass4(UnboundedKafkaReader this) {
                        this.this$0 = this;
                    }

                    @Override // java.lang.Runnable
                    public void run() {
                        this.this$0.updateLatestOffsets();
                    }
                }, 0L, 5L, TimeUnit.SECONDS);
                nextBatch();
                return advance();
            } catch (IllegalAccessException | InstantiationException e3) {
                throw new IOException("Could not instantiate deserializers", e3);
            }
        }

        public boolean advance() throws IOException {
            while (true) {
                if (this.curBatch.hasNext()) {
                    PartitionState next = this.curBatch.next();
                    this.elementsRead.inc();
                    this.elementsReadBySplit.inc();
                    if (next.recordIter.hasNext()) {
                        ConsumerRecord<byte[], byte[]> consumerRecord = (ConsumerRecord) next.recordIter.next();
                        long j = next.nextOffset;
                        long offset = consumerRecord.offset();
                        if (offset >= j) {
                            long j2 = offset - j;
                            if (this.curRecord == null) {
                                KafkaIO.LOG.info("{}: first record offset {}", this.name, Long.valueOf(offset));
                                j2 = 0;
                            }
                            this.curRecord = null;
                            KafkaRecord<K, V> kafkaRecord = new KafkaRecord<>(consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset(), this.consumerSpEL.getRecordTimestamp(consumerRecord), this.keyDeserializerInstance.deserialize(consumerRecord.topic(), (byte[]) consumerRecord.key()), this.valueDeserializerInstance.deserialize(consumerRecord.topic(), (byte[]) consumerRecord.value()));
                            this.curTimestamp = ((UnboundedKafkaSource) this.source).spec.getTimestampFn() == null ? Instant.now() : (Instant) ((UnboundedKafkaSource) this.source).spec.getTimestampFn().apply(kafkaRecord);
                            this.curRecord = kafkaRecord;
                            int length = (consumerRecord.key() == null ? 0 : ((byte[]) consumerRecord.key()).length) + (consumerRecord.value() == null ? 0 : ((byte[]) consumerRecord.value()).length);
                            next.recordConsumed(offset, length, j2);
                            this.bytesRead.inc(length);
                            this.bytesReadBySplit.inc(length);
                            return true;
                        }
                        KafkaIO.LOG.warn("{}: ignoring already consumed offset {} for {}", new Object[]{this, Long.valueOf(offset), next.topicPartition});
                    } else {
                        next.recordIter = Collections.emptyIterator();
                        this.curBatch.remove();
                    }
                } else {
                    nextBatch();
                    if (!this.curBatch.hasNext()) {
                        return false;
                    }
                }
            }
        }

        public void updateLatestOffsets() {
            for (PartitionState partitionState : this.partitionStates) {
                try {
                    this.consumerSpEL.evaluateSeek2End(this.offsetConsumer, partitionState.topicPartition);
                    partitionState.setLatestOffset(this.offsetConsumer.position(partitionState.topicPartition));
                } catch (Exception e) {
                    if (this.closed.get()) {
                        break;
                    }
                    KafkaIO.LOG.warn("{}: exception while fetching latest offset for partition {}. will be retried.", new Object[]{this, partitionState.topicPartition, e});
                    partitionState.setLatestOffset(-1L);
                }
                KafkaIO.LOG.debug("{}: latest offset update for {} : {} (consumer offset {}, avg record size {})", new Object[]{this, partitionState.topicPartition, Long.valueOf(partitionState.latestOffset), Long.valueOf(partitionState.nextOffset), partitionState.avgRecordSize});
            }
            KafkaIO.LOG.debug("{}:  backlog {}", this, Long.valueOf(getSplitBacklogBytes()));
        }

        private void reportBacklog() {
            long splitBacklogBytes = getSplitBacklogBytes();
            if (splitBacklogBytes < 0) {
                splitBacklogBytes = -1;
            }
            this.backlogBytesOfSplit.set(splitBacklogBytes);
            long splitBacklogMessageCount = getSplitBacklogMessageCount();
            if (splitBacklogMessageCount < 0) {
                splitBacklogMessageCount = -1;
            }
            this.backlogElementsOfSplit.set(splitBacklogMessageCount);
        }

        public Instant getWatermark() {
            if (this.curRecord != null) {
                return ((UnboundedKafkaSource) this.source).spec.getWatermarkFn() != null ? (Instant) ((UnboundedKafkaSource) this.source).spec.getWatermarkFn().apply(this.curRecord) : this.curTimestamp;
            }
            KafkaIO.LOG.debug("{}: getWatermark() : no records have been read yet.", this.name);
            return initialWatermark;
        }

        public UnboundedSource.CheckpointMark getCheckpointMark() {
            reportBacklog();
            return new KafkaCheckpointMark(ImmutableList.copyOf((Collection) Lists.transform(this.partitionStates, new Function<PartitionState, KafkaCheckpointMark.PartitionMark>(this) { // from class: org.apache.beam.sdk.io.kafka.KafkaIO.UnboundedKafkaReader.5
                final /* synthetic */ UnboundedKafkaReader this$0;

                AnonymousClass5(UnboundedKafkaReader this) {
                    this.this$0 = this;
                }

                /* renamed from: apply */
                public KafkaCheckpointMark.PartitionMark apply2(PartitionState partitionState) {
                    return new KafkaCheckpointMark.PartitionMark(partitionState.topicPartition.topic(), partitionState.topicPartition.partition(), partitionState.nextOffset);
                }

                @Override // org.apache.beam.sdks.java.io.kafka.repackaged.com.google.common.base.Function
                public /* bridge */ /* synthetic */ KafkaCheckpointMark.PartitionMark apply(PartitionState partitionState) {
                    return apply2(partitionState);
                }
            })));
        }

        public UnboundedSource<KafkaRecord<K, V>, ?> getCurrentSource() {
            return this.source;
        }

        public KafkaRecord<K, V> getCurrent() throws NoSuchElementException {
            return this.curRecord;
        }

        public Instant getCurrentTimestamp() throws NoSuchElementException {
            return this.curTimestamp;
        }

        public long getSplitBacklogBytes() {
            long j = 0;
            Iterator<PartitionState> it = this.partitionStates.iterator();
            while (it.hasNext()) {
                long approxBacklogInBytes = it.next().approxBacklogInBytes();
                if (approxBacklogInBytes == -1) {
                    return -1L;
                }
                j += approxBacklogInBytes;
            }
            return j;
        }

        private long getSplitBacklogMessageCount() {
            long j = 0;
            Iterator<PartitionState> it = this.partitionStates.iterator();
            while (it.hasNext()) {
                long backlogMessageCount = it.next().backlogMessageCount();
                if (backlogMessageCount == -1) {
                    return -1L;
                }
                j += backlogMessageCount;
            }
            return j;
        }

        public void close() throws IOException {
            this.closed.set(true);
            this.consumerPollThread.shutdown();
            this.offsetFetcherThread.shutdown();
            boolean z = false;
            while (!z) {
                if (this.consumer != null) {
                    this.consumer.wakeup();
                }
                if (this.offsetConsumer != null) {
                    this.offsetConsumer.wakeup();
                }
                this.availableRecordsQueue.poll();
                try {
                    z = this.consumerPollThread.awaitTermination(10L, TimeUnit.SECONDS) && this.offsetFetcherThread.awaitTermination(10L, TimeUnit.SECONDS);
                    if (!z) {
                        KafkaIO.LOG.warn("An internal thread is taking a long time to shutdown. will retry.");
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(e);
                }
            }
            Closeables.close(this.keyDeserializerInstance, true);
            Closeables.close(this.valueDeserializerInstance, true);
            Closeables.close(this.offsetConsumer, true);
            Closeables.close(this.consumer, true);
        }

        /* renamed from: getCurrentSource */
        public /* bridge */ /* synthetic */ Source m5getCurrentSource() {
            return getCurrentSource();
        }

        /* renamed from: getCurrent */
        public /* bridge */ /* synthetic */ Object m6getCurrent() throws NoSuchElementException {
            return getCurrent();
        }

        static {
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$UnboundedKafkaSource.class */
    public static class UnboundedKafkaSource<K, V> extends UnboundedSource<KafkaRecord<K, V>, KafkaCheckpointMark> {
        private Read<K, V> spec;
        private final int id;

        /* renamed from: org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaSource$1 */
        /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$UnboundedKafkaSource$1.class */
        public class AnonymousClass1 implements Comparator<TopicPartition> {
            final /* synthetic */ UnboundedKafkaSource this$0;

            AnonymousClass1(UnboundedKafkaSource unboundedKafkaSource) {
                this.this$0 = unboundedKafkaSource;
            }

            /* renamed from: compare */
            public int compare2(TopicPartition topicPartition, TopicPartition topicPartition2) {
                return ComparisonChain.start().compare(topicPartition.topic(), topicPartition2.topic()).compare(topicPartition.partition(), topicPartition2.partition()).result();
            }

            @Override // java.util.Comparator
            public /* bridge */ /* synthetic */ int compare(TopicPartition topicPartition, TopicPartition topicPartition2) {
                return compare2(topicPartition, topicPartition2);
            }
        }

        public UnboundedKafkaSource(Read<K, V> read, int i) {
            this.spec = read;
            this.id = i;
        }

        public List<UnboundedKafkaSource<K, V>> split(int i, PipelineOptions pipelineOptions) throws Exception {
            ArrayList arrayList = new ArrayList(this.spec.getTopicPartitions());
            if (arrayList.isEmpty()) {
                Consumer consumer = (Consumer) this.spec.getConsumerFactoryFn().apply(this.spec.getConsumerConfig());
                Throwable th = null;
                try {
                    try {
                        Iterator<String> it = this.spec.getTopics().iterator();
                        while (it.hasNext()) {
                            for (PartitionInfo partitionInfo : consumer.partitionsFor(it.next())) {
                                arrayList.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
                            }
                        }
                        if (consumer != null) {
                            if (0 != 0) {
                                try {
                                    consumer.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                consumer.close();
                            }
                        }
                    } finally {
                    }
                } catch (Throwable th3) {
                    if (consumer != null) {
                        if (th != null) {
                            try {
                                consumer.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            consumer.close();
                        }
                    }
                    throw th3;
                }
            }
            Collections.sort(arrayList, new Comparator<TopicPartition>(this) { // from class: org.apache.beam.sdk.io.kafka.KafkaIO.UnboundedKafkaSource.1
                final /* synthetic */ UnboundedKafkaSource this$0;

                AnonymousClass1(UnboundedKafkaSource this) {
                    this.this$0 = this;
                }

                /* renamed from: compare */
                public int compare2(TopicPartition topicPartition, TopicPartition topicPartition2) {
                    return ComparisonChain.start().compare(topicPartition.topic(), topicPartition2.topic()).compare(topicPartition.partition(), topicPartition2.partition()).result();
                }

                @Override // java.util.Comparator
                public /* bridge */ /* synthetic */ int compare(TopicPartition topicPartition, TopicPartition topicPartition2) {
                    return compare2(topicPartition, topicPartition2);
                }
            });
            Preconditions.checkArgument(i > 0);
            Preconditions.checkState(arrayList.size() > 0, "Could not find any partitions. Please check Kafka configuration and topic names");
            int min = Math.min(i, arrayList.size());
            ArrayList arrayList2 = new ArrayList(min);
            for (int i2 = 0; i2 < min; i2++) {
                arrayList2.add(new ArrayList());
            }
            for (int i3 = 0; i3 < arrayList.size(); i3++) {
                ((List) arrayList2.get(i3 % min)).add(arrayList.get(i3));
            }
            ArrayList arrayList3 = new ArrayList(min);
            for (int i4 = 0; i4 < min; i4++) {
                List<TopicPartition> list = (List) arrayList2.get(i4);
                KafkaIO.LOG.info("Partitions assigned to split {} (total {}): {}", new Object[]{Integer.valueOf(i4), Integer.valueOf(list.size()), Joiner.on(",").join(list)});
                arrayList3.add(new UnboundedKafkaSource(this.spec.toBuilder().setTopics(Collections.emptyList()).setTopicPartitions(list).build(), i4));
            }
            return arrayList3;
        }

        public UnboundedKafkaReader<K, V> createReader(PipelineOptions pipelineOptions, KafkaCheckpointMark kafkaCheckpointMark) {
            if (!this.spec.getTopicPartitions().isEmpty()) {
                return new UnboundedKafkaReader<>(this, kafkaCheckpointMark);
            }
            KafkaIO.LOG.warn("Looks like generateSplits() is not called. Generate single split.");
            try {
                return new UnboundedKafkaReader<>(split(1, pipelineOptions).get(0), kafkaCheckpointMark);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        public Coder<KafkaCheckpointMark> getCheckpointMarkCoder() {
            return AvroCoder.of(KafkaCheckpointMark.class);
        }

        public boolean requiresDeduping() {
            return false;
        }

        public void validate() {
            this.spec.validate(null);
        }

        public Coder<KafkaRecord<K, V>> getDefaultOutputCoder() {
            return KafkaRecordCoder.of(this.spec.getKeyCoder(), this.spec.getValueCoder());
        }

        public /* bridge */ /* synthetic */ UnboundedSource.UnboundedReader createReader(PipelineOptions pipelineOptions, UnboundedSource.CheckpointMark checkpointMark) throws IOException {
            return createReader(pipelineOptions, (KafkaCheckpointMark) checkpointMark);
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$Write.class */
    public static abstract class Write<K, V> extends PTransform<PCollection<KV<K, V>>, PDone> {
        private static final Map<String, Object> DEFAULT_PRODUCER_PROPERTIES = ImmutableMap.of("retries", 3);
        private static final Map<String, String> IGNORED_PRODUCER_PROPERTIES = ImmutableMap.of("key.serializer", "Use withKeySerializer instead", "value.serializer", "Use withValueSerializer instead");

        /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$Write$Builder.class */
        public static abstract class Builder<K, V> {
            public Builder() {
            }

            abstract Builder<K, V> setTopic(String str);

            abstract Builder<K, V> setProducerConfig(Map<String, Object> map);

            abstract Builder<K, V> setProducerFactoryFn(SerializableFunction<Map<String, Object>, Producer<K, V>> serializableFunction);

            abstract Builder<K, V> setKeySerializer(Class<? extends Serializer<K>> cls);

            abstract Builder<K, V> setValueSerializer(Class<? extends Serializer<V>> cls);

            abstract Write<K, V> build();
        }

        public Write() {
        }

        @Nullable
        public abstract String getTopic();

        public abstract Map<String, Object> getProducerConfig();

        @Nullable
        public abstract SerializableFunction<Map<String, Object>, Producer<K, V>> getProducerFactoryFn();

        @Nullable
        public abstract Class<? extends Serializer<K>> getKeySerializer();

        @Nullable
        public abstract Class<? extends Serializer<V>> getValueSerializer();

        abstract Builder<K, V> toBuilder();

        public Write<K, V> withBootstrapServers(String str) {
            return updateProducerProperties(ImmutableMap.of("bootstrap.servers", str));
        }

        public Write<K, V> withTopic(String str) {
            return toBuilder().setTopic(str).build();
        }

        public Write<K, V> withKeySerializer(Class<? extends Serializer<K>> cls) {
            return toBuilder().setKeySerializer(cls).build();
        }

        public Write<K, V> withValueSerializer(Class<? extends Serializer<V>> cls) {
            return toBuilder().setValueSerializer(cls).build();
        }

        public Write<K, V> updateProducerProperties(Map<String, Object> map) {
            return toBuilder().setProducerConfig(KafkaIO.updateKafkaProperties(getProducerConfig(), IGNORED_PRODUCER_PROPERTIES, map)).build();
        }

        public Write<K, V> withProducerFactoryFn(SerializableFunction<Map<String, Object>, Producer<K, V>> serializableFunction) {
            return toBuilder().setProducerFactoryFn(serializableFunction).build();
        }

        public PTransform<PCollection<V>, PDone> values() {
            return new KafkaValueWrite(toBuilder().build());
        }

        public PDone expand(PCollection<KV<K, V>> pCollection) {
            pCollection.apply(ParDo.of(new KafkaWriter(this)));
            return PDone.in(pCollection.getPipeline());
        }

        public void validate(PipelineOptions pipelineOptions) {
            Preconditions.checkNotNull(getProducerConfig().get("bootstrap.servers"), "Kafka bootstrap servers should be set");
            Preconditions.checkNotNull(getTopic(), "Kafka topic should be set");
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.addIfNotNull(DisplayData.item("topic", getTopic()).withLabel("Topic"));
            Set<String> keySet = IGNORED_PRODUCER_PROPERTIES.keySet();
            for (Map.Entry<String, Object> entry : getProducerConfig().entrySet()) {
                String key = entry.getKey();
                if (!keySet.contains(key)) {
                    builder.add(DisplayData.item(key, ValueProvider.StaticValueProvider.of(DisplayData.inferType(entry.getValue()) != null ? entry.getValue() : String.valueOf(entry.getValue()))));
                }
            }
        }

        public /* bridge */ /* synthetic */ POutput expand(PInput pInput) {
            return expand((PCollection) pInput);
        }

        static {
        }
    }

    public static Read<byte[], byte[]> readBytes() {
        return new AutoValue_KafkaIO_Read.Builder().setTopics(new ArrayList()).setTopicPartitions(new ArrayList()).setKeyDeserializer(ByteArrayDeserializer.class).setValueDeserializer(ByteArrayDeserializer.class).setConsumerFactoryFn(Read.KAFKA_CONSUMER_FACTORY_FN).setConsumerConfig(Read.DEFAULT_CONSUMER_PROPERTIES).setMaxNumRecords(Long.MAX_VALUE).build();
    }

    public static <K, V> Read<K, V> read() {
        return new AutoValue_KafkaIO_Read.Builder().setTopics(new ArrayList()).setTopicPartitions(new ArrayList()).setConsumerFactoryFn(Read.KAFKA_CONSUMER_FACTORY_FN).setConsumerConfig(Read.DEFAULT_CONSUMER_PROPERTIES).setMaxNumRecords(Long.MAX_VALUE).build();
    }

    public static <K, V> Write<K, V> write() {
        return new AutoValue_KafkaIO_Write.Builder().setProducerConfig(Write.DEFAULT_PRODUCER_PROPERTIES).build();
    }

    public static Map<String, Object> updateKafkaProperties(Map<String, Object> map, Map<String, String> map2, Map<String, Object> map3) {
        for (String str : map3.keySet()) {
            Preconditions.checkArgument(!map2.containsKey(str), "No need to configure '%s'. %s", str, map2.get(str));
        }
        HashMap hashMap = new HashMap(map);
        hashMap.putAll(map3);
        return hashMap;
    }

    private KafkaIO() {
    }

    @VisibleForTesting
    static <T> NullableCoder<T> inferCoder(CoderRegistry coderRegistry, Class<? extends Deserializer<T>> cls) {
        Preconditions.checkNotNull(cls);
        for (Type type : cls.getGenericInterfaces()) {
            if (type instanceof ParameterizedType) {
                ParameterizedType parameterizedType = (ParameterizedType) type;
                if (parameterizedType.getRawType() == Deserializer.class) {
                    Class cls2 = (Class) parameterizedType.getActualTypeArguments()[0];
                    try {
                        return NullableCoder.of(coderRegistry.getCoder(cls2));
                    } catch (CannotProvideCoderException e) {
                        throw new RuntimeException(String.format("Unable to automatically infer a Coder for the Kafka Deserializer %s: no coder registered for type %s", cls, cls2));
                    }
                }
            }
        }
        throw new RuntimeException(String.format("Could not extract the Kafka Deserializer type from %s", cls));
    }

    static {
    }
}
