/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.kafka;

import com.google.cloud.Timestamp;
import java.io.IOException;
import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.common.IOITHelper;
import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
import org.apache.beam.sdk.io.kafka.CheckStopReadingFn;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.io.kafka.KafkaIOTest;
import org.apache.beam.sdk.io.kafka.KafkaReadSchemaTransformConfiguration;
import org.apache.beam.sdk.io.kafka.KafkaReadSchemaTransformProvider;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.io.kafka.KafkaWriteSchemaTransformProvider;
import org.apache.beam.sdk.io.kafka.ReadFromKafkaDoFnTest;
import org.apache.beam.sdk.io.synthetic.SyntheticBoundedSource;
import org.apache.beam.sdk.io.synthetic.SyntheticOptions;
import org.apache.beam.sdk.io.synthetic.SyntheticSourceOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.utils.JsonUtils;
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.apache.beam.sdk.testutils.NamedTestResult;
import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
import org.apache.beam.sdk.testutils.metrics.MetricsReader;
import org.apache.beam.sdk.testutils.metrics.TimeMonitor;
import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineFnBase;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.Keys;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler;
import org.apache.beam.sdk.transforms.errorhandling.ErrorHandlingTestUtils;
import org.apache.beam.sdk.transforms.windowing.CalendarWindows;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

@RunWith(value=JUnit4.class)
public class KafkaIOIT {
    private static final String READ_TIME_METRIC_NAME = "read_time";
    private static final String WRITE_TIME_METRIC_NAME = "write_time";
    private static final String RUN_TIME_METRIC_NAME = "run_time";
    private static final String NAMESPACE = KafkaIOIT.class.getName();
    private static final String TEST_ID = UUID.randomUUID().toString();
    private static final String TIMESTAMP = Timestamp.now().toString();
    private static final Logger LOG = LoggerFactory.getLogger(KafkaIOIT.class);
    private static SyntheticSourceOptions sourceOptions;
    private static Options options;
    private static InfluxDBSettings settings;
    @Rule
    public ExpectedLogs kafkaIOITExpectedLogs = ExpectedLogs.none(KafkaIOIT.class);
    @Rule
    public TestPipeline writePipeline = TestPipeline.create();
    @Rule
    public TestPipeline writePipeline2 = TestPipeline.create();
    @Rule
    public TestPipeline readPipeline = TestPipeline.create();
    private static ExperimentalOptions sdfPipelineOptions;
    @Rule
    public TestPipeline sdfReadPipeline = TestPipeline.fromOptions((PipelineOptions)sdfPipelineOptions);
    @Rule
    public TestPipeline sdfReadPipeline2 = TestPipeline.fromOptions((PipelineOptions)sdfPipelineOptions);
    private static KafkaContainer kafkaContainer;
    public static final Schema KAFKA_TOPIC_SCHEMA;
    public static final String SCHEMA_IN_JSON = "{\n  \"type\": \"object\",\n  \"properties\": {\n    \"name\": {\n      \"type\": \"string\"\n    },\n    \"userId\": {\n      \"type\": \"integer\"\n    },\n    \"age\": {\n      \"type\": \"integer\"\n    },\n    \"ageIsEven\": {\n      \"type\": \"boolean\"\n    },\n    \"temperature\": {\n      \"type\": \"number\"\n    },\n    \"childrenNames\": {\n      \"type\": \"array\",\n      \"items\": {\n        \"type\": \"string\"\n      }\n    }\n  }\n}";
    private static final int FIVE_MINUTES_IN_MS = 300000;

    @BeforeClass
    public static void setup() throws IOException {
        options = (Options)IOITHelper.readIOTestPipelineOptions(Options.class);
        sourceOptions = (SyntheticSourceOptions)SyntheticOptions.fromJsonString((String)options.getSourceOptions(), SyntheticSourceOptions.class);
        if (options.isWithTestcontainers().booleanValue()) {
            KafkaIOIT.setupKafkaContainer();
        } else {
            settings = InfluxDBSettings.builder().withHost(options.getInfluxHost()).withDatabase(options.getInfluxDatabase()).withMeasurement(options.getInfluxMeasurement()).get();
        }
    }

    @AfterClass
    public static void afterClass() {
        if (kafkaContainer != null) {
            kafkaContainer.stop();
        }
    }

