package org.apache.beam.it.gcp.bigquery;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.it.common.PipelineLauncher;
import org.apache.beam.it.common.PipelineOperator;
import org.apache.beam.it.common.ResourceManager;
import org.apache.beam.it.common.TestProperties;
import org.apache.beam.it.common.utils.ResourceManagerUtils;
import org.apache.beam.it.gcp.IOLoadTestBase;
import org.apache.beam.it.gcp.IOStressTestBase;
import org.apache.beam.it.gcp.LoadTestBase;
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.gcp.bigquery.AvroWriteRequest;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.synthetic.SyntheticSourceOptions;
import org.apache.beam.sdk.io.synthetic.SyntheticUnboundedSource;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.joda.time.Duration;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:org/apache/beam/it/gcp/bigquery/BigQueryIOST.class */
public final class BigQueryIOST extends IOStressTestBase {
    private static final String READ_ELEMENT_METRIC_NAME = "read_count";
    private static final String STORAGE_WRITE_API_METHOD = "STORAGE_WRITE_API";
    private static BigQueryResourceManager resourceManager;
    private static String tableName;
    private static String tableQualifier;
    private static InfluxDBSettings influxDBSettings;
    private Configuration configuration;
    private String tempLocation;
    private String testConfigName;
    private TableSchema schema;

    @Rule
    public TestPipeline writePipeline = TestPipeline.create();
    private static final Map<String, Configuration> TEST_CONFIGS_PRESET;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/it/gcp/bigquery/BigQueryIOST$AvroFormatFn.class */
    public static class AvroFormatFn extends FormatFn<AvroWriteRequest<byte[]>, GenericRecord> {
        protected final boolean isWrapBytes;

        public AvroFormatFn(int i, boolean z) {
            super(i);
            this.isWrapBytes = z;
        }

        private Object maybeWrapBytes(byte[] bArr) {
            return this.isWrapBytes ? ByteBuffer.wrap(bArr) : bArr;
        }

