package org.apache.beam.it.gcp.spanner;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.cloud.ByteArray;
import com.google.cloud.spanner.Mutation;
import java.io.IOException;
import java.io.Serializable;
import java.text.ParseException;
import java.time.Duration;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import org.apache.beam.it.common.PipelineLauncher;
import org.apache.beam.it.common.PipelineOperator;
import org.apache.beam.it.common.ResourceManager;
import org.apache.beam.it.common.TestProperties;
import org.apache.beam.it.common.utils.ResourceManagerUtils;
import org.apache.beam.it.gcp.IOLoadTestBase;
import org.apache.beam.it.gcp.IOStressTestBase;
import org.apache.beam.it.gcp.LoadTestBase;
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.io.synthetic.SyntheticSourceOptions;
import org.apache.beam.sdk.io.synthetic.SyntheticUnboundedSource;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.joda.time.Instant;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:org/apache/beam/it/gcp/spanner/SpannerIOST.class */
public class SpannerIOST extends IOStressTestBase {
    private static final String READ_ELEMENT_METRIC_NAME = "read_count";
    private static final String WRITE_ELEMENT_METRIC_NAME = "write_count";
    private static final int INSTANCE_NODE_COUNT = 30;
    private String tableName;
    private String testConfigName;
    private Configuration configuration;
    private InfluxDBSettings influxDBSettings;
    private SpannerResourceManager resourceManager;

    @Rule
    public TestPipeline writePipeline = TestPipeline.create();

    @Rule
    public TestPipeline readPipeline = TestPipeline.create();
    private static final Map<String, Configuration> TEST_CONFIGS_PRESET;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/it/gcp/spanner/SpannerIOST$Configuration.class */
    public static class Configuration extends SyntheticSourceOptions {

        @JsonProperty
        public int numColumns = 1;

        @JsonProperty
        public int pipelineTimeout = 20;

        @JsonProperty
        public String runner = "DirectRunner";

        @JsonProperty
        public int numWorkers = 20;

        @JsonProperty
        public int maxNumWorkers = 100;

        @JsonProperty
        public int rowsPerSecond = 1000;

        @JsonProperty
        public int minutes = 15;

        @JsonProperty
        public boolean exportMetricsToInfluxDB = true;

        @JsonProperty
        public String influxMeasurement = SpannerIOST.class.getName();

        @JsonProperty
        public String influxHost;

        @JsonProperty
        public String influxDatabase;

        Configuration() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/it/gcp/spanner/SpannerIOST$GenerateMutations.class */
    public static class GenerateMutations extends DoFn<KV<byte[], byte[]>, Mutation> implements Serializable {
        private final String table;
        private final int numBytesCol;
        private final int sizePerCol;

        public GenerateMutations(String str, int i, int i2) {
            Preconditions.checkArgument(i2 >= i);
            this.table = str;
            this.numBytesCol = i;
            this.sizePerCol = i2 / i;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<KV<byte[], byte[]>, Mutation>.ProcessContext processContext) {
            Mutation.WriteBuilder newInsertOrUpdateBuilder = Mutation.newInsertOrUpdateBuilder(this.table);
            newInsertOrUpdateBuilder.set("Id").to("key-" + UUID.randomUUID() + UUID.randomUUID() + Instant.now().getMillis());
            Random random = new Random();
            byte[] bArr = new byte[this.sizePerCol];
            for (int i = 0; i < this.numBytesCol; i++) {
                String format = String.format("COL%d", Integer.valueOf(i + 1));
                random.nextBytes(bArr);
                newInsertOrUpdateBuilder.set(format).to(ByteArray.copyFrom(bArr));
            }
            processContext.output(newInsertOrUpdateBuilder.build());
        }
    }

