package org.apache.beam.sdk.io.gcp.bigquery;

import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import java.io.IOException;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.function.Function;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.testing.BigqueryClient;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.PeriodicImpulse;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaUpdateIT.class */
public class StorageApiSinkSchemaUpdateIT {

    @Parameterized.Parameter(0)
    public boolean useInputSchema;

    @Parameterized.Parameter(1)
    public boolean changeTableSchema;
    private static final Logger LOG = LoggerFactory.getLogger(StorageApiSinkSchemaUpdateIT.class);
    private static final BigqueryClient BQ_CLIENT = new BigqueryClient("StorageApiSinkSchemaChangeIT");
    private static final String PROJECT = TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
    private static final String BIG_QUERY_DATASET_ID = "storage_api_sink_schema_change_" + System.nanoTime();
    private static final String[] FIELDS = {"BOOL", "BOOLEAN", "BYTES", "INT64", "INTEGER", "FLOAT", "FLOAT64", "NUMERIC", "STRING", "DATE", "TIMESTAMP"};
    private static final int TOTAL_N = 70;
    private static final int ORIGINAL_N = 60;
    private static String bigQueryLocation;

    @Rule
    public TestName testName = new TestName();
    private final Random randomGenerator = new Random();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaUpdateIT$GenerateRowFunc.class */
    public static class GenerateRowFunc implements SerializableFunction<Long, TableRow> {
        private final List<String> fieldNames;
        private final List<String> fieldNamesWithExtra;

        public GenerateRowFunc(List<String> list, List<String> list2) {
            this.fieldNames = list;
            this.fieldNamesWithExtra = list2;
        }

