package org.apache.beam.sdk.io.kafka;

import com.google.cloud.Timestamp;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.common.HashingFn;
import org.apache.beam.sdk.io.common.IOITHelper;
import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.io.kafka.KafkaWriteSchemaTransformProvider;
import org.apache.beam.sdk.io.synthetic.SyntheticBoundedSource;
import org.apache.beam.sdk.io.synthetic.SyntheticOptions;
import org.apache.beam.sdk.io.synthetic.SyntheticSourceOptions;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.utils.JsonUtils;
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.apache.beam.sdk.testutils.NamedTestResult;
import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
import org.apache.beam.sdk.testutils.metrics.MetricsReader;
import org.apache.beam.sdk.testutils.metrics.TimeMonitor;
import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.Keys;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.joda.time.Duration;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIOIT.class */
public class KafkaIOIT {
    private static final String READ_TIME_METRIC_NAME = "read_time";
    private static final String WRITE_TIME_METRIC_NAME = "write_time";
    private static final String RUN_TIME_METRIC_NAME = "run_time";
    private static final String READ_ELEMENT_METRIC_NAME = "kafka_read_element_count";
    private static String expectedHashcode;
    private static SyntheticSourceOptions sourceOptions;
    private static Options options;
    private static InfluxDBSettings settings;

    @Rule
    public ExpectedLogs kafkaIOITExpectedLogs = ExpectedLogs.none(KafkaIOIT.class);

    @Rule
    public TestPipeline writePipeline = TestPipeline.create();

    @Rule
    public TestPipeline writePipeline2 = TestPipeline.create();

    @Rule
    public TestPipeline readPipeline = TestPipeline.create();

    @Rule
    public TestPipeline sdfReadPipeline = TestPipeline.fromOptions(sdfPipelineOptions);

    @Rule
    public TestPipeline sdfReadPipeline2 = TestPipeline.fromOptions(sdfPipelineOptions);
    private static KafkaContainer kafkaContainer;
    public static final Schema KAFKA_TOPIC_SCHEMA;
    public static final String SCHEMA_IN_JSON = "{\n  \"type\": \"object\",\n  \"properties\": {\n    \"name\": {\n      \"type\": \"string\"\n    },\n    \"userId\": {\n      \"type\": \"integer\"\n    },\n    \"age\": {\n      \"type\": \"integer\"\n    },\n    \"ageIsEven\": {\n      \"type\": \"boolean\"\n    },\n    \"temperature\": {\n      \"type\": \"number\"\n    },\n    \"childrenNames\": {\n      \"type\": \"array\",\n      \"items\": {\n        \"type\": \"string\"\n      }\n    }\n  }\n}";
    private static final int FIVE_MINUTES_IN_MS = 300000;
    private static final String NAMESPACE = KafkaIOIT.class.getName();
    private static final String TEST_ID = UUID.randomUUID().toString();
    private static final String TIMESTAMP = Timestamp.now().toString();
    private static final Logger LOG = LoggerFactory.getLogger(KafkaIOIT.class);
    private static ExperimentalOptions sdfPipelineOptions = PipelineOptionsFactory.create().as(ExperimentalOptions.class);

    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIOIT$CheckStopReadingFn.class */
    private static class CheckStopReadingFn implements SerializableFunction<TopicPartition, Boolean> {
        private CheckStopReadingFn() {
        }

