package org.apache.beam.it.kafka;

import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Map;
import java.util.UUID;
import org.apache.beam.it.common.PipelineLauncher;
import org.apache.beam.it.common.PipelineOperator;
import org.apache.beam.it.common.ResourceManager;
import org.apache.beam.it.common.TestProperties;
import org.apache.beam.it.common.utils.ResourceManagerUtils;
import org.apache.beam.it.gcp.IOLoadTestBase;
import org.apache.beam.it.kafka.AutoValue_KafkaIOLT_Configuration;
import org.apache.beam.runners.direct.DirectOptions;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.io.synthetic.SyntheticOptions;
import org.apache.beam.sdk.io.synthetic.SyntheticSourceOptions;
import org.apache.beam.sdk.io.synthetic.SyntheticUnboundedSource;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:org/apache/beam/it/kafka/KafkaIOLT.class */
public final class KafkaIOLT extends IOLoadTestBase {
    private static KafkaResourceManager resourceManager;
    private static final String READ_ELEMENT_METRIC_NAME = "read_count";
    private static final int ROW_SIZE = 1024;
    private Configuration configuration;
    private String kafkaTopic;
    private SyntheticSourceOptions sourceOptions;

    @Rule
    public TestPipeline writePipeline = TestPipeline.create();

    @Rule
    public TestPipeline readPipeline = TestPipeline.create();
    private static final Map<String, Configuration> TEST_CONFIGS = ImmutableMap.of("local", Configuration.of(1000, 2, "DirectRunner"), "medium", Configuration.of(10000000, 20, "DataflowRunner"), "large", Configuration.of(100000000, 80, "DataflowRunner"));

    /* JADX INFO: Access modifiers changed from: package-private */
    @AutoValue
    /* loaded from: input_file:org/apache/beam/it/kafka/KafkaIOLT$Configuration.class */
    public static abstract class Configuration {

        /* JADX INFO: Access modifiers changed from: package-private */
        @AutoValue.Builder
        /* loaded from: input_file:org/apache/beam/it/kafka/KafkaIOLT$Configuration$Builder.class */
        public static abstract class Builder {
            abstract Builder setNumRows(long j);

            abstract Builder setPipelineTimeout(int i);

            abstract Builder setRunner(String str);

            abstract Builder setRowSize(int i);

            abstract Configuration build();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Long getNumRows();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Integer getPipelineTimeout();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract String getRunner();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Integer getRowSize();

        static Configuration of(long j, int i, String str) {
            return new AutoValue_KafkaIOLT_Configuration.Builder().setNumRows(j).setPipelineTimeout(i).setRunner(str).setRowSize(KafkaIOLT.ROW_SIZE).build();
        }

        abstract Builder toBuilder();

        String getSourceOptions() {
            return String.format("{\"numRecords\":%d,\"keySizeBytes\":4,\"valueSizeBytes\":%d}", getNumRows(), getRowSize());
        }
    }

    @BeforeClass
    public static void beforeClass() {
        resourceManager = KafkaResourceManager.builder("io-kafka-lt").build();
    }

    @Before
    public void setup() throws IOException {
        this.kafkaTopic = "io-kafka-" + DateTimeFormatter.ofPattern("MMddHHmmssSSS").withZone(ZoneId.of("UTC")).format(Instant.now()) + UUID.randomUUID().toString().substring(0, 10);
        String property = TestProperties.getProperty("configuration", "local", TestProperties.Type.PROPERTY);
        this.configuration = TEST_CONFIGS.get(property);
        if (this.configuration == null) {
            throw new IllegalArgumentException(String.format("Unknown test configuration: [%s]. Known configs: %s", property, TEST_CONFIGS.keySet()));
        }
        this.sourceOptions = SyntheticOptions.fromJsonString(this.configuration.getSourceOptions(), SyntheticSourceOptions.class);
        if (!Strings.isNullOrEmpty(this.tempBucketName)) {
            String format = String.format("gs://%s/temp/", this.tempBucketName);
            this.writePipeline.getOptions().as(TestPipelineOptions.class).setTempRoot(format);
            this.writePipeline.getOptions().setTempLocation(format);
            this.readPipeline.getOptions().as(TestPipelineOptions.class).setTempRoot(format);
            this.readPipeline.getOptions().setTempLocation(format);
        }
        this.writePipeline.getOptions().as(StreamingOptions.class).setStreaming(true);
        this.writePipeline.getOptions().as(DirectOptions.class).setBlockOnRun(false);
        this.readPipeline.getOptions().as(StreamingOptions.class).setStreaming(true);
        this.readPipeline.getOptions().as(DirectOptions.class).setBlockOnRun(false);
    }

