package org.apache.beam.sdk.io.kafka.upgrade;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.io.kafka.upgrade.KafkaIOTranslation;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.util.construction.TransformUpgrader;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.kafka.common.TopicPartition;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslationTest.class */
public class KafkaIOTranslationTest {
    static final Map<String, String> READ_TRANSFORM_SCHEMA_MAPPING = new HashMap();
    static final Map<String, String> WRITE_TRANSFORM_SCHEMA_MAPPING;

    @Test
    public void testReCreateReadTransformFromRow() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("dummyconfig", "dummyvalue");
        KafkaIO.Read withConsumerConfigUpdates = KafkaIO.read().withBootstrapServers("dummykafkaserver").withTopicPartitions((List) IntStream.range(0, 1).mapToObj(i -> {
            return new TopicPartition("dummytopic", i);
        }).collect(Collectors.toList())).withConsumerConfigUpdates(hashMap);
        KafkaIOTranslation.KafkaIOReadWithMetadataTranslator kafkaIOReadWithMetadataTranslator = new KafkaIOTranslation.KafkaIOReadWithMetadataTranslator();
        KafkaIO.Read fromConfigRow = kafkaIOReadWithMetadataTranslator.fromConfigRow(kafkaIOReadWithMetadataTranslator.toConfigRow(withConsumerConfigUpdates), PipelineOptionsFactory.create());
        Assert.assertNotNull(fromConfigRow.getConsumerConfig().get("bootstrap.servers"));
        Assert.assertEquals("dummykafkaserver", fromConfigRow.getConsumerConfig().get("bootstrap.servers"));
        Assert.assertEquals(1L, fromConfigRow.getTopicPartitions().size());
        Assert.assertEquals("dummytopic", ((TopicPartition) fromConfigRow.getTopicPartitions().get(0)).topic());
        Assert.assertEquals(0L, ((TopicPartition) fromConfigRow.getTopicPartitions().get(0)).partition());
    }

    @Test
    public void testReCreateReadTransformWithTopics() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("dummyconfig", "dummyvalue");
        KafkaIO.Read withConsumerConfigUpdates = KafkaIO.read().withBootstrapServers("dummykafkaserver").withTopic("dummytopic").withConsumerConfigUpdates(hashMap);
        KafkaIOTranslation.KafkaIOReadWithMetadataTranslator kafkaIOReadWithMetadataTranslator = new KafkaIOTranslation.KafkaIOReadWithMetadataTranslator();
        KafkaIO.Read fromConfigRow = kafkaIOReadWithMetadataTranslator.fromConfigRow(kafkaIOReadWithMetadataTranslator.toConfigRow(withConsumerConfigUpdates), PipelineOptionsFactory.create());
        Assert.assertNotNull(fromConfigRow.getConsumerConfig().get("bootstrap.servers"));
        Assert.assertEquals("dummykafkaserver", fromConfigRow.getConsumerConfig().get("bootstrap.servers"));
        Assert.assertEquals(1L, fromConfigRow.getTopics().size());
        Assert.assertEquals("dummytopic", fromConfigRow.getTopics().get(0));
    }

    @Test
    public void testReadTransformRowIncludesAllFields() throws Exception {
        ImmutableList of = ImmutableList.of("getBadRecordRouter", "getBadRecordErrorHandler");
        List<String> list = (List) Arrays.stream(KafkaIO.Read.class.getDeclaredMethods()).map(method -> {
            return method.getName();
        }).filter(str -> {
            return str.startsWith("get");
        }).filter(str2 -> {
            return !of.contains(str2);
        }).collect(Collectors.toList());
        Assert.assertTrue(list.size() > 0);
        for (String str3 : list) {
            Assert.assertTrue("Method " + str3 + " will not be tracked when upgrading the 'KafkaIO.Read' transform. Please update 'KafkaIOTranslation.KafkaIOReadWithMetadataTranslator' to track the new method and update this test.", READ_TRANSFORM_SCHEMA_MAPPING.keySet().contains(str3));
        }
        READ_TRANSFORM_SCHEMA_MAPPING.values().stream().forEach(str4 -> {
            Assert.assertTrue("Field name " + str4 + " was not found in the read transform schema defined in KafkaIOReadWithMetadataTranslator.", KafkaIOTranslation.KafkaIOReadWithMetadataTranslator.schema.getFieldNames().contains(str4));
        });
    }

    @Test
    public void testReCreateWriteTransformFromRow() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("dummyconfig", "dummyvalue");
        KafkaIO.Write withProducerConfigUpdates = KafkaIO.write().withBootstrapServers("dummybootstrapserver").withTopic("dummytopic").withProducerConfigUpdates(hashMap);
        KafkaIOTranslation.KafkaIOWriteTranslator kafkaIOWriteTranslator = new KafkaIOTranslation.KafkaIOWriteTranslator();
        KafkaIO.WriteRecords writeRecordsTransform = kafkaIOWriteTranslator.fromConfigRow(kafkaIOWriteTranslator.toConfigRow(withProducerConfigUpdates), PipelineOptionsFactory.create()).getWriteRecordsTransform();
        Assert.assertNotNull(writeRecordsTransform.getProducerConfig().get("bootstrap.servers"));
        Assert.assertEquals("dummybootstrapserver", writeRecordsTransform.getProducerConfig().get("bootstrap.servers"));
        Assert.assertEquals("dummytopic", writeRecordsTransform.getTopic());
        Map producerConfig = writeRecordsTransform.getProducerConfig();
        Assert.assertTrue(producerConfig.containsKey("dummyconfig"));
        Assert.assertEquals("dummyvalue", producerConfig.get("dummyconfig"));
    }

    @Test
    public void testWriteTransformRowIncludesAllFields() throws Exception {
        ImmutableList of = ImmutableList.of("getBadRecordRouter", "getBadRecordErrorHandler");
        List<String> list = (List) Arrays.stream(KafkaIO.WriteRecords.class.getDeclaredMethods()).map(method -> {
            return method.getName();
        }).filter(str -> {
            return str.startsWith("get");
        }).filter(str2 -> {
            return !of.contains(str2);
        }).collect(Collectors.toList());
        Assert.assertTrue(list.size() > 0);
        for (String str3 : list) {
            Assert.assertTrue("Method " + str3 + " will not be tracked when upgrading the 'KafkaIO.Write' transform. Please update 'KafkaIOTranslation.KafkaIOWriteTranslator' to track the new method and update this test.", WRITE_TRANSFORM_SCHEMA_MAPPING.keySet().contains(str3));
        }
        WRITE_TRANSFORM_SCHEMA_MAPPING.values().stream().forEach(str4 -> {
            Assert.assertTrue("Field name " + str4 + " was not found in the write transform schema defined in KafkaIOWriteWithMetadataTranslator.", KafkaIOTranslation.KafkaIOWriteTranslator.schema.getFieldNames().contains(str4));
        });
    }

    @Test
    public void testReadTransformURNDiscovery() {
        Assert.assertEquals("beam:transform:org.apache.beam:kafka_read_with_metadata:v2", TransformUpgrader.findUpgradeURN(KafkaIO.read().withBootstrapServers("dummykafkaserver").withTopicPartitions((List) IntStream.range(0, 1).mapToObj(i -> {
            return new TopicPartition("dummytopic", i);
        }).collect(Collectors.toList()))));
    }

    @Test
    public void testWriteTransformURNDiscovery() {
        Assert.assertEquals("beam:transform:org.apache.beam:kafka_write:v2", TransformUpgrader.findUpgradeURN(KafkaIO.write().withBootstrapServers("dummybootstrapserver").withTopic("dummytopic")));
    }

    static {
        READ_TRANSFORM_SCHEMA_MAPPING.put("getConsumerConfig", "consumer_config");
        READ_TRANSFORM_SCHEMA_MAPPING.put("getTopics", "topics");
        READ_TRANSFORM_SCHEMA_MAPPING.put("getTopicPartitions", "topic_partitions");
        READ_TRANSFORM_SCHEMA_MAPPING.put("getTopicPattern", "topic_pattern");
        READ_TRANSFORM_SCHEMA_MAPPING.put("getKeyCoder", "key_coder");
        READ_TRANSFORM_SCHEMA_MAPPING.put("getValueCoder", "value_coder");
        READ_TRANSFORM_SCHEMA_MAPPING.put("getConsumerFactoryFn", "consumer_factory_fn");
        READ_TRANSFORM_SCHEMA_MAPPING.put("getWatermarkFn", "watermark_fn");
        READ_TRANSFORM_SCHEMA_MAPPING.put("getMaxNumRecords", "max_num_records");
        READ_TRANSFORM_SCHEMA_MAPPING.put("getMaxReadTime", "max_read_time");
        READ_TRANSFORM_SCHEMA_MAPPING.put("getStartReadTime", "start_read_time");
        READ_TRANSFORM_SCHEMA_MAPPING.put("getStopReadTime", "stop_read_time");
        READ_TRANSFORM_SCHEMA_MAPPING.put("isCommitOffsetsInFinalizeEnabled", "is_commit_offset_finalize_enabled");
        READ_TRANSFORM_SCHEMA_MAPPING.put("isDynamicRead", "is_dynamic_read");
        READ_TRANSFORM_SCHEMA_MAPPING.put("getWatchTopicPartitionDuration", "watch_topic_partition_duration");
        READ_TRANSFORM_SCHEMA_MAPPING.put("getTimestampPolicyFactory", "timestamp_policy_factory");
        READ_TRANSFORM_SCHEMA_MAPPING.put("getOffsetConsumerConfig", "offset_consumer_config");
        READ_TRANSFORM_SCHEMA_MAPPING.put("getKeyDeserializerProvider", "key_deserializer_provider");
        READ_TRANSFORM_SCHEMA_MAPPING.put("getValueDeserializerProvider", "value_deserializer_provider");
        READ_TRANSFORM_SCHEMA_MAPPING.put("getCheckStopReadingFn", "check_stop_reading_fn");
        READ_TRANSFORM_SCHEMA_MAPPING.put("getConsumerPollingTimeout", "consumer_polling_timeout");
        WRITE_TRANSFORM_SCHEMA_MAPPING = new HashMap();
        WRITE_TRANSFORM_SCHEMA_MAPPING.put("getTopic", "topic");
        WRITE_TRANSFORM_SCHEMA_MAPPING.put("getProducerConfig", "producer_config");
        WRITE_TRANSFORM_SCHEMA_MAPPING.put("getProducerFactoryFn", "producer_factory_fn");
        WRITE_TRANSFORM_SCHEMA_MAPPING.put("getKeySerializer", "key_serializer");
        WRITE_TRANSFORM_SCHEMA_MAPPING.put("getValueSerializer", "value_serializer");
        WRITE_TRANSFORM_SCHEMA_MAPPING.put("getPublishTimestampFunction", "publish_timestamp_fn");
        WRITE_TRANSFORM_SCHEMA_MAPPING.put("isEOS", "eos");
        WRITE_TRANSFORM_SCHEMA_MAPPING.put("getSinkGroupId", "sink_group_id");
        WRITE_TRANSFORM_SCHEMA_MAPPING.put("getNumShards", "num_shards");
        WRITE_TRANSFORM_SCHEMA_MAPPING.put("getConsumerFactoryFn", "consumer_factory_fn");
    }
}
