package com.google.cloud.spark.bigquery.integration;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.FieldValueList;
import com.google.cloud.bigquery.LegacySQLTypeName;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.RangePartitioning;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.TableResult;
import com.google.cloud.bigquery.TimePartitioning;
import com.google.cloud.spark.bigquery.PartitionOverwriteMode;
import com.google.cloud.spark.bigquery.SchemaConverters;
import com.google.cloud.spark.bigquery.SchemaConvertersConfiguration;
import com.google.cloud.spark.bigquery.SparkBigQueryConfig;
import com.google.cloud.spark.bigquery.integration.model.Data;
import com.google.cloud.spark.bigquery.integration.model.Friend;
import com.google.cloud.spark.bigquery.integration.model.Link;
import com.google.cloud.spark.bigquery.integration.model.Person;
import com.google.cloud.spark.bigquery.integration.model.RangeData;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.truth.Truth;
import com.google.inject.ProvisionException;
import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TimeZone;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.feature.MinMaxScaler;
import org.apache.spark.ml.feature.MinMaxScalerModel;
import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.linalg.SQLDataTypes;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.package$;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.MetadataBuilder;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.hamcrest.CoreMatchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import scala.Some;

/* loaded from: input_file:com/google/cloud/spark/bigquery/integration/WriteIntegrationTestBase.class */
abstract class WriteIntegrationTestBase extends SparkBigQueryIntegrationTestBase {
    private static final TimeZone DEFAULT_TZ = TimeZone.getDefault();
    protected static AtomicInteger id = new AtomicInteger(0);
    protected final SparkBigQueryConfig.WriteMethod writeMethod;
    protected Class<? extends Exception> expectedExceptionOnExistingTable;
    protected BigQuery bq;
    protected Optional<DataType> timeStampNTZType;

    public WriteIntegrationTestBase(SparkBigQueryConfig.WriteMethod writeMethod) {
        this(writeMethod, IllegalArgumentException.class, Optional.empty());
    }

    public WriteIntegrationTestBase(SparkBigQueryConfig.WriteMethod writeMethod, DataType dataType) {
        this(writeMethod, IllegalArgumentException.class, Optional.of(dataType));
    }

    public WriteIntegrationTestBase(SparkBigQueryConfig.WriteMethod writeMethod, Class<? extends Exception> cls, Optional<DataType> optional) {
        this.writeMethod = writeMethod;
        this.expectedExceptionOnExistingTable = cls;
        this.bq = BigQueryOptions.getDefaultInstance().getService();
        this.timeStampNTZType = optional;
    }

    private Metadata metadata(Map<String, String> map) {
        MetadataBuilder metadataBuilder = new MetadataBuilder();
        map.forEach((str, str2) -> {
            metadataBuilder.putString(str, str2);
        });
        return metadataBuilder.build();
    }

    @Before
    public void createTestTableName() {
        this.testTable = "test_" + System.nanoTime();
    }

    @After
    public void resetDefaultTimeZone() {
        TimeZone.setDefault(DEFAULT_TZ);
    }

    private String createDiffInSchemaDestTable(String str) {
        String str2 = "dest_table_" + System.nanoTime();
        IntegrationTestUtils.runQuery(String.format(str, testDataset, str2));
        return str2;
    }

    protected Dataset<Row> initialData() {
        return this.spark.createDataset(Arrays.asList(new Person("Abc", Arrays.asList(new Friend(10, Arrays.asList(new Link("www.abc.com"))))), new Person("Def", Arrays.asList(new Friend(12, Arrays.asList(new Link("www.def.com")))))), Encoders.bean(Person.class)).toDF();
    }

    protected Dataset<Row> additonalData() {
        return this.spark.createDataset(Arrays.asList(new Person("Xyz", Arrays.asList(new Friend(10, Arrays.asList(new Link("www.xyz.com"))))), new Person("Pqr", Arrays.asList(new Friend(12, Arrays.asList(new Link("www.pqr.com")))))), Encoders.bean(Person.class)).toDF();
    }

    protected int testTableNumberOfRows() throws InterruptedException {
        return testTableNumberOfRows(this.testTable);
    }

    protected int testTableNumberOfRows(String str) throws InterruptedException {
        return (int) this.bq.query(QueryJobConfiguration.of(String.format("select * from %s.%s", testDataset.toString(), str)), new BigQuery.JobOption[0]).getTotalRows();
    }

    private StandardTableDefinition testPartitionedTableDefinition() {
        return this.bq.getTable(testDataset.toString(), this.testTable + "_partitioned", new BigQuery.TableOption[0]).getDefinition();
    }

    protected void writeToBigQueryAvroFormat(Dataset<Row> dataset, SaveMode saveMode, String str) {
        writeToBigQuery(dataset, saveMode, "avro", str);
    }

    protected void writeToBigQuery(Dataset<Row> dataset, SaveMode saveMode, String str) {
        writeToBigQuery(dataset, saveMode, str, "False");
    }

