package org.apache.beam.sdk.io.gcp.bigquery;

import com.google.api.client.util.BackOff;
import com.google.api.client.util.BackOffUtils;
import com.google.api.client.util.Sleeper;
import com.google.api.services.bigquery.model.QueryResponse;
import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableCell;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import java.security.SecureRandom;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.extensions.gcp.util.BackOffAdapter;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.testing.BigqueryClient;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.testing.DataflowPortabilityApiUnsupported;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.joda.time.Duration;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/BigQueryToTableIT.class */
public class BigQueryToTableIT {
    private static String project;
    private static final String NEW_TYPES_QUERY_TABLE_NAME = "types";
    private static final int MAX_RETRY = 5;
    private static final Logger LOG = LoggerFactory.getLogger(BigQueryToTableIT.class);
    private static final BigqueryClient BQ_CLIENT = new BigqueryClient("BigQueryToTableIT");
    private static final String BIG_QUERY_DATASET_ID = "bq_query_to_table_" + System.currentTimeMillis() + "_" + new SecureRandom().nextInt(32);
    private static final TableSchema LEGACY_QUERY_TABLE_SCHEMA = new TableSchema().setFields(ImmutableList.of(new TableFieldSchema().setName("fruit").setType("STRING")));
    private static final TableSchema NEW_TYPES_QUERY_TABLE_SCHEMA = new TableSchema().setFields(ImmutableList.of(new TableFieldSchema().setName("bytes").setType("BYTES"), new TableFieldSchema().setName("date").setType("DATE"), new TableFieldSchema().setName("time").setType("TIME")));
    private static final List<Map<String, Object>> NEW_TYPES_QUERY_TABLE_DATA = ImmutableList.of(ImmutableMap.of("bytes", "abc=", "date", "2000-01-01", "time", "00:00:00"), ImmutableMap.of("bytes", "dec=", "date", "3000-12-31", "time", "23:59:59.990000"), ImmutableMap.of("bytes", "xyw=", "date", "2011-01-01", "time", "23:59:59.999999"));

    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/BigQueryToTableIT$BigQueryToTableOptions.class */
    public interface BigQueryToTableOptions extends TestPipelineOptions, ExperimentalOptions {
        @Description("The BigQuery query to be used for creating the source")
        @Validation.Required
        String getQuery();

        void setQuery(String str);

        @Description("BigQuery table to write to, specified as <project_id>:<dataset_id>.<table_id>. The dataset must already exist.")
        @Validation.Required
        String getOutput();

        void setOutput(String str);

        @Description("BigQuery output table schema.")
        @Validation.Required
        TableSchema getOutputSchema();

        void setOutputSchema(TableSchema tableSchema);

        @Description("Whether to force reshuffle.")
        @Default.Boolean(false)
        boolean getReshuffle();

        void setReshuffle(boolean z);

        @Description("Whether to use the Standard SQL dialect when querying BigQuery.")
        @Default.Boolean(false)
        boolean getUsingStandardSql();

        void setUsingStandardSql(boolean z);
    }

