package org.apache.beam.sdk.io.kafka.upgrade;

import com.google.auto.service.AutoService;
import java.io.IOException;
import java.io.InvalidClassException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.SdkComponents;
import org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar;
import org.apache.beam.runners.core.construction.TransformUpgrader;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.kafka.DeserializerProvider;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.io.kafka.KafkaIOUtils;
import org.apache.beam.sdk.io.kafka.TimestampPolicyFactory;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.logicaltypes.NanosDuration;
import org.apache.beam.sdk.schemas.logicaltypes.NanosInstant;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.common.TopicPartition;
import org.joda.time.Instant;

/* loaded from: input_file:org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.class */
public class KafkaIOTranslation {
    public static final String KAFKA_READ_WITH_METADATA_TRANSFORM_URN_V2 = "beam:transform:org.apache.beam:kafka_read_with_metadata:v2";
    public static final String KAFKA_WRITE_TRANSFORM_URN_V2 = "beam:transform:org.apache.beam:kafka_write:v2";

    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation$KafkaIOReadWithMetadataTranslator.class */
    static class KafkaIOReadWithMetadataTranslator implements PTransformTranslation.TransformPayloadTranslator<KafkaIO.Read<?, ?>> {
        static Schema topicPartitionSchema = Schema.builder().addStringField(ConsumerProtocol.TOPIC_KEY_NAME).addInt32Field("partition").build();
        static Schema schema = Schema.builder().addMapField("consumer_config", Schema.FieldType.STRING, Schema.FieldType.BYTES).addNullableArrayField(ConsumerProtocol.TOPICS_KEY_NAME, Schema.FieldType.STRING).addNullableArrayField(ConsumerProtocol.TOPIC_PARTITIONS_KEY_NAME, Schema.FieldType.row(topicPartitionSchema)).addNullableStringField("topic_pattern").addNullableByteArrayField("key_coder").addNullableByteArrayField("value_coder").addByteArrayField("consumer_factory_fn").addNullableByteArrayField("watermark_fn").addInt64Field("max_num_records").addNullableLogicalTypeField("max_read_time", new NanosDuration()).addNullableLogicalTypeField("start_read_time", new NanosInstant()).addNullableLogicalTypeField("stop_read_time", new NanosInstant()).addBooleanField("is_commit_offset_finalize_enabled").addBooleanField("is_dynamic_read").addNullableLogicalTypeField("watch_topic_partition_duration", new NanosDuration()).addByteArrayField("timestamp_policy_factory").addNullableMapField("offset_consumer_config", Schema.FieldType.STRING, Schema.FieldType.BYTES).addNullableByteArrayField("key_deserializer_provider").addNullableByteArrayField("value_deserializer_provider").addNullableByteArrayField("check_stop_reading_fn").build();

        KafkaIOReadWithMetadataTranslator() {
        }

        @Override // org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator
        public String getUrn() {
            return KafkaIOTranslation.KAFKA_READ_WITH_METADATA_TRANSFORM_URN_V2;
        }

        @Override // org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator
        public RunnerApi.FunctionSpec translate(AppliedPTransform<?, ?, KafkaIO.Read<?, ?>> appliedPTransform, SdkComponents sdkComponents) throws IOException {
            return RunnerApi.FunctionSpec.newBuilder().setUrn(getUrn()).setPayload(ByteString.empty()).build();
        }