    protected void writeToBigQuery(Dataset<Row> dataset, SaveMode saveMode, String str, String str2) {
        dataset.write().format("bigquery").mode(saveMode).option("table", fullTableName()).option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET).option("intermediateFormat", str).option("writeMethod", this.writeMethod.toString()).option("writeAtLeastOnce", str2).save();
    }

    Dataset<Row> readAllTypesTable() {
        return this.spark.read().format("bigquery").option("dataset", testDataset.toString()).option("table", "all_types").load();
    }

    private void writeToBigQuery_AppendSaveMode_Internal(String str) throws InterruptedException {
        writeToBigQueryAvroFormat(initialData(), SaveMode.Append, str);
        Truth.assertThat(Integer.valueOf(testTableNumberOfRows())).isEqualTo(2);
        Truth.assertThat(Boolean.valueOf(initialDataValuesExist())).isTrue();
        writeToBigQueryAvroFormat(additonalData(), SaveMode.Append, str);
        Truth.assertThat(Integer.valueOf(testTableNumberOfRows())).isEqualTo(4);
        Truth.assertThat(Boolean.valueOf(additionalDataValuesExist())).isTrue();
    }

    @Test
    public void testWriteToBigQuery_AppendSaveMode() throws InterruptedException {
        writeToBigQuery_AppendSaveMode_Internal("False");
    }

    @Test
    public void testWriteToBigQuery_AppendSaveMode_AtLeastOnce() throws InterruptedException {
        Assume.assumeThat(this.writeMethod, CoreMatchers.equalTo(SparkBigQueryConfig.WriteMethod.DIRECT));
        writeToBigQuery_AppendSaveMode_Internal("True");
    }

    private void writeToBigQuery_WithTableLabels_Internal(String str) {
        initialData().write().format("bigquery").mode(SaveMode.Append).option("table", fullTableName()).option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET).option("intermediateFormat", "avro").option("writeMethod", this.writeMethod.toString()).option("writeAtLeastOnce", str).option("bigQueryTableLabel.alice", "bob").option("bigQueryTableLabel.foo", "bar").save();
        Map labels = this.bq.getTable(testDataset.toString(), this.testTable, new BigQuery.TableOption[0]).getLabels();
        Assert.assertEquals(2L, labels.size());
        Assert.assertEquals("bob", labels.get("alice"));
        Assert.assertEquals("bar", labels.get("foo"));
    }

    @Test
    public void testWriteToBigQuery_WithTableLabels() {
        writeToBigQuery_WithTableLabels_Internal("False");
    }

    @Test
    public void testWriteToBigQuery_WithTableLabels_AtLeastOnce() {
        Assume.assumeThat(this.writeMethod, CoreMatchers.equalTo(SparkBigQueryConfig.WriteMethod.DIRECT));
        writeToBigQuery_WithTableLabels_Internal("True");
    }

    private void writeToBigQuery_EnableListInference_Internal(String str) throws InterruptedException {
        Dataset<Row> initialData = initialData();
        initialData.write().format("bigquery").mode(SaveMode.Append).option("table", fullTableName()).option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET).option("intermediateFormat", "parquet").option("writeMethod", this.writeMethod.toString()).option("writeAtLeastOnce", str).option("enableListInference", true).save();
        Dataset load = this.spark.read().format("bigquery").option("dataset", testDataset.toString()).option("table", this.testTable).load();
        SchemaConverters from = SchemaConverters.from(SchemaConvertersConfiguration.createDefault());
        Assert.assertEquals(from.toBigQuerySchema(initialData.schema()), from.toBigQuerySchema(load.schema()));
    }

    @Test
    public void testWriteToBigQuery_EnableListInference() throws InterruptedException {
        writeToBigQuery_EnableListInference_Internal("False");
    }

    @Test
    public void testWriteToBigQuery_EnableListInference_AtLeastOnce() throws InterruptedException {
        Assume.assumeThat(this.writeMethod, CoreMatchers.equalTo(SparkBigQueryConfig.WriteMethod.DIRECT));
        writeToBigQuery_EnableListInference_Internal("True");
    }

    private void writeToBigQuery_ErrorIfExistsSaveMode_Internal(String str) throws InterruptedException {
        writeToBigQueryAvroFormat(initialData(), SaveMode.ErrorIfExists, str);
        Truth.assertThat(Integer.valueOf(testTableNumberOfRows())).isEqualTo(2);
        Truth.assertThat(Boolean.valueOf(initialDataValuesExist())).isTrue();
        Assert.assertThrows(this.expectedExceptionOnExistingTable, () -> {
            writeToBigQueryAvroFormat(additonalData(), SaveMode.ErrorIfExists, str);
        });
    }

    @Test
    public void testWriteToBigQuery_ErrorIfExistsSaveMode() throws InterruptedException {
        writeToBigQuery_ErrorIfExistsSaveMode_Internal("False");
    }

    @Test
    public void testWriteToBigQuery_ErrorIfExistsSaveMode_AtLeastOnce() throws InterruptedException {
        Assume.assumeThat(this.writeMethod, CoreMatchers.equalTo(SparkBigQueryConfig.WriteMethod.DIRECT));
        writeToBigQuery_ErrorIfExistsSaveMode_Internal("True");
    }

    private void writeToBigQuery_IgnoreSaveMode_Internal(String str) throws InterruptedException {
        writeToBigQueryAvroFormat(initialData(), SaveMode.Ignore, str);
        Truth.assertThat(Integer.valueOf(testTableNumberOfRows())).isEqualTo(2);
        Truth.assertThat(Boolean.valueOf(initialDataValuesExist())).isTrue();
        writeToBigQueryAvroFormat(additonalData(), SaveMode.Ignore, str);
        Truth.assertThat(Integer.valueOf(testTableNumberOfRows())).isEqualTo(2);
        Truth.assertThat(Boolean.valueOf(initialDataValuesExist())).isTrue();
        Truth.assertThat(Boolean.valueOf(additionalDataValuesExist())).isFalse();
    }

    @Test
    public void testWriteToBigQuery_IgnoreSaveMode() throws InterruptedException {
        writeToBigQuery_IgnoreSaveMode_Internal("False");
    }

    @Test
    public void testWriteToBigQuery_IgnoreSaveMode_AtLeastOnce() throws InterruptedException {
        Assume.assumeThat(this.writeMethod, CoreMatchers.equalTo(SparkBigQueryConfig.WriteMethod.DIRECT));
        writeToBigQuery_IgnoreSaveMode_Internal("True");
    }

    private void writeToBigQuery_OverwriteSaveMode_Internal(String str) throws InterruptedException {
        writeToBigQueryAvroFormat(initialData(), SaveMode.Overwrite, str);
        Truth.assertThat(Integer.valueOf(testTableNumberOfRows())).isEqualTo(2);
        Truth.assertThat(Boolean.valueOf(initialDataValuesExist())).isTrue();
        Thread.sleep(120000L);
        writeToBigQueryAvroFormat(additonalData(), SaveMode.Overwrite, str);
        Truth.assertThat(Integer.valueOf(testTableNumberOfRows())).isEqualTo(2);
        Truth.assertThat(Boolean.valueOf(initialDataValuesExist())).isFalse();
        Truth.assertThat(Boolean.valueOf(additionalDataValuesExist())).isTrue();
    }

    @Test
    public void testWriteToBigQuery_OverwriteSaveMode() throws InterruptedException {
        writeToBigQuery_OverwriteSaveMode_Internal("False");
    }

    @Test
    public void testWriteToBigQuery_OverwriteSaveMode_AtLeastOnce() throws InterruptedException {
        Assume.assumeThat(this.writeMethod, CoreMatchers.equalTo(SparkBigQueryConfig.WriteMethod.DIRECT));
        writeToBigQuery_OverwriteSaveMode_Internal("True");
    }

    @Test
    public void testWriteToBigQuery_AvroFormat() throws InterruptedException {
        writeToBigQuery(initialData(), SaveMode.ErrorIfExists, "avro");
        Truth.assertThat(Integer.valueOf(testTableNumberOfRows())).isEqualTo(2);
        Truth.assertThat(Boolean.valueOf(initialDataValuesExist())).isTrue();
    }

    private void writeToBigQuerySimplifiedApi_Internal(String str) throws InterruptedException {
        initialData().write().format("bigquery").option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET).option("writeMethod", this.writeMethod.toString()).option("writeAtLeastOnce", str).save(fullTableName());
        Truth.assertThat(Integer.valueOf(testTableNumberOfRows())).isEqualTo(2);
        Truth.assertThat(Boolean.valueOf(initialDataValuesExist())).isTrue();
    }

    @Test
    public void testWriteToBigQuerySimplifiedApi() throws InterruptedException {
        writeToBigQuerySimplifiedApi_Internal("False");
    }

    @Test
    public void testWriteToBigQuerySimplifiedApi_AtLeastOnce() throws InterruptedException {
        Assume.assumeThat(this.writeMethod, CoreMatchers.equalTo(SparkBigQueryConfig.WriteMethod.DIRECT));
        writeToBigQuerySimplifiedApi_Internal("True");
    }

    @Test
    public void testWriteToBigQueryAddingTheSettingsToSparkConf() throws InterruptedException {
        this.spark.conf().set("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET);
        initialData().write().format("bigquery").option("table", fullTableName()).option("writeMethod", this.writeMethod.toString()).save();
        Truth.assertThat(Integer.valueOf(testTableNumberOfRows())).isEqualTo(2);
        Truth.assertThat(Boolean.valueOf(initialDataValuesExist())).isTrue();
    }

    private void directWriteToBigQueryWithDiffInSchema_Internal(String str) throws Exception {
        Assume.assumeThat(this.writeMethod, CoreMatchers.equalTo(SparkBigQueryConfig.WriteMethod.DIRECT));
        String createDiffInSchemaDestTable = createDiffInSchemaDestTable(TestConstants.DIFF_IN_SCHEMA_DEST_TABLE);
        Truth.assertThat(Integer.valueOf(testTableNumberOfRows(createDiffInSchemaDestTable))).isEqualTo(0);
        this.spark.read().format("bigquery").option("table", testDataset + ".src_table").load().write().format("bigquery").mode(SaveMode.Append).option("writeMethod", this.writeMethod.toString()).option("writeAtLeastOnce", str).save(testDataset + "." + createDiffInSchemaDestTable);
        Truth.assertThat(Integer.valueOf(testTableNumberOfRows(createDiffInSchemaDestTable))).isEqualTo(1);
    }

    @Test
    public void testDirectWriteToBigQueryWithDiffInSchema() throws Exception {
        directWriteToBigQueryWithDiffInSchema_Internal("False");
    }

    @Test
    public void testDirectWriteToBigQueryWithDiffInSchema_AtLeastOnce() throws Exception {
        Assume.assumeThat(this.writeMethod, CoreMatchers.equalTo(SparkBigQueryConfig.WriteMethod.DIRECT));
        directWriteToBigQueryWithDiffInSchema_Internal("True");
    }

    private void directWriteToBigQueryWithDiffInSchemaAndDisableModeCheck_Internal(String str) throws Exception {
        Assume.assumeThat(this.writeMethod, CoreMatchers.equalTo(SparkBigQueryConfig.WriteMethod.DIRECT));
        String createDiffInSchemaDestTable = createDiffInSchemaDestTable(TestConstants.DIFF_IN_SCHEMA_DEST_TABLE);
        this.spark.read().format("bigquery").option("table", testDataset + ".src_table").load().write().format("bigquery").mode(SaveMode.Append).option("writeMethod", this.writeMethod.toString()).option("writeAtLeastOnce", str).option("enableModeCheckForSchemaFields", false).save(testDataset + "." + createDiffInSchemaDestTable);
        Truth.assertThat(Integer.valueOf(testTableNumberOfRows(createDiffInSchemaDestTable))).isEqualTo(1);
    }

    @Test
    public void testDirectWriteToBigQueryWithDiffInSchemaAndDisableModeCheck() throws Exception {
        directWriteToBigQueryWithDiffInSchemaAndDisableModeCheck_Internal("False");
    }

    @Test
    public void testDirectWriteToBigQueryWithDiffInSchemaAndDisableModeCheck_AtLeastOnce() throws Exception {
        Assume.assumeThat(this.writeMethod, CoreMatchers.equalTo(SparkBigQueryConfig.WriteMethod.DIRECT));
        directWriteToBigQueryWithDiffInSchemaAndDisableModeCheck_Internal("True");
    }

    private void directWriteToBigQueryWithDiffInDescription_Internal(String str) throws Exception {
        Assume.assumeThat(this.writeMethod, CoreMatchers.equalTo(SparkBigQueryConfig.WriteMethod.DIRECT));
        String createDiffInSchemaDestTable = createDiffInSchemaDestTable(TestConstants.DIFF_IN_SCHEMA_DEST_TABLE);
        Truth.assertThat(Integer.valueOf(testTableNumberOfRows(createDiffInSchemaDestTable))).isEqualTo(0);
        this.spark.read().format("bigquery").option("table", testDataset + ".src_table_with_description").load().write().format("bigquery").mode(SaveMode.Append).option("writeMethod", this.writeMethod.toString()).option("writeAtLeastOnce", str).save(testDataset + "." + createDiffInSchemaDestTable);
        Truth.assertThat(Integer.valueOf(testTableNumberOfRows(createDiffInSchemaDestTable))).isEqualTo(1);
    }

    @Test
    public void testDirectWriteToBigQueryWithDiffInDescription() throws Exception {
        directWriteToBigQueryWithDiffInDescription_Internal("False");
    }

    @Test
    public void testDirectWriteToBigQueryWithDiffInDescription_AtLeastOnce() throws Exception {
        Assume.assumeThat(this.writeMethod, CoreMatchers.equalTo(SparkBigQueryConfig.WriteMethod.DIRECT));
        directWriteToBigQueryWithDiffInDescription_Internal("True");
    }

    @Test
    public void testInDirectWriteToBigQueryWithDiffInSchemaAndModeCheck() throws Exception {
        Assume.assumeThat(this.writeMethod, CoreMatchers.equalTo(SparkBigQueryConfig.WriteMethod.INDIRECT));
        String createDiffInSchemaDestTable = createDiffInSchemaDestTable(TestConstants.DIFF_IN_SCHEMA_DEST_TABLE);
        this.spark.read().format("bigquery").option("table", testDataset + ".src_table").load().write().format("bigquery").mode(SaveMode.Append).option("writeMethod", this.writeMethod.toString()).option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET).option("enableModeCheckForSchemaFields", true).save(testDataset + "." + createDiffInSchemaDestTable);
        Truth.assertThat(Integer.valueOf(testTableNumberOfRows(createDiffInSchemaDestTable))).isEqualTo(1);
    }

    @Test
    public void testIndirectWriteToBigQueryWithDiffInSchemaNullableFieldAndDisableModeCheck() throws Exception {
        Assume.assumeThat(this.writeMethod, CoreMatchers.equalTo(SparkBigQueryConfig.WriteMethod.INDIRECT));
        String createDiffInSchemaDestTable = createDiffInSchemaDestTable(TestConstants.DIFF_IN_SCHEMA_DEST_TABLE);
        this.spark.read().format("bigquery").option("table", testDataset + ".src_table").load().write().format("bigquery").mode(SaveMode.Append).option("writeMethod", this.writeMethod.toString()).option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET).option("enableModeCheckForSchemaFields", false).save(testDataset + "." + createDiffInSchemaDestTable);
        Truth.assertThat(Integer.valueOf(testTableNumberOfRows(createDiffInSchemaDestTable))).isEqualTo(1);
    }

    @Test
    public void testInDirectWriteToBigQueryWithDiffInDescription() throws Exception {
        Assume.assumeThat(this.writeMethod, CoreMatchers.equalTo(SparkBigQueryConfig.WriteMethod.INDIRECT));
        String createDiffInSchemaDestTable = createDiffInSchemaDestTable(TestConstants.DIFF_IN_SCHEMA_DEST_TABLE);
        this.spark.read().format("bigquery").option("table", testDataset + ".src_table_with_description").load().write().format("bigquery").mode(SaveMode.Append).option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET).option("writeMethod", this.writeMethod.toString()).save(testDataset + "." + createDiffInSchemaDestTable);
        Truth.assertThat(Integer.valueOf(testTableNumberOfRows(createDiffInSchemaDestTable))).isEqualTo(1);
    }

    private void writeDFNullableToBigQueryNullable_Internal(String str) throws Exception {
        String createDiffInSchemaDestTable = createDiffInSchemaDestTable(TestConstants.DIFF_IN_SCHEMA_DEST_TABLE_WITH_NULLABLE_FIELD);
        this.spark.createDataFrame(Arrays.asList(RowFactory.create(new Object[]{25}), RowFactory.create(new Object[]{null})), structType(StructField.apply("int_null", DataTypes.IntegerType, true, Metadata.empty()))).write().format("bigquery").mode(SaveMode.Append).option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET).option("writeMethod", this.writeMethod.toString()).option("writeAtLeastOnce", str).save(testDataset + "." + createDiffInSchemaDestTable);
        Truth.assertThat(Integer.valueOf(testTableNumberOfRows(createDiffInSchemaDestTable))).isEqualTo(2);
    }

    @Test
    public void testWriteDFNullableToBigQueryNullable() throws Exception {
        writeDFNullableToBigQueryNullable_Internal("False");
    }

    @Test
    public void testWriteDFNullableToBigQueryNullable_AtLeastOnce() throws Exception {
        Assume.assumeThat(this.writeMethod, CoreMatchers.equalTo(SparkBigQueryConfig.WriteMethod.DIRECT));
        writeDFNullableToBigQueryNullable_Internal("True");
    }

    private void writeDFNullableWithNonNullDataToBigQueryRequired_Internal(String str) throws Exception {
        Assume.assumeThat(this.writeMethod, CoreMatchers.equalTo(SparkBigQueryConfig.WriteMethod.DIRECT));
        String createDiffInSchemaDestTable = createDiffInSchemaDestTable(TestConstants.DIFF_IN_SCHEMA_DEST_TABLE_WITH_REQUIRED_FIELD);
        this.spark.createDataFrame(Arrays.asList(RowFactory.create(new Object[]{25})), structType(StructField.apply("int_req", DataTypes.IntegerType, true, Metadata.empty()))).write().format("bigquery").mode(SaveMode.Append).option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET).option("writeMethod", this.writeMethod.toString()).option("writeAtLeastOnce", str).save(testDataset + "." + createDiffInSchemaDestTable);
        Truth.assertThat(Integer.valueOf(testTableNumberOfRows(createDiffInSchemaDestTable))).isEqualTo(1);
    }

    @Test
    public void testWriteDFNullableWithNonNullDataToBigQueryRequired() throws Exception {
        writeDFNullableWithNonNullDataToBigQueryRequired_Internal("False");
    }

    @Test
    public void testWriteDFNullableWithNonNullDataToBigQueryRequired_AtLeastOnce() throws Exception {
        Assume.assumeThat(this.writeMethod, CoreMatchers.equalTo(SparkBigQueryConfig.WriteMethod.DIRECT));
        writeDFNullableWithNonNullDataToBigQueryRequired_Internal("True");
    }

    @Test
    public void testWriteNullableDFWithNullDataToBigQueryRequired() {
        Assume.assumeThat(this.writeMethod, CoreMatchers.equalTo(SparkBigQueryConfig.WriteMethod.DIRECT));
        String createDiffInSchemaDestTable = createDiffInSchemaDestTable(TestConstants.DIFF_IN_SCHEMA_DEST_TABLE_WITH_REQUIRED_FIELD);
        Dataset createDataFrame = this.spark.createDataFrame(Arrays.asList(RowFactory.create(new Object[]{null})), structType(StructField.apply("int_req", DataTypes.IntegerType, true, Metadata.empty())));
        Assert.assertThrows("INVALID_ARGUMENT: Errors found while processing rows.", Exception.class, () -> {
            createDataFrame.write().format("bigquery").mode(SaveMode.Append).option("writeMethod", this.writeMethod.toString()).save(testDataset + "." + createDiffInSchemaDestTable);
        });
    }

    @Test
    public void testWriteNullableDFToBigQueryRepeated() {
        Assume.assumeThat(this.writeMethod, CoreMatchers.equalTo(SparkBigQueryConfig.WriteMethod.DIRECT));
        String createDiffInSchemaDestTable = createDiffInSchemaDestTable(TestConstants.DIFF_IN_SCHEMA_DEST_TABLE_WITH_REPEATED_FIELD);
        Dataset createDataFrame = this.spark.createDataFrame(Arrays.asList(RowFactory.create(new Object[]{10})), structType(StructField.apply("int_rep", DataTypes.IntegerType, true, Metadata.empty())));
        Assert.assertThrows(ProvisionException.class, () -> {
            createDataFrame.write().format("bigquery").mode(SaveMode.Append).option("writeMethod", this.writeMethod.toString()).save(testDataset + "." + createDiffInSchemaDestTable);
        });
    }

    @Test
    public void testWriteRequiredDFToBigQueryNullable() throws Exception {
        String createDiffInSchemaDestTable = createDiffInSchemaDestTable(TestConstants.DIFF_IN_SCHEMA_DEST_TABLE_WITH_NULLABLE_FIELD);
        this.spark.createDataFrame(Arrays.asList(RowFactory.create(new Object[]{25})), structType(StructField.apply("int_null", DataTypes.IntegerType, false, Metadata.empty()))).write().format("bigquery").mode(SaveMode.Append).option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET).option("writeMethod", this.writeMethod.toString()).save(testDataset + "." + createDiffInSchemaDestTable);
        Truth.assertThat(Integer.valueOf(testTableNumberOfRows(createDiffInSchemaDestTable))).isEqualTo(1);
    }

    @Test
    public void testWriteRequiredDFToBigQueryRequired() throws Exception {
        String createDiffInSchemaDestTable = createDiffInSchemaDestTable(TestConstants.DIFF_IN_SCHEMA_DEST_TABLE_WITH_REQUIRED_FIELD);
        this.spark.createDataFrame(Arrays.asList(RowFactory.create(new Object[]{25})), structType(StructField.apply("int_req", DataTypes.IntegerType, false, Metadata.empty()))).write().format("bigquery").mode(SaveMode.Append).option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET).option("writeMethod", this.writeMethod.toString()).save(testDataset + "." + createDiffInSchemaDestTable);
        Truth.assertThat(Integer.valueOf(testTableNumberOfRows(createDiffInSchemaDestTable))).isEqualTo(1);
    }

    @Test
    public void testWriteRequiredDFToBigQueryRepeated() {
        Assume.assumeThat(this.writeMethod, CoreMatchers.equalTo(SparkBigQueryConfig.WriteMethod.DIRECT));
        String createDiffInSchemaDestTable = createDiffInSchemaDestTable(TestConstants.DIFF_IN_SCHEMA_DEST_TABLE_WITH_REPEATED_FIELD);
        Dataset createDataFrame = this.spark.createDataFrame(Arrays.asList(RowFactory.create(new Object[]{10})), structType(StructField.apply("int_rep", DataTypes.IntegerType, false, Metadata.empty())));
        Assert.assertThrows(ProvisionException.class, () -> {
            createDataFrame.write().format("bigquery").mode(SaveMode.Append).option("writeMethod", this.writeMethod.toString()).save(testDataset + "." + createDiffInSchemaDestTable);
        });
    }

    @Test
    public void testWriteRepeatedDFToBigQueryNullable() {
        Assume.assumeThat(this.writeMethod, CoreMatchers.equalTo(SparkBigQueryConfig.WriteMethod.DIRECT));
        String createDiffInSchemaDestTable = createDiffInSchemaDestTable(TestConstants.DIFF_IN_SCHEMA_DEST_TABLE_WITH_NULLABLE_FIELD);
        Dataset createDataFrame = this.spark.createDataFrame(Arrays.asList(RowFactory.create(new Object[]{Arrays.asList(1, 2)}), RowFactory.create(new Object[]{null})), structType(StructField.apply("int_null", DataTypes.createArrayType(DataTypes.IntegerType), true, Metadata.empty())));
        Assert.assertThrows(ProvisionException.class, () -> {
            createDataFrame.write().format("bigquery").mode(SaveMode.Append).option("writeMethod", this.writeMethod.toString()).save(testDataset + "." + createDiffInSchemaDestTable);
        });
    }

    @Test
    public void testWriteRepeatedDFToBigQueryRequired() {
        Assume.assumeThat(this.writeMethod, CoreMatchers.equalTo(SparkBigQueryConfig.WriteMethod.DIRECT));
        String createDiffInSchemaDestTable = createDiffInSchemaDestTable(TestConstants.DIFF_IN_SCHEMA_DEST_TABLE_WITH_REQUIRED_FIELD);
        Dataset createDataFrame = this.spark.createDataFrame(Arrays.asList(RowFactory.create(new Object[]{Arrays.asList(1, 2)})), structType(StructField.apply("int_req", DataTypes.createArrayType(DataTypes.IntegerType), true, Metadata.empty())));
        Assert.assertThrows(ProvisionException.class, () -> {
            createDataFrame.write().format("bigquery").mode(SaveMode.Append).option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET).option("writeMethod", this.writeMethod.toString()).save(testDataset + "." + createDiffInSchemaDestTable);
        });
    }

    @Test
    public void testWriteRepeatedDFToBigQueryRepeated() {
        this.spark.createDataFrame(Arrays.asList(RowFactory.create(new Object[]{Arrays.asList(1, 2)}), RowFactory.create(new Object[]{null})), structType(StructField.apply("int_rep", DataTypes.createArrayType(DataTypes.IntegerType), true, Metadata.empty()))).write().format("bigquery").mode(SaveMode.Append).option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET).option("writeMethod", this.writeMethod.toString()).save(testDataset + "." + createDiffInSchemaDestTable(TestConstants.DIFF_IN_SCHEMA_DEST_TABLE_WITH_REPEATED_FIELD));
    }

    @Test
    public void testWriteToBigQueryPartitionedAndClusteredTable() {
        Assume.assumeThat(this.writeMethod, CoreMatchers.equalTo(SparkBigQueryConfig.WriteMethod.INDIRECT));
        this.spark.read().format("bigquery").option("table", "bigquery-public-data.libraries_io.projects").load().where("platform = 'Sublime'").write().format("bigquery").option("table", fullTableNamePartitioned()).option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET).option("partitionField", "created_timestamp").option("clusteredFields", "platform").option("writeMethod", this.writeMethod.toString()).mode(SaveMode.Overwrite).save();
        StandardTableDefinition testPartitionedTableDefinition = testPartitionedTableDefinition();
        Truth.assertThat(testPartitionedTableDefinition.getTimePartitioning().getField()).isEqualTo("created_timestamp");
        Truth.assertThat(testPartitionedTableDefinition.getClustering().getFields()).contains("platform");
    }

    @Test
    public void testWriteToBigQueryClusteredTable() {
        this.spark.read().format("bigquery").option("table", "bigquery-public-data.libraries_io.projects").load().where("platform = 'Sublime'").write().format("bigquery").option("table", fullTableName()).option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET).option("clusteredFields", "platform").option("writeMethod", this.writeMethod.toString()).mode(SaveMode.Append).save();
        Truth.assertThat(this.bq.getTable(testDataset.toString(), this.testTable, new BigQuery.TableOption[0]).getDefinition().getClustering().getFields()).contains("platform");
    }

    @Test
    public void testWriteWithTableLabels() throws Exception {
        Dataset<Row> initialData = initialData();
        this.spark.conf().set("bigQueryTableLabel.foo", "bar");
        initialData.write().format("bigquery").option("writeMethod", this.writeMethod.toString()).option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET).option("dataset", testDataset.toString()).option("table", this.testTable).save();
        this.spark.conf().unset("bigQueryTableLabel.foo");
        Table table = IntegrationTestUtils.getBigquery().getTable(TableId.of(testDataset.toString(), this.testTable), new BigQuery.TableOption[0]);
        Truth.assertThat(table).isNotNull();
        Map labels = table.getLabels();
        Truth.assertThat(labels).isNotNull();
        Truth.assertThat(labels).containsEntry("foo", "bar");
    }

    protected Dataset<Row> overwriteSinglePartition(StructField structField) {
        this.bq.create(TableInfo.of(TableId.of(testDataset.toString(), fullTableNamePartitioned() + "_" + id.getAndIncrement()), StandardTableDefinition.newBuilder().setSchema(Schema.of(new Field[]{Field.of("the_date", LegacySQLTypeName.DATE, new Field[0]), Field.of("some_text", LegacySQLTypeName.STRING, new Field[0])})).setTimePartitioning(TimePartitioning.newBuilder(TimePartitioning.Type.DAY).setField("the_date").build()).build()), new BigQuery.TableOption[0]);
        try {
            this.bq.query(QueryJobConfiguration.of(String.format("insert into `" + fullTableName() + "` (the_date, some_text) values ('2020-07-01', 'foo'), ('2020-07-02', 'bar')", new Object[0])), new BigQuery.JobOption[0]);
            this.spark.createDataFrame(Arrays.asList(RowFactory.create(new Object[]{Date.valueOf("2020-07-01"), "baz"})), new StructType(new StructField[]{structField, new StructField("some_text", DataTypes.StringType, true, Metadata.empty())})).write().format("bigquery").option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET).option("datePartition", "20200701").mode("overwrite").save(fullTableName());
            Dataset<Row> load = this.spark.read().format("bigquery").load(fullTableName());
            List collectAsList = load.collectAsList();
            Truth.assertThat(collectAsList).hasSize(2);
            Truth.assertThat(Long.valueOf(collectAsList.stream().filter(row -> {
                return row.getString(1).equals("bar");
            }).count())).isEqualTo(1);
            Truth.assertThat(Long.valueOf(collectAsList.stream().filter(row2 -> {
                return row2.getString(1).equals("baz");
            }).count())).isEqualTo(1);
            return load;
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public void testOverwriteSinglePartition() {
        overwriteSinglePartition(new StructField("the_date", DataTypes.DateType, true, Metadata.empty()));
    }

    public void testOverwriteSinglePartitionWithComment() {
        Truth.assertThat(overwriteSinglePartition(new StructField("the_date", DataTypes.DateType, true, Metadata.empty()).withComment("the partition field")).schema().fields()[0].getComment()).isEqualTo(Some.apply("the partition field"));
    }

    @Test
    public void testWriteToBigQueryWithDescription() {
        Metadata fromJson = Metadata.fromJson("{\"description\": \"test description\"}");
        StructType[] structTypeArr = {structType(new StructField("c1", DataTypes.IntegerType, true, fromJson)), structType(new StructField("c1", DataTypes.IntegerType, true, Metadata.empty()).withComment("test comment")), structType(new StructField("c1", DataTypes.IntegerType, true, fromJson).withComment("test comment")), structType(new StructField("c1", DataTypes.IntegerType, true, Metadata.empty()))};
        String[] strArr = {"test description", "test comment", "test comment", null};
        for (int i = 0; i < structTypeArr.length; i++) {
            this.spark.createDataFrame(Arrays.asList(RowFactory.create(new Object[]{100}), RowFactory.create(new Object[]{200})), structTypeArr[i]).write().format("bigquery").mode(SaveMode.Overwrite).option("table", fullTableName() + "_" + i).option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET).option("intermediateFormat", "parquet").option("writeMethod", this.writeMethod.toString()).save();
            Optional descriptionOrCommentOfField = SchemaConverters.getDescriptionOrCommentOfField(this.spark.read().format("bigquery").option("dataset", testDataset.toString()).option("table", this.testTable + "_" + i).load().schema().fields()[0], Optional.empty());
            if (strArr[i] != null) {
                Truth.assertThat(Boolean.valueOf(descriptionOrCommentOfField.isPresent())).isTrue();
                Truth.assertThat((String) descriptionOrCommentOfField.orElse("")).isEqualTo(strArr[i]);
            } else {
                Truth.assertThat(Boolean.valueOf(descriptionOrCommentOfField.isPresent())).isFalse();
            }
        }
    }

    @Test
    public void testWriteEmptyDataFrame() throws Exception {
        writeToBigQueryAvroFormat(this.spark.createDataFrame(Collections.emptyList(), Link.class), SaveMode.Append, "False");
        Truth.assertThat(Integer.valueOf(testTableNumberOfRows())).isEqualTo(0);
    }

    protected StructType structType(StructField... structFieldArr) {
        return new StructType(structFieldArr);
    }

    @Test
    public void testPartition_Hourly() {
        testPartition("HOUR");
    }

    @Test
    public void testPartition_Daily() {
        testPartition("DAY");
    }

    @Test
    public void testPartition_Monthly() {
        testPartition("MONTH");
    }

    @Test
    public void testPartition_Yearly() {
        testPartition("YEAR");
    }

    private void testPartition(String str) {
        Assume.assumeThat(this.writeMethod, CoreMatchers.equalTo(SparkBigQueryConfig.WriteMethod.INDIRECT));
        Dataset df = this.spark.createDataset(Arrays.asList(new Data("a", Timestamp.valueOf("2020-01-01 01:01:01")), new Data("b", Timestamp.valueOf("2020-01-02 02:02:02")), new Data("c", Timestamp.valueOf("2020-01-03 03:03:03"))), Encoders.bean(Data.class)).toDF();
        String str2 = testDataset.toString() + "." + this.testTable + "_" + str;
        df.write().format("bigquery").option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET).option("partitionField", "ts").option("partitionType", str).option("partitionRequireFilter", "true").option("table", str2).option("writeMethod", this.writeMethod.toString()).save();
        Truth.assertThat(Long.valueOf(this.spark.read().format("bigquery").load(str2).count())).isEqualTo(3);
    }

    @Test
    public void testPartitionRange() {
        Assume.assumeThat(this.writeMethod, CoreMatchers.equalTo(SparkBigQueryConfig.WriteMethod.INDIRECT));
        Dataset df = this.spark.createDataset(Arrays.asList(new RangeData("a", 1L), new RangeData("b", 5L), new RangeData("c", 11L)), Encoders.bean(RangeData.class)).toDF();
        String str = testDataset.toString() + "." + this.testTable + "_range";
        df.write().format("bigquery").option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET).option("partitionField", "rng").option("partitionRangeStart", "1").option("partitionRangeEnd", "21").option("partitionRangeInterval", "2").option("partitionRequireFilter", "true").option("table", str).option("writeMethod", this.writeMethod.toString()).save();
        Truth.assertThat(Long.valueOf(this.spark.read().format("bigquery").load(str).count())).isEqualTo(3);
        Table table = this.bq.getTable(TableId.of(testDataset.toString(), this.testTable + "_range"), new BigQuery.TableOption[0]);
        Truth.assertThat(table).isNotNull();
        Assert.assertTrue(table.getDefinition() instanceof StandardTableDefinition);
        StandardTableDefinition definition = table.getDefinition();
        Truth.assertThat(definition.getRangePartitioning()).isNotNull();
        Truth.assertThat(definition.getRangePartitioning().getRange()).isEqualTo(RangePartitioning.Range.newBuilder().setStart(1L).setEnd(21L).setInterval(2L).build());
        Truth.assertThat(definition.getRangePartitioning().getField()).isEqualTo("rng");
    }

    @Test
    public void testCacheDataFrameInDataSource() {
        Assume.assumeThat(this.writeMethod, CoreMatchers.equalTo(SparkBigQueryConfig.WriteMethod.INDIRECT));
        Dataset<Row> readAllTypesTable = readAllTypesTable();
        writeToBigQuery(readAllTypesTable, SaveMode.Overwrite, "avro", "False");
        Dataset cache = this.spark.read().format("bigquery").option("dataset", testDataset.toString()).option("table", this.testTable).option("readDataFormat", "arrow").load().cache();
        Truth.assertThat(cache.head()).isEqualTo(readAllTypesTable.head());
        Truth.assertThat(cache.head()).isEqualTo(readAllTypesTable.head());
        Truth.assertThat(cache.schema()).isEqualTo(readAllTypesTable.schema());
    }

    @Test
    public void testWriteJsonToANewTable() throws Exception {
        Assume.assumeThat(this.writeMethod, CoreMatchers.equalTo(SparkBigQueryConfig.WriteMethod.INDIRECT));
        this.spark.createDataFrame(Arrays.asList(RowFactory.create(new Object[]{"{\"key\":\"foo\",\"value\":1}"}), RowFactory.create(new Object[]{"{\"key\":\"bar\",\"value\":2}"})), structType(StructField.apply("jf", DataTypes.StringType, true, Metadata.fromJson("{\"sqlType\":\"JSON\"}")))).write().format("bigquery").mode(SaveMode.Append).option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET).option("intermediateFormat", "AVRO").option("dataset", testDataset.toString()).option("table", this.testTable).option("writeMethod", this.writeMethod.toString()).save();
        Table table = this.bq.getTable(TableId.of(testDataset.toString(), this.testTable), new BigQuery.TableOption[0]);
        Truth.assertThat(table).isNotNull();
        Schema schema = table.getDefinition().getSchema();
        Truth.assertThat(schema).isNotNull();
        Truth.assertThat(schema.getFields()).hasSize(1);
        Truth.assertThat(schema.getFields().get(0).getType()).isEqualTo(LegacySQLTypeName.JSON);
        Truth.assertThat((List) this.spark.read().format("bigquery").option("viewsEnabled", "true").option("materializationDataset", testDataset.toString()).load(String.format("SELECT jf.value FROM `%s.%s`", testDataset.toString(), this.testTable)).collectAsList().stream().map(row -> {
            return row.getString(0);
        }).collect(Collectors.toList())).containsExactly(new Object[]{"1", "2"});
    }

    @Test
    public void testWriteJsonToAnExistingTable() throws Exception {
        Assume.assumeThat(this.writeMethod, CoreMatchers.equalTo(SparkBigQueryConfig.WriteMethod.INDIRECT));
        Truth.assertThat(this.bq.create(TableInfo.of(TableId.of(testDataset.toString(), this.testTable), StandardTableDefinition.of(Schema.of(new Field[]{Field.of("jf", LegacySQLTypeName.JSON, new Field[0])}))), new BigQuery.TableOption[0])).isNotNull();
        this.spark.createDataFrame(Arrays.asList(RowFactory.create(new Object[]{"{\"key\":\"foo\",\"value\":1}"}), RowFactory.create(new Object[]{"{\"key\":\"bar\",\"value\":2}"})), structType(StructField.apply("jf", DataTypes.StringType, true, Metadata.fromJson("{\"sqlType\":\"JSON\"}")))).write().format("bigquery").mode(SaveMode.Append).option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET).option("intermediateFormat", "AVRO").option("dataset", testDataset.toString()).option("table", this.testTable).option("writeMethod", this.writeMethod.toString()).save();
        Truth.assertThat((List) this.spark.read().format("bigquery").option("viewsEnabled", "true").option("materializationDataset", testDataset.toString()).load(String.format("SELECT jf.value FROM `%s.%s`", testDataset.toString(), this.testTable)).collectAsList().stream().map(row -> {
            return row.getString(0);
        }).collect(Collectors.toList())).containsExactly(new Object[]{"1", "2"});
    }

    @Test
    public void testWriteMapToANewTable() throws Exception {
        this.spark.createDataFrame(Arrays.asList(RowFactory.create(new Object[]{ImmutableMap.of("a", new Long(1L), "b", new Long(2L))}), RowFactory.create(new Object[]{ImmutableMap.of("c", new Long(3L))})), structType(StructField.apply("mf", DataTypes.createMapType(DataTypes.StringType, DataTypes.LongType), false, Metadata.empty()))).write().format("bigquery").mode(SaveMode.Append).option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET).option("intermediateFormat", "AVRO").option("dataset", testDataset.toString()).option("table", this.testTable).option("writeMethod", this.writeMethod.toString()).save();
        Table table = this.bq.getTable(TableId.of(testDataset.toString(), this.testTable), new BigQuery.TableOption[0]);
        Truth.assertThat(table).isNotNull();
        Schema schema = table.getDefinition().getSchema();
        Truth.assertThat(schema).isNotNull();
        Truth.assertThat(schema.getFields()).hasSize(1);
        Field field = schema.getFields().get(0);
        Truth.assertThat(field.getType()).isEqualTo(LegacySQLTypeName.RECORD);
        Truth.assertThat(field.getMode()).isEqualTo(Field.Mode.REPEATED);
        Truth.assertThat(field.getSubFields()).containsExactlyElementsIn(Arrays.asList(Field.newBuilder("key", LegacySQLTypeName.STRING, new Field[0]).setMode(Field.Mode.REQUIRED).build(), Field.newBuilder("value", LegacySQLTypeName.INTEGER, new Field[0]).setMode(Field.Mode.NULLABLE).build()));
        Row row = (Row) this.spark.read().format("bigquery").option("viewsEnabled", "true").option("materializationDataset", testDataset.toString()).load("SELECT\n  (SELECT COUNT(f.key) FROM TABLE, UNNEST(mf) AS f) AS total_keys,\n  (SELECT COUNT(*) FROM TABLE) AS total_rows,\n  (SELECT f.value FROM TABLE, UNNEST(mf) AS f WHERE f.key='b') AS b_value;".replaceAll("TABLE", testDataset.toString() + "." + this.testTable)).head();
        Truth.assertThat(Long.valueOf(row.getLong(0))).isEqualTo(3L);
        Truth.assertThat(Long.valueOf(row.getLong(1))).isEqualTo(2L);
        Truth.assertThat(Long.valueOf(row.getLong(2))).isEqualTo(2L);
    }

    @Test
    public void testAllowFieldAddition() throws Exception {
        Assume.assumeThat(this.writeMethod, CoreMatchers.equalTo(SparkBigQueryConfig.WriteMethod.INDIRECT));
        this.spark.createDataFrame(Arrays.asList(RowFactory.create(new Object[]{"val1", Date.valueOf("2023-04-13")}), RowFactory.create(new Object[]{"val2", Date.valueOf("2023-04-14")})), structType(StructField.apply("value", DataTypes.StringType, true, Metadata.empty()), StructField.apply("ds", DataTypes.DateType, true, Metadata.empty()))).write().format("bigquery").mode(SaveMode.Overwrite).option("dataset", testDataset.toString()).option("table", this.testTable).option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET).option("intermediateFormat", "avro").option("writeMethod", this.writeMethod.toString()).save();
        Truth.assertThat(Integer.valueOf(testTableNumberOfRows())).isEqualTo(2);
        this.spark.createDataFrame(Arrays.asList(RowFactory.create(new Object[]{"val3", Date.valueOf("2023-04-15"), "newVal1"})), structType(StructField.apply("value", DataTypes.StringType, true, Metadata.empty()), StructField.apply("ds", DataTypes.DateType, true, Metadata.empty()), StructField.apply("new_field", DataTypes.StringType, true, Metadata.empty()))).write().format("bigquery").mode(SaveMode.Append).option("dataset", testDataset.toString()).option("table", this.testTable).option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET).option("writeMethod", this.writeMethod.toString()).option("allowFieldAddition", "true").option("allowFieldRelaxation", "true").save();
        Truth.assertThat(Integer.valueOf(testTableNumberOfRows())).isEqualTo(3);
        List collectAsList = this.spark.read().format("bigquery").option("dataset", testDataset.toString()).option("table", this.testTable).load().collectAsList();
        Truth.assertThat(collectAsList).hasSize(3);
        Truth.assertThat(Long.valueOf(collectAsList.stream().filter(row -> {
            return row.getString(2) == null;
        }).count())).isEqualTo(2);
        Truth.assertThat(Long.valueOf(collectAsList.stream().filter(row2 -> {
            return row2.getString(2) != null && row2.getString(2).equals("newVal1");
        }).count())).isEqualTo(1);
    }

    @Test
    public void testWriteToCmekManagedTable() throws Exception {
        String str = (String) Preconditions.checkNotNull(System.getenv("BIGQUERY_KMS_KEY_NAME"), "Please set the BIGQUERY_KMS_KEY_NAME to point to a pre-generated and configured KMS key");
        initialData().write().format("bigquery").mode(SaveMode.Append).option("table", fullTableName()).option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET).option("writeMethod", this.writeMethod.toString()).option("destinationTableKmsKeyName", str).save();
        Table table = IntegrationTestUtils.getBigquery().getTable(TableId.of(testDataset.toString(), this.testTable), new BigQuery.TableOption[0]);
        Truth.assertThat(table).isNotNull();
        Truth.assertThat(table.getEncryptionConfiguration()).isNotNull();
        Truth.assertThat(table.getEncryptionConfiguration().getKmsKeyName()).isEqualTo(str);
    }

    @Test
    public void testWriteNumericsToWiderFields() throws Exception {
        IntegrationTestUtils.runQuery(String.format("CREATE TABLE `%s.%s` (num NUMERIC(10,2), bignum BIGNUMERIC(20,15))", testDataset, this.testTable));
        this.spark.createDataFrame(Arrays.asList(RowFactory.create(new Object[]{Decimal.apply("12345.6"), Decimal.apply("12345.12345")})), structType(StructField.apply("num", DataTypes.createDecimalType(6, 1), true, Metadata.empty()), StructField.apply("bignum", DataTypes.createDecimalType(10, 5), true, Metadata.empty()))).write().format("bigquery").mode(SaveMode.Append).option("dataset", testDataset.toString()).option("table", this.testTable).option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET).option("writeMethod", this.writeMethod.toString()).save();
        List collectAsList = this.spark.read().format("bigquery").option("dataset", testDataset.toString()).option("table", this.testTable).load().collectAsList();
        Truth.assertThat(collectAsList).hasSize(1);
        Row row = (Row) collectAsList.get(0);
        Truth.assertThat(row.getDecimal(row.fieldIndex("num"))).isEqualTo(new BigDecimal("12345.60"));
        Truth.assertThat(row.getDecimal(row.fieldIndex("bignum"))).isEqualTo(new BigDecimal("12345.123450000000000"));
    }

    private void testWriteStringToTimeField_internal(SaveMode saveMode) {
        Assume.assumeThat(this.writeMethod, CoreMatchers.equalTo(SparkBigQueryConfig.WriteMethod.DIRECT));
        IntegrationTestUtils.runQuery(String.format("CREATE TABLE `%s.%s` (name STRING, wake_up_time TIME)", testDataset, this.testTable));
        this.spark.createDataFrame(Arrays.asList(RowFactory.create(new Object[]{"abc", "10:00:00"})), structType(StructField.apply("name", DataTypes.StringType, true, Metadata.empty()), StructField.apply("wake_up_time", DataTypes.StringType, true, Metadata.empty()))).write().format("bigquery").mode(saveMode).option("dataset", testDataset.toString()).option("table", this.testTable).option("writeMethod", this.writeMethod.toString()).save();
        List collectAsList = this.spark.read().format("bigquery").option("dataset", testDataset.toString()).option("table", this.testTable).load().collectAsList();
        Truth.assertThat(collectAsList).hasSize(1);
        Row row = (Row) collectAsList.get(0);
        Truth.assertThat(row.getString(row.fieldIndex("name"))).isEqualTo("abc");
        Truth.assertThat(Long.valueOf(row.getLong(row.fieldIndex("wake_up_time")))).isEqualTo(36000000000L);
    }

    @Test
    public void testWriteStringToTimeField_OverwriteSaveMode() {
        testWriteStringToTimeField_internal(SaveMode.Overwrite);
    }

    @Test
    public void testWriteStringToTimeField_AppendSaveMode() {
        testWriteStringToTimeField_internal(SaveMode.Append);
    }

    private void testWriteStringToDateTimeField_internal(SaveMode saveMode) {
        Assume.assumeThat(this.writeMethod, CoreMatchers.equalTo(SparkBigQueryConfig.WriteMethod.DIRECT));
        IntegrationTestUtils.runQuery(String.format("CREATE TABLE `%s.%s` (name STRING, datetime1 DATETIME)", testDataset, this.testTable));
        this.spark.createDataFrame(Arrays.asList(RowFactory.create(new Object[]{"abc", "0001-01-01T01:22:24.999888"})), structType(StructField.apply("name", DataTypes.StringType, true, Metadata.empty()), StructField.apply("datetime1", DataTypes.StringType, true, Metadata.empty()))).write().format("bigquery").mode(saveMode).option("dataset", testDataset.toString()).option("table", this.testTable).option("writeMethod", this.writeMethod.toString()).save();
        List collectAsList = this.spark.read().format("bigquery").option("dataset", testDataset.toString()).option("table", this.testTable).load().collectAsList();
        Truth.assertThat(collectAsList).hasSize(1);
        Row row = (Row) collectAsList.get(0);
        Truth.assertThat(row.getString(row.fieldIndex("name"))).isEqualTo("abc");
        if (this.timeStampNTZType.isPresent()) {
            Truth.assertThat(row.get(row.fieldIndex("datetime1"))).isEqualTo(LocalDateTime.of(1, 1, 1, 1, 22, 24).plus(999888L, (TemporalUnit) ChronoUnit.MICROS));
        } else {
            Truth.assertThat(row.get(row.fieldIndex("datetime1"))).isEqualTo("0001-01-01T01:22:24.999888");
        }
    }

    @Test
    public void testWriteStringToDateTimeField_OverwriteSaveMode() {
        testWriteStringToDateTimeField_internal(SaveMode.Overwrite);
    }

    @Test
    public void testWriteStringToDateTimeField_AppendSaveMode() {
        testWriteStringToDateTimeField_internal(SaveMode.Append);
    }

    @Test
    public void testWriteToTimestampField() {
        Assume.assumeThat(this.writeMethod, CoreMatchers.equalTo(SparkBigQueryConfig.WriteMethod.DIRECT));
        IntegrationTestUtils.runQuery(String.format("CREATE TABLE `%s.%s` (name STRING, timestamp1 TIMESTAMP)", testDataset, this.testTable));
        TimeZone.setDefault(TimeZone.getTimeZone("IST"));
        Timestamp valueOf = Timestamp.valueOf("1501-01-01 01:22:24.999888");
        this.spark.createDataFrame(Arrays.asList(RowFactory.create(new Object[]{"abc", valueOf})), structType(StructField.apply("name", DataTypes.StringType, true, Metadata.empty()), StructField.apply("timestamp1", DataTypes.TimestampType, true, Metadata.empty()))).write().format("bigquery").mode("append").option("dataset", testDataset.toString()).option("table", this.testTable).option("writeMethod", this.writeMethod.toString()).save();
        TimeZone.setDefault(TimeZone.getTimeZone("PST"));
        List collectAsList = this.spark.read().format("bigquery").option("dataset", testDataset.toString()).option("table", this.testTable).load().collectAsList();
        Truth.assertThat(collectAsList).hasSize(1);
        Row row = (Row) collectAsList.get(0);
        Truth.assertThat(row.getString(row.fieldIndex("name"))).isEqualTo("abc");
        Truth.assertThat(row.get(row.fieldIndex("timestamp1"))).isEqualTo(valueOf);
    }

    protected Dataset<Row> writeAndLoadDatasetOverwriteDynamicPartition(Dataset<Row> dataset) {
        dataset.write().format("bigquery").mode(SaveMode.Overwrite).option("dataset", testDataset.toString()).option("table", this.testTable).option("writeMethod", this.writeMethod.toString()).option("spark.sql.sources.partitionOverwriteMode", PartitionOverwriteMode.DYNAMIC.toString()).option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET).save();
        return this.spark.read().format("bigquery").option("dataset", testDataset.toString()).option("table", this.testTable).load();
    }

    @Test
    public void testOverwriteDynamicPartition_partitionTimestampByHour() {
        String str = "order_id";
        TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
        IntegrationTestUtils.runQuery(String.format("CREATE TABLE `%s.%s` (%s INTEGER, %s TIMESTAMP) PARTITION BY timestamp_trunc(order_date_time, HOUR) AS SELECT * FROM UNNEST([(1, TIMESTAMP '2023-09-28 1:00:00 UTC'), (2, TIMESTAMP '2023-09-28 10:00:00 UTC'), (3, TIMESTAMP '2023-09-28 10:30:00 UTC')])", testDataset, this.testTable, "order_id", "order_date_time"));
        Dataset<Row> writeAndLoadDatasetOverwriteDynamicPartition = writeAndLoadDatasetOverwriteDynamicPartition(this.spark.createDataFrame(Arrays.asList(RowFactory.create(new Object[]{10, Timestamp.valueOf("2023-09-28 10:15:00")}), RowFactory.create(new Object[]{20, Timestamp.valueOf("2023-09-30 12:00:00")})), structType(StructField.apply("order_id", DataTypes.IntegerType, true, Metadata.empty()), StructField.apply("order_date_time", DataTypes.TimestampType, true, Metadata.empty()))));
        Truth.assertThat(Long.valueOf(writeAndLoadDatasetOverwriteDynamicPartition.count())).isEqualTo(3);
        List collectAsList = writeAndLoadDatasetOverwriteDynamicPartition.collectAsList();
        collectAsList.sort(Comparator.comparing(row -> {
            return Long.valueOf(row.getLong(row.fieldIndex(str)));
        }));
        Row row2 = (Row) collectAsList.get(0);
        Row row3 = (Row) collectAsList.get(1);
        Row row4 = (Row) collectAsList.get(2);
        Truth.assertThat(Long.valueOf(row2.getLong(row2.fieldIndex("order_id")))).isEqualTo(1);
        Truth.assertThat(row2.getTimestamp(row2.fieldIndex("order_date_time"))).isEqualTo(Timestamp.valueOf("2023-09-28 1:00:00"));
        Truth.assertThat(Long.valueOf(row3.getLong(row3.fieldIndex("order_id")))).isEqualTo(10);
        Truth.assertThat(row3.getTimestamp(row3.fieldIndex("order_date_time"))).isEqualTo(Timestamp.valueOf("2023-09-28 10:15:00"));
        Truth.assertThat(Long.valueOf(row4.getLong(row4.fieldIndex("order_id")))).isEqualTo(20);
        Truth.assertThat(row4.getTimestamp(row4.fieldIndex("order_date_time"))).isEqualTo(Timestamp.valueOf("2023-09-30 12:00:00"));
    }

    @Test
    public void testOverwriteDynamicPartition_partitionTimestampByDay() {
        String str = "order_id";
        TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
        IntegrationTestUtils.runQuery(String.format("CREATE TABLE `%s.%s` (%s INTEGER, %s TIMESTAMP) PARTITION BY DATE(order_date_time) AS SELECT * FROM UNNEST([(1, TIMESTAMP '2023-09-28 1:00:00 UTC'), (2, TIMESTAMP '2023-09-29 10:00:00 UTC'), (3, TIMESTAMP '2023-09-29 17:00:00 UTC')])", testDataset, this.testTable, "order_id", "order_date_time"));
        Dataset<Row> writeAndLoadDatasetOverwriteDynamicPartition = writeAndLoadDatasetOverwriteDynamicPartition(this.spark.createDataFrame(Arrays.asList(RowFactory.create(new Object[]{10, Timestamp.valueOf("2023-09-29 2:00:00")}), RowFactory.create(new Object[]{20, Timestamp.valueOf("2023-09-30 12:00:00")})), structType(StructField.apply("order_id", DataTypes.IntegerType, true, Metadata.empty()), StructField.apply("order_date_time", DataTypes.TimestampType, true, Metadata.empty()))));
        Truth.assertThat(Long.valueOf(writeAndLoadDatasetOverwriteDynamicPartition.count())).isEqualTo(3);
        List collectAsList = writeAndLoadDatasetOverwriteDynamicPartition.collectAsList();
        collectAsList.sort(Comparator.comparing(row -> {
            return Long.valueOf(row.getLong(row.fieldIndex(str)));
        }));
        Row row2 = (Row) collectAsList.get(0);
        Row row3 = (Row) collectAsList.get(1);
        Row row4 = (Row) collectAsList.get(2);
        Truth.assertThat(Long.valueOf(row2.getLong(row2.fieldIndex("order_id")))).isEqualTo(1);
        Truth.assertThat(row2.getTimestamp(row2.fieldIndex("order_date_time"))).isEqualTo(Timestamp.valueOf("2023-09-28 1:00:00"));
        Truth.assertThat(Long.valueOf(row3.getLong(row3.fieldIndex("order_id")))).isEqualTo(10);
        Truth.assertThat(row3.getTimestamp(row3.fieldIndex("order_date_time"))).isEqualTo(Timestamp.valueOf("2023-09-29 2:00:00"));
        Truth.assertThat(Long.valueOf(row4.getLong(row4.fieldIndex("order_id")))).isEqualTo(20);
        Truth.assertThat(row4.getTimestamp(row4.fieldIndex("order_date_time"))).isEqualTo(Timestamp.valueOf("2023-09-30 12:00:00"));
    }

    @Test
    public void testOverwriteDynamicPartition_partitionTimestampByMonth() {
        String str = "order_id";
        TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
        IntegrationTestUtils.runQuery(String.format("CREATE TABLE `%s.%s` (%s INTEGER, %s TIMESTAMP) PARTITION BY timestamp_trunc(order_date_time, MONTH) AS SELECT * FROM UNNEST([(1, TIMESTAMP '2023-09-28 1:00:00 UTC'), (2, TIMESTAMP '2023-10-20 10:00:00 UTC'), (3, TIMESTAMP '2023-10-25 12:00:00 UTC')])", testDataset, this.testTable, "order_id", "order_date_time"));
        Dataset<Row> writeAndLoadDatasetOverwriteDynamicPartition = writeAndLoadDatasetOverwriteDynamicPartition(this.spark.createDataFrame(Arrays.asList(RowFactory.create(new Object[]{10, Timestamp.valueOf("2023-10-29 2:00:00")}), RowFactory.create(new Object[]{20, Timestamp.valueOf("2023-11-30 12:00:00")})), structType(StructField.apply("order_id", DataTypes.IntegerType, true, Metadata.empty()), StructField.apply("order_date_time", DataTypes.TimestampType, true, Metadata.empty()))));
        Truth.assertThat(Long.valueOf(writeAndLoadDatasetOverwriteDynamicPartition.count())).isEqualTo(3);
        List collectAsList = writeAndLoadDatasetOverwriteDynamicPartition.collectAsList();
        collectAsList.sort(Comparator.comparing(row -> {
            return Long.valueOf(row.getLong(row.fieldIndex(str)));
        }));
        Row row2 = (Row) collectAsList.get(0);
        Row row3 = (Row) collectAsList.get(1);
        Row row4 = (Row) collectAsList.get(2);
        Truth.assertThat(Long.valueOf(row2.getLong(row2.fieldIndex("order_id")))).isEqualTo(1);
        Truth.assertThat(row2.getTimestamp(row2.fieldIndex("order_date_time"))).isEqualTo(Timestamp.valueOf("2023-09-28 1:00:00"));
        Truth.assertThat(Long.valueOf(row3.getLong(row3.fieldIndex("order_id")))).isEqualTo(10);
        Truth.assertThat(row3.getTimestamp(row3.fieldIndex("order_date_time"))).isEqualTo(Timestamp.valueOf("2023-10-29 2:00:00"));
        Truth.assertThat(Long.valueOf(row4.getLong(row4.fieldIndex("order_id")))).isEqualTo(20);
        Truth.assertThat(row4.getTimestamp(row4.fieldIndex("order_date_time"))).isEqualTo(Timestamp.valueOf("2023-11-30 12:00:00"));
    }

    @Test
    public void testOverwriteDynamicPartition_partitionTimestampByYear() {
        String str = "order_id";
        TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
        IntegrationTestUtils.runQuery(String.format("CREATE TABLE `%s.%s` (%s INTEGER, %s TIMESTAMP) PARTITION BY timestamp_trunc(order_date_time, YEAR) AS SELECT * FROM UNNEST([(1, TIMESTAMP '2022-09-28 1:00:00 UTC'), (2, TIMESTAMP '2023-10-20 10:00:00 UTC'), (2, TIMESTAMP '2023-10-25 12:00:00 UTC')])", testDataset, this.testTable, "order_id", "order_date_time"));
        Dataset<Row> writeAndLoadDatasetOverwriteDynamicPartition = writeAndLoadDatasetOverwriteDynamicPartition(this.spark.createDataFrame(Arrays.asList(RowFactory.create(new Object[]{10, Timestamp.valueOf("2023-10-29 2:00:00")}), RowFactory.create(new Object[]{20, Timestamp.valueOf("2024-11-30 12:00:00")})), structType(StructField.apply("order_id", DataTypes.IntegerType, true, Metadata.empty()), StructField.apply("order_date_time", DataTypes.TimestampType, true, Metadata.empty()))));
        Truth.assertThat(Long.valueOf(writeAndLoadDatasetOverwriteDynamicPartition.count())).isEqualTo(3);
        List collectAsList = writeAndLoadDatasetOverwriteDynamicPartition.collectAsList();
        collectAsList.sort(Comparator.comparing(row -> {
            return Long.valueOf(row.getLong(row.fieldIndex(str)));
        }));
        Row row2 = (Row) collectAsList.get(0);
        Row row3 = (Row) collectAsList.get(1);
        Row row4 = (Row) collectAsList.get(2);
        Truth.assertThat(Long.valueOf(row2.getLong(row2.fieldIndex("order_id")))).isEqualTo(1);
        Truth.assertThat(row2.getTimestamp(row2.fieldIndex("order_date_time"))).isEqualTo(Timestamp.valueOf("2022-09-28 1:00:00"));
        Truth.assertThat(Long.valueOf(row3.getLong(row3.fieldIndex("order_id")))).isEqualTo(10);
        Truth.assertThat(row3.getTimestamp(row3.fieldIndex("order_date_time"))).isEqualTo(Timestamp.valueOf("2023-10-29 2:00:00"));
        Truth.assertThat(Long.valueOf(row4.getLong(row4.fieldIndex("order_id")))).isEqualTo(20);
        Truth.assertThat(row4.getTimestamp(row4.fieldIndex("order_date_time"))).isEqualTo(Timestamp.valueOf("2024-11-30 12:00:00"));
    }

    @Test
    public void testOverwriteDynamicPartition_partitionDateByDay() {
        String str = "order_id";
        TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
        IntegrationTestUtils.runQuery(String.format("CREATE TABLE `%s.%s` (%s INTEGER, %s DATE) PARTITION BY order_date AS SELECT * FROM UNNEST([(1, DATE('2023-09-28')), (2, DATE('2023-09-29'))])", testDataset, this.testTable, "order_id", "order_date"));
        Dataset<Row> writeAndLoadDatasetOverwriteDynamicPartition = writeAndLoadDatasetOverwriteDynamicPartition(this.spark.createDataFrame(Arrays.asList(RowFactory.create(new Object[]{10, Date.valueOf("2023-09-29")}), RowFactory.create(new Object[]{20, Date.valueOf("2023-09-30")})), structType(StructField.apply("order_id", DataTypes.IntegerType, true, Metadata.empty()), StructField.apply("order_date", DataTypes.DateType, true, Metadata.empty()))));
        Truth.assertThat(Long.valueOf(writeAndLoadDatasetOverwriteDynamicPartition.count())).isEqualTo(3);
        List collectAsList = writeAndLoadDatasetOverwriteDynamicPartition.collectAsList();
        collectAsList.sort(Comparator.comparing(row -> {
            return Long.valueOf(row.getLong(row.fieldIndex(str)));
        }));
        Row row2 = (Row) collectAsList.get(0);
        Row row3 = (Row) collectAsList.get(1);
        Row row4 = (Row) collectAsList.get(2);
        Truth.assertThat(Long.valueOf(row2.getLong(row2.fieldIndex("order_id")))).isEqualTo(1);
        Truth.assertThat(row2.getDate(row2.fieldIndex("order_date"))).isEqualTo(Date.valueOf("2023-09-28"));
        Truth.assertThat(Long.valueOf(row3.getLong(row3.fieldIndex("order_id")))).isEqualTo(10);
        Truth.assertThat(row3.getDate(row3.fieldIndex("order_date"))).isEqualTo(Date.valueOf("2023-09-29"));
        Truth.assertThat(Long.valueOf(row4.getLong(row4.fieldIndex("order_id")))).isEqualTo(20);
        Truth.assertThat(row4.getDate(row4.fieldIndex("order_date"))).isEqualTo(Date.valueOf("2023-09-30"));
    }

    @Test
    public void testOverwriteDynamicPartition_partitionDateByMonth() {
        String str = "order_id";
        TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
        IntegrationTestUtils.runQuery(String.format("CREATE TABLE `%s.%s` (%s INTEGER, %s DATE) PARTITION BY DATE_TRUNC(order_date, MONTH) AS SELECT * FROM UNNEST([(1, DATE('2023-09-28')), (2, DATE('2023-10-29')), (2, DATE('2023-10-28'))])", testDataset, this.testTable, "order_id", "order_date"));
        Dataset<Row> writeAndLoadDatasetOverwriteDynamicPartition = writeAndLoadDatasetOverwriteDynamicPartition(this.spark.createDataFrame(Arrays.asList(RowFactory.create(new Object[]{10, Date.valueOf("2023-10-20")}), RowFactory.create(new Object[]{20, Date.valueOf("2023-11-30")})), structType(StructField.apply("order_id", DataTypes.IntegerType, true, Metadata.empty()), StructField.apply("order_date", DataTypes.DateType, true, Metadata.empty()))));
        Truth.assertThat(Long.valueOf(writeAndLoadDatasetOverwriteDynamicPartition.count())).isEqualTo(3);
        List collectAsList = writeAndLoadDatasetOverwriteDynamicPartition.collectAsList();
        collectAsList.sort(Comparator.comparing(row -> {
            return Long.valueOf(row.getLong(row.fieldIndex(str)));
        }));
        Row row2 = (Row) collectAsList.get(0);
        Row row3 = (Row) collectAsList.get(1);
        Row row4 = (Row) collectAsList.get(2);
        Truth.assertThat(Long.valueOf(row2.getLong(row2.fieldIndex("order_id")))).isEqualTo(1);
        Truth.assertThat(row2.getDate(row2.fieldIndex("order_date"))).isEqualTo(Date.valueOf("2023-09-28"));
        Truth.assertThat(Long.valueOf(row3.getLong(row3.fieldIndex("order_id")))).isEqualTo(10);
        Truth.assertThat(row3.getDate(row3.fieldIndex("order_date"))).isEqualTo(Date.valueOf("2023-10-20"));
        Truth.assertThat(Long.valueOf(row4.getLong(row4.fieldIndex("order_id")))).isEqualTo(20);
        Truth.assertThat(row4.getDate(row4.fieldIndex("order_date"))).isEqualTo(Date.valueOf("2023-11-30"));
    }

    @Test
    public void testOverwriteDynamicPartition_partitionDateByYear() {
        String str = "order_id";
        TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
        IntegrationTestUtils.runQuery(String.format("CREATE TABLE `%s.%s` (%s INTEGER, %s DATE) PARTITION BY DATE_TRUNC(order_date, YEAR) AS SELECT * FROM UNNEST([(1, DATE('2022-09-28')), (2, DATE('2023-10-29')), (2, DATE('2023-11-28'))])", testDataset, this.testTable, "order_id", "order_date"));
        Dataset<Row> writeAndLoadDatasetOverwriteDynamicPartition = writeAndLoadDatasetOverwriteDynamicPartition(this.spark.createDataFrame(Arrays.asList(RowFactory.create(new Object[]{10, Date.valueOf("2023-10-20")}), RowFactory.create(new Object[]{20, Date.valueOf("2024-11-30")})), structType(StructField.apply("order_id", DataTypes.IntegerType, true, Metadata.empty()), StructField.apply("order_date", DataTypes.DateType, true, Metadata.empty()))));
        Truth.assertThat(Long.valueOf(writeAndLoadDatasetOverwriteDynamicPartition.count())).isEqualTo(3);
        List collectAsList = writeAndLoadDatasetOverwriteDynamicPartition.collectAsList();
        collectAsList.sort(Comparator.comparing(row -> {
            return Long.valueOf(row.getLong(row.fieldIndex(str)));
        }));
        Row row2 = (Row) collectAsList.get(0);
        Row row3 = (Row) collectAsList.get(1);
        Row row4 = (Row) collectAsList.get(2);
        Truth.assertThat(Long.valueOf(row2.getLong(row2.fieldIndex("order_id")))).isEqualTo(1);
        Truth.assertThat(row2.getDate(row2.fieldIndex("order_date"))).isEqualTo(Date.valueOf("2022-09-28"));
        Truth.assertThat(Long.valueOf(row3.getLong(row3.fieldIndex("order_id")))).isEqualTo(10);
        Truth.assertThat(row3.getDate(row3.fieldIndex("order_date"))).isEqualTo(Date.valueOf("2023-10-20"));
        Truth.assertThat(Long.valueOf(row4.getLong(row4.fieldIndex("order_id")))).isEqualTo(20);
        Truth.assertThat(row4.getDate(row4.fieldIndex("order_date"))).isEqualTo(Date.valueOf("2024-11-30"));
    }

    @Test
    public void testOverwriteDynamicPartition_partitionDateTimeByHour() {
        Assume.assumeThat(Boolean.valueOf(this.timeStampNTZType.isPresent()), CoreMatchers.is(true));
        String str = "order_id";
        TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
        IntegrationTestUtils.runQuery(String.format("CREATE TABLE `%s.%s` (%s INTEGER, %s DATETIME) PARTITION BY timestamp_trunc(order_date_time, HOUR) AS SELECT * FROM UNNEST([(1, DATETIME '2023-09-28 1:00:00'), (2, DATETIME '2023-09-28 10:00:00'), (3, DATETIME '2023-09-28 10:30:00')])", testDataset, this.testTable, "order_id", "order_date_time"));
        Dataset<Row> writeAndLoadDatasetOverwriteDynamicPartition = writeAndLoadDatasetOverwriteDynamicPartition(this.spark.createDataFrame(Arrays.asList(RowFactory.create(new Object[]{10, LocalDateTime.of(2023, 9, 28, 10, 15, 0)}), RowFactory.create(new Object[]{20, LocalDateTime.of(2023, 9, 30, 12, 0, 0)})), structType(StructField.apply("order_id", DataTypes.IntegerType, true, Metadata.empty()), StructField.apply("order_date_time", this.timeStampNTZType.get(), true, Metadata.empty()))));
        Truth.assertThat(Long.valueOf(writeAndLoadDatasetOverwriteDynamicPartition.count())).isEqualTo(3);
        List collectAsList = writeAndLoadDatasetOverwriteDynamicPartition.collectAsList();
        collectAsList.sort(Comparator.comparing(row -> {
            return Long.valueOf(row.getLong(row.fieldIndex(str)));
        }));
        Row row2 = (Row) collectAsList.get(0);
        Row row3 = (Row) collectAsList.get(1);
        Row row4 = (Row) collectAsList.get(2);
        Truth.assertThat(Long.valueOf(row2.getLong(row2.fieldIndex("order_id")))).isEqualTo(1);
        Truth.assertThat(row2.get(row2.fieldIndex("order_date_time")).toString()).isEqualTo("2023-09-28T01:00");
        Truth.assertThat(Long.valueOf(row3.getLong(row3.fieldIndex("order_id")))).isEqualTo(10);
        Truth.assertThat(row3.get(row3.fieldIndex("order_date_time")).toString()).isEqualTo("2023-09-28T10:15");
        Truth.assertThat(Long.valueOf(row4.getLong(row4.fieldIndex("order_id")))).isEqualTo(20);
        Truth.assertThat(row4.get(row4.fieldIndex("order_date_time")).toString()).isEqualTo("2023-09-30T12:00");
    }

    @Test
    public void testOverwriteDynamicPartition_partitionDateTimeByDay() {
        Assume.assumeThat(Boolean.valueOf(this.timeStampNTZType.isPresent()), CoreMatchers.is(true));
        String str = "order_id";
        TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
        IntegrationTestUtils.runQuery(String.format("CREATE TABLE `%s.%s` (%s INTEGER, %s DATETIME) PARTITION BY timestamp_trunc(order_date_time, DAY) AS SELECT * FROM UNNEST([(1, DATETIME '2023-09-28 1:00:00'), (2, DATETIME '2023-09-29 10:00:00'), (3, DATETIME '2023-09-29 17:30:00')])", testDataset, this.testTable, "order_id", "order_date_time"));
        Dataset<Row> writeAndLoadDatasetOverwriteDynamicPartition = writeAndLoadDatasetOverwriteDynamicPartition(this.spark.createDataFrame(Arrays.asList(RowFactory.create(new Object[]{10, LocalDateTime.of(2023, 9, 29, 10, 15, 0)}), RowFactory.create(new Object[]{20, LocalDateTime.of(2023, 9, 30, 12, 0, 0)})), structType(StructField.apply("order_id", DataTypes.IntegerType, true, Metadata.empty()), StructField.apply("order_date_time", this.timeStampNTZType.get(), true, Metadata.empty()))));
        Truth.assertThat(Long.valueOf(writeAndLoadDatasetOverwriteDynamicPartition.count())).isEqualTo(3);
        List collectAsList = writeAndLoadDatasetOverwriteDynamicPartition.collectAsList();
        collectAsList.sort(Comparator.comparing(row -> {
            return Long.valueOf(row.getLong(row.fieldIndex(str)));
        }));
        Row row2 = (Row) collectAsList.get(0);
        Row row3 = (Row) collectAsList.get(1);
        Row row4 = (Row) collectAsList.get(2);
        Truth.assertThat(Long.valueOf(row2.getLong(row2.fieldIndex("order_id")))).isEqualTo(1);
        Truth.assertThat(row2.get(row2.fieldIndex("order_date_time")).toString()).isEqualTo("2023-09-28T01:00");
        Truth.assertThat(Long.valueOf(row3.getLong(row3.fieldIndex("order_id")))).isEqualTo(10);
        Truth.assertThat(row3.get(row3.fieldIndex("order_date_time")).toString()).isEqualTo("2023-09-29T10:15");
        Truth.assertThat(Long.valueOf(row4.getLong(row4.fieldIndex("order_id")))).isEqualTo(20);
        Truth.assertThat(row4.get(row4.fieldIndex("order_date_time")).toString()).isEqualTo("2023-09-30T12:00");
    }

    @Test
    public void testOverwriteDynamicPartition_partitionDateTimeByMonth() {
        Assume.assumeThat(Boolean.valueOf(this.timeStampNTZType.isPresent()), CoreMatchers.is(true));
        String str = "order_id";
        TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
        IntegrationTestUtils.runQuery(String.format("CREATE TABLE `%s.%s` (%s INTEGER, %s DATETIME) PARTITION BY timestamp_trunc(order_date_time, MONTH) AS SELECT * FROM UNNEST([(1, DATETIME '2023-09-28 1:00:00'), (2, DATETIME '2023-10-29 10:00:00'), (3, DATETIME '2023-10-29 17:30:00')])", testDataset, this.testTable, "order_id", "order_date_time"));
        Dataset<Row> writeAndLoadDatasetOverwriteDynamicPartition = writeAndLoadDatasetOverwriteDynamicPartition(this.spark.createDataFrame(Arrays.asList(RowFactory.create(new Object[]{10, LocalDateTime.of(2023, 10, 20, 10, 15, 0)}), RowFactory.create(new Object[]{20, LocalDateTime.of(2023, 11, 30, 12, 0, 0)})), structType(StructField.apply("order_id", DataTypes.IntegerType, true, Metadata.empty()), StructField.apply("order_date_time", this.timeStampNTZType.get(), true, Metadata.empty()))));
        Truth.assertThat(Long.valueOf(writeAndLoadDatasetOverwriteDynamicPartition.count())).isEqualTo(3);
        List collectAsList = writeAndLoadDatasetOverwriteDynamicPartition.collectAsList();
        collectAsList.sort(Comparator.comparing(row -> {
            return Long.valueOf(row.getLong(row.fieldIndex(str)));
        }));
        Row row2 = (Row) collectAsList.get(0);
        Row row3 = (Row) collectAsList.get(1);
        Row row4 = (Row) collectAsList.get(2);
        Truth.assertThat(Long.valueOf(row2.getLong(row2.fieldIndex("order_id")))).isEqualTo(1);
        Truth.assertThat(row2.get(row2.fieldIndex("order_date_time")).toString()).isEqualTo("2023-09-28T01:00");
        Truth.assertThat(Long.valueOf(row3.getLong(row3.fieldIndex("order_id")))).isEqualTo(10);
        Truth.assertThat(row3.get(row3.fieldIndex("order_date_time")).toString()).isEqualTo("2023-10-20T10:15");
        Truth.assertThat(Long.valueOf(row4.getLong(row4.fieldIndex("order_id")))).isEqualTo(20);
        Truth.assertThat(row4.get(row4.fieldIndex("order_date_time")).toString()).isEqualTo("2023-11-30T12:00");
    }

    @Test
    public void testOverwriteDynamicPartition_partitionDateTimeByYear() {
        Assume.assumeThat(Boolean.valueOf(this.timeStampNTZType.isPresent()), CoreMatchers.is(true));
        String str = "order_id";
        TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
        IntegrationTestUtils.runQuery(String.format("CREATE TABLE `%s.%s` (%s INTEGER, %s DATETIME) PARTITION BY timestamp_trunc(order_date_time, YEAR) AS SELECT * FROM UNNEST([(1, DATETIME '2022-09-28 1:00:00'), (2, DATETIME '2023-10-29 10:00:00'), (3, DATETIME '2023-11-29 17:30:00')])", testDataset, this.testTable, "order_id", "order_date_time"));
        Dataset<Row> writeAndLoadDatasetOverwriteDynamicPartition = writeAndLoadDatasetOverwriteDynamicPartition(this.spark.createDataFrame(Arrays.asList(RowFactory.create(new Object[]{10, LocalDateTime.of(2023, 10, 20, 10, 15, 0)}), RowFactory.create(new Object[]{20, LocalDateTime.of(2024, 11, 30, 12, 0, 0)})), structType(StructField.apply("order_id", DataTypes.IntegerType, true, Metadata.empty()), StructField.apply("order_date_time", this.timeStampNTZType.get(), true, Metadata.empty()))));
        Truth.assertThat(Long.valueOf(writeAndLoadDatasetOverwriteDynamicPartition.count())).isEqualTo(3);
        List collectAsList = writeAndLoadDatasetOverwriteDynamicPartition.collectAsList();
        collectAsList.sort(Comparator.comparing(row -> {
            return Long.valueOf(row.getLong(row.fieldIndex(str)));
        }));
        Row row2 = (Row) collectAsList.get(0);
        Row row3 = (Row) collectAsList.get(1);
        Row row4 = (Row) collectAsList.get(2);
        Truth.assertThat(Long.valueOf(row2.getLong(row2.fieldIndex("order_id")))).isEqualTo(1);
        Truth.assertThat(row2.get(row2.fieldIndex("order_date_time")).toString()).isEqualTo("2022-09-28T01:00");
        Truth.assertThat(Long.valueOf(row3.getLong(row3.fieldIndex("order_id")))).isEqualTo(10);
        Truth.assertThat(row3.get(row3.fieldIndex("order_date_time")).toString()).isEqualTo("2023-10-20T10:15");
        Truth.assertThat(Long.valueOf(row4.getLong(row4.fieldIndex("order_id")))).isEqualTo(20);
        Truth.assertThat(row4.get(row4.fieldIndex("order_date_time")).toString()).isEqualTo("2024-11-30T12:00");
    }

    @Test
    public void testOverwriteDynamicPartition_noTimePartitioning() {
        String str = "order_id";
        TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
        IntegrationTestUtils.runQuery(String.format("CREATE TABLE `%s.%s` (%s INTEGER, %s TIMESTAMP) AS SELECT * FROM UNNEST([(1, TIMESTAMP '2023-09-28 1:00:00 UTC'), (2, TIMESTAMP '2023-09-29 10:00:00 UTC'), (3, TIMESTAMP '2023-09-29 17:00:00 UTC')])", testDataset, this.testTable, "order_id", "order_date_time"));
        Dataset<Row> writeAndLoadDatasetOverwriteDynamicPartition = writeAndLoadDatasetOverwriteDynamicPartition(this.spark.createDataFrame(Arrays.asList(RowFactory.create(new Object[]{10, Timestamp.valueOf("2023-09-29 2:00:00")}), RowFactory.create(new Object[]{20, Timestamp.valueOf("2023-09-30 12:00:00")})), structType(StructField.apply("order_id", DataTypes.IntegerType, true, Metadata.empty()), StructField.apply("order_date_time", DataTypes.TimestampType, true, Metadata.empty()))));
        Truth.assertThat(Long.valueOf(writeAndLoadDatasetOverwriteDynamicPartition.count())).isEqualTo(2);
        List collectAsList = writeAndLoadDatasetOverwriteDynamicPartition.collectAsList();
        collectAsList.sort(Comparator.comparing(row -> {
            return Long.valueOf(row.getLong(row.fieldIndex(str)));
        }));
        Row row2 = (Row) collectAsList.get(0);
        Row row3 = (Row) collectAsList.get(1);
        Truth.assertThat(Long.valueOf(row2.getLong(row2.fieldIndex("order_id")))).isEqualTo(10);
        Truth.assertThat(row2.getTimestamp(row2.fieldIndex("order_date_time"))).isEqualTo(Timestamp.valueOf("2023-09-29 2:00:00"));
        Truth.assertThat(Long.valueOf(row3.getLong(row3.fieldIndex("order_id")))).isEqualTo(20);
        Truth.assertThat(row3.getTimestamp(row3.fieldIndex("order_date_time"))).isEqualTo(Timestamp.valueOf("2023-09-30 12:00:00"));
    }

    @Test
    public void testOverwriteDynamicPartition_rangePartitioned() {
        String str = "order_id";
        TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
        IntegrationTestUtils.runQuery(String.format("CREATE TABLE `%s.%s` (%s INTEGER, %s INTEGER) PARTITION BY RANGE_BUCKET(order_id, GENERATE_ARRAY(1, 100, 10)) AS SELECT * FROM UNNEST([(1, 1000), (8, 1005), ( 21, 1010), (83, 1020)])", testDataset, this.testTable, "order_id", "order_count"));
        Dataset<Row> writeAndLoadDatasetOverwriteDynamicPartition = writeAndLoadDatasetOverwriteDynamicPartition(this.spark.createDataFrame(Arrays.asList(RowFactory.create(new Object[]{4, 2000}), RowFactory.create(new Object[]{20, 2050}), RowFactory.create(new Object[]{85, 3000}), RowFactory.create(new Object[]{90, 3050})), structType(StructField.apply("order_id", DataTypes.IntegerType, true, Metadata.empty()), StructField.apply("order_count", DataTypes.IntegerType, true, Metadata.empty()))));
        Truth.assertThat(Long.valueOf(writeAndLoadDatasetOverwriteDynamicPartition.count())).isEqualTo(5);
        List collectAsList = writeAndLoadDatasetOverwriteDynamicPartition.collectAsList();
        collectAsList.sort(Comparator.comparing(row -> {
            return Long.valueOf(row.getLong(row.fieldIndex(str)));
        }));
        Row row2 = (Row) collectAsList.get(0);
        Row row3 = (Row) collectAsList.get(1);
        Row row4 = (Row) collectAsList.get(2);
        Row row5 = (Row) collectAsList.get(3);
        Row row6 = (Row) collectAsList.get(4);
        Truth.assertThat(Long.valueOf(row2.getLong(row2.fieldIndex("order_id")))).isEqualTo(4);
        Truth.assertThat(Long.valueOf(row2.getLong(row2.fieldIndex("order_count")))).isEqualTo(2000);
        Truth.assertThat(Long.valueOf(row3.getLong(row3.fieldIndex("order_id")))).isEqualTo(20);
        Truth.assertThat(Long.valueOf(row3.getLong(row3.fieldIndex("order_count")))).isEqualTo(2050);
        Truth.assertThat(Long.valueOf(row4.getLong(row4.fieldIndex("order_id")))).isEqualTo(21);
        Truth.assertThat(Long.valueOf(row4.getLong(row4.fieldIndex("order_count")))).isEqualTo(1010);
        Truth.assertThat(Long.valueOf(row5.getLong(row5.fieldIndex("order_id")))).isEqualTo(85);
        Truth.assertThat(Long.valueOf(row5.getLong(row5.fieldIndex("order_count")))).isEqualTo(3000);
        Truth.assertThat(Long.valueOf(row6.getLong(row6.fieldIndex("order_id")))).isEqualTo(90);
        Truth.assertThat(Long.valueOf(row6.getLong(row6.fieldIndex("order_count")))).isEqualTo(3050);
    }

    @Test
    public void testOverwriteDynamicPartition_rangePartitionedOutsideRangeLessThanStart() {
        String str = "order_id";
        TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
        IntegrationTestUtils.runQuery(String.format("CREATE TABLE `%s.%s` (%s INTEGER, %s INTEGER) PARTITION BY RANGE_BUCKET(order_id, GENERATE_ARRAY(1, 100, 10)) AS SELECT * FROM UNNEST([(1, 1000), (2, 1005), ( 150, 1010)])", testDataset, this.testTable, "order_id", "order_count"));
        Dataset<Row> writeAndLoadDatasetOverwriteDynamicPartition = writeAndLoadDatasetOverwriteDynamicPartition(this.spark.createDataFrame(Arrays.asList(RowFactory.create(new Object[]{4, 2000}), RowFactory.create(new Object[]{-10, 2050})), structType(StructField.apply("order_id", DataTypes.IntegerType, true, Metadata.empty()), StructField.apply("order_count", DataTypes.IntegerType, true, Metadata.empty()))));
        Truth.assertThat(Long.valueOf(writeAndLoadDatasetOverwriteDynamicPartition.count())).isEqualTo(2);
        List collectAsList = writeAndLoadDatasetOverwriteDynamicPartition.collectAsList();
        collectAsList.sort(Comparator.comparing(row -> {
            return Long.valueOf(row.getLong(row.fieldIndex(str)));
        }));
        Row row2 = (Row) collectAsList.get(0);
        Row row3 = (Row) collectAsList.get(1);
        Truth.assertThat(Long.valueOf(row2.getLong(row2.fieldIndex("order_id")))).isEqualTo(-10);
        Truth.assertThat(Long.valueOf(row2.getLong(row2.fieldIndex("order_count")))).isEqualTo(2050);
        Truth.assertThat(Long.valueOf(row3.getLong(row3.fieldIndex("order_id")))).isEqualTo(4);
        Truth.assertThat(Long.valueOf(row3.getLong(row3.fieldIndex("order_count")))).isEqualTo(2000);
    }

    @Test
    public void testOverwriteDynamicPartition_rangePartitionedOutsideRangeGreaterThanEnd() {
        String str = "order_id";
        TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
        IntegrationTestUtils.runQuery(String.format("CREATE TABLE `%s.%s` (%s INTEGER, %s INTEGER) PARTITION BY RANGE_BUCKET(order_id, GENERATE_ARRAY(1, 100, 10)) AS SELECT * FROM UNNEST([(1, 1000), (2, 1005), ( -1, 1010)])", testDataset, this.testTable, "order_id", "order_count"));
        Dataset<Row> writeAndLoadDatasetOverwriteDynamicPartition = writeAndLoadDatasetOverwriteDynamicPartition(this.spark.createDataFrame(Arrays.asList(RowFactory.create(new Object[]{4, 2000}), RowFactory.create(new Object[]{105, 2050})), structType(StructField.apply("order_id", DataTypes.IntegerType, true, Metadata.empty()), StructField.apply("order_count", DataTypes.IntegerType, true, Metadata.empty()))));
        Truth.assertThat(Long.valueOf(writeAndLoadDatasetOverwriteDynamicPartition.count())).isEqualTo(2);
        List collectAsList = writeAndLoadDatasetOverwriteDynamicPartition.collectAsList();
        collectAsList.sort(Comparator.comparing(row -> {
            return Long.valueOf(row.getLong(row.fieldIndex(str)));
        }));
        Row row2 = (Row) collectAsList.get(0);
        Row row3 = (Row) collectAsList.get(1);
        Truth.assertThat(Long.valueOf(row2.getLong(row2.fieldIndex("order_id")))).isEqualTo(4);
        Truth.assertThat(Long.valueOf(row2.getLong(row2.fieldIndex("order_count")))).isEqualTo(2000);
        Truth.assertThat(Long.valueOf(row3.getLong(row3.fieldIndex("order_id")))).isEqualTo(105);
        Truth.assertThat(Long.valueOf(row3.getLong(row3.fieldIndex("order_count")))).isEqualTo(2050);
    }

    @Test
    public void testOverwriteDynamicPartition_rangePartitionedBoundaryCondition() {
        String str = "order_id";
        TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
        IntegrationTestUtils.runQuery(String.format("CREATE TABLE `%s.%s` (%s INTEGER, %s INTEGER) PARTITION BY RANGE_BUCKET(order_id, GENERATE_ARRAY(1, 100, 10)) AS SELECT * FROM UNNEST([(1, 1000), (11, 1005), ( 100, 1010)])", testDataset, this.testTable, "order_id", "order_count"));
        Dataset<Row> writeAndLoadDatasetOverwriteDynamicPartition = writeAndLoadDatasetOverwriteDynamicPartition(this.spark.createDataFrame(Arrays.asList(RowFactory.create(new Object[]{-1, 2000}), RowFactory.create(new Object[]{5, 2050})), structType(StructField.apply("order_id", DataTypes.IntegerType, true, Metadata.empty()), StructField.apply("order_count", DataTypes.IntegerType, true, Metadata.empty()))));
        Truth.assertThat(Long.valueOf(writeAndLoadDatasetOverwriteDynamicPartition.count())).isEqualTo(3);
        List collectAsList = writeAndLoadDatasetOverwriteDynamicPartition.collectAsList();
        collectAsList.sort(Comparator.comparing(row -> {
            return Long.valueOf(row.getLong(row.fieldIndex(str)));
        }));
        Row row2 = (Row) collectAsList.get(0);
        Row row3 = (Row) collectAsList.get(1);
        Row row4 = (Row) collectAsList.get(2);
        Truth.assertThat(Long.valueOf(row2.getLong(row2.fieldIndex("order_id")))).isEqualTo(-1);
        Truth.assertThat(Long.valueOf(row2.getLong(row2.fieldIndex("order_count")))).isEqualTo(2000);
        Truth.assertThat(Long.valueOf(row3.getLong(row3.fieldIndex("order_id")))).isEqualTo(5);
        Truth.assertThat(Long.valueOf(row3.getLong(row3.fieldIndex("order_count")))).isEqualTo(2050);
        Truth.assertThat(Long.valueOf(row4.getLong(row4.fieldIndex("order_id")))).isEqualTo(11);
        Truth.assertThat(Long.valueOf(row4.getLong(row4.fieldIndex("order_count")))).isEqualTo(1005);
    }

    @Test
    public void testOverwriteDynamicPartition_rangePartitionedWithNulls() {
        String str = "order_id";
        TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
        IntegrationTestUtils.runQuery(String.format("CREATE TABLE `%s.%s` (%s INTEGER, %s INTEGER) PARTITION BY RANGE_BUCKET(order_id, GENERATE_ARRAY(1, 100, 10)) AS SELECT * FROM UNNEST([(NULL, 1000), (11, 1005)])", testDataset, this.testTable, "order_id", "order_count"));
        Dataset<Row> writeAndLoadDatasetOverwriteDynamicPartition = writeAndLoadDatasetOverwriteDynamicPartition(this.spark.createDataFrame(Arrays.asList(RowFactory.create(new Object[]{null, 2000}), RowFactory.create(new Object[]{5, 2050})), structType(StructField.apply("order_id", DataTypes.IntegerType, true, Metadata.empty()), StructField.apply("order_count", DataTypes.IntegerType, true, Metadata.empty()))));
        Truth.assertThat(Long.valueOf(writeAndLoadDatasetOverwriteDynamicPartition.count())).isEqualTo(3);
        List collectAsList = writeAndLoadDatasetOverwriteDynamicPartition.collectAsList();
        collectAsList.sort(Comparator.comparing(row -> {
            Object obj = row.get(row.fieldIndex(str));
            if (obj == null) {
                return Long.MIN_VALUE;
            }
            return (Long) obj;
        }));
        Row row2 = (Row) collectAsList.get(0);
        Row row3 = (Row) collectAsList.get(1);
        Row row4 = (Row) collectAsList.get(2);
        Truth.assertThat(row2.get(row2.fieldIndex("order_id"))).isNull();
        Truth.assertThat(Long.valueOf(row2.getLong(row2.fieldIndex("order_count")))).isEqualTo(2000);
        Truth.assertThat(Long.valueOf(row3.getLong(row3.fieldIndex("order_id")))).isEqualTo(5);
        Truth.assertThat(Long.valueOf(row3.getLong(row3.fieldIndex("order_count")))).isEqualTo(2050);
        Truth.assertThat(Long.valueOf(row4.getLong(row4.fieldIndex("order_id")))).isEqualTo(11);
        Truth.assertThat(Long.valueOf(row4.getLong(row4.fieldIndex("order_count")))).isEqualTo(1005);
    }

    public void testWriteSchemaSubset() throws Exception {
        this.spark.createDataFrame(Arrays.asList(RowFactory.create(new Object[]{"key1", "val1", Date.valueOf("2023-04-13")}), RowFactory.create(new Object[]{"key2", "val2", Date.valueOf("2023-04-14")})), structType(StructField.apply("key", DataTypes.StringType, true, Metadata.empty()), StructField.apply("value", DataTypes.StringType, true, Metadata.empty()), StructField.apply("ds", DataTypes.DateType, true, Metadata.empty()))).write().format("bigquery").mode(SaveMode.Append).option("dataset", testDataset.toString()).option("table", this.testTable).option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET).option("writeMethod", this.writeMethod.toString()).save();
        Truth.assertThat(Integer.valueOf(testTableNumberOfRows())).isEqualTo(2);
        this.spark.createDataFrame(Arrays.asList(RowFactory.create(new Object[]{"key3", "val3"})), structType(StructField.apply("key", DataTypes.StringType, true, Metadata.empty()), StructField.apply("value", DataTypes.StringType, true, Metadata.empty()))).write().format("bigquery").mode(SaveMode.Append).option("dataset", testDataset.toString()).option("table", this.testTable).option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET).option("writeMethod", this.writeMethod.toString()).save();
        Truth.assertThat(Integer.valueOf(testTableNumberOfRows())).isEqualTo(3);
    }

    @Test
    public void allowFieldAdditionWithNestedColumns() throws Exception {
        Assume.assumeThat(this.writeMethod, CoreMatchers.equalTo(SparkBigQueryConfig.WriteMethod.INDIRECT));
        this.spark.createDataFrame(Arrays.asList(RowFactory.create(new Object[]{"val1", Date.valueOf("2023-04-13")}), RowFactory.create(new Object[]{"val2", Date.valueOf("2023-04-14")})), structType(StructField.apply("value", DataTypes.StringType, true, Metadata.empty()), StructField.apply("ds", DataTypes.DateType, true, Metadata.empty()))).write().format("bigquery").mode(SaveMode.Overwrite).option("dataset", testDataset.toString()).option("table", this.testTable).option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET).option("writeMethod", this.writeMethod.toString()).save();
        Truth.assertThat(Integer.valueOf(testTableNumberOfRows())).isEqualTo(2);
        this.spark.createDataFrame(Arrays.asList(RowFactory.create(new Object[]{"val5", Date.valueOf("2023-04-15"), RowFactory.create(new Object[]{"str1", "str2"})}), RowFactory.create(new Object[]{"val6", Date.valueOf("2023-04-16"), RowFactory.create(new Object[]{"str1", "str2"})})), structType(StructField.apply("value", DataTypes.StringType, true, Metadata.empty()), StructField.apply("ds", DataTypes.DateType, true, Metadata.empty()), StructField.apply("nested_col", structType(StructField.apply("sub_field1", DataTypes.StringType, true, Metadata.empty()), StructField.apply("sub_field2", DataTypes.StringType, true, Metadata.empty())), true, Metadata.empty()))).write().format("bigquery").mode(SaveMode.Append).option("dataset", testDataset.toString()).option("table", this.testTable).option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET).option("writeMethod", this.writeMethod.toString()).option("allowFieldAddition", "true").option("allowFieldRelaxation", "true").save();
        List collectAsList = this.spark.read().format("bigquery").option("dataset", testDataset.toString()).option("table", this.testTable).load().collectAsList();
        Truth.assertThat(collectAsList).hasSize(4);
        Truth.assertThat(Long.valueOf(collectAsList.stream().filter(row -> {
            return row.getAs("nested_col") == null;
        }).count())).isEqualTo(2);
        Truth.assertThat(Long.valueOf(collectAsList.stream().filter(row2 -> {
            return row2.getAs("nested_col") != null && row2.getAs("nested_col").equals(RowFactory.create(new Object[]{"str1", "str2"}));
        }).count())).isEqualTo(2);
    }

    @Test
    public void allowFieldAdditionIntoNestedColumns() throws Exception {
        Assume.assumeThat(this.writeMethod, CoreMatchers.equalTo(SparkBigQueryConfig.WriteMethod.INDIRECT));
        this.spark.createDataFrame(Arrays.asList(RowFactory.create(new Object[]{"val5", Date.valueOf("2023-04-15"), RowFactory.create(new Object[]{"str1", "str2"})}), RowFactory.create(new Object[]{"val6", Date.valueOf("2023-04-16"), RowFactory.create(new Object[]{"str1", "str2"})})), structType(StructField.apply("value", DataTypes.StringType, true, Metadata.empty()), StructField.apply("ds", DataTypes.DateType, true, Metadata.empty()), StructField.apply("nested_col", structType(StructField.apply("sub_field1", DataTypes.StringType, true, Metadata.empty()), StructField.apply("sub_field2", DataTypes.StringType, true, Metadata.empty())), true, Metadata.empty()))).write().format("bigquery").mode(SaveMode.Append).option("dataset", testDataset.toString()).option("table", this.testTable).option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET).option("writeMethod", this.writeMethod.toString()).option("allowFieldAddition", "true").option("allowFieldRelaxation", "true").save();
        Truth.assertThat(Integer.valueOf(testTableNumberOfRows())).isEqualTo(2);
        this.spark.createDataFrame(Arrays.asList(RowFactory.create(new Object[]{"val5", Date.valueOf("2023-04-15"), RowFactory.create(new Object[]{"str1", "str2", "str3"})}), RowFactory.create(new Object[]{"val6", Date.valueOf("2023-04-16"), RowFactory.create(new Object[]{"str1", "str2", "str3"})})), structType(StructField.apply("value", DataTypes.StringType, true, Metadata.empty()), StructField.apply("ds", DataTypes.DateType, true, Metadata.empty()), StructField.apply("nested_col", structType(StructField.apply("sub_field1", DataTypes.StringType, true, Metadata.empty()), StructField.apply("sub_field2", DataTypes.StringType, true, Metadata.empty()), StructField.apply("sub_field3", DataTypes.StringType, true, Metadata.empty())), true, Metadata.empty()))).write().format("bigquery").mode(SaveMode.Append).option("dataset", testDataset.toString()).option("table", this.testTable).option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET).option("writeMethod", this.writeMethod.toString()).option("allowFieldAddition", "true").option("allowFieldRelaxation", "true").save();
        List collectAsList = this.spark.read().format("bigquery").option("dataset", testDataset.toString()).option("table", this.testTable).load().collectAsList();
        Truth.assertThat(collectAsList).hasSize(4);
        Truth.assertThat(Long.valueOf(collectAsList.stream().filter(row -> {
            return row.getAs("nested_col") != null;
        }).count())).isEqualTo(4);
        Truth.assertThat(Long.valueOf(collectAsList.stream().filter(row2 -> {
            return row2.getAs("nested_col").equals(RowFactory.create(new Object[]{"str1", "str2", null}));
        }).count())).isEqualTo(2);
        Truth.assertThat(Long.valueOf(collectAsList.stream().filter(row3 -> {
            return row3.getAs("nested_col").equals(RowFactory.create(new Object[]{"str1", "str2", "str3"}));
        }).count())).isEqualTo(2);
    }

    @Test
    public void testWriteSparkMlTypes() {
        Assume.assumeThat(package$.MODULE$.SPARK_VERSION(), CoreMatchers.startsWith("3."));
        Dataset createDataFrame = this.spark.createDataFrame(Arrays.asList(RowFactory.create(new Object[]{"1", "20230515", 12345, 5678, Double.valueOf(1234.12345d)}), RowFactory.create(new Object[]{"2", "20230515", 14789, 25836, Double.valueOf(1234.12345d)}), RowFactory.create(new Object[]{"3", "20230515", 54321, 98765, Double.valueOf(1234.12345d)})), new StructType(new StructField[]{StructField.apply("Seqno", DataTypes.StringType, true, Metadata.empty()), StructField.apply("date1", DataTypes.StringType, true, Metadata.empty()), StructField.apply("num1", DataTypes.IntegerType, true, Metadata.empty()), StructField.apply("num2", DataTypes.IntegerType, true, Metadata.empty()), StructField.apply("amt1", DataTypes.DoubleType, true, Metadata.empty())}));
        ArrayList arrayList = new ArrayList();
        VectorAssembler vectorAssembler = new VectorAssembler();
        vectorAssembler.setInputCols(new String[]{"num1", "num2"});
        vectorAssembler.setOutputCol("features_vector");
        Dataset transform = vectorAssembler.transform(createDataFrame);
        arrayList.add(vectorAssembler);
        MinMaxScaler minMaxScaler = new MinMaxScaler();
        minMaxScaler.setInputCol("features_vector");
        minMaxScaler.setOutputCol("features");
        MinMaxScalerModel fit = minMaxScaler.fit(transform);
        Dataset transform2 = fit.transform(transform);
        arrayList.add(fit);
        new PipelineModel("pipeline", arrayList);
        transform2.show(false);
        transform2.write().format("bigquery").option("writeMethod", this.writeMethod.toString()).option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET).option("dataset", testDataset.toString()).option("table", this.testTable).save();
        Dataset load = this.spark.read().format("bigquery").option("dataset", testDataset.toString()).option("table", this.testTable).load();
        Truth.assertThat(load.schema().apply("features").dataType()).isEqualTo(SQLDataTypes.VectorType());
        load.show(false);
        List collectAsList = load.collectAsList();
        Truth.assertThat(collectAsList).hasSize(3);
        Row row = (Row) collectAsList.get(0);
        Truth.assertThat(row.get(row.fieldIndex("features"))).isInstanceOf(Vector.class);
    }

    @Test
    public void testTimestampNTZDirectWriteToBigQuery() throws InterruptedException {
        Assume.assumeThat(this.writeMethod, CoreMatchers.equalTo(SparkBigQueryConfig.WriteMethod.DIRECT));
        Assume.assumeThat(Boolean.valueOf(this.timeStampNTZType.isPresent()), CoreMatchers.is(true));
        this.spark.createDataFrame(Arrays.asList(RowFactory.create(new Object[]{LocalDateTime.of(2023, 9, 1, 12, 23, 34, 268543000)})), new StructType(new StructField[]{StructField.apply("foo", this.timeStampNTZType.get(), true, Metadata.empty())})).write().format("bigquery").mode(SaveMode.Overwrite).option("table", testDataset.toString() + "." + this.testTable).option("writeMethod", SparkBigQueryConfig.WriteMethod.DIRECT.toString()).save();
        TableResult query = IntegrationTestUtils.getBigquery().query(QueryJobConfiguration.of(String.format("Select foo from %s", fullTableName())), new BigQuery.JobOption[0]);
        Truth.assertThat(query.getSchema().getFields().get(0).getType()).isEqualTo(LegacySQLTypeName.DATETIME);
        Truth.assertThat(((FieldValueList) query.streamValues().findFirst().get()).get(0).getValue()).isEqualTo("2023-09-01T12:23:34.268543");
    }

    @Test
    public void testTimestampNTZIndirectWriteToBigQueryAvroFormat() throws InterruptedException {
        Assume.assumeThat(this.writeMethod, CoreMatchers.equalTo(SparkBigQueryConfig.WriteMethod.INDIRECT));
        Assume.assumeThat(Boolean.valueOf(this.timeStampNTZType.isPresent()), CoreMatchers.is(true));
        TableResult insertAndGetTimestampNTZToBigQuery = insertAndGetTimestampNTZToBigQuery(LocalDateTime.of(2023, 9, 1, 12, 23, 34, 268543000), "avro");
        Truth.assertThat(insertAndGetTimestampNTZToBigQuery.getSchema().getFields().get(0).getType()).isEqualTo(LegacySQLTypeName.DATETIME);
        Truth.assertThat(((FieldValueList) insertAndGetTimestampNTZToBigQuery.streamValues().findFirst().get()).get(0).getValue()).isEqualTo("2023-09-01T12:23:34.268543");
    }

    @Test
    public void testTimestampNTZIndirectWriteToBigQueryParquetFormat() throws InterruptedException {
        Assume.assumeThat(this.writeMethod, CoreMatchers.equalTo(SparkBigQueryConfig.WriteMethod.INDIRECT));
        Assume.assumeThat(Boolean.valueOf(this.timeStampNTZType.isPresent()), CoreMatchers.is(true));
        TableResult insertAndGetTimestampNTZToBigQuery = insertAndGetTimestampNTZToBigQuery(LocalDateTime.of(2023, 9, 15, 12, 36, 34, 268543000), "parquet");
        Truth.assertThat(insertAndGetTimestampNTZToBigQuery.getSchema().getFields().get(0).getType()).isEqualTo(LegacySQLTypeName.DATETIME);
        Truth.assertThat(((FieldValueList) insertAndGetTimestampNTZToBigQuery.streamValues().findFirst().get()).get(0).getValue()).isEqualTo("2023-09-15T12:36:34.268543");
    }

    @Test
    public void testTableDescriptionRemainsUnchanged() {
        IntegrationTestUtils.runQuery(String.format("CREATE TABLE `%s.%s` (name STRING, age INT64)", testDataset, this.testTable));
        String description = this.bq.getTable(testDataset.toString(), this.testTable, new BigQuery.TableOption[0]).getDescription();
        writeToBigQueryAvroFormat(this.spark.createDataFrame(Arrays.asList(RowFactory.create(new Object[]{"foo", 10}), RowFactory.create(new Object[]{"bar", 20})), new StructType().add("name", DataTypes.StringType).add("age", DataTypes.IntegerType)), SaveMode.Append, "True");
        Truth.assertThat(description).isEqualTo(this.bq.getTable(testDataset.toString(), this.testTable, new BigQuery.TableOption[0]).getDescription());
        Truth.assertThat(Long.valueOf(this.spark.read().format("bigquery").option("dataset", testDataset.toString()).option("table", this.testTable).load().count())).isAtLeast(1);
        Truth.assertThat(description).isEqualTo(this.bq.getTable(testDataset.toString(), this.testTable, new BigQuery.TableOption[0]).getDescription());
    }

    private TableResult insertAndGetTimestampNTZToBigQuery(LocalDateTime localDateTime, String str) throws InterruptedException {
        Preconditions.checkArgument(this.timeStampNTZType.isPresent(), "timestampNTZType not present");
        writeToBigQuery(this.spark.createDataFrame(Arrays.asList(RowFactory.create(new Object[]{localDateTime})), new StructType(new StructField[]{StructField.apply("foo", this.timeStampNTZType.get(), true, Metadata.empty())})), SaveMode.Overwrite, str);
        return IntegrationTestUtils.getBigquery().query(QueryJobConfiguration.of(String.format("Select foo from %s", fullTableName())), new BigQuery.JobOption[0]);
    }

    protected long numberOfRowsWith(String str) {
        try {
            return this.bq.query(QueryJobConfiguration.of(String.format("select name from %s where name='%s'", fullTableName(), str)), new BigQuery.JobOption[0]).getTotalRows();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    protected String fullTableName() {
        return testDataset.toString() + "." + this.testTable;
    }

    protected String fullTableNamePartitioned() {
        return fullTableName() + "_partitioned";
    }

    protected boolean additionalDataValuesExist() {
        return numberOfRowsWith("Xyz") == 1;
    }

    protected boolean initialDataValuesExist() {
        return numberOfRowsWith("Abc") == 1;
    }
}
