/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.kafka;

import com.google.cloud.Timestamp;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.function.BiFunction;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.common.HashingFn;
import org.apache.beam.sdk.io.common.IOITHelper;
import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.io.synthetic.SyntheticBoundedSource;
import org.apache.beam.sdk.io.synthetic.SyntheticOptions;
import org.apache.beam.sdk.io.synthetic.SyntheticSourceOptions;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testutils.NamedTestResult;
import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
import org.apache.beam.sdk.testutils.metrics.MetricsReader;
import org.apache.beam.sdk.testutils.metrics.TimeMonitor;
import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineFnBase;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

@RunWith(value=JUnit4.class)
public class KafkaIOIT {
    private static final String READ_TIME_METRIC_NAME = "read_time";
    private static final String WRITE_TIME_METRIC_NAME = "write_time";
    private static final String RUN_TIME_METRIC_NAME = "run_time";
    private static final String READ_ELEMENT_METRIC_NAME = "kafka_read_element_count";
    private static final String NAMESPACE = KafkaIOIT.class.getName();
    private static final String TEST_ID = UUID.randomUUID().toString();
    private static final String TIMESTAMP = Timestamp.now().toString();
    private static String expectedHashcode;
    private static SyntheticSourceOptions sourceOptions;
    private static Options options;
    private static InfluxDBSettings settings;
    @Rule
    public TestPipeline writePipeline = TestPipeline.create();
    @Rule
    public TestPipeline readPipeline = TestPipeline.create();
    private static KafkaContainer kafkaContainer;

    @BeforeClass
    public static void setup() throws IOException {
        options = (Options)IOITHelper.readIOTestPipelineOptions(Options.class);
        sourceOptions = (SyntheticSourceOptions)SyntheticOptions.fromJsonString((String)options.getSourceOptions(), SyntheticSourceOptions.class);
        if (options.isWithTestcontainers().booleanValue()) {
            KafkaIOIT.setupKafkaContainer();
        } else {
            settings = InfluxDBSettings.builder().withHost(options.getInfluxHost()).withDatabase(options.getInfluxDatabase()).withMeasurement(options.getInfluxMeasurement()).get();
        }
    }

    @AfterClass
    public static void afterClass() {
        if (kafkaContainer != null) {
            kafkaContainer.stop();
        }
    }

    @Test
    public void testKafkaIOReadsAndWritesCorrectlyInStreaming() throws IOException {
        ((PCollection)((PCollection)this.writePipeline.apply("Generate records", (PTransform)Read.from((BoundedSource)new SyntheticBoundedSource(sourceOptions)))).apply("Measure write time", (PTransform)ParDo.of((DoFn)new TimeMonitor(NAMESPACE, WRITE_TIME_METRIC_NAME)))).apply("Write to Kafka", this.writeToKafka());
        ((Options)this.readPipeline.getOptions().as(Options.class)).setStreaming(true);
        ((PCollection)((PCollection)((PCollection)this.readPipeline.apply("Read from unbounded Kafka", this.readFromKafka())).apply("Measure read time", (PTransform)ParDo.of((DoFn)new TimeMonitor(NAMESPACE, READ_TIME_METRIC_NAME)))).apply("Map records to strings", (PTransform)MapElements.via((SimpleFunction)new MapKafkaRecordsToStrings()))).apply("Counting element", (PTransform)ParDo.of((DoFn)new CountingFn(NAMESPACE, READ_ELEMENT_METRIC_NAME)));
        PipelineResult writeResult = this.writePipeline.run();
        writeResult.waitUntilFinish();
        PipelineResult readResult = this.readPipeline.run();
        PipelineResult.State readState = readResult.waitUntilFinish(Duration.standardSeconds((long)options.getReadTimeout().intValue()));
        this.cancelIfTimeouted(readResult, readState);
        Assert.assertEquals((long)KafkaIOIT.sourceOptions.numRecords, (long)this.readElementMetric(readResult, NAMESPACE, READ_ELEMENT_METRIC_NAME));
        if (!options.isWithTestcontainers().booleanValue()) {
            Set<NamedTestResult> metrics = this.readMetrics(writeResult, readResult);
            IOITMetrics.publishToInflux((String)TEST_ID, (String)TIMESTAMP, metrics, (InfluxDBSettings)settings);
        }
    }

    @Test
    public void testKafkaIOReadsAndWritesCorrectlyInBatch() throws IOException {
        ImmutableMap expectedHashes = ImmutableMap.of((Object)1000L, (Object)"4507649971ee7c51abbb446e65a5c660", (Object)100000000L, (Object)"0f12c27c9a7672e14775594be66cad9a");
        expectedHashcode = KafkaIOIT.getHashForRecordCount(KafkaIOIT.sourceOptions.numRecords, (Map<Long, String>)expectedHashes);
        ((PCollection)((PCollection)this.writePipeline.apply("Generate records", (PTransform)Read.from((BoundedSource)new SyntheticBoundedSource(sourceOptions)))).apply("Measure write time", (PTransform)ParDo.of((DoFn)new TimeMonitor(NAMESPACE, WRITE_TIME_METRIC_NAME)))).apply("Write to Kafka", this.writeToKafka());
        PCollection hashcode = (PCollection)((PCollection)((PCollection)((PCollection)this.readPipeline.apply("Read from bounded Kafka", this.readFromBoundedKafka())).apply("Measure read time", (PTransform)ParDo.of((DoFn)new TimeMonitor(NAMESPACE, READ_TIME_METRIC_NAME)))).apply("Map records to strings", (PTransform)MapElements.via((SimpleFunction)new MapKafkaRecordsToStrings()))).apply("Calculate hashcode", (PTransform)Combine.globally((CombineFnBase.GlobalCombineFn)new HashingFn()).withoutDefaults());
        PAssert.thatSingleton((PCollection)hashcode).isEqualTo((Object)expectedHashcode);
        PipelineResult writeResult = this.writePipeline.run();
        writeResult.waitUntilFinish();
        PipelineResult readResult = this.readPipeline.run();
        PipelineResult.State readState = readResult.waitUntilFinish(Duration.standardSeconds((long)options.getReadTimeout().intValue()));
        this.cancelIfTimeouted(readResult, readState);
        if (!options.isWithTestcontainers().booleanValue()) {
            Set<NamedTestResult> metrics = this.readMetrics(writeResult, readResult);
            IOITMetrics.publishToInflux((String)TEST_ID, (String)TIMESTAMP, metrics, (InfluxDBSettings)settings);
        }
    }

