package org.apache.beam.it.gcp.bigtable;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.bigtable.v2.Mutation;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.UUID;
import org.apache.beam.it.common.PipelineLauncher;
import org.apache.beam.it.common.PipelineOperator;
import org.apache.beam.it.common.ResourceManager;
import org.apache.beam.it.common.TestProperties;
import org.apache.beam.it.common.utils.ResourceManagerUtils;
import org.apache.beam.it.gcp.IOLoadTestBase;
import org.apache.beam.it.gcp.IOStressTestBase;
import org.apache.beam.it.gcp.LoadTestBase;
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO;
import org.apache.beam.sdk.io.synthetic.SyntheticSourceOptions;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.PeriodicImpulse;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.joda.time.Instant;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.threeten.bp.Duration;

/* loaded from: input_file:org/apache/beam/it/gcp/bigtable/BigTableIOST.class */
public final class BigTableIOST extends IOStressTestBase {
    private static final String WRITE_ELEMENT_METRIC_NAME = "write_count";
    private static final String READ_ELEMENT_METRIC_NAME = "read_count";
    private static final String COLUMN_FAMILY_NAME = "cf";
    private static final long TABLE_MAX_AGE_MINUTES = 800;
    private BigtableResourceManager resourceManager;
    private InfluxDBSettings influxDBSettings;
    private Configuration configuration;
    private String testConfigName;
    private String tableId;

    @Rule
    public TestPipeline writePipeline = TestPipeline.create();

    @Rule
    public TestPipeline readPipeline = TestPipeline.create();
    private static final Map<String, Configuration> TEST_CONFIGS_PRESET;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/it/gcp/bigtable/BigTableIOST$Configuration.class */
    public static class Configuration extends SyntheticSourceOptions {

        @JsonProperty
        public int pipelineTimeout = 20;

        @JsonProperty
        public String runner = "DirectRunner";

        @JsonProperty
        public int numWorkers = 20;

        @JsonProperty
        public int maxNumWorkers = 100;

        @JsonProperty
        public int rowsPerSecond = 1000;

        @JsonProperty
        public int minutes = 15;

        @JsonProperty
        public boolean exportMetricsToInfluxDB = true;

        @JsonProperty
        public String influxMeasurement = BigTableIOST.class.getName();

        @JsonProperty
        public String influxHost;

        @JsonProperty
        public String influxDatabase;

        Configuration() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/it/gcp/bigtable/BigTableIOST$MapToBigTableFormat.class */
    public static class MapToBigTableFormat extends DoFn<Instant, KV<ByteString, Iterable<Mutation>>> implements Serializable {
        private final int valueSizeBytes;
        private final int totalRows;

        public MapToBigTableFormat(int i, int i2) {
            this.valueSizeBytes = i;
            this.totalRows = i2;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<Instant, KV<ByteString, Iterable<Mutation>>>.ProcessContext processContext) {
            long millis = ((Instant) Objects.requireNonNull((Instant) processContext.element())).getMillis() % this.totalRows;
            ByteString copyFromUtf8 = ByteString.copyFromUtf8(String.format("key%s", millis + "-" + UUID.randomUUID() + "-" + UUID.randomUUID() + "-" + Instant.now().getMillis()));
            Random random = new Random(millis);
            byte[] bArr = new byte[this.valueSizeBytes];
            random.nextBytes(bArr);
            processContext.output(KV.of(copyFromUtf8, ImmutableList.of(Mutation.newBuilder().setSetCell(Mutation.SetCell.newBuilder().setValue(ByteString.copyFrom(bArr)).setTimestampMicros(java.time.Instant.now().toEpochMilli() * 1000).setFamilyName(BigTableIOST.COLUMN_FAMILY_NAME)).build())));
        }
    }