    @Before
    public void setup() throws IOException {
        this.tableName = "io_spanner_" + DateTimeFormatter.ofPattern("MMddHHmmssSSS").withZone(ZoneId.of("UTC")).format(java.time.Instant.now()) + UUID.randomUUID().toString().replace("-", "").substring(0, 10);
        this.resourceManager = SpannerResourceManager.builder(this.testName, project, region, INSTANCE_NODE_COUNT).build();
        this.testConfigName = TestProperties.getProperty("configuration", "local", TestProperties.Type.PROPERTY);
        this.configuration = TEST_CONFIGS_PRESET.get(this.testConfigName);
        if (this.configuration == null) {
            try {
                this.configuration = Configuration.fromJsonString(this.testConfigName, Configuration.class);
            } catch (IOException e) {
                throw new IllegalArgumentException(String.format("Unknown test configuration: [%s]. Pass to a valid configuration json, or use config presets: %s", this.testConfigName, TEST_CONFIGS_PRESET.keySet()));
            }
        }
        this.resourceManager.executeDdlStatement(createTableStatement(this.tableName, this.configuration.numColumns, (int) this.configuration.valueSizeBytes));
        if (this.configuration.exportMetricsToInfluxDB) {
            this.configuration.influxHost = TestProperties.getProperty("influxHost", "", TestProperties.Type.PROPERTY);
            this.configuration.influxDatabase = TestProperties.getProperty("influxDatabase", "", TestProperties.Type.PROPERTY);
            this.configuration.influxMeasurement = TestProperties.getProperty("influxMeasurement", "", TestProperties.Type.PROPERTY);
        }
    }

    @After
    public void teardown() {
        ResourceManagerUtils.cleanResources(new ResourceManager[]{this.resourceManager});
    }

    @Test
    public void runTest() throws IOException, ParseException, InterruptedException {
        if (this.configuration.exportMetricsToInfluxDB) {
            this.influxDBSettings = InfluxDBSettings.builder().withHost(this.configuration.influxHost).withDatabase(this.configuration.influxDatabase).withMeasurement(this.configuration.influxMeasurement + "_" + this.testConfigName).get();
        }
        PipelineLauncher.LaunchInfo generateDataAndWrite = generateDataAndWrite();
        Assert.assertNotEquals(PipelineOperator.Result.LAUNCH_FAILED, this.pipelineOperator.waitUntilDone(createConfig(generateDataAndWrite, Duration.ofMinutes(this.configuration.pipelineTimeout))));
        PipelineLauncher.LaunchInfo readData = readData();
        Assert.assertNotEquals(PipelineOperator.Result.LAUNCH_FAILED, this.pipelineOperator.waitUntilDone(createConfig(readData, Duration.ofMinutes(this.configuration.pipelineTimeout))));
        try {
            Assert.assertTrue(this.pipelineLauncher.getMetric(project, region, generateDataAndWrite.jobId(), getBeamMetricsName(IOLoadTestBase.PipelineMetricsType.COUNTER, WRITE_ELEMENT_METRIC_NAME)).doubleValue() >= this.pipelineLauncher.getMetric(project, region, readData.jobId(), getBeamMetricsName(IOLoadTestBase.PipelineMetricsType.COUNTER, READ_ELEMENT_METRIC_NAME)).doubleValue());
            if (this.pipelineLauncher.getJobStatus(project, region, generateDataAndWrite.jobId()) == PipelineLauncher.JobState.RUNNING) {
                this.pipelineLauncher.cancelJob(project, region, generateDataAndWrite.jobId());
            }
            LoadTestBase.MetricsConfiguration build = LoadTestBase.MetricsConfiguration.builder().setInputPCollection("Map records.out0").setInputPCollectionV2("Map records/ParMultiDo(GenerateMutations).out0").setOutputPCollection("Counting element.out0").setOutputPCollectionV2("Counting element/ParMultiDo(Counting).out0").build();
            LoadTestBase.MetricsConfiguration build2 = LoadTestBase.MetricsConfiguration.builder().setOutputPCollection("Counting element.out0").setOutputPCollectionV2("Counting element/ParMultiDo(Counting).out0").build();
            exportMetrics(generateDataAndWrite, build, this.configuration.exportMetricsToInfluxDB, this.influxDBSettings);
            exportMetrics(readData, build2, this.configuration.exportMetricsToInfluxDB, this.influxDBSettings);
        } catch (Throwable th) {
            if (this.pipelineLauncher.getJobStatus(project, region, generateDataAndWrite.jobId()) == PipelineLauncher.JobState.RUNNING) {
                this.pipelineLauncher.cancelJob(project, region, generateDataAndWrite.jobId());
            }
            throw th;
        }
    }

