package org.apache.beam.sdk.io.kafka;

import com.google.auto.value.AutoValue;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.invoke.SerializedLambda;
import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.expansion.ExternalTransformRegistrar;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.kafka.AutoValue_KafkaIO_Read;
import org.apache.beam.sdk.io.kafka.AutoValue_KafkaIO_Write;
import org.apache.beam.sdk.io.kafka.AutoValue_KafkaIO_WriteRecords;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ExternalTransformBuilder;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.AppInfoParser;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental(Experimental.Kind.SOURCE_SINK)
/* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO.class */
public class KafkaIO {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaIO.class);

    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$KafkaValueWrite.class */
    private static class KafkaValueWrite<K, V> extends PTransform<PCollection<V>, PDone> {
        private final Write<K, V> kvWriteTransform;

        private KafkaValueWrite(Write<K, V> write) {
            this.kvWriteTransform = write;
        }

        public PDone expand(PCollection<V> pCollection) {
            return pCollection.apply("Kafka values with default key", MapElements.via(new SimpleFunction<V, KV<K, V>>() { // from class: org.apache.beam.sdk.io.kafka.KafkaIO.KafkaValueWrite.1
                public KV<K, V> apply(V v) {
                    return KV.of((Object) null, v);
                }

                /* renamed from: apply, reason: collision with other method in class */
                public /* bridge */ /* synthetic */ Object m6apply(Object obj) {
                    return apply((AnonymousClass1) obj);
                }
            })).setCoder(KvCoder.of(new NullOnlyCoder(), pCollection.getCoder())).apply(this.kvWriteTransform);
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            this.kvWriteTransform.populateDisplayData(builder);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$NullOnlyCoder.class */
    public static class NullOnlyCoder<T> extends AtomicCoder<T> {
        private NullOnlyCoder() {
        }

        public void encode(T t, OutputStream outputStream) {
            Preconditions.checkArgument(t == null, "Can only encode nulls");
        }

        public T decode(InputStream inputStream) {
            return null;
        }
    }

    @AutoValue
    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$Read.class */
    public static abstract class Read<K, V> extends PTransform<PBegin, PCollection<KafkaRecord<K, V>>> {
        private static final Map<String, String> IGNORED_CONSUMER_PROPERTIES = ImmutableMap.of("key.deserializer", "Set keyDeserializer instead", "value.deserializer", "Set valueDeserializer instead");
        private static final Map<String, Object> DEFAULT_CONSUMER_PROPERTIES = ImmutableMap.of("key.deserializer", ByteArrayDeserializer.class.getName(), "value.deserializer", ByteArrayDeserializer.class.getName(), "receive.buffer.bytes", 524288, "auto.offset.reset", "latest", "enable.auto.commit", false);
        private static final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> KAFKA_CONSUMER_FACTORY_FN = KafkaConsumer::new;

        /* JADX INFO: Access modifiers changed from: package-private */
        @Experimental
        @AutoValue.Builder
        /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$Read$Builder.class */
        public static abstract class Builder<K, V> implements ExternalTransformBuilder<External.Configuration, PBegin, PCollection<KV<K, V>>> {
            abstract Builder<K, V> setConsumerConfig(Map<String, Object> map);

            /* JADX INFO: Access modifiers changed from: package-private */
            public abstract Builder<K, V> setTopics(List<String> list);

            /* JADX INFO: Access modifiers changed from: package-private */
            public abstract Builder<K, V> setTopicPartitions(List<TopicPartition> list);

            abstract Builder<K, V> setKeyCoder(Coder<K> coder);

            abstract Builder<K, V> setValueCoder(Coder<V> coder);

            abstract Builder<K, V> setKeyDeserializer(Class<? extends Deserializer<K>> cls);

            abstract Builder<K, V> setValueDeserializer(Class<? extends Deserializer<V>> cls);

            abstract Builder<K, V> setConsumerFactoryFn(SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> serializableFunction);

            abstract Builder<K, V> setWatermarkFn(SerializableFunction<KafkaRecord<K, V>, Instant> serializableFunction);

            abstract Builder<K, V> setMaxNumRecords(long j);

            abstract Builder<K, V> setMaxReadTime(Duration duration);

            abstract Builder<K, V> setStartReadTime(Instant instant);

            abstract Builder<K, V> setCommitOffsetsInFinalizeEnabled(boolean z);

            abstract Builder<K, V> setTimestampPolicyFactory(TimestampPolicyFactory<K, V> timestampPolicyFactory);

            abstract Builder<K, V> setOffsetConsumerConfig(Map<String, Object> map);

            /* JADX INFO: Access modifiers changed from: package-private */
            public abstract Read<K, V> build();

            public PTransform<PBegin, PCollection<KV<K, V>>> buildExternal(External.Configuration configuration) {
                ImmutableList.Builder builder = ImmutableList.builder();
                Iterator it = configuration.topics.iterator();
                while (it.hasNext()) {
                    builder.add(KafkaIO.utf8String((byte[]) it.next()));
                }
                setTopics(builder.build());
                Class<? extends Deserializer<K>> resolveClass = KafkaIO.resolveClass(KafkaIO.utf8String(configuration.keyDeserializer));
                setKeyDeserializer(resolveClass);
                setKeyCoder(resolveCoder(resolveClass));
                Class<? extends Deserializer<V>> resolveClass2 = KafkaIO.resolveClass(KafkaIO.utf8String(configuration.valueDeserializer));
                setValueDeserializer(resolveClass2);
                setValueCoder(resolveCoder(resolveClass2));
                Map<String, Object> hashMap = new HashMap<>();
                for (KV kv : configuration.consumerConfig) {
                    hashMap.put(KafkaIO.utf8String((byte[]) kv.getKey()), KafkaIO.utf8String((byte[]) kv.getValue()));
                }
                hashMap.put("key.deserializer", resolveClass.getName());
                hashMap.put("value.deserializer", resolveClass2.getName());
                setConsumerConfig(hashMap);
                setTopicPartitions(Collections.emptyList());
                setConsumerFactoryFn(Read.KAFKA_CONSUMER_FACTORY_FN);
                setMaxNumRecords(Long.MAX_VALUE);
                setCommitOffsetsInFinalizeEnabled(false);
                setTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime());
                return build().withoutMetadata();
            }

            private static Coder resolveCoder(Class cls) {
                for (Method method : cls.getDeclaredMethods()) {
                    if (method.getName().equals("deserialize")) {
                        Class<?> returnType = method.getReturnType();
                        if (!returnType.equals(Object.class)) {
                            if (returnType.equals(byte[].class)) {
                                return ByteArrayCoder.of();
                            }
                            if (returnType.equals(Integer.class)) {
                                return VarIntCoder.of();
                            }
                            if (returnType.equals(Long.class)) {
                                return VarLongCoder.of();
                            }
                            throw new RuntimeException("Couldn't infer Coder from " + cls);
                        }
                    }
                }
                throw new RuntimeException("Couldn't resolve coder for Deserializer: " + cls);
            }
        }

        @Experimental
        /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$Read$External.class */
        public static class External implements ExternalTransformRegistrar {
            public static final String URN = "beam:external:java:kafka:read:v1";

            /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$Read$External$Configuration.class */
            public static class Configuration {
                private Iterable<KV<byte[], byte[]>> consumerConfig;
                private Iterable<byte[]> topics;
                private byte[] keyDeserializer;
                private byte[] valueDeserializer;

                public void setConsumerConfig(Iterable<KV<byte[], byte[]>> iterable) {
                    this.consumerConfig = iterable;
                }

                public void setTopics(Iterable<byte[]> iterable) {
                    this.topics = iterable;
                }

                public void setKeyDeserializer(byte[] bArr) {
                    this.keyDeserializer = bArr;
                }

                public void setValueDeserializer(byte[] bArr) {
                    this.valueDeserializer = bArr;
                }
            }

            public Map<String, Class<? extends ExternalTransformBuilder>> knownBuilders() {
                return ImmutableMap.of(URN, AutoValue_KafkaIO_Read.Builder.class);
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Map<String, Object> getConsumerConfig();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract List<String> getTopics();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract List<TopicPartition> getTopicPartitions();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract Coder<K> getKeyCoder();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract Coder<V> getValueCoder();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract Class<? extends Deserializer<K>> getKeyDeserializer();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract Class<? extends Deserializer<V>> getValueDeserializer();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> getConsumerFactoryFn();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract SerializableFunction<KafkaRecord<K, V>, Instant> getWatermarkFn();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract long getMaxNumRecords();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract Duration getMaxReadTime();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract Instant getStartReadTime();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract boolean isCommitOffsetsInFinalizeEnabled();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract TimestampPolicyFactory<K, V> getTimestampPolicyFactory();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract Map<String, Object> getOffsetConsumerConfig();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Builder<K, V> toBuilder();

        public Read<K, V> withBootstrapServers(String str) {
            return withConsumerConfigUpdates(ImmutableMap.of("bootstrap.servers", str));
        }

        public Read<K, V> withTopic(String str) {
            return withTopics(ImmutableList.of(str));
        }

        public Read<K, V> withTopics(List<String> list) {
            Preconditions.checkState(getTopicPartitions().isEmpty(), "Only topics or topicPartitions can be set, not both");
            return toBuilder().setTopics(ImmutableList.copyOf(list)).build();
        }

        public Read<K, V> withTopicPartitions(List<TopicPartition> list) {
            Preconditions.checkState(getTopics().isEmpty(), "Only topics or topicPartitions can be set, not both");
            return toBuilder().setTopicPartitions(ImmutableList.copyOf(list)).build();
        }

        public Read<K, V> withKeyDeserializer(Class<? extends Deserializer<K>> cls) {
            return toBuilder().setKeyDeserializer(cls).build();
        }

        public Read<K, V> withKeyDeserializerAndCoder(Class<? extends Deserializer<K>> cls, Coder<K> coder) {
            return toBuilder().setKeyDeserializer(cls).setKeyCoder(coder).build();
        }

        public Read<K, V> withValueDeserializer(Class<? extends Deserializer<V>> cls) {
            return toBuilder().setValueDeserializer(cls).build();
        }

        public Read<K, V> withValueDeserializerAndCoder(Class<? extends Deserializer<V>> cls, Coder<V> coder) {
            return toBuilder().setValueDeserializer(cls).setValueCoder(coder).build();
        }

        public Read<K, V> withConsumerFactoryFn(SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> serializableFunction) {
            return toBuilder().setConsumerFactoryFn(serializableFunction).build();
        }

        @Deprecated
        public Read<K, V> updateConsumerProperties(Map<String, Object> map) {
            return toBuilder().setConsumerConfig(KafkaIO.updateKafkaProperties(getConsumerConfig(), IGNORED_CONSUMER_PROPERTIES, map)).build();
        }

        public Read<K, V> withMaxNumRecords(long j) {
            return toBuilder().setMaxNumRecords(j).build();
        }

        public Read<K, V> withStartReadTime(Instant instant) {
            return toBuilder().setStartReadTime(instant).build();
        }

        public Read<K, V> withMaxReadTime(Duration duration) {
            return toBuilder().setMaxReadTime(duration).build();
        }

        public Read<K, V> withLogAppendTime() {
            return withTimestampPolicyFactory(TimestampPolicyFactory.withLogAppendTime());
        }

        public Read<K, V> withProcessingTime() {
            return withTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime());
        }

        public Read<K, V> withCreateTime(Duration duration) {
            return withTimestampPolicyFactory(TimestampPolicyFactory.withCreateTime(duration));
        }

        public Read<K, V> withTimestampPolicyFactory(TimestampPolicyFactory<K, V> timestampPolicyFactory) {
            return toBuilder().setTimestampPolicyFactory(timestampPolicyFactory).build();
        }

        @Deprecated
        public Read<K, V> withTimestampFn2(SerializableFunction<KafkaRecord<K, V>, Instant> serializableFunction) {
            Preconditions.checkArgument(serializableFunction != null, "timestampFn can not be null");
            return toBuilder().setTimestampPolicyFactory(TimestampPolicyFactory.withTimestampFn(serializableFunction)).build();
        }

        @Deprecated
        public Read<K, V> withWatermarkFn2(SerializableFunction<KafkaRecord<K, V>, Instant> serializableFunction) {
            Preconditions.checkArgument(serializableFunction != null, "watermarkFn can not be null");
            return toBuilder().setWatermarkFn(serializableFunction).build();
        }

        @Deprecated
        public Read<K, V> withTimestampFn(SerializableFunction<KV<K, V>, Instant> serializableFunction) {
            Preconditions.checkArgument(serializableFunction != null, "timestampFn can not be null");
            return withTimestampFn2(unwrapKafkaAndThen(serializableFunction));
        }

        @Deprecated
        public Read<K, V> withWatermarkFn(SerializableFunction<KV<K, V>, Instant> serializableFunction) {
            Preconditions.checkArgument(serializableFunction != null, "watermarkFn can not be null");
            return withWatermarkFn2(unwrapKafkaAndThen(serializableFunction));
        }

        public Read<K, V> withReadCommitted() {
            return withConsumerConfigUpdates(ImmutableMap.of("isolation.level", "read_committed"));
        }

        public Read<K, V> commitOffsetsInFinalize() {
            return toBuilder().setCommitOffsetsInFinalizeEnabled(true).build();
        }

        public Read<K, V> withOffsetConsumerConfigOverrides(Map<String, Object> map) {
            return toBuilder().setOffsetConsumerConfig(map).build();
        }

        public Read<K, V> withConsumerConfigUpdates(Map<String, Object> map) {
            return toBuilder().setConsumerConfig(KafkaIO.updateKafkaProperties(getConsumerConfig(), IGNORED_CONSUMER_PROPERTIES, map)).build();
        }

        public PTransform<PBegin, PCollection<KV<K, V>>> withoutMetadata() {
            return new TypedWithoutMetadata(this);
        }

        public PCollection<KafkaRecord<K, V>> expand(PBegin pBegin) {
            Preconditions.checkArgument(getConsumerConfig().get("bootstrap.servers") != null, "withBootstrapServers() is required");
            Preconditions.checkArgument(getTopics().size() > 0 || getTopicPartitions().size() > 0, "Either withTopic(), withTopics() or withTopicPartitions() is required");
            Preconditions.checkArgument(getKeyDeserializer() != null, "withKeyDeserializer() is required");
            Preconditions.checkArgument(getValueDeserializer() != null, "withValueDeserializer() is required");
            ConsumerSpEL consumerSpEL = new ConsumerSpEL();
            if (!consumerSpEL.hasOffsetsForTimes()) {
                KafkaIO.LOG.warn("Kafka client version {} is too old. Versions before 0.10.1.0 are deprecated and may not be supported in next release of Apache Beam. Please upgrade your Kafka client version.", AppInfoParser.getVersion());
            }
            if (getStartReadTime() != null) {
                Preconditions.checkArgument(consumerSpEL.hasOffsetsForTimes(), "Consumer.offsetsForTimes is only supported by Kafka Client 0.10.1.0 onwards, current version of Kafka Client is " + AppInfoParser.getVersion() + ". If you are building with maven, set \"kafka.clients.version\" maven property to 0.10.1.0 or newer.");
            }
            if (isCommitOffsetsInFinalizeEnabled()) {
                Preconditions.checkArgument(getConsumerConfig().get("group.id") != null, "commitOffsetsInFinalize() is enabled, but group.id in Kafka consumer config is not set. Offset management requires group.id.");
                if (Boolean.TRUE.equals(getConsumerConfig().get("enable.auto.commit"))) {
                    KafkaIO.LOG.warn("'{}' in consumer config is enabled even though commitOffsetsInFinalize() is set. You need only one of them.", "enable.auto.commit");
                }
            }
            CoderRegistry coderRegistry = pBegin.getPipeline().getCoderRegistry();
            Coder<K> keyCoder = getKeyCoder() != null ? getKeyCoder() : KafkaIO.inferCoder(coderRegistry, getKeyDeserializer());
            Preconditions.checkArgument(keyCoder != null, "Key coder could not be inferred from key deserializer. Please providekey coder explicitly using withKeyDeserializerAndCoder()");
            Coder<V> valueCoder = getValueCoder() != null ? getValueCoder() : KafkaIO.inferCoder(coderRegistry, getValueDeserializer());
            Preconditions.checkArgument(valueCoder != null, "Value coder could not be inferred from value deserializer. Please providevalue coder explicitly using withValueDeserializerAndCoder()");
            PTransform from = org.apache.beam.sdk.io.Read.from(toBuilder().setKeyCoder(keyCoder).setValueCoder(valueCoder).build().makeSource());
            PTransform pTransform = from;
            if (getMaxNumRecords() < Long.MAX_VALUE || getMaxReadTime() != null) {
                pTransform = from.withMaxReadTime(getMaxReadTime()).withMaxNumRecords(getMaxNumRecords());
            }
            return pBegin.getPipeline().apply(pTransform);
        }

        @VisibleForTesting
        UnboundedSource<KafkaRecord<K, V>, KafkaCheckpointMark> makeSource() {
            return new KafkaUnboundedSource(this, -1);
        }

        private static <KeyT, ValueT, OutT> SerializableFunction<KafkaRecord<KeyT, ValueT>, OutT> unwrapKafkaAndThen(SerializableFunction<KV<KeyT, ValueT>, OutT> serializableFunction) {
            return kafkaRecord -> {
                return serializableFunction.apply(kafkaRecord.getKV());
            };
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            List<String> topics = getTopics();
            List<TopicPartition> topicPartitions = getTopicPartitions();
            if (topics.size() > 0) {
                builder.add(DisplayData.item("topics", Joiner.on(",").join(topics)).withLabel("Topic/s"));
            } else if (topicPartitions.size() > 0) {
                builder.add(DisplayData.item("topicPartitions", Joiner.on(",").join(topicPartitions)).withLabel("Topic Partition/s"));
            }
            Set<String> keySet = IGNORED_CONSUMER_PROPERTIES.keySet();
            for (Map.Entry<String, Object> entry : getConsumerConfig().entrySet()) {
                String key = entry.getKey();
                if (!keySet.contains(key)) {
                    builder.add(DisplayData.item(key, ValueProvider.StaticValueProvider.of(DisplayData.inferType(entry.getValue()) != null ? entry.getValue() : String.valueOf(entry.getValue()))));
                }
            }
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case -1979494231:
                    if (implMethodName.equals("lambda$unwrapKafkaAndThen$d0142e15$1")) {
                        z = false;
                        break;
                    }
                    break;
                case 1818100338:
                    if (implMethodName.equals("<init>")) {
                        z = true;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/kafka/KafkaIO$Read") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/transforms/SerializableFunction;Lorg/apache/beam/sdk/io/kafka/KafkaRecord;)Ljava/lang/Object;")) {
                        SerializableFunction serializableFunction = (SerializableFunction) serializedLambda.getCapturedArg(0);
                        return kafkaRecord -> {
                            return serializableFunction.apply(kafkaRecord.getKV());
                        };
                    }
                    break;
                case true:
                    if (serializedLambda.getImplMethodKind() == 8 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/kafka/clients/consumer/KafkaConsumer") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Map;)V")) {
                        return KafkaConsumer::new;
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$TypedWithoutMetadata.class */
    public static class TypedWithoutMetadata<K, V> extends PTransform<PBegin, PCollection<KV<K, V>>> {
        private final Read<K, V> read;

        TypedWithoutMetadata(Read<K, V> read) {
            super("KafkaIO.Read");
            this.read = read;
        }

        public PCollection<KV<K, V>> expand(PBegin pBegin) {
            return pBegin.apply(this.read).apply("Remove Kafka Metadata", ParDo.of(new DoFn<KafkaRecord<K, V>, KV<K, V>>() { // from class: org.apache.beam.sdk.io.kafka.KafkaIO.TypedWithoutMetadata.1
                @DoFn.ProcessElement
                public void processElement(DoFn<KafkaRecord<K, V>, KV<K, V>>.ProcessContext processContext) {
                    processContext.output(((KafkaRecord) processContext.element()).getKV());
                }
            }));
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            this.read.populateDisplayData(builder);
        }
    }

    @AutoValue
    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$Write.class */
    public static abstract class Write<K, V> extends PTransform<PCollection<KV<K, V>>, PDone> {

        /* JADX INFO: Access modifiers changed from: package-private */
        @Experimental
        @AutoValue.Builder
        /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$Write$Builder.class */
        public static abstract class Builder<K, V> implements ExternalTransformBuilder<External.Configuration, PCollection<KV<K, V>>, PDone> {
            abstract Builder<K, V> setTopic(String str);

            abstract Builder<K, V> setWriteRecordsTransform(WriteRecords<K, V> writeRecords);

            abstract Write<K, V> build();

            public PTransform<PCollection<KV<K, V>>, PDone> buildExternal(External.Configuration configuration) {
                String utf8String = KafkaIO.utf8String(configuration.topic);
                setTopic(utf8String);
                Map<String, Object> hashMap = new HashMap<>();
                for (KV kv : configuration.producerConfig) {
                    hashMap.put(KafkaIO.utf8String((byte[]) kv.getKey()), KafkaIO.utf8String((byte[]) kv.getValue()));
                }
                Class<? extends Serializer<K>> resolveClass = KafkaIO.resolveClass(KafkaIO.utf8String(configuration.keySerializer));
                setWriteRecordsTransform(KafkaIO.writeRecords().withProducerConfigUpdates(hashMap).withKeySerializer(resolveClass).withValueSerializer(KafkaIO.resolveClass(KafkaIO.utf8String(configuration.valueSerializer))).withTopic(utf8String));
                return build();
            }
        }

        @Experimental
        /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$Write$External.class */
        public static class External implements ExternalTransformRegistrar {
            public static final String URN = "beam:external:java:kafka:write:v1";

            /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$Write$External$Configuration.class */
            public static class Configuration {
                private Iterable<KV<byte[], byte[]>> producerConfig;
                private byte[] topic;
                private byte[] keySerializer;
                private byte[] valueSerializer;

                public void setProducerConfig(Iterable<KV<byte[], byte[]>> iterable) {
                    this.producerConfig = iterable;
                }

                public void setTopic(byte[] bArr) {
                    this.topic = bArr;
                }

                public void setKeySerializer(byte[] bArr) {
                    this.keySerializer = bArr;
                }

                public void setValueSerializer(byte[] bArr) {
                    this.valueSerializer = bArr;
                }
            }

            public Map<String, Class<? extends ExternalTransformBuilder>> knownBuilders() {
                return ImmutableMap.of(URN, AutoValue_KafkaIO_Write.Builder.class);
            }
        }

        /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$Write$PublishTimestampFunctionKV.class */
        private static class PublishTimestampFunctionKV<K, V> implements KafkaPublishTimestampFunction<ProducerRecord<K, V>> {
            private KafkaPublishTimestampFunction<KV<K, V>> fn;

            public PublishTimestampFunctionKV(KafkaPublishTimestampFunction<KV<K, V>> kafkaPublishTimestampFunction) {
                this.fn = kafkaPublishTimestampFunction;
            }

            @Override // org.apache.beam.sdk.io.kafka.KafkaPublishTimestampFunction
            public Instant getTimestamp(ProducerRecord<K, V> producerRecord, Instant instant) {
                return this.fn.getTimestamp(KV.of(producerRecord.key(), producerRecord.value()), instant);
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract String getTopic();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract WriteRecords<K, V> getWriteRecordsTransform();

        abstract Builder<K, V> toBuilder();

        private Write<K, V> withWriteRecordsTransform(WriteRecords<K, V> writeRecords) {
            return toBuilder().setWriteRecordsTransform(writeRecords).build();
        }

        public Write<K, V> withBootstrapServers(String str) {
            return withWriteRecordsTransform(getWriteRecordsTransform().withBootstrapServers(str));
        }

        public Write<K, V> withTopic(String str) {
            return toBuilder().setTopic(str).setWriteRecordsTransform(getWriteRecordsTransform().withTopic(str)).build();
        }

        public Write<K, V> withKeySerializer(Class<? extends Serializer<K>> cls) {
            return withWriteRecordsTransform(getWriteRecordsTransform().withKeySerializer(cls));
        }

        public Write<K, V> withValueSerializer(Class<? extends Serializer<V>> cls) {
            return withWriteRecordsTransform(getWriteRecordsTransform().withValueSerializer(cls));
        }

        public Write<K, V> withProducerFactoryFn(SerializableFunction<Map<String, Object>, Producer<K, V>> serializableFunction) {
            return withWriteRecordsTransform(getWriteRecordsTransform().withProducerFactoryFn(serializableFunction));
        }

        public Write<K, V> withInputTimestamp() {
            return withWriteRecordsTransform(getWriteRecordsTransform().withInputTimestamp());
        }

        @Deprecated
        public Write<K, V> withPublishTimestampFunction(KafkaPublishTimestampFunction<KV<K, V>> kafkaPublishTimestampFunction) {
            return withWriteRecordsTransform(getWriteRecordsTransform().withPublishTimestampFunction(new PublishTimestampFunctionKV(kafkaPublishTimestampFunction)));
        }

        public Write<K, V> withEOS(int i, String str) {
            return withWriteRecordsTransform(getWriteRecordsTransform().withEOS(i, str));
        }

        public Write<K, V> withConsumerFactoryFn(SerializableFunction<Map<String, Object>, ? extends Consumer<?, ?>> serializableFunction) {
            return withWriteRecordsTransform(getWriteRecordsTransform().withConsumerFactoryFn(serializableFunction));
        }

        @Deprecated
        public Write<K, V> updateProducerProperties(Map<String, Object> map) {
            return withWriteRecordsTransform(getWriteRecordsTransform().updateProducerProperties(map));
        }

        public Write<K, V> withProducerConfigUpdates(Map<String, Object> map) {
            return withWriteRecordsTransform(getWriteRecordsTransform().withProducerConfigUpdates(map));
        }

        public PDone expand(PCollection<KV<K, V>> pCollection) {
            Preconditions.checkArgument(getTopic() != null, "withTopic() is required");
            KvCoder coder = pCollection.getCoder();
            return pCollection.apply("Kafka ProducerRecord", MapElements.via(new SimpleFunction<KV<K, V>, ProducerRecord<K, V>>() { // from class: org.apache.beam.sdk.io.kafka.KafkaIO.Write.1
                public ProducerRecord<K, V> apply(KV<K, V> kv) {
                    return new ProducerRecord<>(Write.this.getTopic(), kv.getKey(), kv.getValue());
                }
            })).setCoder(ProducerRecordCoder.of(coder.getKeyCoder(), coder.getValueCoder())).apply(getWriteRecordsTransform());
        }

        public void validate(PipelineOptions pipelineOptions) {
            getWriteRecordsTransform().validate(pipelineOptions);
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            getWriteRecordsTransform().populateDisplayData(builder);
        }

        public PTransform<PCollection<V>, PDone> values() {
            return new KafkaValueWrite(withKeySerializer(StringSerializer.class));
        }
    }

    @AutoValue
    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$WriteRecords.class */
    public static abstract class WriteRecords<K, V> extends PTransform<PCollection<ProducerRecord<K, V>>, PDone> {
        private static final Map<String, Object> DEFAULT_PRODUCER_PROPERTIES = ImmutableMap.of("retries", 3);
        private static final Map<String, String> IGNORED_PRODUCER_PROPERTIES = ImmutableMap.of("key.serializer", "Use withKeySerializer instead", "value.serializer", "Use withValueSerializer instead");

        /* JADX INFO: Access modifiers changed from: package-private */
        @AutoValue.Builder
        /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIO$WriteRecords$Builder.class */
        public static abstract class Builder<K, V> {
            abstract Builder<K, V> setTopic(String str);

            abstract Builder<K, V> setProducerConfig(Map<String, Object> map);

            abstract Builder<K, V> setProducerFactoryFn(SerializableFunction<Map<String, Object>, Producer<K, V>> serializableFunction);

            abstract Builder<K, V> setKeySerializer(Class<? extends Serializer<K>> cls);

            abstract Builder<K, V> setValueSerializer(Class<? extends Serializer<V>> cls);

            abstract Builder<K, V> setPublishTimestampFunction(KafkaPublishTimestampFunction<ProducerRecord<K, V>> kafkaPublishTimestampFunction);

            abstract Builder<K, V> setEOS(boolean z);

            abstract Builder<K, V> setSinkGroupId(String str);

            abstract Builder<K, V> setNumShards(int i);

            abstract Builder<K, V> setConsumerFactoryFn(SerializableFunction<Map<String, Object>, ? extends Consumer<?, ?>> serializableFunction);

            abstract WriteRecords<K, V> build();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract String getTopic();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Map<String, Object> getProducerConfig();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract SerializableFunction<Map<String, Object>, Producer<K, V>> getProducerFactoryFn();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract Class<? extends Serializer<K>> getKeySerializer();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract Class<? extends Serializer<V>> getValueSerializer();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract KafkaPublishTimestampFunction<ProducerRecord<K, V>> getPublishTimestampFunction();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract boolean isEOS();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract String getSinkGroupId();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract int getNumShards();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract SerializableFunction<Map<String, Object>, ? extends Consumer<?, ?>> getConsumerFactoryFn();

        abstract Builder<K, V> toBuilder();

        public WriteRecords<K, V> withBootstrapServers(String str) {
            return withProducerConfigUpdates(ImmutableMap.of("bootstrap.servers", str));
        }

        public WriteRecords<K, V> withTopic(String str) {
            return toBuilder().setTopic(str).build();
        }

        public WriteRecords<K, V> withKeySerializer(Class<? extends Serializer<K>> cls) {
            return toBuilder().setKeySerializer(cls).build();
        }

        public WriteRecords<K, V> withValueSerializer(Class<? extends Serializer<V>> cls) {
            return toBuilder().setValueSerializer(cls).build();
        }

        @Deprecated
        public WriteRecords<K, V> updateProducerProperties(Map<String, Object> map) {
            return toBuilder().setProducerConfig(KafkaIO.updateKafkaProperties(getProducerConfig(), IGNORED_PRODUCER_PROPERTIES, map)).build();
        }

        public WriteRecords<K, V> withProducerConfigUpdates(Map<String, Object> map) {
            return toBuilder().setProducerConfig(KafkaIO.updateKafkaProperties(getProducerConfig(), IGNORED_PRODUCER_PROPERTIES, map)).build();
        }

        public WriteRecords<K, V> withProducerFactoryFn(SerializableFunction<Map<String, Object>, Producer<K, V>> serializableFunction) {
            return toBuilder().setProducerFactoryFn(serializableFunction).build();
        }

        public WriteRecords<K, V> withInputTimestamp() {
            return withPublishTimestampFunction(KafkaPublishTimestampFunction.withElementTimestamp());
        }

        @Deprecated
        public WriteRecords<K, V> withPublishTimestampFunction(KafkaPublishTimestampFunction<ProducerRecord<K, V>> kafkaPublishTimestampFunction) {
            return toBuilder().setPublishTimestampFunction(kafkaPublishTimestampFunction).build();
        }

        public WriteRecords<K, V> withEOS(int i, String str) {
            KafkaExactlyOnceSink.ensureEOSSupport();
            Preconditions.checkArgument(i >= 1, "numShards should be >= 1");
            Preconditions.checkArgument(str != null, "sinkGroupId is required for exactly-once sink");
            return toBuilder().setEOS(true).setNumShards(i).setSinkGroupId(str).build();
        }

        public WriteRecords<K, V> withConsumerFactoryFn(SerializableFunction<Map<String, Object>, ? extends Consumer<?, ?>> serializableFunction) {
            return toBuilder().setConsumerFactoryFn(serializableFunction).build();
        }

        public PDone expand(PCollection<ProducerRecord<K, V>> pCollection) {
            Preconditions.checkArgument(getProducerConfig().get("bootstrap.servers") != null, "withBootstrapServers() is required");
            Preconditions.checkArgument(getKeySerializer() != null, "withKeySerializer() is required");
            Preconditions.checkArgument(getValueSerializer() != null, "withValueSerializer() is required");
            if (isEOS()) {
                Preconditions.checkArgument(getTopic() != null, "withTopic() is required when isEOS() is true");
                KafkaExactlyOnceSink.ensureEOSSupport();
                pCollection.apply(new KafkaExactlyOnceSink(this));
            } else {
                pCollection.apply(ParDo.of(new KafkaWriter(this)));
            }
            return PDone.in(pCollection.getPipeline());
        }

        public void validate(PipelineOptions pipelineOptions) {
            if (isEOS()) {
                String name = pipelineOptions.getRunner().getName();
                if (!"org.apache.beam.runners.direct.DirectRunner".equals(name) && !name.startsWith("org.apache.beam.runners.dataflow.") && !name.startsWith("org.apache.beam.runners.spark.") && !name.startsWith("org.apache.beam.runners.flink.")) {
                    throw new UnsupportedOperationException(name + " is not whitelisted among runners compatible with Kafka exactly-once sink. This implementation of exactly-once sink relies on specific checkpoint guarantees. Only the runners with known to have compatible checkpoint semantics are whitelisted.");
                }
            }
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.addIfNotNull(DisplayData.item("topic", getTopic()).withLabel("Topic"));
            Set<String> keySet = IGNORED_PRODUCER_PROPERTIES.keySet();
            for (Map.Entry<String, Object> entry : getProducerConfig().entrySet()) {
                String key = entry.getKey();
                if (!keySet.contains(key)) {
                    builder.add(DisplayData.item(key, ValueProvider.StaticValueProvider.of(DisplayData.inferType(entry.getValue()) != null ? entry.getValue() : String.valueOf(entry.getValue()))));
                }
            }
        }
    }

    public static Read<byte[], byte[]> readBytes() {
        return read().withKeyDeserializer(ByteArrayDeserializer.class).withValueDeserializer(ByteArrayDeserializer.class);
    }

    public static <K, V> Read<K, V> read() {
        return new AutoValue_KafkaIO_Read.Builder().setTopics(new ArrayList()).setTopicPartitions(new ArrayList()).setConsumerFactoryFn(Read.KAFKA_CONSUMER_FACTORY_FN).setConsumerConfig(Read.DEFAULT_CONSUMER_PROPERTIES).setMaxNumRecords(Long.MAX_VALUE).setCommitOffsetsInFinalizeEnabled(false).setTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime()).build();
    }

    public static <K, V> Write<K, V> write() {
        return new AutoValue_KafkaIO_Write.Builder().setWriteRecordsTransform(new AutoValue_KafkaIO_WriteRecords.Builder().setProducerConfig(WriteRecords.DEFAULT_PRODUCER_PROPERTIES).setEOS(false).setNumShards(0).setConsumerFactoryFn(Read.KAFKA_CONSUMER_FACTORY_FN).build()).build();
    }

    public static <K, V> WriteRecords<K, V> writeRecords() {
        return new AutoValue_KafkaIO_WriteRecords.Builder().setProducerConfig(WriteRecords.DEFAULT_PRODUCER_PROPERTIES).setEOS(false).setNumShards(0).setConsumerFactoryFn(Read.KAFKA_CONSUMER_FACTORY_FN).build();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Map<String, Object> updateKafkaProperties(Map<String, Object> map, Map<String, String> map2, Map<String, Object> map3) {
        for (String str : map3.keySet()) {
            Preconditions.checkArgument(!map2.containsKey(str), "No need to configure '%s'. %s", str, map2.get(str));
        }
        HashMap hashMap = new HashMap(map);
        hashMap.putAll(map3);
        return hashMap;
    }

    private KafkaIO() {
    }

    @VisibleForTesting
    static <T> NullableCoder<T> inferCoder(CoderRegistry coderRegistry, Class<? extends Deserializer<T>> cls) {
        Preconditions.checkNotNull(cls);
        for (Type type : cls.getGenericInterfaces()) {
            if (type instanceof ParameterizedType) {
                ParameterizedType parameterizedType = (ParameterizedType) type;
                if (parameterizedType.getRawType() == Deserializer.class) {
                    Class cls2 = (Class) parameterizedType.getActualTypeArguments()[0];
                    try {
                        return NullableCoder.of(coderRegistry.getCoder(cls2));
                    } catch (CannotProvideCoderException e) {
                        throw new RuntimeException(String.format("Unable to automatically infer a Coder for the Kafka Deserializer %s: no coder registered for type %s", cls, cls2));
                    }
                }
            }
        }
        throw new RuntimeException(String.format("Could not extract the Kafka Deserializer type from %s", cls));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static String utf8String(byte[] bArr) {
        return new String(bArr, Charsets.UTF_8);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Class resolveClass(String str) {
        try {
            return Class.forName(str);
        } catch (ClassNotFoundException e) {
            throw new RuntimeException("Could not find class: " + str);
        }
    }
}