    private void runBigQueryToTablePipeline(BigQueryToTableOptions bigQueryToTableOptions) {
        Pipeline create = Pipeline.create(bigQueryToTableOptions);
        BigQueryIO.Read fromQuery = BigQueryIO.read().fromQuery(bigQueryToTableOptions.getQuery());
        if (bigQueryToTableOptions.getUsingStandardSql()) {
            fromQuery = fromQuery.usingStandardSql();
        }
        PCollection apply = create.apply(fromQuery);
        if (bigQueryToTableOptions.getReshuffle()) {
            apply = (PCollection) apply.apply(WithKeys.of((Void) null)).setCoder(KvCoder.of(VoidCoder.of(), TableRowJsonCoder.of())).apply(Reshuffle.of()).apply(Values.create());
        }
        apply.apply(BigQueryIO.writeTableRows().to(bigQueryToTableOptions.getOutput()).withSchema(bigQueryToTableOptions.getOutputSchema()).withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
        create.run().waitUntilFinish();
    }

    private BigQueryToTableOptions setupLegacyQueryTest(String str) {
        BigQueryToTableOptions as = TestPipeline.testingPipelineOptions().as(BigQueryToTableOptions.class);
        as.setTempLocation(as.getTempRoot() + "/bq_it_temp");
        as.setQuery("SELECT * FROM (SELECT \"apple\" as fruit), (SELECT \"orange\" as fruit),");
        as.setOutput(str);
        as.setOutputSchema(LEGACY_QUERY_TABLE_SCHEMA);
        return as;
    }

    private BigQueryToTableOptions setupNewTypesQueryTest(String str) {
        BigQueryToTableOptions as = TestPipeline.testingPipelineOptions().as(BigQueryToTableOptions.class);
        as.setTempLocation(as.getTempRoot() + "/bq_it_temp");
        as.setQuery(String.format("SELECT bytes, date, time FROM [%s:%s.%s]", project, BIG_QUERY_DATASET_ID, NEW_TYPES_QUERY_TABLE_NAME));
        as.setOutput(str);
        as.setOutputSchema(NEW_TYPES_QUERY_TABLE_SCHEMA);
        return as;
    }

    private BigQueryToTableOptions setupStandardQueryTest(String str) {
        BigQueryToTableOptions bigQueryToTableOptions = setupLegacyQueryTest(str);
        bigQueryToTableOptions.setTempLocation(bigQueryToTableOptions.getTempRoot() + "/bq_it_temp");
        bigQueryToTableOptions.setQuery("SELECT * FROM (SELECT \"apple\" as fruit) UNION ALL (SELECT \"orange\" as fruit)");
        bigQueryToTableOptions.setUsingStandardSql(true);
        return bigQueryToTableOptions;
    }

    private List<TableRow> getTableRowsFromQuery(String str, int i) throws Exception {
        FluentBackoff withInitialBackoff = FluentBackoff.DEFAULT.withMaxRetries(i).withInitialBackoff(Duration.standardSeconds(1L));
        Sleeper sleeper = Sleeper.DEFAULT;
        BackOff gcpBackOff = BackOffAdapter.toGcpBackOff(withInitialBackoff.backoff());
        do {
            LOG.info("Starting querying {}", str);
            QueryResponse queryWithRetries = BQ_CLIENT.queryWithRetries(str, project);
            if (queryWithRetries.getRows() != null) {
                LOG.info("Got table content with query {}", str);
                return queryWithRetries.getRows();
            }
        } while (BackOffUtils.next(sleeper, gcpBackOff));
        LOG.info("Got empty table for query {} with retry {}", str, Integer.valueOf(i));
        return Collections.emptyList();
    }

    private void verifyLegacyQueryRes(String str) throws Exception {
        Assert.assertEquals(ImmutableList.of("apple", "orange"), (List) getTableRowsFromQuery(String.format("SELECT fruit from [%s];", str), MAX_RETRY).stream().flatMap(tableRow -> {
            return tableRow.getF().stream().map(tableCell -> {
                return tableCell.getV().toString();
            });
        }).sorted().collect(Collectors.toList()));
    }

    private void verifyNewTypesQueryRes(String str) throws Exception {
        ImmutableList of = ImmutableList.of("abc=,2000-01-01,00:00:00", "dec=,3000-12-31,23:59:59.990000", "xyw=,2011-01-01,23:59:59.999999");
        BQ_CLIENT.queryWithRetries(String.format("SELECT bytes, date, time FROM [%s];", str), project);
        Assert.assertEquals(of, (List) getTableRowsFromQuery(String.format("SELECT bytes, date, time FROM [%s];", str), MAX_RETRY).stream().map(tableRow -> {
            String str2 = "";
            for (TableCell tableCell : tableRow.getF()) {
                str2 = str2.isEmpty() ? tableCell.getV().toString() : str2 + "," + tableCell.getV().toString();
            }
            return str2;
        }).sorted().collect(Collectors.toList()));
    }

    private void verifyStandardQueryRes(String str) throws Exception {
        verifyLegacyQueryRes(str);
    }

    @BeforeClass
    public static void setupTestEnvironment() throws Exception {
        PipelineOptionsFactory.register(BigQueryToTableOptions.class);
        project = TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
        BQ_CLIENT.createNewDataset(project, BIG_QUERY_DATASET_ID);
        BQ_CLIENT.createNewTable(project, BIG_QUERY_DATASET_ID, new Table().setSchema(NEW_TYPES_QUERY_TABLE_SCHEMA).setTableReference(new TableReference().setTableId(NEW_TYPES_QUERY_TABLE_NAME).setDatasetId(BIG_QUERY_DATASET_ID).setProjectId(project)));
        BQ_CLIENT.insertDataToTable(project, BIG_QUERY_DATASET_ID, NEW_TYPES_QUERY_TABLE_NAME, NEW_TYPES_QUERY_TABLE_DATA);
    }

    @AfterClass
    public static void cleanup() {
        LOG.info("Start to clean up tables and datasets.");
        BQ_CLIENT.deleteDataset(project, BIG_QUERY_DATASET_ID);
    }

    @Test
    public void testLegacyQueryWithoutReshuffle() throws Exception {
        String str = project + ":" + BIG_QUERY_DATASET_ID + ".testLegacyQueryWithoutReshuffle";
        runBigQueryToTablePipeline(setupLegacyQueryTest(str));
        verifyLegacyQueryRes(str);
    }

    @Test
    public void testNewTypesQueryWithoutReshuffle() throws Exception {
        String str = project + ":" + BIG_QUERY_DATASET_ID + ".testNewTypesQueryWithoutReshuffle";
        runBigQueryToTablePipeline(setupNewTypesQueryTest(str));
        verifyNewTypesQueryRes(str);
    }

    @Test
    public void testNewTypesQueryWithReshuffle() throws Exception {
        String str = project + ":" + BIG_QUERY_DATASET_ID + ".testNewTypesQueryWithReshuffle";
        BigQueryToTableOptions bigQueryToTableOptions = setupNewTypesQueryTest(str);
        bigQueryToTableOptions.setReshuffle(true);
        runBigQueryToTablePipeline(bigQueryToTableOptions);
        verifyNewTypesQueryRes(str);
    }

    @Test
    public void testStandardQueryWithoutCustom() throws Exception {
        String str = project + ":" + BIG_QUERY_DATASET_ID + ".testStandardQueryWithoutCustom";
        runBigQueryToTablePipeline(setupStandardQueryTest(str));
        verifyStandardQueryRes(str);
    }

    @Test
    @Category({DataflowPortabilityApiUnsupported.class})
    public void testNewTypesQueryWithoutReshuffleWithCustom() throws Exception {
        String str = project + ":" + BIG_QUERY_DATASET_ID + ".testNewTypesQueryWithoutReshuffleWithCustom";
        BigQueryToTableOptions bigQueryToTableOptions = setupNewTypesQueryTest(str);
        bigQueryToTableOptions.setExperiments(ImmutableList.of("enable_custom_bigquery_sink", "enable_custom_bigquery_source"));
        runBigQueryToTablePipeline(bigQueryToTableOptions);
        verifyNewTypesQueryRes(str);
    }

    @Test
    @Category({DataflowPortabilityApiUnsupported.class})
    public void testLegacyQueryWithoutReshuffleWithCustom() throws Exception {
        String str = project + ":" + BIG_QUERY_DATASET_ID + ".testLegacyQueryWithoutReshuffleWithCustom";
        BigQueryToTableOptions bigQueryToTableOptions = setupLegacyQueryTest(str);
        bigQueryToTableOptions.setExperiments(ImmutableList.of("enable_custom_bigquery_sink", "enable_custom_bigquery_source"));
        runBigQueryToTablePipeline(bigQueryToTableOptions);
        verifyLegacyQueryRes(str);
    }

    @Test
    @Category({DataflowPortabilityApiUnsupported.class})
    public void testStandardQueryWithoutReshuffleWithCustom() throws Exception {
        String str = project + ":" + BIG_QUERY_DATASET_ID + ".testStandardQueryWithoutReshuffleWithCustom";
        BigQueryToTableOptions bigQueryToTableOptions = setupStandardQueryTest(str);
        bigQueryToTableOptions.setExperiments(ImmutableList.of("enable_custom_bigquery_sink", "enable_custom_bigquery_source"));
        runBigQueryToTablePipeline(bigQueryToTableOptions);
        verifyStandardQueryRes(str);
    }
}