    private long readElementMetric(PipelineResult result, String namespace, String name) {
        MetricsReader metricsReader = new MetricsReader(result, namespace);
        return metricsReader.getCounterMetric(name);
    }

    private Set<NamedTestResult> readMetrics(PipelineResult writeResult, PipelineResult readResult) {
        BiFunction<MetricsReader, String, NamedTestResult> supplier = (reader, metricName) -> {
            long start = reader.getStartTimeMetric(metricName);
            long end = reader.getEndTimeMetric(metricName);
            return NamedTestResult.create((String)TEST_ID, (String)TIMESTAMP, (String)metricName, (double)((double)(end - start) / 1000.0));
        };
        NamedTestResult writeTime = supplier.apply(new MetricsReader(writeResult, NAMESPACE), WRITE_TIME_METRIC_NAME);
        NamedTestResult readTime = supplier.apply(new MetricsReader(readResult, NAMESPACE), READ_TIME_METRIC_NAME);
        NamedTestResult runTime = NamedTestResult.create((String)TEST_ID, (String)TIMESTAMP, (String)RUN_TIME_METRIC_NAME, (double)(writeTime.getValue() + readTime.getValue()));
        return ImmutableSet.of((Object)readTime, (Object)writeTime, (Object)runTime);
    }

    private void cancelIfTimeouted(PipelineResult readResult, PipelineResult.State readState) throws IOException {
        if (readState == null) {
            readResult.cancel();
        }
    }

    private KafkaIO.Write<byte[], byte[]> writeToKafka() {
        return KafkaIO.write().withBootstrapServers(options.getKafkaBootstrapServerAddresses()).withTopic(options.getKafkaTopic()).withKeySerializer(ByteArraySerializer.class).withValueSerializer(ByteArraySerializer.class);
    }

    private KafkaIO.Read<byte[], byte[]> readFromBoundedKafka() {
        return this.readFromKafka().withMaxNumRecords(KafkaIOIT.sourceOptions.numRecords);
    }

    private KafkaIO.Read<byte[], byte[]> readFromKafka() {
        return KafkaIO.readBytes().withBootstrapServers(options.getKafkaBootstrapServerAddresses()).withConsumerConfigUpdates((Map)ImmutableMap.of((Object)"auto.offset.reset", (Object)"earliest")).withTopic(options.getKafkaTopic());
    }

    public static String getHashForRecordCount(long recordCount, Map<Long, String> hashes) {
        String hash = hashes.get(recordCount);
        if (hash == null) {
            throw new UnsupportedOperationException(String.format("No hash for that record count: %s", recordCount));
        }
        return hash;
    }

    private static void setupKafkaContainer() {
        kafkaContainer = new KafkaContainer(DockerImageName.parse((String)"confluentinc/cp-kafka").withTag(options.getKafkaContainerVersion()));
        kafkaContainer.start();
        options.setKafkaBootstrapServerAddresses(kafkaContainer.getBootstrapServers());
    }

    private static class MapKafkaRecordsToStrings
    extends SimpleFunction<KafkaRecord<byte[], byte[]>, String> {
        private MapKafkaRecordsToStrings() {
        }

        public String apply(KafkaRecord<byte[], byte[]> input) {
            String key = Arrays.toString((byte[])input.getKV().getKey());
            String value = Arrays.toString((byte[])input.getKV().getValue());
            return String.format("%s %s", key, value);
        }
    }

    public static interface Options
    extends IOTestPipelineOptions,
    StreamingOptions {
        @Description(value="Options for synthetic source.")
        @Validation.Required
        public String getSourceOptions();

        public void setSourceOptions(String var1);

        @Description(value="Kafka bootstrap server addresses")
        @Default.String(value="localhost:9092")
        public String getKafkaBootstrapServerAddresses();

        public void setKafkaBootstrapServerAddresses(String var1);

        @Description(value="Kafka topic")
        @Validation.Required
        public String getKafkaTopic();

        public void setKafkaTopic(String var1);

        @Description(value="Time to wait for the events to be processed by the read pipeline (in seconds)")
        @Validation.Required
        public Integer getReadTimeout();

        public void setReadTimeout(Integer var1);

        @Description(value="Whether to use testcontainers")
        @Default.Boolean(value=false)
        public Boolean isWithTestcontainers();

        public void setWithTestcontainers(Boolean var1);

        @Description(value="Kafka container version in format 'X.Y.Z'. Use when useTestcontainers is true")
        public @Nullable String getKafkaContainerVersion();

        public void setKafkaContainerVersion(String var1);
    }

    private static class CountingFn
    extends DoFn<String, Void> {
        private final Counter elementCounter;

        CountingFn(String namespace, String name) {
            this.elementCounter = Metrics.counter((String)namespace, (String)name);
        }

        @DoFn.ProcessElement
        public void processElement() {
            this.elementCounter.inc(1L);
        }
    }
}