    @Before
    public void setup() throws IOException {
        this.resourceManager = BigtableResourceManager.builder(this.testName, project, CREDENTIALS_PROVIDER).build();
        this.tableId = BigtableResourceManagerUtils.generateTableId(this.testName);
        this.resourceManager.createTable(this.tableId, ImmutableList.of(COLUMN_FAMILY_NAME), Duration.ofMinutes(TABLE_MAX_AGE_MINUTES));
        this.testConfigName = TestProperties.getProperty("configuration", "medium", TestProperties.Type.PROPERTY);
        this.configuration = TEST_CONFIGS_PRESET.get(this.testConfigName);
        if (this.configuration == null) {
            try {
                this.configuration = Configuration.fromJsonString(this.testConfigName, Configuration.class);
            } catch (IOException e) {
                throw new IllegalArgumentException(String.format("Unknown test configuration: [%s]. Pass to a valid configuration json, or use config presets: %s", this.testConfigName, TEST_CONFIGS_PRESET.keySet()));
            }
        }
        if (!Strings.isNullOrEmpty(this.tempBucketName)) {
            String format = String.format("gs://%s/temp/", this.tempBucketName);
            this.writePipeline.getOptions().as(TestPipelineOptions.class).setTempRoot(format);
            this.writePipeline.getOptions().setTempLocation(format);
            this.readPipeline.getOptions().as(TestPipelineOptions.class).setTempRoot(format);
            this.readPipeline.getOptions().setTempLocation(format);
        }
        this.writePipeline.getOptions().as(StreamingOptions.class).setStreaming(true);
        if (this.configuration.exportMetricsToInfluxDB) {
            this.configuration.influxHost = TestProperties.getProperty("influxHost", "", TestProperties.Type.PROPERTY);
            this.configuration.influxDatabase = TestProperties.getProperty("influxDatabase", "", TestProperties.Type.PROPERTY);
            this.configuration.influxMeasurement = TestProperties.getProperty("influxMeasurement", "", TestProperties.Type.PROPERTY);
        }
    }

    @After
    public void teardown() {
        ResourceManagerUtils.cleanResources(new ResourceManager[]{this.resourceManager});
    }

    @Test
    public void runTest() throws IOException {
        if (this.configuration.exportMetricsToInfluxDB) {
            this.influxDBSettings = InfluxDBSettings.builder().withHost(this.configuration.influxHost).withDatabase(this.configuration.influxDatabase).withMeasurement(this.configuration.influxMeasurement + "_" + this.testConfigName).get();
        }
        PipelineLauncher.LaunchInfo generateDataAndWrite = generateDataAndWrite();
        Assert.assertNotEquals(PipelineOperator.Result.LAUNCH_FAILED, this.pipelineOperator.waitUntilDone(createConfig(generateDataAndWrite, java.time.Duration.ofMinutes(this.configuration.pipelineTimeout))));
        PipelineLauncher.LaunchInfo readData = readData();
        Assert.assertNotEquals(PipelineOperator.Result.LAUNCH_FAILED, this.pipelineOperator.waitUntilDone(createConfig(readData, java.time.Duration.ofMinutes(this.configuration.pipelineTimeout))));
        try {
            Assert.assertTrue(this.pipelineLauncher.getMetric(project, region, generateDataAndWrite.jobId(), getBeamMetricsName(IOLoadTestBase.PipelineMetricsType.COUNTER, WRITE_ELEMENT_METRIC_NAME)).doubleValue() >= this.pipelineLauncher.getMetric(project, region, readData.jobId(), getBeamMetricsName(IOLoadTestBase.PipelineMetricsType.COUNTER, READ_ELEMENT_METRIC_NAME)).doubleValue());
            if (this.pipelineLauncher.getJobStatus(project, region, generateDataAndWrite.jobId()) == PipelineLauncher.JobState.RUNNING) {
                this.pipelineLauncher.cancelJob(project, region, generateDataAndWrite.jobId());
            }
            LoadTestBase.MetricsConfiguration build = LoadTestBase.MetricsConfiguration.builder().setOutputPCollection("Counting element.out0").setOutputPCollectionV2("Counting element/ParMultiDo(Counting).out0").build();
            LoadTestBase.MetricsConfiguration build2 = LoadTestBase.MetricsConfiguration.builder().setOutputPCollection("Counting element.out0").setOutputPCollectionV2("Counting element/ParMultiDo(Counting).out0").build();
            exportMetrics(generateDataAndWrite, build, this.configuration.exportMetricsToInfluxDB, this.influxDBSettings);
            exportMetrics(readData, build2, this.configuration.exportMetricsToInfluxDB, this.influxDBSettings);
        } catch (Throwable th) {
            if (this.pipelineLauncher.getJobStatus(project, region, generateDataAndWrite.jobId()) == PipelineLauncher.JobState.RUNNING) {
                this.pipelineLauncher.cancelJob(project, region, generateDataAndWrite.jobId());
            }
            throw th;
        }
    }