    @AfterClass
    public static void tearDownClass() {
        ResourceManagerUtils.cleanResources(new ResourceManager[]{resourceManager});
    }

    @Test
    public void testWriteAndRead() throws IOException {
        PipelineLauncher.LaunchInfo testWrite = testWrite();
        PipelineLauncher.LaunchInfo testRead = testRead();
        try {
            Assert.assertNotEquals(PipelineOperator.Result.LAUNCH_FAILED, this.pipelineOperator.waitUntilDone(createConfig(testRead, Duration.ofMinutes(this.configuration.getPipelineTimeout().intValue()))));
            Assert.assertEquals(PipelineLauncher.JobState.RUNNING, this.pipelineLauncher.getJobStatus(project, region, testRead.jobId()));
            Assert.assertEquals(this.configuration.getNumRows().longValue(), this.pipelineLauncher.getMetric(project, region, testRead.jobId(), getBeamMetricsName(IOLoadTestBase.PipelineMetricsType.COUNTER, READ_ELEMENT_METRIC_NAME)).doubleValue(), 10.0d);
            if (this.pipelineLauncher.getJobStatus(project, region, testWrite.jobId()) == PipelineLauncher.JobState.RUNNING) {
                this.pipelineLauncher.cancelJob(project, region, testWrite.jobId());
            }
            if (this.pipelineLauncher.getJobStatus(project, region, testRead.jobId()) == PipelineLauncher.JobState.RUNNING) {
                this.pipelineLauncher.cancelJob(project, region, testRead.jobId());
            }
        } catch (Throwable th) {
            if (this.pipelineLauncher.getJobStatus(project, region, testWrite.jobId()) == PipelineLauncher.JobState.RUNNING) {
                this.pipelineLauncher.cancelJob(project, region, testWrite.jobId());
            }
            if (this.pipelineLauncher.getJobStatus(project, region, testRead.jobId()) == PipelineLauncher.JobState.RUNNING) {
                this.pipelineLauncher.cancelJob(project, region, testRead.jobId());
            }
            throw th;
        }
    }

    private PipelineLauncher.LaunchInfo testWrite() throws IOException {
        this.writePipeline.apply("Generate records", Read.from(new SyntheticUnboundedSource(this.sourceOptions))).apply("Write to Kafka", KafkaIO.write().withBootstrapServers(resourceManager.getBootstrapServers()).withTopic(this.kafkaTopic).withKeySerializer(ByteArraySerializer.class).withValueSerializer(ByteArraySerializer.class).withTopic(this.kafkaTopic));
        return this.pipelineLauncher.launch(project, region, PipelineLauncher.LaunchConfig.builder("write-kafka").setSdk(PipelineLauncher.Sdk.JAVA).setPipeline(this.writePipeline).addParameter("runner", this.configuration.getRunner()).build());
    }

    private PipelineLauncher.LaunchInfo testRead() throws IOException {
        this.readPipeline.apply("Read from unbounded Kafka", KafkaIO.readBytes().withBootstrapServers(resourceManager.getBootstrapServers()).withTopic(this.kafkaTopic).withConsumerConfigUpdates(ImmutableMap.of("auto.offset.reset", "earliest"))).apply("Counting element", ParDo.of(new IOLoadTestBase.CountingFn(READ_ELEMENT_METRIC_NAME)));
        return this.pipelineLauncher.launch(project, region, PipelineLauncher.LaunchConfig.builder("read-kafka").setSdk(PipelineLauncher.Sdk.JAVA).setPipeline(this.readPipeline).addParameter("runner", this.configuration.getRunner()).build());
    }
}
