package org.apache.beam.it.gcp.spanner;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.cloud.ByteArray;
import com.google.cloud.spanner.Mutation;
import java.io.IOException;
import java.io.Serializable;
import java.text.ParseException;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.UUID;
import org.apache.beam.it.common.PipelineLauncher;
import org.apache.beam.it.common.PipelineOperator;
import org.apache.beam.it.common.ResourceManager;
import org.apache.beam.it.common.TestProperties;
import org.apache.beam.it.common.utils.ResourceManagerUtils;
import org.apache.beam.it.gcp.IOLoadTestBase;
import org.apache.beam.it.gcp.LoadTestBase;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.io.synthetic.SyntheticSourceOptions;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:org/apache/beam/it/gcp/spanner/SpannerIOLT.class */
public class SpannerIOLT extends IOLoadTestBase {

    @Rule
    public TestPipeline writePipeline = TestPipeline.create();

    @Rule
    public TestPipeline readPipeline = TestPipeline.create();
    private String tableName;
    private SpannerResourceManager resourceManager;
    private Configuration configuration;
    private static final String READ_ELEMENT_METRIC_NAME = "read_count";
    private static final Map<String, Configuration> TEST_CONFIGS_PRESET;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/it/gcp/spanner/SpannerIOLT$Configuration.class */
    public static class Configuration extends SyntheticSourceOptions {

        @JsonProperty
        public int numColumns = 1;

        @JsonProperty
        public int pipelineTimeout = 20;

        @JsonProperty
        public String runner = "DirectRunner";

        Configuration() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/it/gcp/spanner/SpannerIOLT$GenerateMutations.class */
    public static class GenerateMutations extends DoFn<Long, Mutation> implements Serializable {
        private final String table;
        private final int numBytesCol;
        private final int sizePerCol;

        public GenerateMutations(String str, int i, int i2) {
            Preconditions.checkArgument(i2 >= i);
            this.table = str;
            this.numBytesCol = i;
            this.sizePerCol = i2 / i;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<Long, Mutation>.ProcessContext processContext) {
            Mutation.WriteBuilder newInsertOrUpdateBuilder = Mutation.newInsertOrUpdateBuilder(this.table);
            Long l = (Long) Objects.requireNonNull((Long) processContext.element());
            newInsertOrUpdateBuilder.set("Id").to(l);
            Random random = new Random(l.longValue());
            byte[] bArr = new byte[this.sizePerCol];
            for (int i = 0; i < this.numBytesCol; i++) {
                String format = String.format("COL%d", Integer.valueOf(i + 1));
                random.nextBytes(bArr);
                newInsertOrUpdateBuilder.set(format).to(ByteArray.copyFrom(bArr));
            }
            processContext.output(newInsertOrUpdateBuilder.build());
        }
    }

    @Before
    public void setup() throws IOException {
        this.tableName = "io_spanner_" + DateTimeFormatter.ofPattern("MMddHHmmssSSS").withZone(ZoneId.of("UTC")).format(Instant.now()) + UUID.randomUUID().toString().replace("-", "").substring(0, 10);
        this.resourceManager = SpannerResourceManager.builder(this.testName, project, region).build();
        String property = TestProperties.getProperty("configuration", "local", TestProperties.Type.PROPERTY);
        this.configuration = TEST_CONFIGS_PRESET.get(property);
        if (this.configuration == null) {
            try {
                this.configuration = Configuration.fromJsonString(property, Configuration.class);
            } catch (IOException e) {
                throw new IllegalArgumentException(String.format("Unknown test configuration: [%s]. Pass to a valid configuration json, or use config presets: %s", property, TEST_CONFIGS_PRESET.keySet()));
            }
        }
        this.resourceManager.executeDdlStatement(createTableStatement(this.tableName, this.configuration.numColumns, (int) this.configuration.valueSizeBytes));
    }

    @After
    public void teardown() {
        ResourceManagerUtils.cleanResources(new ResourceManager[]{this.resourceManager});
    }

