/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.bigquery;

import com.google.api.client.util.BackOff;
import com.google.api.client.util.BackOffUtils;
import com.google.api.client.util.Sleeper;
import com.google.api.services.bigquery.model.QueryResponse;
import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableCell;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import java.security.SecureRandom;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.extensions.gcp.util.BackOffAdapter;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder;
import org.apache.beam.sdk.io.gcp.testing.BigqueryClient;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.testing.DataflowPortabilityApiUnsupported;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.joda.time.Duration;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(value=JUnit4.class)
public class BigQueryToTableIT {
    private static final Logger LOG = LoggerFactory.getLogger(BigQueryToTableIT.class);
    private static String project;
    private static final BigqueryClient BQ_CLIENT;
    private static final String BIG_QUERY_DATASET_ID;
    private static final TableSchema LEGACY_QUERY_TABLE_SCHEMA;
    private static final TableSchema NEW_TYPES_QUERY_TABLE_SCHEMA;
    private static final String NEW_TYPES_QUERY_TABLE_NAME = "types";
    private static final List<Map<String, Object>> NEW_TYPES_QUERY_TABLE_DATA;
    private static final int MAX_RETRY = 5;

    private void runBigQueryToTablePipeline(BigQueryToTableOptions options) {
        Pipeline p = Pipeline.create((PipelineOptions)options);
        BigQueryIO.Read bigQueryRead = BigQueryIO.read().fromQuery(options.getQuery());
        if (options.getUsingStandardSql()) {
            bigQueryRead = bigQueryRead.usingStandardSql();
        }
        PCollection input = (PCollection)p.apply((PTransform)bigQueryRead);
        if (options.getReshuffle()) {
            input = (PCollection)((PCollection)((PCollection)input.apply((PTransform)WithKeys.of((Object)null))).setCoder((Coder)KvCoder.of((Coder)VoidCoder.of(), (Coder)TableRowJsonCoder.of())).apply((PTransform)Reshuffle.of())).apply((PTransform)Values.create());
        }
        input.apply((PTransform)BigQueryIO.writeTableRows().to(options.getOutput()).withSchema(options.getOutputSchema()).withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
        p.run().waitUntilFinish();
    }

    private BigQueryToTableOptions setupLegacyQueryTest(String outputTable) {
        BigQueryToTableOptions options = (BigQueryToTableOptions)TestPipeline.testingPipelineOptions().as(BigQueryToTableOptions.class);
        options.setTempLocation(options.getTempRoot() + "/bq_it_temp");
        options.setQuery("SELECT * FROM (SELECT \"apple\" as fruit), (SELECT \"orange\" as fruit),");
        options.setOutput(outputTable);
        options.setOutputSchema(LEGACY_QUERY_TABLE_SCHEMA);
        return options;
    }

    private BigQueryToTableOptions setupNewTypesQueryTest(String outputTable) {
        BigQueryToTableOptions options = (BigQueryToTableOptions)TestPipeline.testingPipelineOptions().as(BigQueryToTableOptions.class);
        options.setTempLocation(options.getTempRoot() + "/bq_it_temp");
        options.setQuery(String.format("SELECT bytes, date, time FROM [%s:%s.%s]", project, BIG_QUERY_DATASET_ID, NEW_TYPES_QUERY_TABLE_NAME));
        options.setOutput(outputTable);
        options.setOutputSchema(NEW_TYPES_QUERY_TABLE_SCHEMA);
        return options;
    }

    private BigQueryToTableOptions setupStandardQueryTest(String outputTable) {
        BigQueryToTableOptions options = this.setupLegacyQueryTest(outputTable);
        options.setTempLocation(options.getTempRoot() + "/bq_it_temp");
        options.setQuery("SELECT * FROM (SELECT \"apple\" as fruit) UNION ALL (SELECT \"orange\" as fruit)");
        options.setUsingStandardSql(true);
        return options;
    }

    private List<TableRow> getTableRowsFromQuery(String query, int maxRetry) throws Exception {
        FluentBackoff backoffFactory = FluentBackoff.DEFAULT.withMaxRetries(maxRetry).withInitialBackoff(Duration.standardSeconds((long)1L));
        Sleeper sleeper = Sleeper.DEFAULT;
        BackOff backoff = BackOffAdapter.toGcpBackOff((org.apache.beam.sdk.util.BackOff)backoffFactory.backoff());
        do {
            LOG.info("Starting querying {}", (Object)query);
            QueryResponse response = BQ_CLIENT.queryWithRetries(query, project);
            if (response.getRows() == null) continue;
            LOG.info("Got table content with query {}", (Object)query);
            return response.getRows();
        } while (BackOffUtils.next((Sleeper)sleeper, (BackOff)backoff));
        LOG.info("Got empty table for query {} with retry {}", (Object)query, (Object)maxRetry);
        return Collections.emptyList();
    }

    private void verifyLegacyQueryRes(String outputTable) throws Exception {
        ImmutableList legacyQueryExpectedRes = ImmutableList.of((Object)"apple", (Object)"orange");
        List<TableRow> tableRows = this.getTableRowsFromQuery(String.format("SELECT fruit from [%s];", outputTable), 5);
        List tableResult = tableRows.stream().flatMap(row -> row.getF().stream().map(cell -> cell.getV().toString())).sorted().collect(Collectors.toList());
        Assert.assertEquals((Object)legacyQueryExpectedRes, tableResult);
    }

    private void verifyNewTypesQueryRes(String outputTable) throws Exception {
        ImmutableList newTypeQueryExpectedRes = ImmutableList.of((Object)"abc=,2000-01-01,00:00:00", (Object)"dec=,3000-12-31,23:59:59.990000", (Object)"xyw=,2011-01-01,23:59:59.999999");
        QueryResponse response = BQ_CLIENT.queryWithRetries(String.format("SELECT bytes, date, time FROM [%s];", outputTable), project);
        List<TableRow> tableRows = this.getTableRowsFromQuery(String.format("SELECT bytes, date, time FROM [%s];", outputTable), 5);
        List tableResult = tableRows.stream().map(row -> {
            String res = "";
            for (TableCell cell : row.getF()) {
                if (res.isEmpty()) {
                    res = cell.getV().toString();
                    continue;
                }
                res = res + "," + cell.getV().toString();
            }
            return res;
        }).sorted().collect(Collectors.toList());
        Assert.assertEquals((Object)newTypeQueryExpectedRes, tableResult);
    }

    private void verifyStandardQueryRes(String outputTable) throws Exception {
        this.verifyLegacyQueryRes(outputTable);
    }

    @BeforeClass
    public static void setupTestEnvironment() throws Exception {
        PipelineOptionsFactory.register(BigQueryToTableOptions.class);
        project = ((GcpOptions)TestPipeline.testingPipelineOptions().as(GcpOptions.class)).getProject();
        BQ_CLIENT.createNewDataset(project, BIG_QUERY_DATASET_ID);
        BQ_CLIENT.createNewTable(project, BIG_QUERY_DATASET_ID, new Table().setSchema(NEW_TYPES_QUERY_TABLE_SCHEMA).setTableReference(new TableReference().setTableId(NEW_TYPES_QUERY_TABLE_NAME).setDatasetId(BIG_QUERY_DATASET_ID).setProjectId(project)));
        BQ_CLIENT.insertDataToTable(project, BIG_QUERY_DATASET_ID, NEW_TYPES_QUERY_TABLE_NAME, NEW_TYPES_QUERY_TABLE_DATA);
    }

    @AfterClass
    public static void cleanup() {
        LOG.info("Start to clean up tables and datasets.");
        BQ_CLIENT.deleteDataset(project, BIG_QUERY_DATASET_ID);
    }

    @Test
    public void testLegacyQueryWithoutReshuffle() throws Exception {
        String outputTable = project + ":" + BIG_QUERY_DATASET_ID + ".testLegacyQueryWithoutReshuffle";
        this.runBigQueryToTablePipeline(this.setupLegacyQueryTest(outputTable));
        this.verifyLegacyQueryRes(outputTable);
    }

    @Test
    public void testNewTypesQueryWithoutReshuffle() throws Exception {
        String outputTable = project + ":" + BIG_QUERY_DATASET_ID + ".testNewTypesQueryWithoutReshuffle";
        this.runBigQueryToTablePipeline(this.setupNewTypesQueryTest(outputTable));
        this.verifyNewTypesQueryRes(outputTable);
    }

    @Test
    public void testNewTypesQueryWithReshuffle() throws Exception {
        String outputTable = project + ":" + BIG_QUERY_DATASET_ID + ".testNewTypesQueryWithReshuffle";
        BigQueryToTableOptions options = this.setupNewTypesQueryTest(outputTable);
        options.setReshuffle(true);
        this.runBigQueryToTablePipeline(options);
        this.verifyNewTypesQueryRes(outputTable);
    }

    @Test
    public void testStandardQueryWithoutCustom() throws Exception {
        String outputTable = project + ":" + BIG_QUERY_DATASET_ID + ".testStandardQueryWithoutCustom";
        this.runBigQueryToTablePipeline(this.setupStandardQueryTest(outputTable));
        this.verifyStandardQueryRes(outputTable);
    }

    @Test
    @Category(value={DataflowPortabilityApiUnsupported.class})
    public void testNewTypesQueryWithoutReshuffleWithCustom() throws Exception {
        String outputTable = project + ":" + BIG_QUERY_DATASET_ID + ".testNewTypesQueryWithoutReshuffleWithCustom";
        BigQueryToTableOptions options = this.setupNewTypesQueryTest(outputTable);
        options.setExperiments((List)ImmutableList.of((Object)"enable_custom_bigquery_sink", (Object)"enable_custom_bigquery_source"));
        this.runBigQueryToTablePipeline(options);
        this.verifyNewTypesQueryRes(outputTable);
    }

    @Test
    @Category(value={DataflowPortabilityApiUnsupported.class})
    public void testLegacyQueryWithoutReshuffleWithCustom() throws Exception {
        String outputTable = project + ":" + BIG_QUERY_DATASET_ID + ".testLegacyQueryWithoutReshuffleWithCustom";
        BigQueryToTableOptions options = this.setupLegacyQueryTest(outputTable);
        options.setExperiments((List)ImmutableList.of((Object)"enable_custom_bigquery_sink", (Object)"enable_custom_bigquery_source"));
        this.runBigQueryToTablePipeline(options);
        this.verifyLegacyQueryRes(outputTable);
    }

    @Test
    @Category(value={DataflowPortabilityApiUnsupported.class})
    public void testStandardQueryWithoutReshuffleWithCustom() throws Exception {
        String outputTable = project + ":" + BIG_QUERY_DATASET_ID + ".testStandardQueryWithoutReshuffleWithCustom";
        BigQueryToTableOptions options = this.setupStandardQueryTest(outputTable);
        options.setExperiments((List)ImmutableList.of((Object)"enable_custom_bigquery_sink", (Object)"enable_custom_bigquery_source"));
        this.runBigQueryToTablePipeline(options);
        this.verifyStandardQueryRes(outputTable);
    }

    static {
        BQ_CLIENT = new BigqueryClient("BigQueryToTableIT");
        BIG_QUERY_DATASET_ID = "bq_query_to_table_" + System.currentTimeMillis() + "_" + new SecureRandom().nextInt(32);
        LEGACY_QUERY_TABLE_SCHEMA = new TableSchema().setFields((List)ImmutableList.of((Object)new TableFieldSchema().setName("fruit").setType("STRING")));
        NEW_TYPES_QUERY_TABLE_SCHEMA = new TableSchema().setFields((List)ImmutableList.of((Object)new TableFieldSchema().setName("bytes").setType("BYTES"), (Object)new TableFieldSchema().setName("date").setType("DATE"), (Object)new TableFieldSchema().setName("time").setType("TIME")));
        NEW_TYPES_QUERY_TABLE_DATA = ImmutableList.of((Object)ImmutableMap.of((Object)"bytes", (Object)"abc=", (Object)"date", (Object)"2000-01-01", (Object)"time", (Object)"00:00:00"), (Object)ImmutableMap.of((Object)"bytes", (Object)"dec=", (Object)"date", (Object)"3000-12-31", (Object)"time", (Object)"23:59:59.990000"), (Object)ImmutableMap.of((Object)"bytes", (Object)"xyw=", (Object)"date", (Object)"2011-01-01", (Object)"time", (Object)"23:59:59.999999"));
    }

    public static interface BigQueryToTableOptions
    extends TestPipelineOptions,
    ExperimentalOptions {
        @Description(value="The BigQuery query to be used for creating the source")
        @Validation.Required
        public String getQuery();

        public void setQuery(String var1);

        @Description(value="BigQuery table to write to, specified as <project_id>:<dataset_id>.<table_id>. The dataset must already exist.")
        @Validation.Required
        public String getOutput();

        public void setOutput(String var1);

        @Description(value="BigQuery output table schema.")
        @Validation.Required
        public TableSchema getOutputSchema();

        public void setOutputSchema(TableSchema var1);

        @Description(value="Whether to force reshuffle.")
        @Default.Boolean(value=false)
        public boolean getReshuffle();

        public void setReshuffle(boolean var1);

        @Description(value="Whether to use the Standard SQL dialect when querying BigQuery.")
        @Default.Boolean(value=false)
        public boolean getUsingStandardSql();

        public void setUsingStandardSql(boolean var1);
    }
}