    private PipelineLauncher.LaunchInfo generateDataAndWrite() throws IOException {
        int max = Math.max(this.configuration.rowsPerSecond, 1000) / 1000;
        long millis = org.joda.time.Duration.standardMinutes(this.configuration.minutes).getMillis();
        long j = (max * millis) / 1;
        List loadPeriods = getLoadPeriods(this.configuration.minutes, DEFAULT_LOAD_INCREASE_ARRAY);
        PCollection apply = this.writePipeline.apply(PeriodicImpulse.create().stopAfter(org.joda.time.Duration.millis(millis - 1)).withInterval(org.joda.time.Duration.millis(1)));
        if (max > 1) {
            apply = (PCollection) apply.apply("One input to multiple outputs", ParDo.of(new IOStressTestBase.MultiplierDoFn(max, loadPeriods))).apply("Counting element", ParDo.of(new IOLoadTestBase.CountingFn(WRITE_ELEMENT_METRIC_NAME)));
        }
        apply.apply("Map records to BigTable format", ParDo.of(new MapToBigTableFormat((int) this.configuration.valueSizeBytes, (int) j))).apply("Write to BigTable", BigtableIO.write().withProjectId(project).withInstanceId(this.resourceManager.getInstanceId()).withTableId(this.tableId));
        return this.pipelineLauncher.launch(project, region, PipelineLauncher.LaunchConfig.builder("write-bigtable").setSdk(PipelineLauncher.Sdk.JAVA).setPipeline(this.writePipeline).addParameter("runner", this.configuration.runner).addParameter("autoscalingAlgorithm", DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType.THROUGHPUT_BASED.toString()).addParameter("numWorkers", String.valueOf(this.configuration.numWorkers)).addParameter("maxNumWorkers", String.valueOf(this.configuration.maxNumWorkers)).addParameter("experiments", "use_runner_v2").build());
    }

    private PipelineLauncher.LaunchInfo readData() throws IOException {
        this.readPipeline.apply("Read from BigTable", BigtableIO.read().withoutValidation().withProjectId(project).withInstanceId(this.resourceManager.getInstanceId()).withTableId(this.tableId)).apply("Counting element", ParDo.of(new IOLoadTestBase.CountingFn(READ_ELEMENT_METRIC_NAME)));
        return this.pipelineLauncher.launch(project, region, PipelineLauncher.LaunchConfig.builder("read-bigtable").setSdk(PipelineLauncher.Sdk.JAVA).setPipeline(this.readPipeline).addParameter("runner", this.configuration.runner).addParameter("autoscalingAlgorithm", DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType.THROUGHPUT_BASED.toString()).addParameter("numWorkers", String.valueOf(this.configuration.numWorkers)).addParameter("maxNumWorkers", String.valueOf(this.configuration.maxNumWorkers)).build());
    }

    static {
        try {
            TEST_CONFIGS_PRESET = ImmutableMap.of("medium", Configuration.fromJsonString("{\"rowsPerSecond\":25000,\"minutes\":40,\"pipelineTimeout\":120,\"valueSizeBytes\":100,\"runner\":\"DataflowRunner\"}", Configuration.class), "large", Configuration.fromJsonString("{\"rowsPerSecond\":25000,\"minutes\":130,\"pipelineTimeout\":200,\"valueSizeBytes\":1000,\"runner\":\"DataflowRunner\"}", Configuration.class));
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}
