package org.apache.beam.it.gcp.pubsub;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.SubscriptionName;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.beam.it.common.PipelineLauncher;
import org.apache.beam.it.common.PipelineOperator;
import org.apache.beam.it.common.ResourceManager;
import org.apache.beam.it.common.TestProperties;
import org.apache.beam.it.common.utils.ResourceManagerUtils;
import org.apache.beam.it.gcp.IOLoadTestBase;
import org.apache.beam.it.gcp.IOStressTestBase;
import org.apache.beam.it.gcp.LoadTestBase;
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions;
import org.apache.beam.sdk.io.synthetic.SyntheticSourceOptions;
import org.apache.beam.sdk.io.synthetic.SyntheticUnboundedSource;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:org/apache/beam/it/gcp/pubsub/PubSubIOST.class */
public class PubSubIOST extends IOStressTestBase {
    private static final int NUMBER_OF_BUNDLES_FOR_MEDIUM = 20;
    private static final int NUMBER_OF_BUNDLES_FOR_LARGE = 200;
    private static final String READ_ELEMENT_METRIC_NAME = "read_count";
    private static final String WRITE_ELEMENT_METRIC_NAME = "write_count";
    private static final String MAP_RECORDS_STEP_NAME = "Map records";
    private static final String WRITE_TO_PUBSUB_STEP_NAME = "Write to PubSub";
    private static final Map<String, Configuration> TEST_CONFIGS_PRESET;
    private static TopicName topicName;
    private static String testConfigName;
    private static Configuration configuration;
    private static SubscriptionName subscription;
    private static InfluxDBSettings influxDBSettings;
    private static PubsubResourceManager resourceManager;

    @Rule
    public transient TestPipeline writePipeline = TestPipeline.create();

    @Rule
    public transient TestPipeline readPipeline = TestPipeline.create();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/it/gcp/pubsub/PubSubIOST$Configuration.class */
    public static class Configuration extends SyntheticSourceOptions {

        @JsonProperty
        public int pipelineTimeout = PubSubIOST.NUMBER_OF_BUNDLES_FOR_MEDIUM;

        @JsonProperty
        public String runner = "DirectRunner";

        @JsonProperty
        public String writeAndReadFormat = "STRING";

        @JsonProperty
        public int numWorkers = PubSubIOST.NUMBER_OF_BUNDLES_FOR_MEDIUM;

        @JsonProperty
        public int maxNumWorkers = 100;

        @JsonProperty
        public int rowsPerSecond = 1000;

        @JsonProperty
        public int minutes = 15;

        @JsonProperty
        public boolean exportMetricsToInfluxDB = true;

        @JsonProperty
        public String influxMeasurement;

        @JsonProperty
        public String influxHost;

        @JsonProperty
        public String influxDatabase;

        Configuration() {
        }
    }

    /* loaded from: input_file:org/apache/beam/it/gcp/pubsub/PubSubIOST$GenericClass.class */
    static class GenericClass implements Serializable {
        byte[] byteField;

        public GenericClass() {
        }

        public GenericClass(byte[] bArr) {
            this.byteField = bArr;
        }

        public String toString() {
            return MoreObjects.toStringHelper(getClass()).add("byteField", this.byteField).toString();
        }

        public int hashCode() {
            return Objects.hash(Integer.valueOf(Arrays.hashCode(this.byteField)));
        }

