package org.apache.beam.it.gcp.bigquery;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.text.ParseException;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.it.common.PipelineLauncher;
import org.apache.beam.it.common.PipelineOperator;
import org.apache.beam.it.common.ResourceManager;
import org.apache.beam.it.common.TestProperties;
import org.apache.beam.it.common.utils.ResourceManagerUtils;
import org.apache.beam.it.gcp.IOLoadTestBase;
import org.apache.beam.it.gcp.LoadTestBase;
import org.apache.beam.it.truthmatchers.PipelineAsserts;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.gcp.bigquery.AvroWriteRequest;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.synthetic.SyntheticBoundedSource;
import org.apache.beam.sdk.io.synthetic.SyntheticSourceOptions;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:org/apache/beam/it/gcp/bigquery/BigQueryIOLT.class */
public final class BigQueryIOLT extends IOLoadTestBase {
    private static BigQueryResourceManager resourceManager;
    private static String tableQualifier;
    private static final String READ_ELEMENT_METRIC_NAME = "read_count";
    private Configuration configuration;
    private String tempLocation;
    private TableSchema schema;
    private static final String READ_PCOLLECTION = "Counting element.out0";
    private static final String WRITE_PCOLLECTION = "Map records.out0";

    @Rule
    public TestPipeline writePipeline = TestPipeline.create();

    @Rule
    public TestPipeline readPipeline = TestPipeline.create();
    private static final Map<String, Configuration> TEST_CONFIGS_PRESET;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/it/gcp/bigquery/BigQueryIOLT$AvroFormatFn.class */
    public static class AvroFormatFn extends FormatFn<AvroWriteRequest<byte[]>, GenericRecord> {
        protected final boolean isWrapBytes;

        public AvroFormatFn(int i, boolean z) {
            super(i);
            this.isWrapBytes = z;
        }

        private Object maybeWrapBytes(byte[] bArr) {
            return this.isWrapBytes ? ByteBuffer.wrap(bArr) : bArr;
        }