        public GenericRecord apply(AvroWriteRequest<byte[]> avroWriteRequest) {
            byte[] bArr = (byte[]) Objects.requireNonNull((byte[]) avroWriteRequest.getElement());
            GenericData.Record record = new GenericData.Record(avroWriteRequest.getSchema());
            if (this.numColumns == 1) {
                record.put("data_0", maybeWrapBytes(bArr));
            } else {
                int length = bArr.length / this.numColumns;
                int i = 0;
                for (int i2 = 0; i2 < this.numColumns - 1; i2++) {
                    record.put("data_" + i2, maybeWrapBytes(Arrays.copyOfRange(bArr, i, i + length)));
                    i += length;
                }
                record.put("data_" + (this.numColumns - 1), maybeWrapBytes(Arrays.copyOfRange(bArr, i, bArr.length)));
            }
            return record;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/it/gcp/bigquery/BigQueryIOST$Configuration.class */
    public static class Configuration extends SyntheticSourceOptions {

        @JsonProperty
        public int numColumns = 1;

        @JsonProperty
        public int pipelineTimeout = 20;

        @JsonProperty
        public String runner = "DirectRunner";

        @JsonProperty
        public int numWorkers = 20;

        @JsonProperty
        public int maxNumWorkers = 100;

        @JsonProperty
        public String writeMethod = "DEFAULT";

        @JsonProperty
        public String writeFormat = WriteFormat.AVRO.name();

        @JsonProperty
        public int rowsPerSecond = 1000;

        @JsonProperty
        public int minutes = 15;

        @JsonProperty
        public boolean exportMetricsToInfluxDB = true;

        @JsonProperty
        public String influxMeasurement = BigQueryIOST.class.getName();

        @JsonProperty
        public String influxHost;

        @JsonProperty
        public String influxDatabase;

        Configuration() {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/it/gcp/bigquery/BigQueryIOST$FormatFn.class */
    public static abstract class FormatFn<InputT, OutputT> implements SerializableFunction<InputT, OutputT> {
        protected final int numColumns;

        public FormatFn(int i) {
            this.numColumns = i;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/it/gcp/bigquery/BigQueryIOST$JsonFormatFn.class */
    public static class JsonFormatFn extends FormatFn<byte[], TableRow> {
        public JsonFormatFn(int i) {
            super(i);
        }

        public TableRow apply(byte[] bArr) {
            TableRow tableRow = new TableRow();
            Base64.Encoder encoder = Base64.getEncoder();
            if (this.numColumns == 1) {
                tableRow.set("data_0", encoder.encodeToString(bArr));
            } else {
                int length = bArr.length / this.numColumns;
                int i = 0;
                for (int i2 = 0; i2 < this.numColumns - 1; i2++) {
                    tableRow.set("data_" + i2, encoder.encodeToString(Arrays.copyOfRange(bArr, i, i + length)));
                    i += length;
                }
                tableRow.set("data_" + (this.numColumns - 1), encoder.encodeToString(Arrays.copyOfRange(bArr, i, bArr.length)));
            }
            return tableRow;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/it/gcp/bigquery/BigQueryIOST$WriteFormat.class */
    public enum WriteFormat {
        AVRO,
        JSON
    }

    @BeforeClass
    public static void beforeClass() {
        resourceManager = BigQueryResourceManager.builder("io-bigquery-lt", project, CREDENTIALS).build();
        resourceManager.createDataset(region);
    }

    @Before
    public void setup() {
        tableName = "io-bq-source-table-" + DateTimeFormatter.ofPattern("MMddHHmmssSSS").withZone(ZoneId.of("UTC")).format(Instant.now()) + UUID.randomUUID().toString().substring(0, 10);
        tableQualifier = String.format("%s:%s.%s", project, resourceManager.getDatasetId(), tableName);
        this.testConfigName = TestProperties.getProperty("configuration", "medium", TestProperties.Type.PROPERTY);
        this.configuration = TEST_CONFIGS_PRESET.get(this.testConfigName);
        if (this.configuration == null) {
            try {
                this.configuration = Configuration.fromJsonString(this.testConfigName, Configuration.class);
            } catch (IOException e) {
                throw new IllegalArgumentException(String.format("Unknown test configuration: [%s]. Pass to a valid configuration json, or use config presets: %s", this.testConfigName, TEST_CONFIGS_PRESET.keySet()));
            }
        }
        ArrayList arrayList = new ArrayList(this.configuration.numColumns);
        for (int i = 0; i < this.configuration.numColumns; i++) {
            arrayList.add(new TableFieldSchema().setName("data_" + i).setType("BYTES"));
        }
        this.schema = new TableSchema().setFields(arrayList);
        if (!Strings.isNullOrEmpty(this.tempBucketName)) {
            this.tempLocation = String.format("gs://%s/temp/", this.tempBucketName);
            this.writePipeline.getOptions().as(TestPipelineOptions.class).setTempRoot(this.tempLocation);
            this.writePipeline.getOptions().setTempLocation(this.tempLocation);
        }
        if (this.configuration.exportMetricsToInfluxDB) {
            this.configuration.influxHost = TestProperties.getProperty("influxHost", "", TestProperties.Type.PROPERTY);
            this.configuration.influxDatabase = TestProperties.getProperty("influxDatabase", "", TestProperties.Type.PROPERTY);
            this.configuration.influxMeasurement = TestProperties.getProperty("influxMeasurement", "", TestProperties.Type.PROPERTY);
        }
    }

    @AfterClass
    public static void tearDownClass() {
        ResourceManagerUtils.cleanResources(new ResourceManager[]{resourceManager});
    }

    @Test
    public void testAvroStorageAPIWrite() throws IOException {
        this.configuration.writeFormat = WriteFormat.AVRO.name();
        this.configuration.writeMethod = STORAGE_WRITE_API_METHOD;
        runTest();
    }

    @Test
    public void testJsonStorageAPIWrite() throws IOException {
        this.configuration.writeFormat = WriteFormat.JSON.name();
        this.configuration.writeMethod = STORAGE_WRITE_API_METHOD;
        runTest();
    }

    private void runTest() throws IOException {
        if (this.configuration.exportMetricsToInfluxDB) {
            influxDBSettings = InfluxDBSettings.builder().withHost(this.configuration.influxHost).withDatabase(this.configuration.influxDatabase).withMeasurement(this.configuration.influxMeasurement + "_" + this.testConfigName + "_" + this.configuration.writeFormat + "_" + this.configuration.writeMethod).get();
        }
        BigQueryIO.Write<byte[]> write = null;
        switch (WriteFormat.valueOf(this.configuration.writeFormat)) {
            case AVRO:
                write = BigQueryIO.write().withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND).withAvroFormatFunction(new AvroFormatFn(this.configuration.numColumns, !STORAGE_WRITE_API_METHOD.equalsIgnoreCase(this.configuration.writeMethod)));
                break;
            case JSON:
                write = BigQueryIO.write().withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND).withSuccessfulInsertsPropagation(false).withFormatFunction(new JsonFormatFn(this.configuration.numColumns));
                break;
        }
        if (this.configuration.writeMethod.equals(STORAGE_WRITE_API_METHOD)) {
            write = write.withTriggeringFrequency(Duration.standardSeconds(60L));
        }
        generateDataAndWrite(write);
    }

    private void generateDataAndWrite(BigQueryIO.Write<byte[]> write) throws IOException {
        BigQueryIO.Write.Method valueOf = BigQueryIO.Write.Method.valueOf(this.configuration.writeMethod);
        this.writePipeline.getOptions().as(StreamingOptions.class).setStreaming(true);
        int max = Math.max(this.configuration.rowsPerSecond, 1000) / 1000;
        List loadPeriods = getLoadPeriods(this.configuration.minutes, DEFAULT_LOAD_INCREASE_ARRAY);
        PCollection apply = this.writePipeline.apply(Read.from(new SyntheticUnboundedSource(this.configuration)));
        if (max > 1) {
            apply = (PCollection) apply.apply("One input to multiple outputs", ParDo.of(new IOStressTestBase.MultiplierDoFn(max, loadPeriods))).apply("Reshuffle fanout", Reshuffle.of());
        }
        apply.apply("Extract values", Values.create()).apply("Counting element", ParDo.of(new IOLoadTestBase.CountingFn(READ_ELEMENT_METRIC_NAME))).apply("Write to BQ", write.to(tableQualifier).withMethod(valueOf).withSchema(this.schema).withCustomGcsTempLocation(ValueProvider.StaticValueProvider.of(this.tempLocation)));
        PipelineLauncher.LaunchInfo launch = this.pipelineLauncher.launch(project, region, PipelineLauncher.LaunchConfig.builder("write-bigquery").setSdk(PipelineLauncher.Sdk.JAVA).setPipeline(this.writePipeline).addParameter("runner", this.configuration.runner).addParameter("autoscalingAlgorithm", DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType.THROUGHPUT_BASED.toString()).addParameter("numWorkers", String.valueOf(this.configuration.numWorkers)).addParameter("maxNumWorkers", String.valueOf(this.configuration.maxNumWorkers)).addParameter("experiments", "enable_streaming_engine").build());
        Assert.assertNotEquals(PipelineOperator.Result.LAUNCH_FAILED, this.pipelineOperator.waitUntilDone(createConfig(launch, java.time.Duration.ofMinutes(this.configuration.pipelineTimeout))));
        Assert.assertTrue(this.pipelineLauncher.getMetric(project, region, launch.jobId(), getBeamMetricsName(IOLoadTestBase.PipelineMetricsType.COUNTER, READ_ELEMENT_METRIC_NAME)).doubleValue() >= ((double) resourceManager.getRowCount(tableName).longValue()));
        exportMetrics(launch, LoadTestBase.MetricsConfiguration.builder().setInputPCollection("Reshuffle fanout/ExpandIterable.out0").setOutputPCollection("Counting element.out0").build(), this.configuration.exportMetricsToInfluxDB, influxDBSettings);
    }

    static {
        try {
            TEST_CONFIGS_PRESET = ImmutableMap.of("medium", Configuration.fromJsonString("{\"numColumns\":5,\"rowsPerSecond\":25000,\"minutes\":30,\"numRecords\":2500000,\"valueSizeBytes\":1000,\"pipelineTimeout\":60,\"runner\":\"DataflowRunner\"}", Configuration.class), "large", Configuration.fromJsonString("{\"numColumns\":10,\"rowsPerSecond\":50000,\"minutes\":60,\"numRecords\":5000000,\"valueSizeBytes\":1000,\"pipelineTimeout\":120,\"runner\":\"DataflowRunner\"}", Configuration.class));
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}