        public boolean equals(Object obj) {
            if (obj == null || !(obj instanceof GenericClass)) {
                return false;
            }
            return Arrays.equals(this.byteField, ((GenericClass) obj).byteField);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/it/gcp/pubsub/PubSubIOST$MapKVtoGenericClass.class */
    public static class MapKVtoGenericClass extends DoFn<KV<byte[], byte[]>, GenericClass> {
        private MapKVtoGenericClass() {
        }

        @DoFn.ProcessElement
        public void process(DoFn<KV<byte[], byte[]>, GenericClass>.ProcessContext processContext) {
            processContext.output(new GenericClass((byte[]) ((KV) Objects.requireNonNull((KV) processContext.element())).getValue()));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/it/gcp/pubsub/PubSubIOST$MapKVtoPrimitiveProto.class */
    public static class MapKVtoPrimitiveProto extends DoFn<KV<byte[], byte[]>, Proto3SchemaMessages.Primitive> {
        private MapKVtoPrimitiveProto() {
        }

        @DoFn.ProcessElement
        public void process(DoFn<KV<byte[], byte[]>, Proto3SchemaMessages.Primitive>.ProcessContext processContext) {
            byte[] bArr = (byte[]) ((KV) Objects.requireNonNull((KV) processContext.element())).getValue();
            processContext.output(Proto3SchemaMessages.Primitive.newBuilder().setPrimitiveBytes(ByteString.copyFrom(bArr)).setPrimitiveInt32(ByteBuffer.wrap(bArr).getInt()).build());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/it/gcp/pubsub/PubSubIOST$MapKVtoPubSubMessage.class */
    public static class MapKVtoPubSubMessage extends DoFn<KV<byte[], byte[]>, PubsubMessage> {
        private MapKVtoPubSubMessage() {
        }

        @DoFn.ProcessElement
        public void process(DoFn<KV<byte[], byte[]>, PubsubMessage>.ProcessContext processContext) {
            processContext.output(new PubsubMessage((byte[]) ((KV) Objects.requireNonNull((KV) processContext.element())).getValue(), Collections.emptyMap()));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/it/gcp/pubsub/PubSubIOST$MapKVtoString.class */
    public static class MapKVtoString extends DoFn<KV<byte[], byte[]>, String> {
        private MapKVtoString() {
        }

        @DoFn.ProcessElement
        public void process(DoFn<KV<byte[], byte[]>, String>.ProcessContext processContext) {
            processContext.output(ByteString.copyFrom((byte[]) ((KV) Objects.requireNonNull((KV) processContext.element())).getValue()).toString(StandardCharsets.UTF_8));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/it/gcp/pubsub/PubSubIOST$WriteAndReadFormat.class */
    public enum WriteAndReadFormat {
        STRING,
        AVRO,
        PROTO,
        PUBSUB_MESSAGE
    }

    @Before
    public void setup() throws IOException {
        resourceManager = PubsubResourceManager.builder("io-pubsub-st", project, CREDENTIALS_PROVIDER).build();
        topicName = resourceManager.createTopic("topic");
        subscription = resourceManager.createSubscription(topicName, "subscription");
        PipelineOptionsFactory.register(TestPipelineOptions.class);
        testConfigName = TestProperties.getProperty("configuration", "local", TestProperties.Type.PROPERTY);
        configuration = TEST_CONFIGS_PRESET.get(testConfigName);
        if (configuration == null) {
            try {
                configuration = Configuration.fromJsonString(testConfigName, Configuration.class);
            } catch (IOException e) {
                throw new IllegalArgumentException(String.format("Unknown test configuration: [%s]. Pass to a valid configuration json, or use config presets: %s", testConfigName, TEST_CONFIGS_PRESET.keySet()));
            }
        }
        configuration.forceNumInitialBundles = Integer.valueOf(testConfigName.equals("medium") ? NUMBER_OF_BUNDLES_FOR_MEDIUM : NUMBER_OF_BUNDLES_FOR_LARGE);
        if (!Strings.isNullOrEmpty(this.tempBucketName)) {
            String format = String.format("gs://%s/temp/", this.tempBucketName);
            this.writePipeline.getOptions().as(TestPipelineOptions.class).setTempRoot(format);
            this.writePipeline.getOptions().setTempLocation(format);
            this.readPipeline.getOptions().as(TestPipelineOptions.class).setTempRoot(format);
            this.readPipeline.getOptions().setTempLocation(format);
        }
        this.writePipeline.getOptions().as(PubsubOptions.class).setProject(project);
        this.readPipeline.getOptions().as(PubsubOptions.class).setProject(project);
        if (configuration.exportMetricsToInfluxDB) {
            configuration.influxHost = TestProperties.getProperty("influxHost", "", TestProperties.Type.PROPERTY);
            configuration.influxDatabase = TestProperties.getProperty("influxDatabase", "", TestProperties.Type.PROPERTY);
            configuration.influxMeasurement = TestProperties.getProperty("influxMeasurement", "", TestProperties.Type.PROPERTY);
        }
    }

    @After
    public void tearDownClass() {
        ResourceManagerUtils.cleanResources(new ResourceManager[]{resourceManager});
    }

    @Test
    public void testStringWriteAndRead() throws IOException {
        configuration.writeAndReadFormat = WriteAndReadFormat.STRING.toString();
        testWriteAndRead();
    }

    @Test
    public void testAvroGenericClassWriteAndRead() throws IOException {
        configuration.writeAndReadFormat = WriteAndReadFormat.AVRO.toString();
        testWriteAndRead();
    }

    @Test
    public void testProtoPrimitiveWriteAndRead() throws IOException {
        configuration.writeAndReadFormat = WriteAndReadFormat.PROTO.toString();
        testWriteAndRead();
    }

    @Test
    public void testPubsubMessageWriteAndRead() throws IOException {
        configuration.writeAndReadFormat = WriteAndReadFormat.PUBSUB_MESSAGE.toString();
        testWriteAndRead();
    }

    public void testWriteAndRead() throws IOException {
        if (configuration.exportMetricsToInfluxDB) {
            influxDBSettings = InfluxDBSettings.builder().withHost(configuration.influxHost).withDatabase(configuration.influxDatabase).withMeasurement(configuration.influxMeasurement + "_" + testConfigName + "_" + configuration.writeAndReadFormat).get();
        }
        WriteAndReadFormat valueOf = WriteAndReadFormat.valueOf(configuration.writeAndReadFormat);
        PipelineLauncher.LaunchInfo generateDataAndWrite = generateDataAndWrite(valueOf);
        PipelineLauncher.LaunchInfo testRead = testRead(valueOf);
        try {
            Assert.assertNotEquals(PipelineOperator.Result.LAUNCH_FAILED, this.pipelineOperator.waitUntilDone(createConfig(testRead, Duration.ofMinutes(configuration.pipelineTimeout))));
            Assert.assertTrue(this.pipelineLauncher.getMetric(project, region, generateDataAndWrite.jobId(), getBeamMetricsName(IOLoadTestBase.PipelineMetricsType.COUNTER, WRITE_ELEMENT_METRIC_NAME)).doubleValue() >= this.pipelineLauncher.getMetric(project, region, testRead.jobId(), getBeamMetricsName(IOLoadTestBase.PipelineMetricsType.COUNTER, READ_ELEMENT_METRIC_NAME)).doubleValue());
            LoadTestBase.MetricsConfiguration build = LoadTestBase.MetricsConfiguration.builder().setInputPCollection("Map records.out0").setInputPCollectionV2("Map records/ParMultiDo(MapKVToPubSubType).out0").setOutputPCollection("Counting element.out0").setOutputPCollectionV2("Counting element/ParMultiDo(Counting).out0").build();
            LoadTestBase.MetricsConfiguration build2 = LoadTestBase.MetricsConfiguration.builder().setOutputPCollection("Counting element.out0").setOutputPCollectionV2("Counting element/ParMultiDo(Counting).out0").build();
            exportMetrics(generateDataAndWrite, build, configuration.exportMetricsToInfluxDB, influxDBSettings);
            exportMetrics(testRead, build2, configuration.exportMetricsToInfluxDB, influxDBSettings);
            cancelJobIfRunning(generateDataAndWrite);
            cancelJobIfRunning(testRead);
        } catch (Throwable th) {
            cancelJobIfRunning(generateDataAndWrite);
            cancelJobIfRunning(testRead);
            throw th;
        }
    }

    private PipelineLauncher.LaunchInfo generateDataAndWrite(WriteAndReadFormat writeAndReadFormat) throws IOException {
        int max = Math.max(configuration.rowsPerSecond, 1000) / 1000;
        List loadPeriods = getLoadPeriods(configuration.minutes, DEFAULT_LOAD_INCREASE_ARRAY);
        PCollection apply = this.writePipeline.apply("Read from source", Read.from(new SyntheticUnboundedSource(configuration)));
        if (max > 1) {
            apply = (PCollection) apply.apply("One input to multiple outputs", ParDo.of(new IOStressTestBase.MultiplierDoFn(max, loadPeriods))).apply("Reshuffle fanout", Reshuffle.of()).apply("Counting element", ParDo.of(new IOLoadTestBase.CountingFn(WRITE_ELEMENT_METRIC_NAME)));
        }
        switch (writeAndReadFormat) {
            case STRING:
                apply.apply(MAP_RECORDS_STEP_NAME, ParDo.of(new MapKVtoString())).apply(WRITE_TO_PUBSUB_STEP_NAME, PubsubIO.writeStrings().to(topicName.toString()));
                break;
            case AVRO:
                apply.apply(MAP_RECORDS_STEP_NAME, ParDo.of(new MapKVtoGenericClass())).apply(WRITE_TO_PUBSUB_STEP_NAME, PubsubIO.writeAvros(GenericClass.class).to(topicName.toString()));
                break;
            case PROTO:
                apply.apply(MAP_RECORDS_STEP_NAME, ParDo.of(new MapKVtoPrimitiveProto())).apply(WRITE_TO_PUBSUB_STEP_NAME, PubsubIO.writeProtos(Proto3SchemaMessages.Primitive.class).to(topicName.toString()));
                break;
            case PUBSUB_MESSAGE:
                apply.apply(MAP_RECORDS_STEP_NAME, ParDo.of(new MapKVtoPubSubMessage())).apply(WRITE_TO_PUBSUB_STEP_NAME, PubsubIO.writeMessages().to(topicName.toString()));
                break;
        }
        return this.pipelineLauncher.launch(project, region, PipelineLauncher.LaunchConfig.builder("write-pubsub").setSdk(PipelineLauncher.Sdk.JAVA).setPipeline(this.writePipeline).addParameter("runner", configuration.runner).addParameter("autoscalingAlgorithm", DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType.THROUGHPUT_BASED.toString()).addParameter("numWorkers", String.valueOf(configuration.numWorkers)).addParameter("maxNumWorkers", String.valueOf(configuration.maxNumWorkers)).addParameter("streaming", "true").addParameter("experiments", "use_runner_v2").build());
    }

    private PipelineLauncher.LaunchInfo testRead(WriteAndReadFormat writeAndReadFormat) throws IOException {
        PubsubIO.Read read = null;
        switch (writeAndReadFormat) {
            case STRING:
                read = PubsubIO.readStrings().fromSubscription(subscription.toString());
                break;
            case AVRO:
                read = PubsubIO.readAvros(GenericClass.class).fromSubscription(subscription.toString());
                break;
            case PROTO:
                read = PubsubIO.readProtos(Proto3SchemaMessages.Primitive.class).fromSubscription(subscription.toString());
                break;
            case PUBSUB_MESSAGE:
                read = PubsubIO.readMessages().fromSubscription(subscription.toString());
                break;
        }
        this.readPipeline.apply("Read from PubSub", read).apply("Counting element", ParDo.of(new IOLoadTestBase.CountingFn(READ_ELEMENT_METRIC_NAME)));
        return this.pipelineLauncher.launch(project, region, PipelineLauncher.LaunchConfig.builder("read-pubsub").setSdk(PipelineLauncher.Sdk.JAVA).setPipeline(this.readPipeline).addParameter("runner", configuration.runner).addParameter("streaming", "true").addParameter("experiments", "use_runner_v2").addParameter("numWorkers", String.valueOf(configuration.numWorkers)).addParameter("maxNumWorkers", String.valueOf(configuration.maxNumWorkers)).build());
    }

    private void cancelJobIfRunning(PipelineLauncher.LaunchInfo launchInfo) throws IOException {
        if (this.pipelineLauncher.getJobStatus(project, region, launchInfo.jobId()) == PipelineLauncher.JobState.RUNNING) {
            this.pipelineLauncher.cancelJob(project, region, launchInfo.jobId());
        }
    }

    static {
        try {
            TEST_CONFIGS_PRESET = ImmutableMap.of("medium", Configuration.fromJsonString("{\"numRecords\":2000000,\"rowsPerSecond\":25000,\"minutes\":10,\"valueSizeBytes\":1000,\"pipelineTimeout\":20,\"runner\":\"DataflowRunner\"}", Configuration.class), "large", Configuration.fromJsonString("{\"numRecords\":20000000,\"rowsPerSecond\":25000,\"minutes\":40,\"valueSizeBytes\":1000,\"pipelineTimeout\":70,\"runner\":\"DataflowRunner\"}", Configuration.class));
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}
