package com.google.cloud.spark.bigquery.integration;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.LegacySQLTypeName;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.TimePartitioning;
import com.google.cloud.spark.bigquery.SchemaConverters;
import com.google.cloud.spark.bigquery.SparkBigQueryConfig;
import com.google.cloud.spark.bigquery.integration.model.Data;
import com.google.cloud.spark.bigquery.integration.model.Friend;
import com.google.cloud.spark.bigquery.integration.model.Link;
import com.google.cloud.spark.bigquery.integration.model.Person;
import com.google.common.truth.Truth;
import com.google.inject.ProvisionException;
import java.sql.Date;
import java.sql.Timestamp;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.MetadataBuilder;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import scala.Some;

/* loaded from: input_file:com/google/cloud/spark/bigquery/integration/WriteIntegrationTestBase.class */
abstract class WriteIntegrationTestBase extends SparkBigQueryIntegrationTestBase {
    protected static AtomicInteger id = new AtomicInteger(0);
    protected final SparkBigQueryConfig.WriteMethod writeMethod;
    protected Class<? extends Exception> expectedExceptionOnExistingTable;
    protected BigQuery bq;

    public WriteIntegrationTestBase(SparkBigQueryConfig.WriteMethod writeMethod) {
        this(writeMethod, IllegalArgumentException.class);
    }

    public WriteIntegrationTestBase(SparkBigQueryConfig.WriteMethod writeMethod, Class<? extends Exception> cls) {
        this.writeMethod = writeMethod;
        this.expectedExceptionOnExistingTable = cls;
        this.bq = BigQueryOptions.getDefaultInstance().getService();
    }

    private Metadata metadata(Map<String, String> map) {
        MetadataBuilder metadataBuilder = new MetadataBuilder();
        map.forEach((str, str2) -> {
            metadataBuilder.putString(str, str2);
        });
        return metadataBuilder.build();
    }

    @Before
    public void createTestTableName() {
        this.testTable = "test_" + System.nanoTime();
    }

    private String createDiffInSchemaDestTable() {
        String str = "dest_table_" + System.nanoTime();
        IntegrationTestUtils.runQuery(String.format(TestConstants.DIFF_IN_SCHEMA_DEST_TABLE, testDataset, str));
        return str;
    }

    protected Dataset<Row> initialData() {
        return this.spark.createDataset(Arrays.asList(new Person("Abc", Arrays.asList(new Friend(10, Arrays.asList(new Link("www.abc.com"))))), new Person("Def", Arrays.asList(new Friend(12, Arrays.asList(new Link("www.def.com")))))), Encoders.bean(Person.class)).toDF();
    }

    protected Dataset<Row> additonalData() {
        return this.spark.createDataset(Arrays.asList(new Person("Xyz", Arrays.asList(new Friend(10, Arrays.asList(new Link("www.xyz.com"))))), new Person("Pqr", Arrays.asList(new Friend(12, Arrays.asList(new Link("www.pqr.com")))))), Encoders.bean(Person.class)).toDF();
    }

    protected int testTableNumberOfRows() throws InterruptedException {
        return (int) this.bq.query(QueryJobConfiguration.of(String.format("select * from %s.%s", testDataset.toString(), this.testTable)), new BigQuery.JobOption[0]).getTotalRows();
    }

    private StandardTableDefinition testPartitionedTableDefinition() {
        return this.bq.getTable(testDataset.toString(), this.testTable + "_partitioned", new BigQuery.TableOption[0]).getDefinition();
    }

    protected void writeToBigQuery(Dataset<Row> dataset, SaveMode saveMode) {
        writeToBigQuery(dataset, saveMode, "avro");
    }