        public Boolean apply(TopicPartition topicPartition) {
            return true;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIOIT$CountingFn.class */
    public static class CountingFn extends DoFn<String, Void> {
        private final Counter elementCounter;

        CountingFn(String str, String str2) {
            this.elementCounter = Metrics.counter(str, str2);
        }

        @DoFn.ProcessElement
        public void processElement() {
            this.elementCounter.inc(1L);
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIOIT$CrashOnExtra.class */
    public static class CrashOnExtra extends DoFn<String, String> {
        final Set<String> expected;

        public CrashOnExtra(Collection<String> collection) {
            this.expected = new HashSet(collection);
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element String str, DoFn.OutputReceiver<String> outputReceiver) {
            if (!this.expected.contains(str)) {
                throw new RuntimeException("Received unexpected element: " + str);
            }
            this.expected.remove(str);
            outputReceiver.output(str);
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIOIT$DelayedCheckStopReadingFn.class */
    private static class DelayedCheckStopReadingFn implements SerializableFunction<TopicPartition, Boolean> {
        int checkCount;

        private DelayedCheckStopReadingFn() {
            this.checkCount = 0;
        }

        public Boolean apply(TopicPartition topicPartition) {
            if (this.checkCount >= 5) {
                return true;
            }
            this.checkCount++;
            return false;
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIOIT$KeyByPartition.class */
    private static class KeyByPartition extends DoFn<KafkaRecord<Integer, String>, KV<Integer, KafkaRecord<Integer, String>>> {
        private KeyByPartition() {
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element KafkaRecord<Integer, String> kafkaRecord, DoFn.OutputReceiver<KV<Integer, KafkaRecord<Integer, String>>> outputReceiver) {
            outputReceiver.output(KV.of(Integer.valueOf(kafkaRecord.getPartition()), kafkaRecord));
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIOIT$LogFn.class */
    public static class LogFn extends DoFn<String, String> {
        @DoFn.ProcessElement
        public void processElement(@DoFn.Element String str, DoFn.OutputReceiver<String> outputReceiver) {
            KafkaIOIT.LOG.error(str);
            outputReceiver.output(str);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIOIT$MapKafkaRecordsToStrings.class */
    public static class MapKafkaRecordsToStrings extends SimpleFunction<KafkaRecord<byte[], byte[]>, String> {
        private MapKafkaRecordsToStrings() {
        }

        public String apply(KafkaRecord<byte[], byte[]> kafkaRecord) {
            return String.format("%s %s", Arrays.toString((byte[]) kafkaRecord.getKV().getKey()), Arrays.toString((byte[]) kafkaRecord.getKV().getValue()));
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaIOIT$Options.class */
    public interface Options extends IOTestPipelineOptions, StreamingOptions {
        @Description("Options for synthetic source.")
        @Validation.Required
        String getSourceOptions();

        void setSourceOptions(String str);

        @Default.String("localhost:9092")
        @Description("Kafka bootstrap server addresses")
        String getKafkaBootstrapServerAddresses();

        void setKafkaBootstrapServerAddresses(String str);

        @Description("Kafka topic")
        @Validation.Required
        String getKafkaTopic();

        void setKafkaTopic(String str);

        @Description("Time to wait for the events to be processed by the read pipeline (in seconds)")
        @Validation.Required
        Integer getReadTimeout();

        void setReadTimeout(Integer num);

        @Description("Whether to use testcontainers")
        @Default.Boolean(false)
        Boolean isWithTestcontainers();

        void setWithTestcontainers(Boolean bool);

        @Description("Kafka container version in format 'X.Y.Z'. Use when useTestcontainers is true")
        String getKafkaContainerVersion();

        void setKafkaContainerVersion(String str);
    }

    @BeforeClass
    public static void setup() throws IOException {
        options = IOITHelper.readIOTestPipelineOptions(Options.class);
        sourceOptions = SyntheticOptions.fromJsonString(options.getSourceOptions(), SyntheticSourceOptions.class);
        if (options.isWithTestcontainers().booleanValue()) {
            setupKafkaContainer();
        } else {
            settings = InfluxDBSettings.builder().withHost(options.getInfluxHost()).withDatabase(options.getInfluxDatabase()).withMeasurement(options.getInfluxMeasurement()).get();
        }
    }

    @AfterClass
    public static void afterClass() {
        if (kafkaContainer != null) {
            kafkaContainer.stop();
        }
    }

    @Test
    public void testKafkaIOReadsAndWritesCorrectlyInStreaming() throws IOException {
        this.writePipeline.apply("Generate records", Read.from(new SyntheticBoundedSource(sourceOptions))).apply("Measure write time", ParDo.of(new TimeMonitor(NAMESPACE, WRITE_TIME_METRIC_NAME))).apply("Write to Kafka", writeToKafka().withTopic(options.getKafkaTopic()));
        this.readPipeline.getOptions().as(Options.class).setStreaming(true);
        this.readPipeline.apply("Read from unbounded Kafka", readFromKafka().withTopic(options.getKafkaTopic())).apply("Measure read time", ParDo.of(new TimeMonitor(NAMESPACE, READ_TIME_METRIC_NAME))).apply("Map records to strings", MapElements.via(new MapKafkaRecordsToStrings())).apply("Counting element", ParDo.of(new CountingFn(NAMESPACE, READ_ELEMENT_METRIC_NAME)));
        PipelineResult run = this.writePipeline.run();
        Assert.assertNotEquals(PipelineResult.State.FAILED, run.waitUntilFinish());
        PipelineResult run2 = this.readPipeline.run();
        PipelineResult.State waitUntilFinish = run2.waitUntilFinish(Duration.standardSeconds(options.getReadTimeout().intValue()));
        tearDownTopic(options.getKafkaTopic());
        cancelIfTimeouted(run2, waitUntilFinish);
        long readElementMetric = readElementMetric(run2, NAMESPACE, READ_ELEMENT_METRIC_NAME);
        Assert.assertTrue(String.format("actual number of records %d smaller than expected: %d.", Long.valueOf(readElementMetric), Long.valueOf(sourceOptions.numRecords)), sourceOptions.numRecords <= readElementMetric);
        if (!options.isWithTestcontainers().booleanValue()) {
            IOITMetrics.publishToInflux(TEST_ID, TIMESTAMP, readMetrics(run, run2), settings);
        }
        Assert.assertNotEquals(PipelineResult.State.FAILED, waitUntilFinish);
    }

    @Test
    public void testKafkaIOReadsAndWritesCorrectlyInBatch() throws IOException {
        expectedHashcode = getHashForRecordCount(sourceOptions.numRecords, ImmutableMap.of(1000L, "4507649971ee7c51abbb446e65a5c660", 100000000L, "0f12c27c9a7672e14775594be66cad9a"));
        this.writePipeline.apply("Generate records", Read.from(new SyntheticBoundedSource(sourceOptions))).apply("Measure write time", ParDo.of(new TimeMonitor(NAMESPACE, WRITE_TIME_METRIC_NAME))).apply("Write to Kafka", writeToKafka().withTopic(options.getKafkaTopic()));
        PAssert.thatSingleton(this.readPipeline.apply("Read from bounded Kafka", readFromBoundedKafka().withTopic(options.getKafkaTopic())).apply("Measure read time", ParDo.of(new TimeMonitor(NAMESPACE, READ_TIME_METRIC_NAME))).apply("Map records to strings", MapElements.via(new MapKafkaRecordsToStrings())).apply("Calculate hashcode", Combine.globally(new HashingFn()).withoutDefaults())).isEqualTo(expectedHashcode);
        PipelineResult run = this.writePipeline.run();
        run.waitUntilFinish();
        PipelineResult run2 = this.readPipeline.run();
        PipelineResult.State waitUntilFinish = run2.waitUntilFinish(Duration.standardSeconds(options.getReadTimeout().intValue()));
        tearDownTopic(options.getKafkaTopic());
        cancelIfTimeouted(run2, waitUntilFinish);
        Assert.assertEquals(PipelineResult.State.DONE, waitUntilFinish);
        if (options.isWithTestcontainers().booleanValue()) {
            return;
        }
        IOITMetrics.publishToInflux(TEST_ID, TIMESTAMP, readMetrics(run, run2), settings);
    }

    @Test
    public void testKafkaIOSDFResumesCorrectly() throws IOException {
        roundtripElements("first-pass", 4, this.writePipeline, this.sdfReadPipeline);
        roundtripElements("second-pass", 3, this.writePipeline2, this.sdfReadPipeline2);
    }

    private void roundtripElements(String str, Integer num, TestPipeline testPipeline, TestPipeline testPipeline2) throws IOException {
        AdminClient.create(ImmutableMap.of("bootstrap.servers", options.getKafkaBootstrapServerAddresses())).listTopics();
        HashMap hashMap = new HashMap();
        for (int i = 0; i < num.intValue(); i++) {
            hashMap.put(Integer.valueOf(i), str + "-" + i);
        }
        testPipeline.apply("Generate Write Elements", Create.of(hashMap)).apply("Write to Kafka", KafkaIO.write().withBootstrapServers(options.getKafkaBootstrapServerAddresses()).withTopic(options.getKafkaTopic() + "-resuming").withKeySerializer(IntegerSerializer.class).withValueSerializer(StringSerializer.class));
        testPipeline.run().waitUntilFinish(Duration.standardSeconds(10L));
        testPipeline2.apply("Read from Kafka", KafkaIO.read().withBootstrapServers(options.getKafkaBootstrapServerAddresses()).withConsumerConfigUpdates(ImmutableMap.of("group.id", "resuming-group", "auto.offset.reset", "earliest", "enable.auto.commit", "true")).withTopic(options.getKafkaTopic() + "-resuming").withKeyDeserializer(IntegerDeserializer.class).withValueDeserializer(StringDeserializer.class).withoutMetadata()).apply("Get Values", Values.create()).apply(ParDo.of(new CrashOnExtra(hashMap.values()))).apply(ParDo.of(new LogFn()));
        testPipeline2.run().waitUntilFinish(Duration.standardSeconds(options.getReadTimeout().intValue()));
        Iterator it = hashMap.values().iterator();
        while (it.hasNext()) {
            this.kafkaIOITExpectedLogs.verifyError((String) it.next());
        }
    }

    @Test
    public void testKafkaIOExternalRoundtripWithMetadataAndNullKeysAndValues() throws IOException {
        this.writePipeline.apply(Create.of(KV.of((Object) null, (Object) null), new KV[0])).apply("Write to Kafka", writeToKafka().withTopic(options.getKafkaTopic() + "-nullRoundTrip"));
        PAssert.thatSingleton(this.readPipeline.apply(KafkaIO.read().withBootstrapServers(options.getKafkaBootstrapServerAddresses()).withTopic(options.getKafkaTopic() + "-nullRoundTrip").withConsumerConfigUpdates(ImmutableMap.of("auto.offset.reset", "earliest")).withKeyDeserializerAndCoder(ByteArrayDeserializer.class, NullableCoder.of(ByteArrayCoder.of())).withValueDeserializerAndCoder(ByteArrayDeserializer.class, NullableCoder.of(ByteArrayCoder.of())).withMaxNumRecords(1L).externalWithMetadata())).satisfies(row -> {
            Assert.assertNull(row.getString("key"));
            Assert.assertNull(row.getString("value"));
            return null;
        });
        this.writePipeline.run().waitUntilFinish();
        PipelineResult run = this.readPipeline.run();
        cancelIfTimeouted(run, run.waitUntilFinish(Duration.standardSeconds(options.getReadTimeout().intValue())));
    }

    @Test
    public void testKafkaWithDynamicPartitions() throws IOException {
        AdminClient create = AdminClient.create(ImmutableMap.of("bootstrap.servers", options.getKafkaBootstrapServerAddresses()));
        String str = "DynamicTopicPartition-" + UUID.randomUUID();
        HashMap hashMap = new HashMap();
        for (int i = 0; i < 100; i++) {
            hashMap.put(Integer.valueOf(i), String.valueOf(i));
        }
        HashMap hashMap2 = new HashMap();
        for (int i2 = 100; i2 < 200; i2++) {
            hashMap2.put(Integer.valueOf(i2), String.valueOf(i2));
        }
        try {
            create.createTopics(ImmutableSet.of(new NewTopic(str, 1, (short) 1)));
            create.createPartitions(ImmutableMap.of(str, NewPartitions.increaseTo(1)));
            this.writePipeline.apply("Generate Write Elements", Create.of(hashMap)).apply("Write to Kafka", KafkaIO.write().withBootstrapServers(options.getKafkaBootstrapServerAddresses()).withTopic(str).withKeySerializer(IntegerSerializer.class).withValueSerializer(StringSerializer.class));
            this.writePipeline.run().waitUntilFinish(Duration.standardSeconds(15L));
            new Thread(() -> {
                try {
                    Thread.sleep(20000L);
                    create.createPartitions(ImmutableMap.of(str, NewPartitions.increaseTo(2)));
                    this.writePipeline.apply("Second Pass generate Write Elements", Create.of(hashMap2)).apply("Write more to Kafka", KafkaIO.write().withBootstrapServers(options.getKafkaBootstrapServerAddresses()).withTopic(str).withKeySerializer(IntegerSerializer.class).withValueSerializer(StringSerializer.class));
                    this.writePipeline.run().waitUntilFinish(Duration.standardSeconds(15L));
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }).start();
            PAssert.that(this.sdfReadPipeline.apply("Read from Kafka", KafkaIO.read().withBootstrapServers(options.getKafkaBootstrapServerAddresses()).withConsumerConfigUpdates(ImmutableMap.of("auto.offset.reset", "earliest")).withTopic(str).withDynamicRead(Duration.standardSeconds(5L)).withKeyDeserializer(IntegerDeserializer.class).withValueDeserializer(StringDeserializer.class)).apply("Key by Partition", ParDo.of(new KeyByPartition())).apply(Window.into(FixedWindows.of(Duration.standardMinutes(1L)))).apply("Group by Partition", GroupByKey.create()).apply("Get Partitions", Keys.create())).containsInAnyOrder(new Integer[]{0, 1});
            PipelineResult run = this.sdfReadPipeline.run();
            PipelineResult.State waitUntilFinish = run.waitUntilFinish(Duration.standardSeconds(options.getReadTimeout().intValue() / 2));
            cancelIfTimeouted(run, waitUntilFinish);
            Assert.assertNotEquals(PipelineResult.State.FAILED, waitUntilFinish);
            create.deleteTopics(ImmutableSet.of(str));
        } catch (Throwable th) {
            create.deleteTopics(ImmutableSet.of(str));
            throw th;
        }
    }

    @Test
    public void testKafkaWithStopReadingFunction() {
        Assert.assertEquals(-1L, readElementMetric(runWithStopReadingFn(new CheckStopReadingFn(), "stop-reading"), NAMESPACE, READ_ELEMENT_METRIC_NAME));
    }

    @Test
    public void testKafkaWithDelayedStopReadingFunction() {
        Assert.assertEquals(sourceOptions.numRecords, readElementMetric(runWithStopReadingFn(new DelayedCheckStopReadingFn(), "delayed-stop-reading"), NAMESPACE, READ_ELEMENT_METRIC_NAME));
    }

    @Test(timeout = 300000)
    public void testKafkaViaSchemaTransformJson() {
        runReadWriteKafkaViaSchemaTransforms("JSON", SCHEMA_IN_JSON, JsonUtils.beamSchemaFromJsonSchema(SCHEMA_IN_JSON));
    }

    @Test(timeout = 300000)
    public void testKafkaViaSchemaTransformAvro() {
        runReadWriteKafkaViaSchemaTransforms("AVRO", AvroUtils.toAvroSchema(KAFKA_TOPIC_SCHEMA).toString(), KAFKA_TOPIC_SCHEMA);
    }

    public void runReadWriteKafkaViaSchemaTransforms(String str, String str2, Schema schema) {
        String str3 = options.getKafkaTopic() + "-schema-transform" + UUID.randomUUID();
        PCollectionRowTuple.of("input", this.writePipeline.apply("Generate records", GenerateSequence.from(0L).to(1000L)).apply("Transform to Beam Rows", MapElements.into(TypeDescriptors.rows()).via(l -> {
            return Row.withSchema(schema).withFieldValue("name", l.toString()).withFieldValue("userId", Long.valueOf(l.hashCode())).withFieldValue("age", Long.valueOf(l.intValue())).withFieldValue("ageIsEven", Boolean.valueOf(l.longValue() % 2 == 0)).withFieldValue("temperature", Double.valueOf(new Random(l.longValue()).nextDouble())).withFieldValue("childrenNames", Lists.newArrayList(new String[]{Long.toString(l.longValue() + 1), Long.toString(l.longValue() + 2)})).build();
        })).setRowSchema(schema)).apply("Write to Kafka", new KafkaWriteSchemaTransformProvider().from(KafkaWriteSchemaTransformProvider.KafkaWriteSchemaTransformConfiguration.builder().setTopic(str3).setBootstrapServers(options.getKafkaBootstrapServerAddresses()).setFormat(str).build()).buildTransform());
        PAssert.that(PCollectionRowTuple.empty(this.readPipeline).apply("Read from unbounded Kafka", new KafkaReadSchemaTransformProvider(true, Integer.valueOf(options.isWithTestcontainers().booleanValue() ? 30 : 120)).from(KafkaReadSchemaTransformConfiguration.builder().setFormat(str).setAutoOffsetResetConfig("earliest").setSchema(str2).setTopic(str3).setBootstrapServers(options.getKafkaBootstrapServerAddresses()).build()).buildTransform()).get("output")).containsInAnyOrder((Iterable) LongStream.range(0L, 1000L).mapToObj(j -> {
            return Row.withSchema(schema).withFieldValue("name", Long.toString(j)).withFieldValue("userId", Long.valueOf(Long.hashCode(j))).withFieldValue("age", Long.valueOf(j)).withFieldValue("ageIsEven", Boolean.valueOf(j % 2 == 0)).withFieldValue("temperature", Double.valueOf(new Random(j).nextDouble())).withFieldValue("childrenNames", Lists.newArrayList(new String[]{Long.toString(j + 1), Long.toString(j + 2)})).build();
        }).collect(Collectors.toList()));
        this.writePipeline.run().waitUntilFinish();
        PipelineResult run = this.readPipeline.run();
        run.waitUntilFinish(Duration.standardSeconds(options.getReadTimeout().intValue()));
        Assert.assertEquals(PipelineResult.State.DONE, run.getState());
    }

    private PipelineResult runWithStopReadingFn(SerializableFunction<TopicPartition, Boolean> serializableFunction, String str) {
        this.writePipeline.apply("Generate records", Read.from(new SyntheticBoundedSource(sourceOptions))).apply("Measure write time", ParDo.of(new TimeMonitor(NAMESPACE, WRITE_TIME_METRIC_NAME))).apply("Write to Kafka", writeToKafka().withTopic(options.getKafkaTopic() + "-" + str));
        this.readPipeline.getOptions().as(Options.class).setStreaming(true);
        this.readPipeline.apply("Read from unbounded Kafka", readFromKafka().withTopic(options.getKafkaTopic() + "-" + str).withCheckStopReadingFn(serializableFunction)).apply("Measure read time", ParDo.of(new TimeMonitor(NAMESPACE, READ_TIME_METRIC_NAME))).apply("Map records to strings", MapElements.via(new MapKafkaRecordsToStrings())).apply("Counting element", ParDo.of(new CountingFn(NAMESPACE, READ_ELEMENT_METRIC_NAME)));
        this.writePipeline.run().waitUntilFinish();
        PipelineResult run = this.readPipeline.run();
        run.waitUntilFinish(Duration.standardSeconds(options.getReadTimeout().intValue()));
        return run;
    }

    @Test
    public void testWatermarkUpdateWithSparseMessages() throws IOException, InterruptedException {
        AdminClient create = AdminClient.create(ImmutableMap.of("bootstrap.servers", options.getKafkaBootstrapServerAddresses()));
        String str = "SparseDataTopicPartition-" + UUID.randomUUID();
        HashMap hashMap = new HashMap();
        for (int i = 0; i < 5; i++) {
            hashMap.put(Integer.valueOf(i), String.valueOf(i));
        }
        try {
            create.createTopics(ImmutableSet.of(new NewTopic(str, 1, (short) 1)));
            this.writePipeline.apply("Generate Write Elements", Create.of(hashMap)).apply("Write to Kafka", KafkaIO.write().withBootstrapServers(options.getKafkaBootstrapServerAddresses()).withTopic(str).withKeySerializer(IntegerSerializer.class).withValueSerializer(StringSerializer.class));
            this.writePipeline.run().waitUntilFinish(Duration.standardSeconds(15L));
            create.createPartitions(ImmutableMap.of(str, NewPartitions.increaseTo(3)));
            this.sdfReadPipeline.apply("Read from Kafka", KafkaIO.read().withBootstrapServers(options.getKafkaBootstrapServerAddresses()).withConsumerConfigUpdates(ImmutableMap.of("auto.offset.reset", "earliest")).withTopic(str).withKeyDeserializer(IntegerDeserializer.class).withValueDeserializer(StringDeserializer.class).withoutMetadata()).apply(Window.into(FixedWindows.of(Duration.standardMinutes(1L)))).apply("GroupKey", GroupByKey.create()).apply("GetValues", Values.create()).apply("Flatten", Flatten.iterables()).apply("LogValues", ParDo.of(new LogFn()));
            PipelineResult run = this.sdfReadPipeline.run();
            Thread.sleep(options.getReadTimeout().intValue() * 1000);
            Iterator it = hashMap.values().iterator();
            while (it.hasNext()) {
                this.kafkaIOITExpectedLogs.verifyError((String) it.next());
            }
            PipelineResult.State waitUntilFinish = run.waitUntilFinish(Duration.standardSeconds(5L));
            cancelIfTimeouted(run, waitUntilFinish);
            Assert.assertNotEquals(waitUntilFinish, PipelineResult.State.FAILED);
            create.deleteTopics(ImmutableSet.of(str));
        } catch (Throwable th) {
            create.deleteTopics(ImmutableSet.of(str));
            throw th;
        }
    }

    private long readElementMetric(PipelineResult pipelineResult, String str, String str2) {
        return new MetricsReader(pipelineResult, str).getCounterMetric(str2);
    }

    private Set<NamedTestResult> readMetrics(PipelineResult pipelineResult, PipelineResult pipelineResult2) {
        BiFunction biFunction = (metricsReader, str) -> {
            long startTimeMetric = metricsReader.getStartTimeMetric(str);
            return NamedTestResult.create(TEST_ID, TIMESTAMP, str, (metricsReader.getEndTimeMetric(str) - startTimeMetric) / 1000.0d);
        };
        NamedTestResult namedTestResult = (NamedTestResult) biFunction.apply(new MetricsReader(pipelineResult, NAMESPACE), WRITE_TIME_METRIC_NAME);
        NamedTestResult namedTestResult2 = (NamedTestResult) biFunction.apply(new MetricsReader(pipelineResult2, NAMESPACE), READ_TIME_METRIC_NAME);
        return ImmutableSet.of(namedTestResult2, namedTestResult, NamedTestResult.create(TEST_ID, TIMESTAMP, RUN_TIME_METRIC_NAME, namedTestResult.getValue() + namedTestResult2.getValue()));
    }

    private void cancelIfTimeouted(PipelineResult pipelineResult, PipelineResult.State state) throws IOException {
        if (state == null) {
            pipelineResult.cancel();
        }
    }

    private void tearDownTopic(String str) {
        AdminClient.create(ImmutableMap.of("bootstrap.servers", options.getKafkaBootstrapServerAddresses())).deleteTopics(Collections.singleton(str));
    }

    private KafkaIO.Write<byte[], byte[]> writeToKafka() {
        return KafkaIO.write().withBootstrapServers(options.getKafkaBootstrapServerAddresses()).withKeySerializer(ByteArraySerializer.class).withValueSerializer(ByteArraySerializer.class);
    }

    private KafkaIO.Read<byte[], byte[]> readFromBoundedKafka() {
        return readFromKafka().withMaxNumRecords(sourceOptions.numRecords);
    }

    private KafkaIO.Read<byte[], byte[]> readFromKafka() {
        return KafkaIO.readBytes().withBootstrapServers(options.getKafkaBootstrapServerAddresses()).withConsumerConfigUpdates(ImmutableMap.of("auto.offset.reset", "earliest"));
    }

    public static String getHashForRecordCount(long j, Map<Long, String> map) {
        String str = map.get(Long.valueOf(j));
        if (str == null) {
            throw new UnsupportedOperationException(String.format("No hash for that record count: %s", Long.valueOf(j)));
        }
        return str;
    }

    private static void setupKafkaContainer() {
        kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka").withTag(options.getKafkaContainerVersion()));
        kafkaContainer.withStartupAttempts(3);
        kafkaContainer.start();
        options.setKafkaBootstrapServerAddresses(kafkaContainer.getBootstrapServers());
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1317242147:
                if (implMethodName.equals("lambda$runReadWriteKafkaViaSchemaTransforms$9b56176c$1")) {
                    z = false;
                    break;
                }
                break;
            case 1919702153:
                if (implMethodName.equals("lambda$testKafkaIOExternalRoundtripWithMetadataAndNullKeysAndValues$43268ee4$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/kafka/KafkaIOIT") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/schemas/Schema;Ljava/lang/Long;)Lorg/apache/beam/sdk/values/Row;")) {
                    Schema schema = (Schema) serializedLambda.getCapturedArg(0);
                    return l -> {
                        return Row.withSchema(schema).withFieldValue("name", l.toString()).withFieldValue("userId", Long.valueOf(l.hashCode())).withFieldValue("age", Long.valueOf(l.intValue())).withFieldValue("ageIsEven", Boolean.valueOf(l.longValue() % 2 == 0)).withFieldValue("temperature", Double.valueOf(new Random(l.longValue()).nextDouble())).withFieldValue("childrenNames", Lists.newArrayList(new String[]{Long.toString(l.longValue() + 1), Long.toString(l.longValue() + 2)})).build();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/kafka/KafkaIOIT") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/values/Row;)Ljava/lang/Void;")) {
                    return row -> {
                        Assert.assertNull(row.getString("key"));
                        Assert.assertNull(row.getString("value"));
                        return null;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }

    static {
        ExperimentalOptions.addExperiment(sdfPipelineOptions, "use_sdf_read");
        ExperimentalOptions.addExperiment(sdfPipelineOptions, "beam_fn_api");
        sdfPipelineOptions.as(TestPipelineOptions.class).setBlockOnRun(false);
        KAFKA_TOPIC_SCHEMA = Schema.builder().addStringField("name").addInt64Field("userId").addInt64Field("age").addBooleanField("ageIsEven").addDoubleField("temperature").addArrayField("childrenNames", Schema.FieldType.STRING).build();
    }
}