        @Override // org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator
        public Row toConfigRow(KafkaIO.Read<?, ?> read) {
            HashMap hashMap = new HashMap();
            HashMap hashMap2 = new HashMap();
            read.getConsumerConfig().forEach((str, obj) -> {
                hashMap2.put(str, TransformUpgrader.toByteArray(obj));
            });
            hashMap.put("consumer_config", hashMap2);
            if (read.getTopics() != null) {
                hashMap.put(ConsumerProtocol.TOPICS_KEY_NAME, read.getTopics());
            }
            if (read.getTopicPartitions() != null) {
                ArrayList arrayList = new ArrayList();
                for (TopicPartition topicPartition : read.getTopicPartitions()) {
                    arrayList.add(Row.withSchema(topicPartitionSchema).addValue(topicPartition.topic()).addValue(Integer.valueOf(topicPartition.partition())).build());
                }
                hashMap.put(ConsumerProtocol.TOPIC_PARTITIONS_KEY_NAME, arrayList);
            }
            if (read.getTopicPattern() != null) {
                hashMap.put("topic_pattern", read.getTopicPattern().pattern());
            }
            if (read.getKeyCoder() != null) {
                hashMap.put("key_coder", TransformUpgrader.toByteArray(read.getKeyCoder()));
            }
            if (read.getValueCoder() != null) {
                hashMap.put("value_coder", TransformUpgrader.toByteArray(read.getValueCoder()));
            }
            if (read.getConsumerFactoryFn() != null) {
                hashMap.put("consumer_factory_fn", TransformUpgrader.toByteArray(read.getConsumerFactoryFn()));
            }
            if (read.getWatermarkFn() != null) {
                hashMap.put("watermark_fn", TransformUpgrader.toByteArray(read.getWatermarkFn()));
            }
            hashMap.put("max_num_records", Long.valueOf(read.getMaxNumRecords()));
            if (read.getMaxReadTime() != null) {
                hashMap.put("max_read_time", read.getMaxReadTime());
            }
            if (read.getStartReadTime() != null) {
                hashMap.put("start_read_time", read.getStartReadTime());
            }
            if (read.getStopReadTime() != null) {
                hashMap.put("stop_read_time", read.getStopReadTime());
            }
            hashMap.put("is_commit_offset_finalize_enabled", Boolean.valueOf(read.isCommitOffsetsInFinalizeEnabled()));
            hashMap.put("is_dynamic_read", Boolean.valueOf(read.isDynamicRead()));
            if (read.getWatchTopicPartitionDuration() != null) {
                hashMap.put("watch_topic_partition_duration", read.getWatchTopicPartitionDuration());
            }
            hashMap.put("timestamp_policy_factory", TransformUpgrader.toByteArray(read.getTimestampPolicyFactory()));
            if (read.getOffsetConsumerConfig() != null) {
                HashMap hashMap3 = new HashMap();
                read.getOffsetConsumerConfig().forEach((str2, obj2) -> {
                    hashMap3.put(str2, TransformUpgrader.toByteArray(obj2));
                });
                hashMap.put("offset_consumer_config", hashMap3);
            }
            if (read.getKeyDeserializerProvider() != null) {
                hashMap.put("key_deserializer_provider", TransformUpgrader.toByteArray(read.getKeyDeserializerProvider()));
            }
            if (read.getValueDeserializerProvider() != null) {
                hashMap.put("value_deserializer_provider", TransformUpgrader.toByteArray(read.getValueDeserializerProvider()));
            }
            if (read.getCheckStopReadingFn() != null) {
                hashMap.put("check_stop_reading_fn", TransformUpgrader.toByteArray(read.getCheckStopReadingFn()));
            }
            if (read.getBadRecordErrorHandler() != null) {
                throw new RuntimeException("Upgrading KafkaIO read transforms that have `withBadRecordErrorHandler` property setis not supported yet.");
            }
            return Row.withSchema(schema).withFieldValues(hashMap).build();
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator
        public KafkaIO.Read<?, ?> fromConfigRow(Row row) {
            try {
                KafkaIO.Read<?, ?> read = KafkaIO.read();
                Map map = row.getMap("consumer_config");
                if (map != null) {
                    HashMap hashMap = new HashMap();
                    map.forEach((str, bArr) -> {
                        if (KafkaIOUtils.DISALLOWED_CONSUMER_PROPERTIES.containsKey(str)) {
                            return;
                        }
                        if (map.get(str) == null) {
                            throw new IllegalArgumentException("Encoded value of the consumer config property " + str + " was null");
                        }
                        try {
                            hashMap.put(str, TransformUpgrader.fromByteArray((byte[]) map.get(str)));
                        } catch (InvalidClassException e) {
                            throw new RuntimeException(e);
                        }
                    });
                    read = read.withConsumerConfigUpdates(hashMap);
                }
                Collection array = row.getArray(ConsumerProtocol.TOPICS_KEY_NAME);
                if (array != null) {
                    read = read.withTopics(new ArrayList(array));
                }
                Collection array2 = row.getArray(ConsumerProtocol.TOPIC_PARTITIONS_KEY_NAME);
                if (array2 != null) {
                    read = read.withTopicPartitions(Lists.newArrayList((Collection) array2.stream().map(row2 -> {
                        String string = row2.getString(ConsumerProtocol.TOPIC_KEY_NAME);
                        if (string == null) {
                            throw new IllegalArgumentException("Expected the topic to be not null");
                        }
                        Integer int32 = row2.getInt32("partition");
                        if (int32 == null) {
                            throw new IllegalArgumentException("Expected the partition to be not null");
                        }
                        return new TopicPartition(string, int32.intValue());
                    }).collect(Collectors.toList())));
                }
                String string = row.getString("topic_pattern");
                if (string != null) {
                    read = read.withTopicPattern(string);
                }
                byte[] bytes = row.getBytes("key_deserializer_provider");
                if (bytes != null) {
                    byte[] bytes2 = row.getBytes("key_coder");
                    read = bytes2 != null ? read.withKeyDeserializerProviderAndCoder((DeserializerProvider) TransformUpgrader.fromByteArray(bytes), (Coder) TransformUpgrader.fromByteArray(bytes2)) : read.withKeyDeserializer((DeserializerProvider<?>) TransformUpgrader.fromByteArray(bytes));
                }
                byte[] bytes3 = row.getBytes("value_deserializer_provider");
                if (bytes3 != null) {
                    byte[] bytes4 = row.getBytes("value_coder");
                    read = bytes4 != null ? read.withValueDeserializerProviderAndCoder((DeserializerProvider) TransformUpgrader.fromByteArray(bytes3), (Coder) TransformUpgrader.fromByteArray(bytes4)) : read.withValueDeserializer((DeserializerProvider<?>) TransformUpgrader.fromByteArray(bytes3));
                }
                byte[] bytes5 = row.getBytes("consumer_factory_fn");
                if (bytes5 != null) {
                    read = read.withConsumerFactoryFn((SerializableFunction) TransformUpgrader.fromByteArray(bytes5));
                }
                byte[] bytes6 = row.getBytes("watermark_fn");
                if (bytes6 != null) {
                    read = read.withWatermarkFn2((SerializableFunction) TransformUpgrader.fromByteArray(bytes6));
                }
                Long int64 = row.getInt64("max_num_records");
                if (int64 != null) {
                    read = read.withMaxNumRecords(int64.longValue());
                }
                Duration duration = (Duration) row.getValue("max_read_time");
                if (duration != null) {
                    read = read.withMaxReadTime(org.joda.time.Duration.millis(duration.toMillis()));
                }
                Instant instant = (Instant) row.getValue("start_read_time");
                if (instant != null) {
                    read = read.withStartReadTime(instant);
                }
                Instant instant2 = (Instant) row.getValue("stop_read_time");
                if (instant2 != null) {
                    read = read.withStopReadTime(instant2);
                }
                Boolean bool = row.getBoolean("is_commit_offset_finalize_enabled");
                if (bool != null && bool.booleanValue()) {
                    read = read.commitOffsetsInFinalize();
                }
                Boolean bool2 = row.getBoolean("is_dynamic_read");
                if (bool2 != null && bool2.booleanValue()) {
                    Duration duration2 = (Duration) row.getValue("watch_topic_partition_duration");
                    if (duration2 == null) {
                        throw new IllegalArgumentException("Expected watchTopicPartitionDuration to be available when isDynamicRead is set to true");
                    }
                    read = read.withDynamicRead(org.joda.time.Duration.millis(duration2.toMillis()));
                }
                byte[] bytes7 = row.getBytes("timestamp_policy_factory");
                if (bytes7 != null) {
                    read = read.withTimestampPolicyFactory((TimestampPolicyFactory) TransformUpgrader.fromByteArray(bytes7));
                }
                Map map2 = row.getMap("offset_consumer_config");
                if (map2 != null) {
                    HashMap hashMap2 = new HashMap();
                    map2.forEach((str2, bArr2) -> {
                        if (map2.get(str2) == null) {
                            throw new IllegalArgumentException("Encoded value for the offset consumer config key " + str2 + " was null.");
                        }
                        try {
                            hashMap2.put(str2, TransformUpgrader.fromByteArray((byte[]) map2.get(str2)));
                        } catch (InvalidClassException e) {
                            throw new RuntimeException(e);
                        }
                    });
                    read = read.withOffsetConsumerConfigOverrides(hashMap2);
                }
                byte[] bytes8 = row.getBytes("check_stop_reading_fn");
                if (bytes8 != null) {
                    read = read.withCheckStopReadingFn((SerializableFunction<TopicPartition, Boolean>) TransformUpgrader.fromByteArray(bytes8));
                }
                return read;
            } catch (InvalidClassException e) {
                throw new RuntimeException(e);
            }
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation$KafkaIOWriteTranslator.class */
    static class KafkaIOWriteTranslator implements PTransformTranslation.TransformPayloadTranslator<KafkaIO.Write<?, ?>> {
        static Schema schema = Schema.builder().addStringField("bootstrap_servers").addNullableStringField(ConsumerProtocol.TOPIC_KEY_NAME).addNullableByteArrayField("key_serializer").addNullableByteArrayField("value_serializer").addNullableByteArrayField("producer_factory_fn").addNullableByteArrayField("publish_timestamp_fn").addBooleanField("eos").addInt32Field("num_shards").addNullableStringField("sink_group_id").addNullableByteArrayField("consumer_factory_fn").addNullableMapField("producer_config", Schema.FieldType.STRING, Schema.FieldType.BYTES).build();

        KafkaIOWriteTranslator() {
        }

        @Override // org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator
        public String getUrn() {
            return KafkaIOTranslation.KAFKA_WRITE_TRANSFORM_URN_V2;
        }

        @Override // org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator
        public String getUrn(KafkaIO.Write<?, ?> write) {
            return super.getUrn((KafkaIOWriteTranslator) write);
        }

        @Override // org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator
        public RunnerApi.FunctionSpec translate(AppliedPTransform<?, ?, KafkaIO.Write<?, ?>> appliedPTransform, SdkComponents sdkComponents) throws IOException {
            return RunnerApi.FunctionSpec.newBuilder().setUrn(getUrn()).setPayload(ByteString.empty()).build();
        }

        @Override // org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator
        public Row toConfigRow(KafkaIO.Write<?, ?> write) {
            HashMap hashMap = new HashMap();
            KafkaIO.WriteRecords<?, ?> writeRecordsTransform = write.getWriteRecordsTransform();
            if (!writeRecordsTransform.getProducerConfig().containsKey("bootstrap.servers")) {
                throw new IllegalArgumentException("Expected the producer config to have 'ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG' set. Found: " + writeRecordsTransform.getProducerConfig());
            }
            hashMap.put("bootstrap_servers", writeRecordsTransform.getProducerConfig().get("bootstrap.servers"));
            if (writeRecordsTransform.getTopic() != null) {
                hashMap.put(ConsumerProtocol.TOPIC_KEY_NAME, writeRecordsTransform.getTopic());
            }
            if (writeRecordsTransform.getKeySerializer() != null) {
                hashMap.put("key_serializer", TransformUpgrader.toByteArray(writeRecordsTransform.getKeySerializer()));
            }
            if (writeRecordsTransform.getValueSerializer() != null) {
                hashMap.put("value_serializer", TransformUpgrader.toByteArray(writeRecordsTransform.getValueSerializer()));
            }
            if (writeRecordsTransform.getProducerFactoryFn() != null) {
                hashMap.put("producer_factory_fn", TransformUpgrader.toByteArray(writeRecordsTransform.getProducerFactoryFn()));
            }
            if (writeRecordsTransform.getPublishTimestampFunction() != null) {
                hashMap.put("publish_timestamp_fn", TransformUpgrader.toByteArray(writeRecordsTransform.getPublishTimestampFunction()));
            }
            hashMap.put("eos", Boolean.valueOf(writeRecordsTransform.isEOS()));
            hashMap.put("num_shards", Integer.valueOf(writeRecordsTransform.getNumShards()));
            if (writeRecordsTransform.getSinkGroupId() != null) {
                hashMap.put("sink_group_id", writeRecordsTransform.getSinkGroupId());
            }
            if (writeRecordsTransform.getConsumerFactoryFn() != null) {
                hashMap.put("consumer_factory_fn", TransformUpgrader.toByteArray(writeRecordsTransform.getConsumerFactoryFn()));
            }
            if (writeRecordsTransform.getProducerConfig().size() > 0) {
                HashMap hashMap2 = new HashMap();
                writeRecordsTransform.getProducerConfig().forEach((str, obj) -> {
                    hashMap2.put(str, TransformUpgrader.toByteArray(obj));
                });
                hashMap.put("producer_config", hashMap2);
            }
            if (writeRecordsTransform.getBadRecordErrorHandler() == null || (writeRecordsTransform.getBadRecordErrorHandler() instanceof ErrorHandler.DefaultErrorHandler)) {
                return Row.withSchema(schema).withFieldValues(hashMap).build();
            }
            throw new RuntimeException("Upgrading KafkaIO write transforms that have `withBadRecordErrorHandler` property setis not supported yet.");
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator
        public KafkaIO.Write<?, ?> fromConfigRow(Row row) {
            try {
                KafkaIO.Write<?, ?> write = KafkaIO.write();
                String string = row.getString("bootstrap_servers");
                if (string != null) {
                    write = write.withBootstrapServers(string);
                }
                String str = (String) row.getValue(ConsumerProtocol.TOPIC_KEY_NAME);
                if (str != null) {
                    write = write.withTopic(str);
                }
                byte[] bytes = row.getBytes("key_serializer");
                if (bytes != null) {
                    write = write.withKeySerializer((Class) TransformUpgrader.fromByteArray(bytes));
                }
                byte[] bytes2 = row.getBytes("value_serializer");
                if (bytes2 != null) {
                    write = write.withValueSerializer((Class) TransformUpgrader.fromByteArray(bytes2));
                }
                byte[] bytes3 = row.getBytes("producer_factory_fn");
                if (bytes3 != null) {
                    write = write.withProducerFactoryFn((SerializableFunction) TransformUpgrader.fromByteArray(bytes3));
                }
                Boolean bool = row.getBoolean("eos");
                if (bool != null && bool.booleanValue()) {
                    Integer int32 = row.getInt32("num_shards");
                    String string2 = row.getString("sink_group_id");
                    if (int32 == null) {
                        throw new IllegalArgumentException("Expected numShards to be provided when EOS is set to true");
                    }
                    if (string2 == null) {
                        throw new IllegalArgumentException("Expected sinkGroupId to be provided when EOS is set to true");
                    }
                    write = write.withEOS(int32.intValue(), string2);
                }
                byte[] bytes4 = row.getBytes("consumer_factory_fn");
                if (bytes4 != null) {
                    write = write.withConsumerFactoryFn((SerializableFunction) TransformUpgrader.fromByteArray(bytes4));
                }
                Map map = row.getMap("producer_config");
                if (map != null && !map.isEmpty()) {
                    HashMap hashMap = new HashMap();
                    map.forEach((str2, bArr) -> {
                        try {
                            hashMap.put(str2, TransformUpgrader.fromByteArray(bArr));
                        } catch (InvalidClassException e) {
                            throw new RuntimeException(e);
                        }
                    });
                    write = write.withProducerConfigUpdates(hashMap);
                }
                return write;
            } catch (InvalidClassException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @AutoService({TransformPayloadTranslatorRegistrar.class})
    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation$ReadRegistrar.class */
    public static class ReadRegistrar implements TransformPayloadTranslatorRegistrar {
        @Override // org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar
        public Map<? extends Class<? extends PTransform>, ? extends PTransformTranslation.TransformPayloadTranslator> getTransformPayloadTranslators() {
            return ImmutableMap.builder().put(KafkaIO.Read.AUTOVALUE_CLASS, new KafkaIOReadWithMetadataTranslator()).build();
        }
    }

    @AutoService({TransformPayloadTranslatorRegistrar.class})
    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation$WriteRegistrar.class */
    public static class WriteRegistrar implements TransformPayloadTranslatorRegistrar {
        @Override // org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar
        public Map<? extends Class<? extends PTransform>, ? extends PTransformTranslation.TransformPayloadTranslator> getTransformPayloadTranslators() {
            return ImmutableMap.builder().put(KafkaIO.Write.AUTOVALUE_CLASS, new KafkaIOWriteTranslator()).build();
        }
    }
}