    @Test
    public void testKafkaIOReadsAndWritesCorrectlyInStreaming() throws IOException {
        ((PCollection)((PCollection)this.writePipeline.apply("Generate records", (PTransform)Read.from((BoundedSource)new SyntheticBoundedSource(sourceOptions)))).apply("Measure write time", (PTransform)ParDo.of((DoFn)new TimeMonitor(NAMESPACE, WRITE_TIME_METRIC_NAME)))).apply("Write to Kafka", (PTransform)this.writeToKafka().withTopic(options.getKafkaTopic()));
        ((Options)this.readPipeline.getOptions().as(Options.class)).setStreaming(true);
        PCollection count = (PCollection)((PCollection)((PCollection)((PCollection)this.readPipeline.apply("Read from unbounded Kafka", (PTransform)this.readFromKafka().withTopic(options.getKafkaTopic()))).apply("Measure read time", (PTransform)ParDo.of((DoFn)new TimeMonitor(NAMESPACE, READ_TIME_METRIC_NAME)))).apply("Window", (PTransform)Window.into((WindowFn)CalendarWindows.years((int)1)))).apply("Counting element", (PTransform)Combine.globally((CombineFnBase.GlobalCombineFn)Count.combineFn()).withoutDefaults());
        PipelineResult writeResult = this.writePipeline.run();
        PipelineResult.State writeState = writeResult.waitUntilFinish();
        Assert.assertNotEquals((Object)PipelineResult.State.FAILED, (Object)writeState);
        PAssert.thatSingleton((PCollection)count).isEqualTo((Object)KafkaIOIT.sourceOptions.numRecords);
        PipelineResult readResult = this.readPipeline.run();
        PipelineResult.State readState = readResult.waitUntilFinish(Duration.standardSeconds((long)options.getReadTimeout().intValue()));
        this.tearDownTopic(options.getKafkaTopic());
        this.cancelIfTimeouted(readResult, readState);
        if (!options.isWithTestcontainers().booleanValue()) {
            Set<NamedTestResult> metrics = this.readMetrics(writeResult, readResult);
            IOITMetrics.publishToInflux((String)TEST_ID, (String)TIMESTAMP, metrics, (InfluxDBSettings)settings);
        }
        Assert.assertNotEquals((Object)PipelineResult.State.FAILED, (Object)readState);
    }

    @Test
    public void testKafkaIOReadsAndWritesCorrectlyInBatch() throws IOException {
        ((PCollection)((PCollection)this.writePipeline.apply("Generate records", (PTransform)Read.from((BoundedSource)new SyntheticBoundedSource(sourceOptions)))).apply("Measure write time", (PTransform)ParDo.of((DoFn)new TimeMonitor(NAMESPACE, WRITE_TIME_METRIC_NAME)))).apply("Write to Kafka", (PTransform)this.writeToKafka().withTopic(options.getKafkaTopic()));
        PCollection count = (PCollection)((PCollection)((PCollection)this.readPipeline.apply("Read from bounded Kafka", (PTransform)this.readFromBoundedKafka().withTopic(options.getKafkaTopic()))).apply("Measure read time", (PTransform)ParDo.of((DoFn)new TimeMonitor(NAMESPACE, READ_TIME_METRIC_NAME)))).apply("Counting element", Count.globally());
        PipelineResult writeResult = this.writePipeline.run();
        writeResult.waitUntilFinish();
        PAssert.thatSingleton((PCollection)count).isEqualTo((Object)KafkaIOIT.sourceOptions.numRecords);
        PipelineResult readResult = this.readPipeline.run();
        PipelineResult.State readState = readResult.waitUntilFinish(Duration.standardSeconds((long)options.getReadTimeout().intValue()));
        this.tearDownTopic(options.getKafkaTopic());
        this.cancelIfTimeouted(readResult, readState);
        Assert.assertNotEquals((Object)PipelineResult.State.FAILED, (Object)readState);
        if (!options.isWithTestcontainers().booleanValue()) {
            Set<NamedTestResult> metrics = this.readMetrics(writeResult, readResult);
            IOITMetrics.publishToInflux((String)TEST_ID, (String)TIMESTAMP, metrics, (InfluxDBSettings)settings);
        }
    }

    @Test
    public void testKafkaIOSDFResumesCorrectly() throws IOException {
        this.roundtripElements("first-pass", 4, this.writePipeline, this.sdfReadPipeline);
        this.roundtripElements("second-pass", 3, this.writePipeline2, this.sdfReadPipeline2);
    }

