/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.kafka;

import com.google.cloud.Timestamp;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.function.BiFunction;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.common.HashingFn;
import org.apache.beam.sdk.io.common.IOITHelper;
import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.io.synthetic.SyntheticBoundedSource;
import org.apache.beam.sdk.io.synthetic.SyntheticOptions;
import org.apache.beam.sdk.io.synthetic.SyntheticSourceOptions;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testutils.NamedTestResult;
import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
import org.apache.beam.sdk.testutils.metrics.MetricsReader;
import org.apache.beam.sdk.testutils.metrics.TimeMonitor;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineFnBase;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.joda.time.Duration;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class KafkaIOIT {
    private static final String READ_TIME_METRIC_NAME = "read_time";
    private static final String WRITE_TIME_METRIC_NAME = "write_time";
    private static final String RUN_TIME_METRIC_NAME = "run_time";
    private static final String NAMESPACE = KafkaIOIT.class.getName();
    private static final String TEST_ID = UUID.randomUUID().toString();
    private static final String TIMESTAMP = Timestamp.now().toString();
    private static final String EXPECTED_HASHCODE = "4507649971ee7c51abbb446e65a5c660";
    private static SyntheticSourceOptions sourceOptions;
    private static Options options;
    @Rule
    public TestPipeline writePipeline = TestPipeline.create();
    @Rule
    public TestPipeline readPipeline = TestPipeline.create();

    @BeforeClass
    public static void setup() throws IOException {
        options = (Options)IOITHelper.readIOTestPipelineOptions(Options.class);
        sourceOptions = (SyntheticSourceOptions)SyntheticOptions.fromJsonString((String)options.getSourceOptions(), SyntheticSourceOptions.class);
    }

    @Test
    public void testKafkaIOReadsAndWritesCorrectly() throws IOException {
        ((PCollection)((PCollection)this.writePipeline.apply("Generate records", (PTransform)Read.from((BoundedSource)new SyntheticBoundedSource(sourceOptions)))).apply("Measure write time", (PTransform)ParDo.of((DoFn)new TimeMonitor(NAMESPACE, WRITE_TIME_METRIC_NAME)))).apply("Write to Kafka", this.writeToKafka());
        PCollection hashcode = (PCollection)((PCollection)((PCollection)((PCollection)this.readPipeline.apply("Read from Kafka", this.readFromKafka())).apply("Measure read time", (PTransform)ParDo.of((DoFn)new TimeMonitor(NAMESPACE, READ_TIME_METRIC_NAME)))).apply("Map records to strings", (PTransform)MapElements.via((SimpleFunction)new MapKafkaRecordsToStrings()))).apply("Calculate hashcode", (PTransform)Combine.globally((CombineFnBase.GlobalCombineFn)new HashingFn()).withoutDefaults());
        PAssert.thatSingleton((PCollection)hashcode).isEqualTo((Object)EXPECTED_HASHCODE);
        PipelineResult writeResult = this.writePipeline.run();
        writeResult.waitUntilFinish();
        PipelineResult readResult = this.readPipeline.run();
        PipelineResult.State readState = readResult.waitUntilFinish(Duration.standardSeconds((long)options.getReadTimeout().intValue()));
        this.cancelIfNotTerminal(readResult, readState);
        Set<NamedTestResult> metrics = this.readMetrics(writeResult, readResult);
        IOITMetrics.publish((String)TEST_ID, (String)TIMESTAMP, (String)options.getBigQueryDataset(), (String)options.getBigQueryTable(), metrics);
    }

    private Set<NamedTestResult> readMetrics(PipelineResult writeResult, PipelineResult readResult) {
        BiFunction<MetricsReader, String, NamedTestResult> supplier = (reader, metricName) -> {
            long start = reader.getStartTimeMetric(metricName);
            long end = reader.getEndTimeMetric(metricName);
            return NamedTestResult.create((String)TEST_ID, (String)TIMESTAMP, (String)metricName, (double)((double)(end - start) / 1000.0));
        };
        NamedTestResult writeTime = supplier.apply(new MetricsReader(writeResult, NAMESPACE), WRITE_TIME_METRIC_NAME);
        NamedTestResult readTime = supplier.apply(new MetricsReader(readResult, NAMESPACE), READ_TIME_METRIC_NAME);
        NamedTestResult runTime = NamedTestResult.create((String)TEST_ID, (String)TIMESTAMP, (String)RUN_TIME_METRIC_NAME, (double)(writeTime.getValue() + readTime.getValue()));
        return ImmutableSet.of((Object)readTime, (Object)writeTime, (Object)runTime);
    }

    private void cancelIfNotTerminal(PipelineResult readResult, PipelineResult.State readState) throws IOException {
        if (!readState.isTerminal()) {
            readResult.cancel();
        }
    }

    private KafkaIO.Write<byte[], byte[]> writeToKafka() {
        return KafkaIO.write().withBootstrapServers(options.getKafkaBootstrapServerAddress()).withTopic(options.getKafkaTopic()).withKeySerializer(ByteArraySerializer.class).withValueSerializer(ByteArraySerializer.class);
    }

    private KafkaIO.Read<byte[], byte[]> readFromKafka() {
        return KafkaIO.readBytes().withBootstrapServers(options.getKafkaBootstrapServerAddress()).withConsumerConfigUpdates((Map)ImmutableMap.of((Object)"auto.offset.reset", (Object)"earliest")).withTopic(options.getKafkaTopic()).withMaxNumRecords(KafkaIOIT.sourceOptions.numRecords);
    }

    private static class MapKafkaRecordsToStrings
    extends SimpleFunction<KafkaRecord<byte[], byte[]>, String> {
        private MapKafkaRecordsToStrings() {
        }

        public String apply(KafkaRecord<byte[], byte[]> input) {
            String key = Arrays.toString((byte[])input.getKV().getKey());
            String value = Arrays.toString((byte[])input.getKV().getValue());
            return String.format("%s %s", key, value);
        }
    }

    public static interface Options
    extends IOTestPipelineOptions,
    StreamingOptions {
        @Description(value="Options for synthetic source.")
        @Validation.Required
        public String getSourceOptions();

        public void setSourceOptions(String var1);

        @Description(value="Kafka server address")
        @Validation.Required
        public String getKafkaBootstrapServerAddress();

        public void setKafkaBootstrapServerAddress(String var1);

        @Description(value="Kafka topic")
        @Validation.Required
        public String getKafkaTopic();

        public void setKafkaTopic(String var1);

        @Description(value="Time to wait for the events to be processed by the read pipeline (in seconds)")
        @Validation.Required
        public Integer getReadTimeout();

        public void setReadTimeout(Integer var1);
    }
}