        /* JADX WARN: Failed to find 'out' block for switch in B:8:0x0064. Please report as an issue. */
        public TableRow apply(Long l) {
            TableRow tableRow = new TableRow();
            tableRow.set("id", l);
            for (String str : l.longValue() < 60 ? this.fieldNames : this.fieldNamesWithExtra) {
                String str2 = (String) Iterables.get(Splitter.on('_').split(str), 0);
                boolean z = -1;
                switch (str2.hashCode()) {
                    case -1838656495:
                        if (str2.equals("STRING")) {
                            z = 10;
                            break;
                        }
                        break;
                    case -1618932450:
                        if (str2.equals("INTEGER")) {
                            z = 4;
                            break;
                        }
                        break;
                    case -1453246218:
                        if (str2.equals("TIMESTAMP")) {
                            z = 9;
                            break;
                        }
                        break;
                    case -1282431251:
                        if (str2.equals("NUMERIC")) {
                            z = 7;
                            break;
                        }
                        break;
                    case -48459270:
                        if (str2.equals("FLOAT64")) {
                            z = 6;
                            break;
                        }
                        break;
                    case 2044650:
                        if (str2.equals("BOOL")) {
                            z = false;
                            break;
                        }
                        break;
                    case 2090926:
                        if (str2.equals("DATE")) {
                            z = 8;
                            break;
                        }
                        break;
                    case 63686731:
                        if (str2.equals("BYTES")) {
                            z = 2;
                            break;
                        }
                        break;
                    case 66988604:
                        if (str2.equals("FLOAT")) {
                            z = 5;
                            break;
                        }
                        break;
                    case 69823181:
                        if (str2.equals("INT64")) {
                            z = 3;
                            break;
                        }
                        break;
                    case 782694408:
                        if (str2.equals("BOOLEAN")) {
                            z = true;
                            break;
                        }
                        break;
                }
                switch (z) {
                    case false:
                    case true:
                        if (l.longValue() % 2 == 0) {
                            tableRow.set(str, false);
                            break;
                        } else {
                            tableRow.set(str, true);
                            break;
                        }
                    case true:
                        tableRow.set(str, String.format("test_blob_%s", l).getBytes(StandardCharsets.UTF_8));
                        break;
                    case true:
                    case true:
                        tableRow.set(str, Long.valueOf(l.longValue() + 10));
                        break;
                    case true:
                    case true:
                        tableRow.set(str, Double.valueOf(0.5d + l.longValue()));
                        break;
                    case true:
                        tableRow.set(str, Double.valueOf(l.longValue() + 0.12345d));
                        break;
                    case true:
                        tableRow.set(str, "2022-01-01");
                        break;
                    case true:
                        tableRow.set(str, "2022-01-01T10:10:10.012Z");
                        break;
                    case true:
                        tableRow.set(str, "test_string" + l);
                        break;
                    default:
                        tableRow.set(str, "unknown" + l);
                        break;
                }
            }
            return tableRow;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaUpdateIT$UpdateSchemaDoFn.class */
    public static class UpdateSchemaDoFn extends DoFn<KV<Integer, TableRow>, TableRow> {
        private final String projectId;
        private final String datasetId;
        private final String tableId;
        private final String newSchema;
        private static final String ROW_COUNTER = "rowCounter";
        private transient BigqueryClient bqClient = null;

        @DoFn.StateId(ROW_COUNTER)
        private final StateSpec<ValueState<Integer>> counter = StateSpecs.value();

        public UpdateSchemaDoFn(String str, String str2, String str3, TableSchema tableSchema) {
            this.projectId = str;
            this.datasetId = str2;
            this.tableId = str3;
            this.newSchema = BigQueryHelpers.toJsonString(tableSchema);
        }

        @DoFn.Setup
        public void setup() {
            this.bqClient = new BigqueryClient("StorageApiSinkSchemaChangeIT");
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<KV<Integer, TableRow>, TableRow>.ProcessContext processContext, @DoFn.StateId("rowCounter") ValueState<Integer> valueState) throws Exception {
            int intValue = ((Integer) MoreObjects.firstNonNull((Integer) valueState.read(), 0)).intValue();
            if (intValue == 10) {
                this.bqClient.updateTableSchema(this.projectId, this.datasetId, this.tableId, (TableSchema) BigQueryHelpers.fromJsonString(this.newSchema, TableSchema.class));
            }
            valueState.write(Integer.valueOf(intValue + 1));
            processContext.output((TableRow) ((KV) processContext.element()).getValue());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaUpdateIT$VerifyPCollectionSize.class */
    public static class VerifyPCollectionSize implements SerializableFunction<Iterable<BigQueryStorageApiInsertError>, Void> {
        int expectedSize;
        String extraField;

        VerifyPCollectionSize(int i, String str) {
            this.expectedSize = i;
            this.extraField = str;
        }

        public Void apply(Iterable<BigQueryStorageApiInsertError> iterable) {
            ArrayList arrayList = new ArrayList();
            for (BigQueryStorageApiInsertError bigQueryStorageApiInsertError : iterable) {
                arrayList.add(bigQueryStorageApiInsertError);
                Assert.assertTrue(String.format("Didn't find expected [%s] error in failed message: %s", "SchemaTooNarrowException", bigQueryStorageApiInsertError), bigQueryStorageApiInsertError.getErrorMessage().contains("SchemaTooNarrowException"));
                Assert.assertTrue(String.format("Didn't find expected [%s] schema field in failed message: %s", "SchemaTooNarrowException", bigQueryStorageApiInsertError), bigQueryStorageApiInsertError.getErrorMessage().contains(this.extraField));
            }
            StorageApiSinkSchemaUpdateIT.LOG.info("Found {} failed rows in DLQ", Integer.valueOf(arrayList.size()));
            Assert.assertEquals(this.expectedSize, arrayList.size());
            return null;
        }
    }

    @Parameterized.Parameters
    public static Iterable<Object[]> data() {
        return ImmutableList.of(new Object[]{false, false}, new Object[]{false, true}, new Object[]{true, false}, new Object[]{true, true});
    }

    @BeforeClass
    public static void setUpTestEnvironment() throws IOException, InterruptedException {
        bigQueryLocation = TestPipeline.testingPipelineOptions().as(TestBigQueryOptions.class).getBigQueryLocation();
        BQ_CLIENT.createNewDataset(PROJECT, BIG_QUERY_DATASET_ID, (Long) null, bigQueryLocation);
    }

    @AfterClass
    public static void cleanUp() {
        LOG.info("Cleaning up dataset {} and tables.", BIG_QUERY_DATASET_ID);
        BQ_CLIENT.deleteDataset(PROJECT, BIG_QUERY_DATASET_ID);
    }

    private String createTable(TableSchema tableSchema) throws IOException, InterruptedException {
        String str = (String) Iterables.get(Splitter.on('[').split(this.testName.getMethodName()), 0);
        if (this.useInputSchema) {
            str = str + "WithInputSchema";
        }
        if (this.changeTableSchema) {
            str = str + "OnSchemaChange";
        }
        BQ_CLIENT.deleteTable(PROJECT, BIG_QUERY_DATASET_ID, str);
        BQ_CLIENT.createNewTable(PROJECT, BIG_QUERY_DATASET_ID, new Table().setSchema(tableSchema).setTableReference(new TableReference().setTableId(str).setDatasetId(BIG_QUERY_DATASET_ID).setProjectId(PROJECT)));
        return str;
    }

    private static TableSchema makeTableSchemaFromTypes(List<String> list, Set<String> set) {
        ImmutableList.Builder builder = ImmutableList.builder();
        builder.add(new TableFieldSchema().setType("INTEGER").setName("id").setMode("REQUIRED"));
        for (String str : list) {
            String str2 = (String) Iterables.get(Splitter.on('_').split(str), 0);
            String str3 = "REQUIRED";
            if (set != null && set.contains(str)) {
                str3 = "NULLABLE";
            }
            builder.add(new TableFieldSchema().setType(str2).setName(str).setMode(str3));
        }
        return new TableSchema().setFields(builder.build());
    }

    private void runStreamingPipelineWithSchemaChange(BigQueryIO.Write.Method method, boolean z, boolean z2) throws Exception {
        Pipeline create = Pipeline.create(TestPipeline.testingPipelineOptions());
        create.getOptions().as(BigQueryOptions.class).setStorageApiAppendThresholdBytes(0);
        create.getOptions().as(BigQueryOptions.class).setNumStorageWriteApiStreams(3);
        ExperimentalOptions.addExperiment(create.getOptions().as(ExperimentalOptions.class), "enable_streaming_engine");
        if (create.getOptions().getRunner().getName().contains("DataflowRunner")) {
            Assume.assumeTrue("Skipping in favor of more relevant test case", this.changeTableSchema && this.useInputSchema && z);
        }
        ArrayList arrayList = new ArrayList(Arrays.asList(FIELDS));
        ArrayList arrayList2 = new ArrayList(arrayList);
        Collections.shuffle(arrayList2, this.randomGenerator);
        ArrayList arrayList3 = new ArrayList(arrayList);
        String str = ((String) arrayList.get(this.randomGenerator.nextInt(arrayList.size()))) + "_EXTRA";
        arrayList3.add(str);
        TableSchema makeTableSchemaFromTypes = makeTableSchemaFromTypes(arrayList, null);
        TableSchema makeTableSchemaFromTypes2 = makeTableSchemaFromTypes(arrayList2, null);
        TableSchema makeTableSchemaFromTypes3 = makeTableSchemaFromTypes(arrayList3, ImmutableSet.of(str));
        String createTable = createTable(makeTableSchemaFromTypes);
        String str2 = PROJECT + ":" + BIG_QUERY_DATASET_ID + "." + createTable;
        BigQueryIO.Write withWriteDisposition = BigQueryIO.writeTableRows().to(str2).withAutoSchemaUpdate(z).withMethod(method).withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER).withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND);
        if (method == BigQueryIO.Write.Method.STORAGE_WRITE_API) {
            withWriteDisposition = withWriteDisposition.withTriggeringFrequency(Duration.standardSeconds(1L));
        }
        if (this.useInputSchema) {
            withWriteDisposition = withWriteDisposition.withSchema(makeTableSchemaFromTypes2);
        }
        if (z2) {
            withWriteDisposition = withWriteDisposition.ignoreUnknownValues();
        }
        Instant instant = new Instant(0L);
        boolean z3 = this.changeTableSchema && (z || !this.useInputSchema);
        Duration standardSeconds = z3 ? Duration.standardSeconds(1L) : Duration.millis(1L);
        Duration standardSeconds2 = z3 ? Duration.standardSeconds(69L) : Duration.millis(69L);
        Function function = z3 ? (Function) ((Serializable) instant2 -> {
            return Long.valueOf(instant2.getMillis() / 1000);
        }) : (Function) ((Serializable) instant3 -> {
            return Long.valueOf(instant3.getMillis());
        });
        GenerateRowFunc generateRowFunc = new GenerateRowFunc(arrayList, arrayList3);
        PCollection apply = create.apply("Generate Instants", PeriodicImpulse.create().startAt(instant).stopAt(instant.plus(standardSeconds2)).withInterval(standardSeconds).catchUpToNow(false)).apply("Create TableRows", MapElements.into(TypeDescriptor.of(TableRow.class)).via(instant4 -> {
            return generateRowFunc.apply((Long) function.apply(instant4));
        }));
        if (this.changeTableSchema) {
            apply = (PCollection) apply.apply("Add a dummy key", WithKeys.of(1)).apply("Update Schema", ParDo.of(new UpdateSchemaDoFn(PROJECT, BIG_QUERY_DATASET_ID, createTable, makeTableSchemaFromTypes3)));
        }
        WriteResult apply2 = apply.apply("Stream to BigQuery", withWriteDisposition);
        if (z2) {
            PAssert.that("Check DLQ is empty", apply2.getFailedStorageApiInserts()).empty();
        } else {
            PAssert.that(String.format("Check DLQ has %s schema errors", 10), apply2.getFailedStorageApiInserts()).satisfies(new VerifyPCollectionSize(10, str));
        }
        create.run().waitUntilFinish();
        checkRowCompleteness(str2, z2 ? TOTAL_N : ORIGINAL_N, method == BigQueryIO.Write.Method.STORAGE_WRITE_API);
        if (z2) {
            checkRowsWithUpdatedSchema(str2, str, z);
        }
    }

    private static void checkRowCompleteness(String str, int i, boolean z) throws IOException, InterruptedException {
        TableRow tableRow = (TableRow) Iterables.getOnlyElement(BQ_CLIENT.queryUnflattened(String.format("SELECT COUNT(DISTINCT(id)), COUNT(id) FROM [%s]", str), PROJECT, true, false, bigQueryLocation));
        int parseInt = Integer.parseInt((String) tableRow.get("f0_"));
        int parseInt2 = Integer.parseInt((String) tableRow.get("f1_"));
        LOG.info("total distinct count = {}, total count = {}", Integer.valueOf(parseInt), Integer.valueOf(parseInt2));
        Assert.assertEquals(i, parseInt);
        if (z) {
            Assert.assertEquals(parseInt, parseInt2);
        }
    }

    public void checkRowsWithUpdatedSchema(String str, String str2, boolean z) throws IOException, InterruptedException {
        for (TableRow tableRow : BQ_CLIENT.queryUnflattened(String.format("SELECT * FROM [%s]", str), PROJECT, true, false, bigQueryLocation)) {
            if (Integer.parseInt((String) tableRow.get("id")) >= ORIGINAL_N && z && this.changeTableSchema) {
                Assert.assertTrue(String.format("Expected row to have field %s:\n%s", str2, tableRow), tableRow.get(str2) != null);
            } else {
                Assert.assertTrue(String.format("Expected row to NOT have field %s:\n%s", str2, tableRow), tableRow.get(str2) == null);
            }
        }
    }

    @Test
    public void testExactlyOnce() throws Exception {
        runStreamingPipelineWithSchemaChange(BigQueryIO.Write.Method.STORAGE_WRITE_API, false, false);
    }

    @Test
    public void testExactlyOnceWithIgnoreUnknownValues() throws Exception {
        runStreamingPipelineWithSchemaChange(BigQueryIO.Write.Method.STORAGE_WRITE_API, false, true);
    }

    @Test
    public void testExactlyOnceWithAutoSchemaUpdate() throws Exception {
        runStreamingPipelineWithSchemaChange(BigQueryIO.Write.Method.STORAGE_WRITE_API, true, true);
    }

    @Test
    public void testAtLeastOnce() throws Exception {
        runStreamingPipelineWithSchemaChange(BigQueryIO.Write.Method.STORAGE_API_AT_LEAST_ONCE, false, false);
    }

    @Test
    public void testAtLeastOnceWithIgnoreUnknownValues() throws Exception {
        runStreamingPipelineWithSchemaChange(BigQueryIO.Write.Method.STORAGE_API_AT_LEAST_ONCE, false, true);
    }

    @Test
    public void testAtLeastOnceWithAutoSchemaUpdate() throws Exception {
        runStreamingPipelineWithSchemaChange(BigQueryIO.Write.Method.STORAGE_API_AT_LEAST_ONCE, true, true);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -267797812:
                if (implMethodName.equals("lambda$runStreamingPipelineWithSchemaChange$370c0add$1")) {
                    z = true;
                    break;
                }
                break;
            case -267797811:
                if (implMethodName.equals("lambda$runStreamingPipelineWithSchemaChange$370c0add$2")) {
                    z = false;
                    break;
                }
                break;
            case -53445333:
                if (implMethodName.equals("lambda$runStreamingPipelineWithSchemaChange$9c13ee94$1")) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("java/util/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaUpdateIT") && serializedLambda.getImplMethodSignature().equals("(Lorg/joda/time/Instant;)Ljava/lang/Long;")) {
                    return instant3 -> {
                        return Long.valueOf(instant3.getMillis());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("java/util/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaUpdateIT") && serializedLambda.getImplMethodSignature().equals("(Lorg/joda/time/Instant;)Ljava/lang/Long;")) {
                    return instant2 -> {
                        return Long.valueOf(instant2.getMillis() / 1000);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaUpdateIT") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkSchemaUpdateIT$GenerateRowFunc;Ljava/util/function/Function;Lorg/joda/time/Instant;)Lcom/google/api/services/bigquery/model/TableRow;")) {
                    GenerateRowFunc generateRowFunc = (GenerateRowFunc) serializedLambda.getCapturedArg(0);
                    Function function = (Function) serializedLambda.getCapturedArg(1);
                    return instant4 -> {
                        return generateRowFunc.apply((Long) function.apply(instant4));
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