    private void roundtripElements(String recordPrefix, Integer recordCount, TestPipeline wPipeline, TestPipeline rPipeline) throws IOException {
        AdminClient client = AdminClient.create((Map)ImmutableMap.of((Object)"bootstrap.servers", (Object)options.getKafkaBootstrapServerAddresses()));
        client.listTopics();
        HashMap<Integer, String> records = new HashMap<Integer, String>();
        for (int i = 0; i < recordCount; ++i) {
            records.put(i, recordPrefix + "-" + i);
        }
        ((PCollection)wPipeline.apply("Generate Write Elements", (PTransform)Create.of(records))).apply("Write to Kafka", (PTransform)KafkaIO.write().withBootstrapServers(options.getKafkaBootstrapServerAddresses()).withTopic(options.getKafkaTopic() + "-resuming").withKeySerializer(IntegerSerializer.class).withValueSerializer(StringSerializer.class));
        wPipeline.run().waitUntilFinish(Duration.standardSeconds((long)10L));
        ((PCollection)((PCollection)((PCollection)rPipeline.apply("Read from Kafka", KafkaIO.read().withBootstrapServers(options.getKafkaBootstrapServerAddresses()).withConsumerConfigUpdates((Map)ImmutableMap.of((Object)"group.id", (Object)"resuming-group", (Object)"auto.offset.reset", (Object)"earliest", (Object)"enable.auto.commit", (Object)"true")).withTopic(options.getKafkaTopic() + "-resuming").withKeyDeserializer(IntegerDeserializer.class).withValueDeserializer(StringDeserializer.class).withoutMetadata())).apply("Get Values", (PTransform)Values.create())).apply((PTransform)ParDo.of((DoFn)new CrashOnExtra(records.values())))).apply((PTransform)ParDo.of((DoFn)new LogFn()));
        rPipeline.run().waitUntilFinish(Duration.standardSeconds((long)options.getReadTimeout().intValue()));
        for (String value : records.values()) {
            this.kafkaIOITExpectedLogs.verifyError(value);
        }
    }

    @Test
    public void testKafkaIOSDFReadWithErrorHandler() throws IOException {
        ((PCollection)this.writePipeline.apply((PTransform)Create.of((Object)KV.of((Object)"key", (Object)"val"), (Object[])new KV[0]))).apply("Write to Kafka", (PTransform)KafkaIO.write().withBootstrapServers(options.getKafkaBootstrapServerAddresses()).withKeySerializer(StringSerializer.class).withValueSerializer(StringSerializer.class).withTopic(options.getKafkaTopic() + "-failingDeserialization"));
        PipelineResult writeResult = this.writePipeline.run();
        PipelineResult.State writeState = writeResult.waitUntilFinish();
        Assert.assertNotEquals((Object)PipelineResult.State.FAILED, (Object)writeState);
        ErrorHandler.BadRecordErrorHandler eh = this.sdfReadPipeline.registerBadRecordErrorHandler((PTransform)new ErrorHandlingTestUtils.ErrorSinkTransform());
        this.sdfReadPipeline.apply((PTransform)KafkaIO.read().withBootstrapServers(options.getKafkaBootstrapServerAddresses()).withTopic(options.getKafkaTopic() + "-failingDeserialization").withConsumerConfigUpdates((Map)ImmutableMap.of((Object)"auto.offset.reset", (Object)"earliest")).withKeyDeserializer(ReadFromKafkaDoFnTest.FailingDeserializer.class).withValueDeserializer(ReadFromKafkaDoFnTest.FailingDeserializer.class).withBadRecordErrorHandler((ErrorHandler)eh));
        eh.close();
        PAssert.thatSingleton((PCollection)Objects.requireNonNull((PCollection)eh.getOutput())).isEqualTo((Object)1L);
        PipelineResult readResult = this.sdfReadPipeline.run();
        PipelineResult.State readState = readResult.waitUntilFinish(Duration.standardSeconds((long)options.getReadTimeout().intValue()));
        this.cancelIfTimeouted(readResult, readState);
        Assert.assertNotEquals((Object)PipelineResult.State.FAILED, (Object)readState);
    }