    protected void writeToBigQuery(Dataset<Row> dataset, SaveMode saveMode, String str) {
        dataset.write().format("bigquery").mode(saveMode).option("table", fullTableName()).option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET).option("intermediateFormat", str).option("writeMethod", this.writeMethod.toString()).save();
    }

    Dataset<Row> readAllTypesTable() {
        return this.spark.read().format("bigquery").option("dataset", testDataset.toString()).option("table", "all_types").load();
    }

    @Test
    public void testWriteToBigQuery_AppendSaveMode() throws InterruptedException {
        writeToBigQuery(initialData(), SaveMode.Append);
        Truth.assertThat(Integer.valueOf(testTableNumberOfRows())).isEqualTo(2);
        Truth.assertThat(Boolean.valueOf(initialDataValuesExist())).isTrue();
        writeToBigQuery(additonalData(), SaveMode.Append);
        Truth.assertThat(Integer.valueOf(testTableNumberOfRows())).isEqualTo(4);
        Truth.assertThat(Boolean.valueOf(additionalDataValuesExist())).isTrue();
    }

    @Test
    public void testWriteToBigQuery_EnableListInference() throws InterruptedException {
        Dataset<Row> initialData = initialData();
        initialData.write().format("bigquery").mode(SaveMode.Append).option("table", fullTableName()).option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET).option("intermediateFormat", "parquet").option("writeMethod", this.writeMethod.toString()).option("enableListInference", true).save();
        Assert.assertEquals(SchemaConverters.toBigQuerySchema(initialData.schema()), SchemaConverters.toBigQuerySchema(this.spark.read().format("bigquery").option("dataset", testDataset.toString()).option("table", this.testTable).load().schema()));
    }

    @Test
    public void testWriteToBigQuery_ErrorIfExistsSaveMode() throws InterruptedException {
        writeToBigQuery(initialData(), SaveMode.ErrorIfExists);
        Truth.assertThat(Integer.valueOf(testTableNumberOfRows())).isEqualTo(2);
        Truth.assertThat(Boolean.valueOf(initialDataValuesExist())).isTrue();
        Assert.assertThrows(this.expectedExceptionOnExistingTable, () -> {
            writeToBigQuery(additonalData(), SaveMode.ErrorIfExists);
        });
    }

    @Test
    public void testWriteToBigQuery_IgnoreSaveMode() throws InterruptedException {
        writeToBigQuery(initialData(), SaveMode.Ignore);
        Truth.assertThat(Integer.valueOf(testTableNumberOfRows())).isEqualTo(2);
        Truth.assertThat(Boolean.valueOf(initialDataValuesExist())).isTrue();
        writeToBigQuery(additonalData(), SaveMode.Ignore);
        Truth.assertThat(Integer.valueOf(testTableNumberOfRows())).isEqualTo(2);
        Truth.assertThat(Boolean.valueOf(initialDataValuesExist())).isTrue();
        Truth.assertThat(Boolean.valueOf(additionalDataValuesExist())).isFalse();
    }

    @Test
    public void testWriteToBigQuery_OverwriteSaveMode() throws InterruptedException {
        writeToBigQuery(initialData(), SaveMode.Overwrite);
        Truth.assertThat(Integer.valueOf(testTableNumberOfRows())).isEqualTo(2);
        Truth.assertThat(Boolean.valueOf(initialDataValuesExist())).isTrue();
        Thread.sleep(120000L);
        writeToBigQuery(additonalData(), SaveMode.Overwrite);
        Truth.assertThat(Integer.valueOf(testTableNumberOfRows())).isEqualTo(2);
        Truth.assertThat(Boolean.valueOf(initialDataValuesExist())).isFalse();
        Truth.assertThat(Boolean.valueOf(additionalDataValuesExist())).isTrue();
    }

    @Test
    public void testWriteToBigQuery_AvroFormat() throws InterruptedException {
        writeToBigQuery(initialData(), SaveMode.ErrorIfExists, "avro");
        Truth.assertThat(Integer.valueOf(testTableNumberOfRows())).isEqualTo(2);
        Truth.assertThat(Boolean.valueOf(initialDataValuesExist())).isTrue();
    }

    @Test
    public void testWriteToBigQuerySimplifiedApi() throws InterruptedException {
        initialData().write().format("bigquery").option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET).save(fullTableName());
        Truth.assertThat(Integer.valueOf(testTableNumberOfRows())).isEqualTo(2);
        Truth.assertThat(Boolean.valueOf(initialDataValuesExist())).isTrue();
    }

    @Test
    public void testWriteToBigQueryAddingTheSettingsToSparkConf() throws InterruptedException {
        this.spark.conf().set("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET);
        initialData().write().format("bigquery").option("table", fullTableName()).option("writeMethod", this.writeMethod.toString()).save();
        Truth.assertThat(Integer.valueOf(testTableNumberOfRows())).isEqualTo(2);
        Truth.assertThat(Boolean.valueOf(initialDataValuesExist())).isTrue();
    }

    @Test
    public void testDirectWriteToBigQueryWithDiffInSchema() {
        Assume.assumeThat(this.writeMethod, CoreMatchers.equalTo(SparkBigQueryConfig.WriteMethod.DIRECT));
        String createDiffInSchemaDestTable = createDiffInSchemaDestTable();
        Dataset load = this.spark.read().format("bigquery").option("table", testDataset + ".src_table").load();
        Assert.assertThrows(ProvisionException.class, () -> {
            load.write().format("bigquery").mode(SaveMode.Append).option("writeMethod", this.writeMethod.toString()).save(testDataset + "." + createDiffInSchemaDestTable);
        });
    }

    @Test
    public void testDirectWriteToBigQueryWithDiffInSchemaAndDisableModeCheck() throws Exception {
        Assume.assumeThat(this.writeMethod, CoreMatchers.equalTo(SparkBigQueryConfig.WriteMethod.DIRECT));
        String createDiffInSchemaDestTable = createDiffInSchemaDestTable();
        this.spark.read().format("bigquery").option("table", testDataset + ".src_table").load().write().format("bigquery").mode(SaveMode.Append).option("writeMethod", this.writeMethod.toString()).option("enableModeCheckForSchemaFields", false).save(testDataset + "." + createDiffInSchemaDestTable);
        Truth.assertThat(Integer.valueOf((int) this.bq.query(QueryJobConfiguration.of(String.format("select * from %s.%s", testDataset.toString(), createDiffInSchemaDestTable)), new BigQuery.JobOption[0]).getTotalRows())).isEqualTo(1);
    }

    @Test
    public void testDirectWriteToBigQueryWithDiffInDescription() throws Exception {
        Assume.assumeThat(this.writeMethod, CoreMatchers.equalTo(SparkBigQueryConfig.WriteMethod.DIRECT));
        String createDiffInSchemaDestTable = createDiffInSchemaDestTable();
        Dataset load = this.spark.read().format("bigquery").option("table", testDataset + ".src_table_with_description").load();
        Assert.assertThrows(ProvisionException.class, () -> {
            load.write().format("bigquery").mode(SaveMode.Append).option("writeMethod", this.writeMethod.toString()).save(testDataset + "." + createDiffInSchemaDestTable);
        });
    }

    @Test
    public void testInDirectWriteToBigQueryWithDiffInSchemaAndModeCheck() throws Exception {
        Assume.assumeThat(this.writeMethod, CoreMatchers.equalTo(SparkBigQueryConfig.WriteMethod.INDIRECT));
        String createDiffInSchemaDestTable = createDiffInSchemaDestTable();
        this.spark.read().format("bigquery").option("table", testDataset + ".src_table").load().write().format("bigquery").mode(SaveMode.Append).option("writeMethod", this.writeMethod.toString()).option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET).option("enableModeCheckForSchemaFields", true).save(testDataset + "." + createDiffInSchemaDestTable);
        Truth.assertThat(Integer.valueOf((int) this.bq.query(QueryJobConfiguration.of(String.format("select * from %s.%s", testDataset.toString(), createDiffInSchemaDestTable)), new BigQuery.JobOption[0]).getTotalRows())).isEqualTo(1);
    }

    @Test
    public void testIndirectWriteToBigQueryWithDiffInSchemaNullableFieldAndDisableModeCheck() throws Exception {
        Assume.assumeThat(this.writeMethod, CoreMatchers.equalTo(SparkBigQueryConfig.WriteMethod.INDIRECT));
        String createDiffInSchemaDestTable = createDiffInSchemaDestTable();
        this.spark.read().format("bigquery").option("table", testDataset + ".src_table").load().write().format("bigquery").mode(SaveMode.Append).option("writeMethod", this.writeMethod.toString()).option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET).option("enableModeCheckForSchemaFields", false).save(testDataset + "." + createDiffInSchemaDestTable);
        Truth.assertThat(Integer.valueOf((int) this.bq.query(QueryJobConfiguration.of(String.format("select * from %s.%s", testDataset.toString(), createDiffInSchemaDestTable)), new BigQuery.JobOption[0]).getTotalRows())).isEqualTo(1);
    }

    @Test
    public void testInDirectWriteToBigQueryWithDiffInDescription() throws Exception {
        Assume.assumeThat(this.writeMethod, CoreMatchers.equalTo(SparkBigQueryConfig.WriteMethod.INDIRECT));
        String createDiffInSchemaDestTable = createDiffInSchemaDestTable();
        this.spark.read().format("bigquery").option("table", testDataset + ".src_table_with_description").load().write().format("bigquery").mode(SaveMode.Append).option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET).option("writeMethod", this.writeMethod.toString()).save(testDataset + "." + createDiffInSchemaDestTable);
        Truth.assertThat(Integer.valueOf((int) this.bq.query(QueryJobConfiguration.of(String.format("select * from %s.%s", testDataset.toString(), createDiffInSchemaDestTable)), new BigQuery.JobOption[0]).getTotalRows())).isEqualTo(1);
    }

    @Test
    public void testWriteToBigQueryPartitionedAndClusteredTable() {
        Assume.assumeThat(this.writeMethod, CoreMatchers.equalTo(SparkBigQueryConfig.WriteMethod.INDIRECT));
        this.spark.read().format("bigquery").option("table", "bigquery-public-data.libraries_io.projects").load().where("platform = 'Sublime'").write().format("bigquery").option("table", fullTableNamePartitioned()).option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET).option("partitionField", "created_timestamp").option("clusteredFields", "platform").option("writeMethod", this.writeMethod.toString()).mode(SaveMode.Overwrite).save();
        StandardTableDefinition testPartitionedTableDefinition = testPartitionedTableDefinition();
        Truth.assertThat(testPartitionedTableDefinition.getTimePartitioning().getField()).isEqualTo("created_timestamp");
        Truth.assertThat(testPartitionedTableDefinition.getClustering().getFields()).contains("platform");
    }

    protected Dataset<Row> overwriteSinglePartition(StructField structField) {
        this.bq.create(TableInfo.of(TableId.of(testDataset.toString(), fullTableNamePartitioned() + "_" + id.getAndIncrement()), StandardTableDefinition.newBuilder().setSchema(Schema.of(new Field[]{Field.of("the_date", LegacySQLTypeName.DATE, new Field[0]), Field.of("some_text", LegacySQLTypeName.STRING, new Field[0])})).setTimePartitioning(TimePartitioning.newBuilder(TimePartitioning.Type.DAY).setField("the_date").build()).build()), new BigQuery.TableOption[0]);
        try {
            this.bq.query(QueryJobConfiguration.of(String.format("insert into `" + fullTableName() + "` (the_date, some_text) values ('2020-07-01', 'foo'), ('2020-07-02', 'bar')", new Object[0])), new BigQuery.JobOption[0]);
            this.spark.createDataFrame(Arrays.asList(RowFactory.create(new Object[]{Date.valueOf("2020-07-01"), "baz"})), new StructType(new StructField[]{structField, new StructField("some_text", DataTypes.StringType, true, Metadata.empty())})).write().format("bigquery").option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET).option("datePartition", "20200701").mode("overwrite").save(fullTableName());
            Dataset<Row> load = this.spark.read().format("bigquery").load(fullTableName());
            List collectAsList = load.collectAsList();
            Truth.assertThat(collectAsList).hasSize(2);
            Truth.assertThat(Long.valueOf(collectAsList.stream().filter(row -> {
                return row.getString(1).equals("bar");
            }).count())).isEqualTo(1);
            Truth.assertThat(Long.valueOf(collectAsList.stream().filter(row2 -> {
                return row2.getString(1).equals("baz");
            }).count())).isEqualTo(1);
            return load;
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public void testOverwriteSinglePartition() {
        overwriteSinglePartition(new StructField("the_date", DataTypes.DateType, true, Metadata.empty()));
    }

    public void testOverwriteSinglePartitionWithComment() {
        Truth.assertThat(overwriteSinglePartition(new StructField("the_date", DataTypes.DateType, true, Metadata.empty()).withComment("the partition field")).schema().fields()[0].getComment()).isEqualTo(Some.apply("the partition field"));
    }

    @Test
    public void testWriteToBigQueryWithDescription() {
        Metadata fromJson = Metadata.fromJson("{\"description\": \"test description\"}");
        StructType[] structTypeArr = {structType(new StructField("c1", DataTypes.IntegerType, true, fromJson)), structType(new StructField("c1", DataTypes.IntegerType, true, Metadata.empty()).withComment("test comment")), structType(new StructField("c1", DataTypes.IntegerType, true, fromJson).withComment("test comment")), structType(new StructField("c1", DataTypes.IntegerType, true, Metadata.empty()))};
        String[] strArr = {"test description", "test comment", "test comment", null};
        for (int i = 0; i < structTypeArr.length; i++) {
            this.spark.createDataFrame(Arrays.asList(RowFactory.create(new Object[]{100}), RowFactory.create(new Object[]{200})), structTypeArr[i]).write().format("bigquery").mode(SaveMode.Overwrite).option("table", fullTableName() + "_" + i).option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET).option("intermediateFormat", "parquet").option("writeMethod", this.writeMethod.toString()).save();
            Optional descriptionOrCommentOfField = SchemaConverters.getDescriptionOrCommentOfField(this.spark.read().format("bigquery").option("dataset", testDataset.toString()).option("table", this.testTable + "_" + i).load().schema().fields()[0]);
            if (strArr[i] != null) {
                Truth.assertThat(Boolean.valueOf(descriptionOrCommentOfField.isPresent())).isTrue();
                Truth.assertThat((String) descriptionOrCommentOfField.orElse("")).isEqualTo(strArr[i]);
            } else {
                Truth.assertThat(Boolean.valueOf(descriptionOrCommentOfField.isPresent())).isFalse();
            }
        }
    }

    private StructType structType(StructField... structFieldArr) {
        return new StructType(structFieldArr);
    }

    @Test
    public void testPartition_Hourly() {
        testPartition("HOUR");
    }

    @Test
    public void testPartition_Daily() {
        testPartition("DAY");
    }

    @Test
    public void testPartition_Monthly() {
        testPartition("MONTH");
    }

    @Test
    public void testPartition_Yearly() {
        testPartition("YEAR");
    }

    private void testPartition(String str) {
        Assume.assumeThat(this.writeMethod, CoreMatchers.equalTo(SparkBigQueryConfig.WriteMethod.INDIRECT));
        Dataset df = this.spark.createDataset(Arrays.asList(new Data("a", Timestamp.valueOf("2020-01-01 01:01:01")), new Data("b", Timestamp.valueOf("2020-01-02 02:02:02")), new Data("c", Timestamp.valueOf("2020-01-03 03:03:03"))), Encoders.bean(Data.class)).toDF();
        String str2 = testDataset.toString() + "." + this.testTable + "_" + str;
        df.write().format("bigquery").option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET).option("partitionField", "ts").option("partitionType", str).option("partitionRequireFilter", "true").option("table", str2).option("writeMethod", this.writeMethod.toString()).save();
        Truth.assertThat(Long.valueOf(this.spark.read().format("bigquery").load(str2).count())).isEqualTo(3);
    }

    @Test
    public void testCacheDataFrameInDataSource() {
        Assume.assumeThat(this.writeMethod, CoreMatchers.equalTo(SparkBigQueryConfig.WriteMethod.INDIRECT));
        Dataset<Row> readAllTypesTable = readAllTypesTable();
        writeToBigQuery(readAllTypesTable, SaveMode.Overwrite, "avro");
        Dataset cache = this.spark.read().format("bigquery").option("dataset", testDataset.toString()).option("table", this.testTable).option("readDataFormat", "arrow").load().cache();
        Truth.assertThat(cache.head()).isEqualTo(readAllTypesTable.head());
        Truth.assertThat(cache.head()).isEqualTo(readAllTypesTable.head());
        Truth.assertThat(cache.schema()).isEqualTo(readAllTypesTable.schema());
    }

    protected long numberOfRowsWith(String str) {
        try {
            return this.bq.query(QueryJobConfiguration.of(String.format("select name from %s where name='%s'", fullTableName(), str)), new BigQuery.JobOption[0]).getTotalRows();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    protected String fullTableName() {
        return testDataset.toString() + "." + this.testTable;
    }

    protected String fullTableNamePartitioned() {
        return fullTableName() + "_partitioned";
    }

    protected boolean additionalDataValuesExist() {
        return numberOfRowsWith("Xyz") == 1;
    }

    protected boolean initialDataValuesExist() {
        return numberOfRowsWith("Abc") == 1;
    }
}
