package org.apache.beam.sdk.io.gcp.bigquery;

import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.testing.BigqueryClient;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PeriodicImpulse;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/FileLoadsStreamingIT.class */
public class FileLoadsStreamingIT {

    @Parameterized.Parameter(0)
    public boolean useInputSchema;
    private static final int TOTAL_N = 50;
    private static final Logger LOG = LoggerFactory.getLogger(FileLoadsStreamingIT.class);
    private static final BigqueryClient BQ_CLIENT = new BigqueryClient("FileLoadsStreamingIT");
    private static final String PROJECT = TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
    private static final String BIG_QUERY_DATASET_ID = "file_loads_streaming_it_" + System.nanoTime();
    private static final String[] FIELDS = {"BOOL", "BOOLEAN", "BYTES", "INT64", "INTEGER", "FLOAT", "FLOAT64", "NUMERIC", "STRING", "DATE", "TIMESTAMP"};

    @Rule
    public TestName testName = new TestName();
    private final Random randomGenerator = new Random();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/FileLoadsStreamingIT$GenerateRowFunc.class */
    public static class GenerateRowFunc implements SerializableFunction<Long, TableRow> {
        private final List<String> fieldNames;

        public GenerateRowFunc(List<String> list) {
            this.fieldNames = list;
        }

        /* JADX WARN: Failed to find 'out' block for switch in B:5:0x004d. Please report as an issue. */
        public TableRow apply(Long l) {
            TableRow tableRow = new TableRow();
            tableRow.set("id", l);
            for (String str : this.fieldNames) {
                String str2 = (String) Iterables.get(Splitter.on('_').split(str), 0);
                boolean z = -1;
                switch (str2.hashCode()) {
                    case -1838656495:
                        if (str2.equals("STRING")) {
                            z = 10;
                            break;
                        }
                        break;
                    case -1618932450:
                        if (str2.equals("INTEGER")) {
                            z = 4;
                            break;
                        }
                        break;
                    case -1453246218:
                        if (str2.equals("TIMESTAMP")) {
                            z = 9;
                            break;
                        }
                        break;
                    case -1282431251:
                        if (str2.equals("NUMERIC")) {
                            z = 7;
                            break;
                        }
                        break;
                    case -48459270:
                        if (str2.equals("FLOAT64")) {
                            z = 6;
                            break;
                        }
                        break;
                    case 2044650:
                        if (str2.equals("BOOL")) {
                            z = false;
                            break;
                        }
                        break;
                    case 2090926:
                        if (str2.equals("DATE")) {
                            z = 8;
                            break;
                        }
                        break;
                    case 63686731:
                        if (str2.equals("BYTES")) {
                            z = 2;
                            break;
                        }
                        break;
                    case 66988604:
                        if (str2.equals("FLOAT")) {
                            z = 5;
                            break;
                        }
                        break;
                    case 69823181:
                        if (str2.equals("INT64")) {
                            z = 3;
                            break;
                        }
                        break;
                    case 782694408:
                        if (str2.equals("BOOLEAN")) {
                            z = true;
                            break;
                        }
                        break;
                }
                switch (z) {
                    case false:
                    case true:
                        if (l.longValue() % 2 == 0) {
                            tableRow.set(str, false);
                            break;
                        } else {
                            tableRow.set(str, true);
                            break;
                        }
                    case true:
                        tableRow.set(str, String.format("test_blob_%s", l).getBytes(StandardCharsets.UTF_8));
                        break;
                    case true:
                    case true:
                        tableRow.set(str, String.valueOf(l.longValue() + 10));
                        break;
                    case true:
                    case true:
                        tableRow.set(str, String.valueOf(0.5d + l.longValue()));
                        break;
                    case true:
                        tableRow.set(str, String.valueOf(l.longValue() + 0.12345d));
                        break;
                    case true:
                        tableRow.set(str, "2022-01-01");
                        break;
                    case true:
                        tableRow.set(str, "2022-01-01 10:10:10.012 UTC");
                        break;
                    case true:
                        tableRow.set(str, "test_string" + l);
                        break;
                    default:
                        tableRow.set(str, "unknown" + l);
                        break;
                }
            }
            return tableRow;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/FileLoadsStreamingIT$TestDynamicDest.class */
    public static class TestDynamicDest extends DynamicDestinations<Long, Long> {
        String tablePrefix;
        List<String> table0Fields;
        List<String> table1Fields;
        List<String> table2Fields;
        boolean useInputSchema;

        public TestDynamicDest(String str, List<String> list, List<String> list2, List<String> list3, boolean z) {
            this.tablePrefix = str;
            this.table0Fields = list;
            this.table1Fields = list2;
            this.table2Fields = list3;
            this.useInputSchema = z;
        }

        public Long getDestination(ValueInSingleWindow<Long> valueInSingleWindow) {
            return Long.valueOf(((Long) valueInSingleWindow.getValue()).longValue() % 3);
        }

        public TableDestination getTable(Long l) {
            return new TableDestination(this.tablePrefix + "-" + l, (String) null);
        }

        public TableSchema getSchema(Long l) {
            if (!this.useInputSchema) {
                return null;
            }
            List list = (List) (l.longValue() == 0 ? this.table0Fields : l.longValue() == 1 ? this.table1Fields : this.table2Fields).stream().map(str -> {
                return new TableFieldSchema().setName(str).setType(str).setMode("REQUIRED");
            }).collect(Collectors.toList());
            list.add(0, new TableFieldSchema().setName("id").setType("INTEGER").setMode("REQUIRED"));
            return new TableSchema().setFields(list);
        }

        /* renamed from: getDestination, reason: collision with other method in class */
        public /* bridge */ /* synthetic */ Object m35getDestination(ValueInSingleWindow valueInSingleWindow) {
            return getDestination((ValueInSingleWindow<Long>) valueInSingleWindow);
        }
    }

    @Parameterized.Parameters
    public static Iterable<Object[]> data() {
        return ImmutableList.of(new Object[]{false}, new Object[]{true});
    }

    @BeforeClass
    public static void setUpTestEnvironment() throws IOException, InterruptedException {
        cleanUp();
        BQ_CLIENT.createNewDataset(PROJECT, BIG_QUERY_DATASET_ID);
    }

    @AfterClass
    public static void cleanUp() {
        BQ_CLIENT.deleteDataset(PROJECT, BIG_QUERY_DATASET_ID);
    }

    private static TableSchema makeTableSchemaFromTypes(List<String> list) {
        ImmutableList.Builder builder = ImmutableList.builder();
        builder.add(new TableFieldSchema().setType("INTEGER").setName("id").setMode("REQUIRED"));
        for (String str : list) {
            builder.add(new TableFieldSchema().setType(str).setName(str).setMode("REQUIRED"));
        }
        return new TableSchema().setFields(builder.build());
    }

    private String maybeCreateTable(TableSchema tableSchema, String str) throws IOException, InterruptedException {
        String str2 = (String) Iterables.get(Splitter.on('[').split(this.testName.getMethodName()), 0);
        BQ_CLIENT.deleteTable(PROJECT, BIG_QUERY_DATASET_ID, str2 + str);
        if (this.useInputSchema) {
            str2 = str2 + "WithInputSchema";
        } else {
            BQ_CLIENT.createNewTable(PROJECT, BIG_QUERY_DATASET_ID, new Table().setSchema(tableSchema).setTableReference(new TableReference().setTableId(str2 + str).setDatasetId(BIG_QUERY_DATASET_ID).setProjectId(PROJECT)));
        }
        return String.format("%s.%s.%s", PROJECT, BIG_QUERY_DATASET_ID, str2 + str);
    }

    private void runStreaming(int i, boolean z) throws IOException, InterruptedException {
        TestPipelineOptions as = TestPipeline.testingPipelineOptions().as(TestPipelineOptions.class);
        as.setTempLocation(as.getTempRoot());
        Pipeline create = Pipeline.create(as);
        if (create.getOptions().getRunner().getName().contains("DataflowRunner")) {
            Assume.assumeTrue("Skipping in favor of more relevant test case", this.useInputSchema);
            ExperimentalOptions.addExperiment(create.getOptions().as(ExperimentalOptions.class), "enable_streaming_engine");
        }
        List asList = Arrays.asList(FIELDS);
        ArrayList arrayList = new ArrayList(asList);
        Collections.shuffle(arrayList, this.randomGenerator);
        TableSchema makeTableSchemaFromTypes = makeTableSchemaFromTypes(asList);
        TableSchema makeTableSchemaFromTypes2 = makeTableSchemaFromTypes(arrayList);
        String maybeCreateTable = maybeCreateTable(makeTableSchemaFromTypes, "");
        Instant instant = new Instant(0L);
        GenerateRowFunc generateRowFunc = new GenerateRowFunc(arrayList);
        PCollection apply = create.apply("Generate Instants", PeriodicImpulse.create().startAt(instant).stopAt(instant.plus(Duration.standardSeconds(49L))).withInterval(Duration.standardSeconds(1L)).catchUpToNow(false)).apply("Create TableRows", MapElements.into(TypeDescriptor.of(TableRow.class)).via(instant2 -> {
            return generateRowFunc.apply(Long.valueOf(instant2.getMillis() / 1000));
        }));
        BigQueryIO.Write withTriggeringFrequency = BigQueryIO.writeTableRows().to(maybeCreateTable).withMethod(BigQueryIO.Write.Method.FILE_LOADS).withTriggeringFrequency(Duration.standardSeconds(10L));
        if (z) {
            withTriggeringFrequency = withTriggeringFrequency.withMaxBytesPerPartition(250L);
        }
        BigQueryIO.Write withWriteDisposition = this.useInputSchema ? withTriggeringFrequency.withSchema(makeTableSchemaFromTypes2).withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED).withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE) : withTriggeringFrequency.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER).withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND);
        apply.apply("Stream loads to BigQuery", i == 0 ? withWriteDisposition.withAutoSharding() : withWriteDisposition.withNumFileShards(i));
        create.run().waitUntilFinish();
        ArrayList arrayList2 = new ArrayList();
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= 50) {
                checkRowCompleteness(maybeCreateTable, makeTableSchemaFromTypes2, arrayList2);
                return;
            } else {
                arrayList2.add(generateRowFunc.apply(Long.valueOf(j2)));
                j = j2 + 1;
            }
        }
    }

    private static void checkRowCompleteness(String str, TableSchema tableSchema, List<TableRow> list) throws IOException, InterruptedException {
        List queryUnflattened = BQ_CLIENT.queryUnflattened(String.format("SELECT * FROM [%s]", str), PROJECT, true, false);
        Schema fromTableSchema = BigQueryUtils.fromTableSchema(tableSchema);
        List list2 = (List) queryUnflattened.stream().map(tableRow -> {
            return BigQueryUtils.toBeamRow(fromTableSchema, tableRow);
        }).collect(Collectors.toList());
        List list3 = (List) list.stream().map(tableRow2 -> {
            return BigQueryUtils.toBeamRow(fromTableSchema, tableRow2);
        }).collect(Collectors.toList());
        LOG.info("Actual rows number: {}, expected: {}", Integer.valueOf(list2.size()), Integer.valueOf(list3.size()));
        MatcherAssert.assertThat("Comparing expected rows with actual rows", list2, Matchers.containsInAnyOrder(list3.toArray()));
        Assert.assertEquals("Checking there is no duplication", list3.size(), list2.size());
    }

    @Test
    public void testLoadWithFixedShards() throws IOException, InterruptedException {
        runStreaming(5, false);
    }

    @Test
    public void testLoadWithAutoShardingAndCopyJobs() throws IOException, InterruptedException {
        runStreaming(0, true);
    }

    @Test
    public void testDynamicDestinationsWithFixedShards() throws IOException, InterruptedException {
        runStreamingToDynamicDestinations(6, false);
    }

    @Test
    public void testDynamicDestinationsWithAutoShardingAndCopyJobs() throws IOException, InterruptedException {
        runStreamingToDynamicDestinations(0, true);
    }

    private void runStreamingToDynamicDestinations(int i, boolean z) throws IOException, InterruptedException {
        TestPipelineOptions as = TestPipeline.testingPipelineOptions().as(TestPipelineOptions.class);
        as.setTempLocation(as.getTempRoot());
        Pipeline create = Pipeline.create(as);
        if (create.getOptions().getRunner().getName().contains("DataflowRunner")) {
            Assume.assumeTrue("Skipping in favor of more relevant test case", this.useInputSchema);
            ExperimentalOptions.addExperiment(create.getOptions().as(ExperimentalOptions.class), "enable_streaming_engine");
        }
        List asList = Arrays.asList(FIELDS);
        ArrayList arrayList = new ArrayList(asList.subList(0, 4));
        ArrayList arrayList2 = new ArrayList(asList.subList(4, 8));
        ArrayList arrayList3 = new ArrayList(asList.subList(8, 11));
        TableSchema makeTableSchemaFromTypes = makeTableSchemaFromTypes(arrayList);
        TableSchema makeTableSchemaFromTypes2 = makeTableSchemaFromTypes(arrayList2);
        TableSchema makeTableSchemaFromTypes3 = makeTableSchemaFromTypes(arrayList3);
        String maybeCreateTable = maybeCreateTable(makeTableSchemaFromTypes, "-0");
        String maybeCreateTable2 = maybeCreateTable(makeTableSchemaFromTypes2, "-1");
        String maybeCreateTable3 = maybeCreateTable(makeTableSchemaFromTypes3, "-2");
        GenerateRowFunc generateRowFunc = new GenerateRowFunc(arrayList);
        GenerateRowFunc generateRowFunc2 = new GenerateRowFunc(arrayList2);
        GenerateRowFunc generateRowFunc3 = new GenerateRowFunc(arrayList3);
        String substring = maybeCreateTable.substring(0, maybeCreateTable.length() - 2);
        Instant instant = new Instant(0L);
        PCollection apply = create.apply("Generate Instants", PeriodicImpulse.create().startAt(instant).stopAt(instant.plus(Duration.standardSeconds(49L))).withInterval(Duration.standardSeconds(1L)).catchUpToNow(false)).apply("Create TableRows", MapElements.into(TypeDescriptors.longs()).via(instant2 -> {
            return Long.valueOf(instant2.getMillis() / 1000);
        }));
        BigQueryIO.Write withTriggeringFrequency = BigQueryIO.write().to(new TestDynamicDest(substring, arrayList, arrayList2, arrayList3, this.useInputSchema)).withFormatFunction(l -> {
            long longValue = l.longValue() % 3;
            return longValue == 0 ? generateRowFunc.apply(l) : longValue == 1 ? generateRowFunc2.apply(l) : generateRowFunc3.apply(l);
        }).withMethod(BigQueryIO.Write.Method.FILE_LOADS).withTriggeringFrequency(Duration.standardSeconds(10L));
        if (z) {
            withTriggeringFrequency = withTriggeringFrequency.withMaxBytesPerPartition(150L);
        }
        BigQueryIO.Write withWriteDisposition = this.useInputSchema ? withTriggeringFrequency.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED).withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE) : withTriggeringFrequency.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER).withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND);
        apply.apply("Stream loads to dynamic destinations", i == 0 ? withWriteDisposition.withAutoSharding() : withWriteDisposition.withNumFileShards(i));
        create.run().waitUntilFinish();
        ArrayList arrayList4 = new ArrayList();
        ArrayList arrayList5 = new ArrayList();
        ArrayList arrayList6 = new ArrayList();
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= 50) {
                checkRowCompleteness(maybeCreateTable, makeTableSchemaFromTypes(arrayList), arrayList4);
                checkRowCompleteness(maybeCreateTable2, makeTableSchemaFromTypes(arrayList2), arrayList5);
                checkRowCompleteness(maybeCreateTable3, makeTableSchemaFromTypes(arrayList3), arrayList6);
                return;
            } else {
                long j3 = j2 % 3;
                if (j3 == 0) {
                    arrayList4.add(generateRowFunc.apply(Long.valueOf(j2)));
                } else if (j3 == 1) {
                    arrayList5.add(generateRowFunc2.apply(Long.valueOf(j2)));
                } else {
                    arrayList6.add(generateRowFunc3.apply(Long.valueOf(j2)));
                }
                j = j2 + 1;
            }
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 211411658:
                if (implMethodName.equals("lambda$runStreaming$b6bdf706$1")) {
                    z = false;
                    break;
                }
                break;
            case 1437295695:
                if (implMethodName.equals("lambda$runStreamingToDynamicDestinations$867317f9$1")) {
                    z = true;
                    break;
                }
                break;
            case 2108715198:
                if (implMethodName.equals("lambda$runStreamingToDynamicDestinations$cbc204a1$1")) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/gcp/bigquery/FileLoadsStreamingIT") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/io/gcp/bigquery/FileLoadsStreamingIT$GenerateRowFunc;Lorg/joda/time/Instant;)Lcom/google/api/services/bigquery/model/TableRow;")) {
                    GenerateRowFunc generateRowFunc = (GenerateRowFunc) serializedLambda.getCapturedArg(0);
                    return instant2 -> {
                        return generateRowFunc.apply(Long.valueOf(instant2.getMillis() / 1000));
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/gcp/bigquery/FileLoadsStreamingIT") && serializedLambda.getImplMethodSignature().equals("(Lorg/joda/time/Instant;)Ljava/lang/Long;")) {
                    return instant22 -> {
                        return Long.valueOf(instant22.getMillis() / 1000);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/gcp/bigquery/FileLoadsStreamingIT") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/io/gcp/bigquery/FileLoadsStreamingIT$GenerateRowFunc;Lorg/apache/beam/sdk/io/gcp/bigquery/FileLoadsStreamingIT$GenerateRowFunc;Lorg/apache/beam/sdk/io/gcp/bigquery/FileLoadsStreamingIT$GenerateRowFunc;Ljava/lang/Long;)Lcom/google/api/services/bigquery/model/TableRow;")) {
                    GenerateRowFunc generateRowFunc2 = (GenerateRowFunc) serializedLambda.getCapturedArg(0);
                    GenerateRowFunc generateRowFunc3 = (GenerateRowFunc) serializedLambda.getCapturedArg(1);
                    GenerateRowFunc generateRowFunc4 = (GenerateRowFunc) serializedLambda.getCapturedArg(2);
                    return l -> {
                        long longValue = l.longValue() % 3;
                        return longValue == 0 ? generateRowFunc2.apply(l) : longValue == 1 ? generateRowFunc3.apply(l) : generateRowFunc4.apply(l);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