    @Test
    public void testSpannerWriteAndRead() throws IOException {
        PipelineLauncher.LaunchInfo testWrite = testWrite();
        Assert.assertNotEquals(PipelineOperator.Result.LAUNCH_FAILED, this.pipelineOperator.waitUntilDone(createConfig(testWrite, Duration.ofMinutes(this.configuration.pipelineTimeout))));
        PipelineLauncher.LaunchInfo testRead = testRead();
        Assert.assertNotEquals(PipelineOperator.Result.LAUNCH_FAILED, this.pipelineOperator.waitUntilDone(createConfig(testRead, Duration.ofMinutes(this.configuration.pipelineTimeout))));
        Assert.assertEquals(PipelineLauncher.JobState.DONE, this.pipelineLauncher.getJobStatus(project, region, testRead.jobId()));
        Assert.assertEquals(this.configuration.numRecords, this.pipelineLauncher.getMetric(project, region, testRead.jobId(), getBeamMetricsName(IOLoadTestBase.PipelineMetricsType.COUNTER, READ_ELEMENT_METRIC_NAME)).doubleValue(), 0.5d);
        LoadTestBase.MetricsConfiguration build = LoadTestBase.MetricsConfiguration.builder().setInputPCollection("Map records.out0").setInputPCollectionV2("Map records/ParMultiDo(GenerateMutations).out0").setOutputPCollection("Counting element.out0").setOutputPCollectionV2("Counting element/ParMultiDo(Counting).out0").build();
        try {
            exportMetricsToBigQuery(testWrite, getMetrics(testWrite, build));
            exportMetricsToBigQuery(testRead, getMetrics(testRead, build));
        } catch (InterruptedException | ParseException e) {
            throw new RuntimeException(e);
        }
    }

    private PipelineLauncher.LaunchInfo testWrite() throws IOException {
        this.writePipeline.apply(GenerateSequence.from(0L).to(this.configuration.numRecords)).apply("Map records", ParDo.of(new GenerateMutations(this.tableName, this.configuration.numColumns, (int) this.configuration.valueSizeBytes))).apply("Write to Spanner", SpannerIO.write().withProjectId(project).withInstanceId(this.resourceManager.getInstanceId()).withDatabaseId(this.resourceManager.getDatabaseId()));
        return this.pipelineLauncher.launch(project, region, PipelineLauncher.LaunchConfig.builder("write-spanner").setSdk(PipelineLauncher.Sdk.JAVA).setPipeline(this.writePipeline).addParameter("runner", this.configuration.runner).build());
    }

    private PipelineLauncher.LaunchInfo testRead() throws IOException {
        this.readPipeline.apply("Read from Spanner", SpannerIO.read().withProjectId(project).withInstanceId(this.resourceManager.getInstanceId()).withDatabaseId(this.resourceManager.getDatabaseId()).withQuery(String.format("SELECT * FROM %s", this.tableName))).apply("Counting element", ParDo.of(new IOLoadTestBase.CountingFn(READ_ELEMENT_METRIC_NAME)));
        return this.pipelineLauncher.launch(project, region, PipelineLauncher.LaunchConfig.builder("read-spanner").setSdk(PipelineLauncher.Sdk.JAVA).setPipeline(this.readPipeline).addParameter("runner", this.configuration.runner).build());
    }

    static String createTableStatement(String str, int i, int i2) {
        int i3 = i2 / i;
        StringBuilder sb = new StringBuilder();
        sb.append(String.format("CREATE TABLE %s (Id INT64", str));
        for (int i4 = 0; i4 < i; i4++) {
            sb.append(String.format(",\n COL%d BYTES(%d)", Integer.valueOf(i4 + 1), Integer.valueOf(i3)));
        }
        sb.append(") PRIMARY KEY(Id)");
        return sb.toString();
    }

    static {
        try {
            TEST_CONFIGS_PRESET = ImmutableMap.of("local", Configuration.fromJsonString("{\"numRecords\":1000,\"valueSizeBytes\":1000,\"pipelineTimeout\":2,\"runner\":\"DirectRunner\"}", Configuration.class), "medium", Configuration.fromJsonString("{\"numRecords\":10000000,\"valueSizeBytes\":1000,\"pipelineTimeout\":20,\"runner\":\"DataflowRunner\"}", Configuration.class), "large", Configuration.fromJsonString("{\"numRecords\":100000000,\"valueSizeBytes\":1000,\"pipelineTimeout\":80,\"runner\":\"DataflowRunner\"}", Configuration.class));
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}