    private PipelineLauncher.LaunchInfo generateDataAndWrite() throws IOException {
        int max = Math.max(this.configuration.rowsPerSecond, 1000) / 1000;
        List loadPeriods = getLoadPeriods(this.configuration.minutes, DEFAULT_LOAD_INCREASE_ARRAY);
        PCollection apply = this.writePipeline.apply(Read.from(new SyntheticUnboundedSource(this.configuration)));
        if (max > 1) {
            apply = (PCollection) apply.apply("One input to multiple outputs", ParDo.of(new IOStressTestBase.MultiplierDoFn(max, loadPeriods))).apply("Reshuffle fanout", Reshuffle.of());
        }
        apply.apply("Map records to Spanner format", ParDo.of(new GenerateMutations(this.tableName, this.configuration.numColumns, (int) this.configuration.valueSizeBytes))).apply("Counting element", ParDo.of(new IOLoadTestBase.CountingFn(WRITE_ELEMENT_METRIC_NAME))).apply("Write to Spanner", SpannerIO.write().withProjectId(project).withInstanceId(this.resourceManager.getInstanceId()).withDatabaseId(this.resourceManager.getDatabaseId()).withCommitDeadline(org.joda.time.Duration.standardMinutes(2L)));
        return this.pipelineLauncher.launch(project, region, PipelineLauncher.LaunchConfig.builder("write-spanner").setSdk(PipelineLauncher.Sdk.JAVA).setPipeline(this.writePipeline).addParameter("runner", this.configuration.runner).addParameter("autoscalingAlgorithm", DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType.THROUGHPUT_BASED.toString()).addParameter("numWorkers", String.valueOf(this.configuration.numWorkers)).addParameter("maxNumWorkers", String.valueOf(this.configuration.maxNumWorkers)).addParameter("streaming", "true").build());
    }

    private PipelineLauncher.LaunchInfo readData() throws IOException {
        this.readPipeline.apply("Read from Spanner", SpannerIO.read().withProjectId(project).withInstanceId(this.resourceManager.getInstanceId()).withDatabaseId(this.resourceManager.getDatabaseId()).withQuery(String.format("SELECT * FROM %s", this.tableName))).apply("Counting element", ParDo.of(new IOLoadTestBase.CountingFn(READ_ELEMENT_METRIC_NAME)));
        return this.pipelineLauncher.launch(project, region, PipelineLauncher.LaunchConfig.builder("read-spanner").setSdk(PipelineLauncher.Sdk.JAVA).setPipeline(this.readPipeline).addParameter("runner", this.configuration.runner).build());
    }

    static String createTableStatement(String str, int i, int i2) {
        int i3 = i2 / i;
        StringBuilder sb = new StringBuilder();
        sb.append(String.format("CREATE TABLE %s (Id STRING(MAX)", str));
        for (int i4 = 0; i4 < i; i4++) {
            sb.append(String.format(",\n COL%d BYTES(%d)", Integer.valueOf(i4 + 1), Integer.valueOf(i3)));
        }
        sb.append(") PRIMARY KEY(Id)");
        return sb.toString();
    }

    static {
        try {
            TEST_CONFIGS_PRESET = ImmutableMap.of("medium", Configuration.fromJsonString("{\"numRecords\":1000000,\"rowsPerSecond\":25000,\"minutes\":10,\"valueSizeBytes\":1000,\"pipelineTimeout\":60,\"runner\":\"DataflowRunner\"}", Configuration.class), "large", Configuration.fromJsonString("{\"numRecords\":10000000,\"rowsPerSecond\":25000,\"minutes\":30,\"valueSizeBytes\":1000,\"pipelineTimeout\":300,\"runner\":\"DataflowRunner\"}", Configuration.class));
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}