        public GenericRecord apply(AvroWriteRequest<byte[]> avroWriteRequest) {
            byte[] bArr = (byte[]) Objects.requireNonNull((byte[]) avroWriteRequest.getElement());
            GenericData.Record record = new GenericData.Record(avroWriteRequest.getSchema());
            if (this.numColumns == 1) {
                record.put("data_0", maybeWrapBytes(bArr));
            } else {
                int length = bArr.length / this.numColumns;
                int i = 0;
                for (int i2 = 0; i2 < this.numColumns - 1; i2++) {
                    record.put("data_" + i2, maybeWrapBytes(Arrays.copyOfRange(bArr, i, i + length)));
                    i += length;
                }
                record.put("data_" + (this.numColumns - 1), maybeWrapBytes(Arrays.copyOfRange(bArr, i, bArr.length)));
            }
            return record;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/it/gcp/bigquery/BigQueryIOLT$Configuration.class */
    public static class Configuration extends SyntheticSourceOptions {

        @JsonProperty
        public int numColumns = 1;

        @JsonProperty
        public int pipelineTimeout = 20;

        @JsonProperty
        public String runner = "DirectRunner";

        @JsonProperty
        public String readMethod = "DEFAULT";

        @JsonProperty
        public String writeMethod = "DEFAULT";

        @JsonProperty
        public String writeFormat = "AVRO";

        Configuration() {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/it/gcp/bigquery/BigQueryIOLT$FormatFn.class */
    public static abstract class FormatFn<InputT, OutputT> implements SerializableFunction<InputT, OutputT> {
        protected final int numColumns;

        public FormatFn(int i) {
            this.numColumns = i;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/it/gcp/bigquery/BigQueryIOLT$JsonFormatFn.class */
    public static class JsonFormatFn extends FormatFn<byte[], TableRow> {
        public JsonFormatFn(int i) {
            super(i);
        }

        public TableRow apply(byte[] bArr) {
            TableRow tableRow = new TableRow();
            Base64.Encoder encoder = Base64.getEncoder();
            if (this.numColumns == 1) {
                tableRow.set("data_0", encoder.encodeToString(bArr));
            } else {
                int length = bArr.length / this.numColumns;
                int i = 0;
                for (int i2 = 0; i2 < this.numColumns - 1; i2++) {
                    tableRow.set("data_" + i2, encoder.encodeToString(Arrays.copyOfRange(bArr, i, i + length)));
                    i += length;
                }
                tableRow.set("data_" + (this.numColumns - 1), encoder.encodeToString(Arrays.copyOfRange(bArr, i, bArr.length)));
            }
            return tableRow;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/it/gcp/bigquery/BigQueryIOLT$MapKVToV.class */
    public static class MapKVToV extends DoFn<KV<byte[], byte[]>, byte[]> {
        private MapKVToV() {
        }

        @DoFn.ProcessElement
        public void process(DoFn<KV<byte[], byte[]>, byte[]>.ProcessContext processContext) {
            processContext.output((byte[]) ((KV) Objects.requireNonNull((KV) processContext.element())).getValue());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/it/gcp/bigquery/BigQueryIOLT$WriteFormat.class */
    public enum WriteFormat {
        AVRO,
        JSON
    }

    @BeforeClass
    public static void beforeClass() throws IOException {
        resourceManager = BigQueryResourceManager.builder("io-bigquery-lt", project, CREDENTIALS).setCredentials(CREDENTIALS).build();
        resourceManager.createDataset(region);
    }

    @Before
    public void setup() {
        tableQualifier = String.format("%s:%s.%s", project, resourceManager.getDatasetId(), "io-bq-table-" + DateTimeFormatter.ofPattern("MMddHHmmssSSS").withZone(ZoneId.of("UTC")).format(Instant.now()) + UUID.randomUUID().toString().substring(0, 10));
        String property = TestProperties.getProperty("configuration", "local", TestProperties.Type.PROPERTY);
        this.configuration = TEST_CONFIGS_PRESET.get(property);
        if (this.configuration == null) {
            try {
                this.configuration = Configuration.fromJsonString(property, Configuration.class);
            } catch (IOException e) {
                throw new IllegalArgumentException(String.format("Unknown test configuration: [%s]. Pass to a valid configuration json, or use config presets: %s", property, TEST_CONFIGS_PRESET.keySet()));
            }
        }
        ArrayList arrayList = new ArrayList(this.configuration.numColumns);
        for (int i = 0; i < this.configuration.numColumns; i++) {
            arrayList.add(new TableFieldSchema().setName("data_" + i).setType("BYTES"));
        }
        this.schema = new TableSchema().setFields(arrayList);
        if (Strings.isNullOrEmpty(this.tempBucketName)) {
            return;
        }
        this.tempLocation = String.format("gs://%s/temp/", this.tempBucketName);
        this.writePipeline.getOptions().as(TestPipelineOptions.class).setTempRoot(this.tempLocation);
        this.writePipeline.getOptions().setTempLocation(this.tempLocation);
        this.readPipeline.getOptions().as(TestPipelineOptions.class).setTempRoot(this.tempLocation);
        this.readPipeline.getOptions().setTempLocation(this.tempLocation);
    }

    @AfterClass
    public static void tearDownClass() {
        ResourceManagerUtils.cleanResources(new ResourceManager[]{resourceManager});
    }

    @Test
    public void testAvroFileLoadsWriteThenRead() throws IOException {
        this.configuration.writeFormat = "AVRO";
        this.configuration.writeMethod = "FILE_LOADS";
        testWriteAndRead();
    }

    @Test
    public void testJsonFileLoadsWriteThenRead() throws IOException {
        this.configuration.writeFormat = "JSON";
        this.configuration.writeMethod = "FILE_LOADS";
        testWriteAndRead();
    }

    @Test
    @Ignore("Avro streaming write is not supported as of Beam v2.45.0")
    public void testAvroStreamingWriteThenRead() throws IOException {
        this.configuration.writeFormat = "AVRO";
        this.configuration.writeMethod = "STREAMING_INSERTS";
        testWriteAndRead();
    }

    @Test
    public void testJsonStreamingWriteThenRead() throws IOException {
        this.configuration.writeFormat = "JSON";
        this.configuration.writeMethod = "STREAMING_INSERTS";
        testWriteAndRead();
    }

    @Test
    public void testStorageAPIWriteThenRead() throws IOException {
        this.configuration.readMethod = "DIRECT_READ";
        this.configuration.writeFormat = "AVRO";
        this.configuration.writeMethod = "STORAGE_WRITE_API";
        testWriteAndRead();
    }

    public void testWriteAndRead() throws IOException {
        BigQueryIO.Write<byte[]> write = null;
        switch (WriteFormat.valueOf(this.configuration.writeFormat)) {
            case AVRO:
                write = BigQueryIO.write().withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE).withAvroFormatFunction(new AvroFormatFn(this.configuration.numColumns, !"STORAGE_WRITE_API".equalsIgnoreCase(this.configuration.writeMethod)));
                break;
            case JSON:
                write = BigQueryIO.write().withSuccessfulInsertsPropagation(false).withFormatFunction(new JsonFormatFn(this.configuration.numColumns));
                break;
        }
        testWrite(write);
        testRead();
    }

    private void testWrite(BigQueryIO.Write<byte[]> write) throws IOException {
        BigQueryIO.Write.Method valueOf = BigQueryIO.Write.Method.valueOf(this.configuration.writeMethod);
        if (valueOf == BigQueryIO.Write.Method.STREAMING_INSERTS) {
            this.writePipeline.getOptions().as(StreamingOptions.class).setStreaming(true);
        }
        this.writePipeline.apply("Read from source", Read.from(new SyntheticBoundedSource(this.configuration))).apply("Map records", ParDo.of(new MapKVToV())).apply("Write to BQ", write.to(tableQualifier).withMethod(valueOf).withSchema(this.schema).withCustomGcsTempLocation(ValueProvider.StaticValueProvider.of(this.tempLocation)));
        PipelineLauncher.LaunchInfo launch = this.pipelineLauncher.launch(project, region, PipelineLauncher.LaunchConfig.builder("test-bigquery-write").setSdk(PipelineLauncher.Sdk.JAVA).setPipeline(this.writePipeline).addParameter("runner", this.configuration.runner).build());
        Assert.assertNotEquals(PipelineOperator.Result.LAUNCH_FAILED, this.pipelineOperator.waitUntilDone(createConfig(launch, Duration.ofMinutes(this.configuration.pipelineTimeout))));
        try {
            exportMetricsToBigQuery(launch, getMetrics(launch, LoadTestBase.MetricsConfiguration.builder().setInputPCollection(WRITE_PCOLLECTION).build()));
        } catch (InterruptedException | ParseException e) {
            throw new RuntimeException(e);
        }
    }

    private void testRead() throws IOException {
        this.readPipeline.apply("Read from BQ", BigQueryIO.readTableRows().from(tableQualifier).withMethod(BigQueryIO.TypedRead.Method.valueOf(this.configuration.readMethod))).apply("Counting element", ParDo.of(new IOLoadTestBase.CountingFn(READ_ELEMENT_METRIC_NAME)));
        PipelineLauncher.LaunchInfo launch = this.pipelineLauncher.launch(project, region, PipelineLauncher.LaunchConfig.builder("test-bigquery-read").setSdk(PipelineLauncher.Sdk.JAVA).setPipeline(this.readPipeline).addParameter("runner", this.configuration.runner).build());
        PipelineAsserts.assertThatResult(this.pipelineOperator.waitUntilDone(createConfig(launch, Duration.ofMinutes(this.configuration.pipelineTimeout)))).isLaunchFinished();
        Assert.assertEquals(this.configuration.numRecords, this.pipelineLauncher.getMetric(project, region, launch.jobId(), getBeamMetricsName(IOLoadTestBase.PipelineMetricsType.COUNTER, READ_ELEMENT_METRIC_NAME)).doubleValue(), 0.5d);
        try {
            exportMetricsToBigQuery(launch, getMetrics(launch, LoadTestBase.MetricsConfiguration.builder().setOutputPCollection(READ_PCOLLECTION).build()));
        } catch (InterruptedException | ParseException e) {
            throw new RuntimeException(e);
        }
    }

    static {
        try {
            TEST_CONFIGS_PRESET = ImmutableMap.of("local", Configuration.fromJsonString("{\"numRecords\":1000,\"valueSizeBytes\":1000,\"pipelineTimeout\":2,\"runner\":\"DirectRunner\"}", Configuration.class), "medium", Configuration.fromJsonString("{\"numRecords\":10000000,\"valueSizeBytes\":1000,\"pipelineTimeout\":20,\"runner\":\"DataflowRunner\"}", Configuration.class), "large", Configuration.fromJsonString("{\"numRecords\":100000000,\"valueSizeBytes\":1000,\"pipelineTimeout\":80,\"runner\":\"DataflowRunner\"}", Configuration.class));
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}
