package org.apache.beam.sdk.io.gcp.bigquery;

import com.google.api.services.bigquery.model.QueryResponse;
import com.google.api.services.bigquery.model.TableCell;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import java.io.IOException;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.testing.BigqueryClient;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageWriteIT.class */
public class BigQueryIOStorageWriteIT {
    private String project;
    private static final String DATASET_ID = "big_query_storage";
    private static final String TABLE_PREFIX = "storage_write_";
    private BigQueryOptions bqOptions;
    private static final BigqueryClient BQ_CLIENT = new BigqueryClient("BigQueryStorageIOWriteIT");

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageWriteIT$FillRowFn.class */
    public static class FillRowFn extends DoFn<Long, TableRow> {
        FillRowFn() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<Long, TableRow>.ProcessContext processContext) {
            processContext.output(new TableRow().set("number", processContext.element()).set("str", "aaaaaaaaaa"));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageWriteIT$WriteMode.class */
    public enum WriteMode {
        EXACT_ONCE,
        AT_LEAST_ONCE
    }

    private void setUpTestEnvironment(WriteMode writeMode) {
        PipelineOptionsFactory.register(BigQueryOptions.class);
        this.bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
        this.bqOptions.setProject(TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject());
        this.bqOptions.setUseStorageWriteApi(true);
        if (writeMode == WriteMode.AT_LEAST_ONCE) {
            this.bqOptions.setUseStorageWriteApiAtLeastOnce(true);
        }
        this.bqOptions.setNumStorageWriteApiStreams(2);
        this.bqOptions.setStorageWriteApiTriggeringFrequencySec(1);
        this.project = TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
    }

    private GenerateSequence stream(int i) {
        return GenerateSequence.from(0L).to(i).withRate(1L, Duration.millis(10));
    }

    private void runBigQueryIOStorageWritePipeline(int i, WriteMode writeMode, Boolean bool) {
        String str = bool.booleanValue() ? "storage_write_streaming_" + System.currentTimeMillis() : TABLE_PREFIX + System.currentTimeMillis();
        TableSchema fields = new TableSchema().setFields(ImmutableList.of(new TableFieldSchema().setName("number").setType("INTEGER"), new TableFieldSchema().setName("str").setType("STRING")));
        Pipeline create = Pipeline.create(this.bqOptions);
        create.apply("Input", bool.booleanValue() ? stream(i) : GenerateSequence.from(0L).to(i)).apply("GenerateMessage", ParDo.of(new FillRowFn())).apply("WriteToBQ", BigQueryIO.writeTableRows().to(String.format("%s:%s.%s", this.project, DATASET_ID, str)).withSchema(fields).withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED).withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
        create.run().waitUntilFinish();
        try {
            QueryResponse queryWithRetries = BQ_CLIENT.queryWithRetries(String.format("SELECT count(*) FROM [%s.%s];", DATASET_ID, str), this.project);
            if (writeMode == WriteMode.EXACT_ONCE) {
                Assert.assertEquals(Integer.parseInt((String) ((TableCell) ((TableRow) queryWithRetries.getRows().get(0)).getF().get(0)).getV()), i);
            } else {
                Assert.assertTrue(Integer.parseInt((String) ((TableCell) ((TableRow) queryWithRetries.getRows().get(0)).getF().get(0)).getV()) >= i);
            }
        } catch (IOException | InterruptedException e) {
            Assert.fail("Unexpected exception: " + e);
        }
    }

    @Test
    public void testBigQueryStorageWrite30MProto() {
        setUpTestEnvironment(WriteMode.EXACT_ONCE);
        runBigQueryIOStorageWritePipeline(3000000, WriteMode.EXACT_ONCE, false);
    }

    @Test
    public void testBigQueryStorageWrite30MProtoALO() {
        setUpTestEnvironment(WriteMode.AT_LEAST_ONCE);
        runBigQueryIOStorageWritePipeline(3000000, WriteMode.AT_LEAST_ONCE, false);
    }

    @Test
    public void testBigQueryStorageWrite3KProtoStreaming() {
        setUpTestEnvironment(WriteMode.EXACT_ONCE);
        runBigQueryIOStorageWritePipeline(3000, WriteMode.EXACT_ONCE, true);
    }

    @Test
    public void testBigQueryStorageWrite3KProtoALOStreaming() {
        setUpTestEnvironment(WriteMode.AT_LEAST_ONCE);
        runBigQueryIOStorageWritePipeline(3000, WriteMode.AT_LEAST_ONCE, true);
    }
}