    @Test
    public void testKafkaIOWriteWithErrorHandler() throws IOException {
        ErrorHandler.BadRecordErrorHandler eh = this.writePipeline.registerBadRecordErrorHandler((PTransform)new ErrorHandlingTestUtils.ErrorSinkTransform());
        ((PCollection)this.writePipeline.apply("Create single KV", (PTransform)Create.of((Object)KV.of((Object)"key", (Object)4L), (Object[])new KV[0]))).apply("Write to Kafka", (PTransform)KafkaIO.write().withBootstrapServers(options.getKafkaBootstrapServerAddresses()).withKeySerializer(StringSerializer.class).withValueSerializer(KafkaIOTest.FailingLongSerializer.class).withTopic(options.getKafkaTopic() + "-failingSerialization").withBadRecordErrorHandler((ErrorHandler)eh));
        eh.close();
        PAssert.thatSingleton((PCollection)Objects.requireNonNull((PCollection)eh.getOutput())).isEqualTo((Object)1L);
        PipelineResult writeResult = this.writePipeline.run();
        PipelineResult.State writeState = writeResult.waitUntilFinish();
        Assert.assertNotEquals((Object)PipelineResult.State.FAILED, (Object)writeState);
    }

    @Test
    public void testKafkaIOExternalRoundtripWithMetadataAndNullKeysAndValues() throws IOException {
        ((PCollection)this.writePipeline.apply((PTransform)Create.of((Object)KV.of(null, null), (Object[])new KV[0]))).apply("Write to Kafka", (PTransform)this.writeToKafka().withTopic(options.getKafkaTopic() + "-nullRoundTrip"));
        PCollection rows = (PCollection)this.readPipeline.apply(KafkaIO.read().withBootstrapServers(options.getKafkaBootstrapServerAddresses()).withTopic(options.getKafkaTopic() + "-nullRoundTrip").withConsumerConfigUpdates((Map)ImmutableMap.of((Object)"auto.offset.reset", (Object)"earliest")).withKeyDeserializerAndCoder(ByteArrayDeserializer.class, (Coder)NullableCoder.of((Coder)ByteArrayCoder.of())).withValueDeserializerAndCoder(ByteArrayDeserializer.class, (Coder)NullableCoder.of((Coder)ByteArrayCoder.of())).withMaxNumRecords(1L).externalWithMetadata());
        PAssert.thatSingleton((PCollection)rows).satisfies((SerializableFunction & Serializable)actualRow -> {
            Assert.assertNull((Object)actualRow.getString("key"));
            Assert.assertNull((Object)actualRow.getString("value"));
            return null;
        });
        PipelineResult writeResult = this.writePipeline.run();
        writeResult.waitUntilFinish();
        PipelineResult readResult = this.readPipeline.run();
        PipelineResult.State readState = readResult.waitUntilFinish(Duration.standardSeconds((long)options.getReadTimeout().intValue()));
        this.cancelIfTimeouted(readResult, readState);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testKafkaWithDynamicPartitions() throws IOException {
        AdminClient client = AdminClient.create((Map)ImmutableMap.of((Object)"bootstrap.servers", (Object)options.getKafkaBootstrapServerAddresses()));
        String topicName = "DynamicTopicPartition-" + UUID.randomUUID();
        HashMap<Integer, String> records = new HashMap<Integer, String>();
        for (int i = 0; i < 100; ++i) {
            records.put(i, String.valueOf(i));
        }
        HashMap<Integer, String> moreRecords = new HashMap<Integer, String>();
        for (int i = 100; i < 200; ++i) {
            moreRecords.put(i, String.valueOf(i));
        }
        try {
            client.createTopics((Collection)ImmutableSet.of((Object)new NewTopic(topicName, 1, 1)));
            client.createPartitions((Map)ImmutableMap.of((Object)topicName, (Object)NewPartitions.increaseTo((int)1)));
            ((PCollection)this.writePipeline.apply("Generate Write Elements", (PTransform)Create.of(records))).apply("Write to Kafka", (PTransform)KafkaIO.write().withBootstrapServers(options.getKafkaBootstrapServerAddresses()).withTopic(topicName).withKeySerializer(IntegerSerializer.class).withValueSerializer(StringSerializer.class));
            this.writePipeline.run().waitUntilFinish(Duration.standardSeconds((long)15L));
            Thread delayedWriteThread = new Thread(() -> {
                try {
                    Thread.sleep(20000L);
                }
                catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                client.createPartitions((Map)ImmutableMap.of((Object)topicName, (Object)NewPartitions.increaseTo((int)2)));
                ((PCollection)this.writePipeline.apply("Second Pass generate Write Elements", (PTransform)Create.of((Map)moreRecords))).apply("Write more to Kafka", (PTransform)KafkaIO.write().withBootstrapServers(options.getKafkaBootstrapServerAddresses()).withTopic(topicName).withKeySerializer(IntegerSerializer.class).withValueSerializer(StringSerializer.class));
                this.writePipeline.run().waitUntilFinish(Duration.standardSeconds((long)15L));
            });
            delayedWriteThread.start();
            PCollection values = (PCollection)((PCollection)((PCollection)((PCollection)((PCollection)this.sdfReadPipeline.apply("Read from Kafka", (PTransform)KafkaIO.read().withBootstrapServers(options.getKafkaBootstrapServerAddresses()).withConsumerConfigUpdates((Map)ImmutableMap.of((Object)"auto.offset.reset", (Object)"earliest")).withTopic(topicName).withDynamicRead(Duration.standardSeconds((long)5L)).withKeyDeserializer(IntegerDeserializer.class).withValueDeserializer(StringDeserializer.class))).apply("Key by Partition", (PTransform)ParDo.of((DoFn)new KeyByPartition()))).apply((PTransform)Window.into((WindowFn)FixedWindows.of((Duration)Duration.standardMinutes((long)1L))))).apply("Group by Partition", (PTransform)GroupByKey.create())).apply("Get Partitions", (PTransform)Keys.create());
            PAssert.that((PCollection)values).containsInAnyOrder((Object[])new Integer[]{0, 1});
            PipelineResult readResult = this.sdfReadPipeline.run();
            PipelineResult.State readState = readResult.waitUntilFinish(Duration.standardSeconds((long)(options.getReadTimeout() / 2)));
            this.cancelIfTimeouted(readResult, readState);
            Assert.assertNotEquals((Object)PipelineResult.State.FAILED, (Object)readState);
        }
        finally {
            client.deleteTopics((Collection)ImmutableSet.of((Object)topicName));
        }
    }

    @Test
    public void testKafkaWithStopReadingFunction() {
        AlwaysStopCheckStopReadingFn checkStopReadingFn = new AlwaysStopCheckStopReadingFn();
        this.runWithStopReadingFn(checkStopReadingFn, "stop-reading", 0L);
    }

    @Test
    public void testKafkaWithDelayedStopReadingFunction() {
        DelayedCheckStopReadingFn checkStopReadingFn = new DelayedCheckStopReadingFn();
        this.runWithStopReadingFn(checkStopReadingFn, "delayed-stop-reading", KafkaIOIT.sourceOptions.numRecords);
    }

    @Test(timeout=300000L)
    public void testKafkaViaSchemaTransformJson() {
        this.runReadWriteKafkaViaSchemaTransforms("JSON", SCHEMA_IN_JSON, JsonUtils.beamSchemaFromJsonSchema((String)SCHEMA_IN_JSON));
    }

    @Test(timeout=300000L)
    public void testKafkaViaSchemaTransformAvro() {
        this.runReadWriteKafkaViaSchemaTransforms("AVRO", AvroUtils.toAvroSchema((Schema)KAFKA_TOPIC_SCHEMA).toString(), KAFKA_TOPIC_SCHEMA);
    }

    public void runReadWriteKafkaViaSchemaTransforms(String format, String schemaDefinition, Schema beamSchema) {
        String topicName = options.getKafkaTopic() + "-schema-transform" + UUID.randomUUID();
        PCollectionRowTuple.of((String)"input", (PCollection)((PCollection)((PCollection)this.writePipeline.apply("Generate records", (PTransform)GenerateSequence.from((long)0L).to(1000L))).apply("Transform to Beam Rows", (PTransform)MapElements.into((TypeDescriptor)TypeDescriptors.rows()).via((SerializableFunction & Serializable)numb -> Row.withSchema((Schema)beamSchema).withFieldValue("name", (Object)numb.toString()).withFieldValue("userId", (Object)numb.hashCode()).withFieldValue("age", (Object)numb.intValue()).withFieldValue("ageIsEven", (Object)(numb % 2L == 0L ? 1 : 0)).withFieldValue("temperature", (Object)new Random((long)numb).nextDouble()).withFieldValue("childrenNames", (Object)Lists.newArrayList((Object[])new String[]{Long.toString(numb + 1L), Long.toString(numb + 2L)})).build()))).setRowSchema(beamSchema)).apply("Write to Kafka", (PTransform)new KafkaWriteSchemaTransformProvider().from(KafkaWriteSchemaTransformProvider.KafkaWriteSchemaTransformConfiguration.builder().setTopic(topicName).setBootstrapServers(options.getKafkaBootstrapServerAddresses()).setFormat(format).build()));
        PAssert.that((PCollection)((PCollectionRowTuple)PCollectionRowTuple.empty((Pipeline)this.readPipeline).apply("Read from unbounded Kafka", (PTransform)new KafkaReadSchemaTransformProvider(Boolean.valueOf(true), Integer.valueOf(options.isWithTestcontainers() != false ? 30 : 120)).from(KafkaReadSchemaTransformConfiguration.builder().setFormat(format).setAutoOffsetResetConfig("earliest").setSchema(schemaDefinition).setTopic(topicName).setBootstrapServers(options.getKafkaBootstrapServerAddresses()).build()))).get("output")).containsInAnyOrder((Iterable)LongStream.range(0L, 1000L).mapToObj(numb -> Row.withSchema((Schema)beamSchema).withFieldValue("name", (Object)Long.toString(numb)).withFieldValue("userId", (Object)Long.hashCode(numb)).withFieldValue("age", (Object)numb).withFieldValue("ageIsEven", (Object)(numb % 2L == 0L ? 1 : 0)).withFieldValue("temperature", (Object)new Random(numb).nextDouble()).withFieldValue("childrenNames", (Object)Lists.newArrayList((Object[])new String[]{Long.toString(numb + 1L), Long.toString(numb + 2L)})).build()).collect(Collectors.toList()));
        PipelineResult writeResult = this.writePipeline.run();
        writeResult.waitUntilFinish();
        PipelineResult readResult = this.readPipeline.run();
        readResult.waitUntilFinish(Duration.standardSeconds((long)options.getReadTimeout().intValue()));
        Assert.assertEquals((Object)PipelineResult.State.DONE, (Object)readResult.getState());
    }

    private void runWithStopReadingFn(CheckStopReadingFn function, String topicSuffix, Long expectedCount) {
        ((PCollection)((PCollection)this.writePipeline.apply("Generate records", (PTransform)Read.from((BoundedSource)new SyntheticBoundedSource(sourceOptions)))).apply("Measure write time", (PTransform)ParDo.of((DoFn)new TimeMonitor(NAMESPACE, WRITE_TIME_METRIC_NAME)))).apply("Write to Kafka", (PTransform)this.writeToKafka().withTopic(options.getKafkaTopic() + "-" + topicSuffix));
        ((Options)this.readPipeline.getOptions().as(Options.class)).setStreaming(true);
        PCollection count = (PCollection)((PCollection)((PCollection)((PCollection)this.readPipeline.apply("Read from unbounded Kafka", (PTransform)this.readFromKafka().withTopic(options.getKafkaTopic() + "-" + topicSuffix).withCheckStopReadingFn(function))).apply("Measure read time", (PTransform)ParDo.of((DoFn)new TimeMonitor(NAMESPACE, READ_TIME_METRIC_NAME)))).apply("Window", (PTransform)Window.into((WindowFn)CalendarWindows.years((int)1)))).apply("Counting element", (PTransform)Combine.globally((CombineFnBase.GlobalCombineFn)Count.combineFn()).withoutDefaults());
        if (expectedCount == 0L) {
            PAssert.that((PCollection)count).empty();
        } else {
            PAssert.thatSingleton((PCollection)count).isEqualTo((Object)expectedCount);
        }
        PipelineResult writeResult = this.writePipeline.run();
        writeResult.waitUntilFinish();
        PipelineResult readResult = this.readPipeline.run();
        readResult.waitUntilFinish(Duration.standardSeconds((long)options.getReadTimeout().intValue()));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testWatermarkUpdateWithSparseMessages() throws IOException, InterruptedException {
        AdminClient client = AdminClient.create((Map)ImmutableMap.of((Object)"bootstrap.servers", (Object)options.getKafkaBootstrapServerAddresses()));
        String topicName = "SparseDataTopicPartition-" + UUID.randomUUID();
        HashMap<Integer, String> records = new HashMap<Integer, String>();
        for (int i = 1; i <= 5; ++i) {
            records.put(i, String.valueOf(i));
        }
        try {
            client.createTopics((Collection)ImmutableSet.of((Object)new NewTopic(topicName, 1, 1)));
            ((PCollection)this.writePipeline.apply("Generate Write Elements", (PTransform)Create.of(records))).apply("Write to Kafka", (PTransform)KafkaIO.write().withBootstrapServers(options.getKafkaBootstrapServerAddresses()).withTopic(topicName).withKeySerializer(IntegerSerializer.class).withValueSerializer(StringSerializer.class));
            this.writePipeline.run().waitUntilFinish(Duration.standardSeconds((long)15L));
            client.createPartitions((Map)ImmutableMap.of((Object)topicName, (Object)NewPartitions.increaseTo((int)3)));
            ((PCollection)((PCollection)((PCollection)((PCollection)((PCollection)this.sdfReadPipeline.apply("Read from Kafka", KafkaIO.read().withBootstrapServers(options.getKafkaBootstrapServerAddresses()).withConsumerConfigUpdates((Map)ImmutableMap.of((Object)"auto.offset.reset", (Object)"earliest")).withTopic(topicName).withKeyDeserializer(IntegerDeserializer.class).withValueDeserializer(StringDeserializer.class).withoutMetadata())).apply((PTransform)Window.into((WindowFn)FixedWindows.of((Duration)Duration.standardMinutes((long)1L))))).apply("GroupKey", (PTransform)GroupByKey.create())).apply("GetValues", (PTransform)Values.create())).apply("Flatten", (PTransform)Flatten.iterables())).apply("LogValues", (PTransform)ParDo.of((DoFn)new LogFn()));
            PipelineResult readResult = this.sdfReadPipeline.run();
            Thread.sleep(options.getReadTimeout() * 1000 * 2);
            for (String value : records.values()) {
                this.kafkaIOITExpectedLogs.verifyError(value);
            }
            PipelineResult.State readState = readResult.waitUntilFinish(Duration.standardSeconds((long)5L));
            this.cancelIfTimeouted(readResult, readState);
            Assert.assertNotEquals((Object)readState, (Object)PipelineResult.State.FAILED);
        }
        finally {
            client.deleteTopics((Collection)ImmutableSet.of((Object)topicName));
        }
    }

    private Set<NamedTestResult> readMetrics(PipelineResult writeResult, PipelineResult readResult) {
        BiFunction<MetricsReader, String, NamedTestResult> supplier = (reader, metricName) -> {
            long start = reader.getStartTimeMetric(metricName);
            long end = reader.getEndTimeMetric(metricName);
            return NamedTestResult.create((String)TEST_ID, (String)TIMESTAMP, (String)metricName, (double)((double)(end - start) / 1000.0));
        };
        NamedTestResult writeTime = supplier.apply(new MetricsReader(writeResult, NAMESPACE), WRITE_TIME_METRIC_NAME);
        NamedTestResult readTime = supplier.apply(new MetricsReader(readResult, NAMESPACE), READ_TIME_METRIC_NAME);
        NamedTestResult runTime = NamedTestResult.create((String)TEST_ID, (String)TIMESTAMP, (String)RUN_TIME_METRIC_NAME, (double)(writeTime.getValue() + readTime.getValue()));
        return ImmutableSet.of((Object)readTime, (Object)writeTime, (Object)runTime);
    }

    private void cancelIfTimeouted(PipelineResult readResult, PipelineResult.State readState) throws IOException {
        if (readState == null) {
            readResult.cancel();
        }
    }

    private void tearDownTopic(String topicName) {
        AdminClient client = AdminClient.create((Map)ImmutableMap.of((Object)"bootstrap.servers", (Object)options.getKafkaBootstrapServerAddresses()));
        client.deleteTopics(Collections.singleton(topicName));
    }

    private KafkaIO.Write<byte[], byte[]> writeToKafka() {
        return KafkaIO.write().withBootstrapServers(options.getKafkaBootstrapServerAddresses()).withKeySerializer(ByteArraySerializer.class).withValueSerializer(ByteArraySerializer.class);
    }

    private KafkaIO.Read<byte[], byte[]> readFromBoundedKafka() {
        return this.readFromKafka().withMaxNumRecords(KafkaIOIT.sourceOptions.numRecords);
    }

    private KafkaIO.Read<byte[], byte[]> readFromKafka() {
        return KafkaIO.readBytes().withBootstrapServers(options.getKafkaBootstrapServerAddresses()).withConsumerConfigUpdates((Map)ImmutableMap.of((Object)"auto.offset.reset", (Object)"earliest"));
    }

    private static void setupKafkaContainer() {
        kafkaContainer = new KafkaContainer(DockerImageName.parse((String)"confluentinc/cp-kafka").withTag(options.getKafkaContainerVersion()));
        kafkaContainer.withStartupAttempts(3);
        kafkaContainer.start();
        options.setKafkaBootstrapServerAddresses(kafkaContainer.getBootstrapServers());
    }

    static {
        sdfPipelineOptions = (ExperimentalOptions)PipelineOptionsFactory.create().as(ExperimentalOptions.class);
        ExperimentalOptions.addExperiment((ExperimentalOptions)sdfPipelineOptions, (String)"use_sdf_read");
        ExperimentalOptions.addExperiment((ExperimentalOptions)sdfPipelineOptions, (String)"beam_fn_api");
        ((TestPipelineOptions)sdfPipelineOptions.as(TestPipelineOptions.class)).setBlockOnRun(false);
        KAFKA_TOPIC_SCHEMA = Schema.builder().addStringField("name").addInt64Field("userId").addInt64Field("age").addBooleanField("ageIsEven").addDoubleField("temperature").addArrayField("childrenNames", Schema.FieldType.STRING).build();
    }

    public static interface Options
    extends IOTestPipelineOptions,
    StreamingOptions {
        @Description(value="Options for synthetic source.")
        @Validation.Required
        public String getSourceOptions();

        public void setSourceOptions(String var1);

        @Description(value="Kafka bootstrap server addresses")
        @Default.String(value="localhost:9092")
        public String getKafkaBootstrapServerAddresses();

        public void setKafkaBootstrapServerAddresses(String var1);

        @Description(value="Kafka topic")
        @Validation.Required
        public String getKafkaTopic();

        public void setKafkaTopic(String var1);

        @Description(value="Time to wait for the events to be processed by the read pipeline (in seconds)")
        @Validation.Required
        public Integer getReadTimeout();

        public void setReadTimeout(Integer var1);

        @Description(value="Whether to use testcontainers")
        @Default.Boolean(value=false)
        public Boolean isWithTestcontainers();

        public void setWithTestcontainers(Boolean var1);

        @Description(value="Kafka container version in format 'X.Y.Z'. Use when useTestcontainers is true")
        public @Nullable String getKafkaContainerVersion();

        public void setKafkaContainerVersion(String var1);
    }

    private static class KeyByPartition
    extends DoFn<KafkaRecord<Integer, String>, KV<Integer, KafkaRecord<Integer, String>>> {
        private KeyByPartition() {
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element KafkaRecord<Integer, String> record, DoFn.OutputReceiver<KV<Integer, KafkaRecord<Integer, String>>> receiver) {
            receiver.output((Object)KV.of((Object)record.getPartition(), record));
        }
    }

    private static class DelayedCheckStopReadingFn
    implements CheckStopReadingFn {
        int checkCount = 0;

        private DelayedCheckStopReadingFn() {
        }

        public Boolean apply(TopicPartition input) {
            if (this.checkCount >= 10) {
                return true;
            }
            ++this.checkCount;
            return false;
        }
    }

    private static class AlwaysStopCheckStopReadingFn
    implements CheckStopReadingFn {
        private AlwaysStopCheckStopReadingFn() {
        }

        public Boolean apply(TopicPartition input) {
            return true;
        }
    }

    public static class LogFn
    extends DoFn<String, String> {
        @DoFn.ProcessElement
        public void processElement(@DoFn.Element String element, DoFn.OutputReceiver<String> outputReceiver) {
            LOG.error(element);
            outputReceiver.output((Object)element);
        }
    }

    public static class CrashOnExtra
    extends DoFn<String, String> {
        final Set<String> expected;

        public CrashOnExtra(Collection<String> records) {
            this.expected = new HashSet<String>(records);
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element String element, DoFn.OutputReceiver<String> outputReceiver) {
            if (!this.expected.contains(element)) {
                throw new RuntimeException("Received unexpected element: " + element);
            }
            this.expected.remove(element);
            outputReceiver.output((Object)element);
        }
    }
}

