package org.apache.beam.sdk.io.gcp.bigquery;

import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.testing.BigqueryClient;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkFailedRowsIT.class */
public class StorageApiSinkFailedRowsIT {

    @Parameterized.Parameter(0)
    public boolean useStreamingExactlyOnce;

    @Parameterized.Parameter(1)
    public boolean useAtLeastOnce;

    @Parameterized.Parameter(2)
    public boolean useBatch;
    private static final Logger LOG = LoggerFactory.getLogger(StorageApiSinkFailedRowsIT.class);
    private static final BigqueryClient BQ_CLIENT = new BigqueryClient("StorageApiSinkFailedRowsIT");
    private static final String PROJECT = TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
    private static final String BIG_QUERY_DATASET_ID = "storage_api_sink_failed_rows" + System.nanoTime();
    private static final List<TableFieldSchema> FIELDS = ImmutableList.builder().add(new TableFieldSchema().setType("STRING").setName("str")).add(new TableFieldSchema().setType("INT64").setName("i64")).add(new TableFieldSchema().setType("DATE").setName("date")).add(new TableFieldSchema().setType("STRING").setMaxLength(1L).setName("strone")).add(new TableFieldSchema().setType("BYTES").setName("bytes")).add(new TableFieldSchema().setType("JSON").setName("json")).add(new TableFieldSchema().setType("STRING").setMaxLength(1L).setMode("REPEATED").setName("stronearray")).build();
    private static final TableSchema BASE_TABLE_SCHEMA = new TableSchema().setFields(ImmutableList.builder().addAll(FIELDS).add(new TableFieldSchema().setType("STRUCT").setFields(FIELDS).setName("inner")).build());
    private static final byte[] BIG_BYTES = new byte[11534336];
    private static String bigQueryLocation;

    @Parameterized.Parameters
    public static Iterable<Object[]> data() {
        return ImmutableList.of(new Object[]{true, false, false}, new Object[]{false, true, false}, new Object[]{false, false, true}, new Object[]{true, false, true});
    }

    private BigQueryIO.Write.Method getMethod() {
        return this.useAtLeastOnce ? BigQueryIO.Write.Method.STORAGE_API_AT_LEAST_ONCE : BigQueryIO.Write.Method.STORAGE_WRITE_API;
    }

    @BeforeClass
    public static void setUpTestEnvironment() throws IOException, InterruptedException {
        bigQueryLocation = TestPipeline.testingPipelineOptions().as(TestBigQueryOptions.class).getBigQueryLocation();
        BQ_CLIENT.createNewDataset(PROJECT, BIG_QUERY_DATASET_ID, (Long) null, bigQueryLocation);
    }

    @AfterClass
    public static void cleanup() {
        LOG.info("Start to clean up tables and datasets.");
        BQ_CLIENT.deleteDataset(PROJECT, BIG_QUERY_DATASET_ID);
    }

    @Test
    public void testSchemaMismatchCaughtByBeam() throws IOException, InterruptedException {
        String createTable = createTable(BASE_TABLE_SCHEMA);
        TableRow tableRow = new TableRow().set("str", "foo").set("i64", "42");
        TableRow tableRow2 = new TableRow().set("str", "foo").set("i64", "43");
        ImmutableList of = ImmutableList.of(tableRow.clone().set("inner", new TableRow()), tableRow2.clone().set("inner", new TableRow()), new TableRow().set("inner", tableRow), new TableRow().set("inner", tableRow2));
        TableRow tableRow3 = new TableRow().set("str", "foo").set("i64", "baad");
        TableRow tableRow4 = new TableRow().set("str", "foo").set("i64", "42").set("unknown", "foobar");
        ImmutableList of2 = ImmutableList.of(tableRow3, tableRow4, new TableRow().set("inner", tableRow3), new TableRow().set("inner", tableRow4));
        runPipeline(getMethod(), this.useStreamingExactlyOnce, createTable, Iterables.concat(of, of2), of2);
        assertGoodRowsWritten(createTable, of);
    }

    @Test
    public void testInvalidRowCaughtByBigquery() throws IOException, InterruptedException {
        String createTable = createTable(BASE_TABLE_SCHEMA);
        TableRow tableRow = new TableRow().set("str", "foo").set("i64", "42").set("date", "2022-08-16").set("stronearray", Lists.newArrayList());
        TableRow tableRow2 = new TableRow().set("str", "foo").set("i64", "43").set("stronearray", Lists.newArrayList());
        ImmutableList of = ImmutableList.of(tableRow.clone().set("inner", new TableRow().set("stronearray", Lists.newArrayList())), tableRow2.clone().set("inner", new TableRow().set("stronearray", Lists.newArrayList())), new TableRow().set("inner", tableRow).set("stronearray", Lists.newArrayList()), new TableRow().set("inner", tableRow2).set("stronearray", Lists.newArrayList()));
        TableRow tableRow3 = new TableRow().set("str", "foo").set("i64", "42").set("date", "10001-08-16");
        TableRow tableRow4 = new TableRow().set("str", "foo").set("i64", "42").set("strone", "ab");
        TableRow tableRow5 = new TableRow().set("str", "foo").set("i64", "42").set("json", "BAADF00D");
        ImmutableList of2 = ImmutableList.of(tableRow3, tableRow4, tableRow5, new TableRow().set("str", "foo").set("i64", "42").set("stronearray", Lists.newArrayList(new String[]{"toolong"})), new TableRow().set("bytes", BIG_BYTES), new TableRow().set("inner", tableRow3), new TableRow().set("inner", tableRow4), new TableRow().set("inner", tableRow5));
        runPipeline(getMethod(), this.useStreamingExactlyOnce, createTable, Iterables.concat(of, of2), of2);
        assertGoodRowsWritten(createTable, of);
    }

    private static String createTable(TableSchema tableSchema) throws IOException, InterruptedException {
        String str = "table" + System.nanoTime();
        BQ_CLIENT.deleteTable(PROJECT, BIG_QUERY_DATASET_ID, str);
        BQ_CLIENT.createNewTable(PROJECT, BIG_QUERY_DATASET_ID, new Table().setSchema(tableSchema).setTableReference(new TableReference().setTableId(str).setDatasetId(BIG_QUERY_DATASET_ID).setProjectId(PROJECT)));
        return PROJECT + "." + BIG_QUERY_DATASET_ID + "." + str;
    }

    private void assertGoodRowsWritten(String str, Iterable<TableRow> iterable) throws IOException, InterruptedException {
        int parseInt = Integer.parseInt((String) ((TableRow) Iterables.getOnlyElement(BQ_CLIENT.queryUnflattened(String.format("SELECT COUNT(*) FROM `%s`", str), PROJECT, true, true, bigQueryLocation))).get("f0_"));
        if (this.useAtLeastOnce) {
            MatcherAssert.assertThat(Integer.valueOf(parseInt), Matchers.greaterThanOrEqualTo(Integer.valueOf(Iterables.size(iterable))));
        } else {
            MatcherAssert.assertThat(Integer.valueOf(parseInt), Matchers.equalTo(Integer.valueOf(Iterables.size(iterable))));
        }
    }

    private static void runPipeline(BigQueryIO.Write.Method method, boolean z, String str, Iterable<TableRow> iterable, Iterable<TableRow> iterable2) {
        Pipeline create = Pipeline.create();
        BigQueryIO.Write withCreateDisposition = BigQueryIO.writeTableRows().to(str).withSchema(BASE_TABLE_SCHEMA).withMethod(method).withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER);
        if (method == BigQueryIO.Write.Method.STORAGE_WRITE_API) {
            withCreateDisposition = withCreateDisposition.withNumStorageWriteApiStreams(1);
            if (z) {
                withCreateDisposition = withCreateDisposition.withTriggeringFrequency(Duration.standardSeconds(1L));
            }
        }
        PCollection apply = create.apply("Create test cases", Create.of(iterable));
        if (z) {
            apply = apply.setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED);
        }
        PAssert.that(apply.apply("Write using Storage Write API", withCreateDisposition).getFailedStorageApiInserts().apply(MapElements.into(TypeDescriptor.of(TableRow.class)).via((v0) -> {
            return v0.getRow();
        }))).containsInAnyOrder(iterable2);
        create.run().waitUntilFinish();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1249351004:
                if (implMethodName.equals("getRow")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageApiInsertError") && serializedLambda.getImplMethodSignature().equals("()Lcom/google/api/services/bigquery/model/TableRow;")) {
                    return (v0) -> {
                        return v0.getRow();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
