package org.apache.beam.it.gcp.bigquery;

import com.google.api.core.ApiFuture;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.auto.value.AutoValue;
import com.google.cloud.bigquery.storage.v1.FlushRowsResponse;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import org.apache.beam.it.common.PipelineLauncher;
import org.apache.beam.it.common.PipelineOperator;
import org.apache.beam.it.common.TestProperties;
import org.apache.beam.it.gcp.IOLoadTestBase;
import org.apache.beam.it.gcp.LoadTestBase;
import org.apache.beam.it.gcp.bigquery.AutoValue_BigQueryStreamingLT_TestConfiguration;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
import org.apache.beam.sdk.io.gcp.testing.BigqueryClient;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.PeriodicImpulse;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.joda.time.Duration;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/it/gcp/bigquery/BigQueryStreamingLT.class */
public class BigQueryStreamingLT extends IOLoadTestBase {
    private TestConfiguration config;
    private Integer crashIntervalSeconds;

    @Rule
    public final transient TestPipeline fileLoadsPipeline = TestPipeline.create();

    @Rule
    public final transient TestPipeline storageApiPipeline = TestPipeline.create();
    static final String FIELD_PREFIX = "byte_field_";
    static final String RECORD_FIELD_PREFIX = "record_byte_field_";
    static final String NESTED_FIELD_PREFIX = "nested_byte_field_";
    static final String REPEATED_FIELD_PREFIX = "repeated_byte_field_";
    private static final Logger LOG = LoggerFactory.getLogger(BigQueryStreamingLT.class);
    private static final BigqueryClient BQ_CLIENT = new BigqueryClient("BigQueryStreamingLT");
    private static final String BIG_QUERY_DATASET_ID = "storage_api_sink_load_test_" + System.nanoTime();
    private static final Map<String, TestConfiguration> TEST_CONFIGS = ImmutableMap.of("local", TestConfiguration.of(5, 5, 2, 1000, "DirectRunner", null), "small", TestConfiguration.of(10, 10, 5, 1000, "DataflowRunner", null), "medium", TestConfiguration.of(20, 20, 10, 5000, "DataflowRunner", null), "large", TestConfiguration.of(30, 50, 20, 10000, "DataflowRunner", null));

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/it/gcp/bigquery/BigQueryStreamingLT$CrashingBigQueryServices.class */
    public static class CrashingBigQueryServices extends BigQueryServicesImpl {
        public final Integer crashIntervalSeconds;

        /* loaded from: input_file:org/apache/beam/it/gcp/bigquery/BigQueryStreamingLT$CrashingBigQueryServices$CrashingWriteStreamService.class */
        private class CrashingWriteStreamService extends BigQueryServicesImpl.WriteStreamServiceImpl {
            private Instant lastCrash;

            public CrashingWriteStreamService(BigQueryOptions bigQueryOptions) {
                super(bigQueryOptions);
            }

            public ApiFuture<FlushRowsResponse> flush(String str, long j) throws IOException, InterruptedException {
                maybeCrash();
                return super.flush(str, j);
            }

            private void maybeCrash() {
                if (CrashingBigQueryServices.this.crashIntervalSeconds.intValue() != -1) {
                    Instant instant = this.lastCrash;
                    if (instant == null) {
                        this.lastCrash = Instant.now();
                        return;
                    }
                    if (Instant.now().isAfter(instant.plusSeconds(CrashingBigQueryServices.this.crashIntervalSeconds.intValue()))) {
                        this.lastCrash = Instant.now();
                        if (ThreadLocalRandom.current().nextInt(100) < 30) {
                            if (ThreadLocalRandom.current().nextBoolean()) {
                                throw new RuntimeException("Throwing a random exception! This is for testing retry resilience.");
                            }
                            BigQueryStreamingLT.LOG.error("Crashing this worker! This is for testing retry resilience.");
                            System.exit(0);
                        }
                    }
                }
            }
        }

        public CrashingBigQueryServices(Integer num) {
            this.crashIntervalSeconds = num;
        }

        public BigQueryServices.WriteStreamService getWriteStreamService(BigQueryOptions bigQueryOptions) {
            return new CrashingWriteStreamService(bigQueryOptions);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/it/gcp/bigquery/BigQueryStreamingLT$GenerateTableRow.class */
    public static class GenerateTableRow implements SerializableFunction<Long, TableRow> {
        private final int numFields;
        private final int sizePerField;
        static final /* synthetic */ boolean $assertionsDisabled;

        public GenerateTableRow(int i, int i2) {
            if (!$assertionsDisabled && i < 0) {
                throw new AssertionError();
            }
            this.numFields = i;
            this.sizePerField = i2;
        }

        /* JADX WARN: Type inference failed for: r2v11, types: [java.lang.Object[], byte[]] */
        public TableRow apply(Long l) {
            TableRow tableRow = new TableRow();
            tableRow.set("id", l);
            byte[] array = getPayload(this.sizePerField, l.longValue()).array();
            int i = 1;
            for (int i2 = 1; i2 <= this.numFields; i2++) {
                if (i == 4) {
                    tableRow.set(BigQueryStreamingLT.RECORD_FIELD_PREFIX + i2, new TableRow().set("nested_byte_field_1", Arrays.copyOfRange(array, 0, this.sizePerField / 2)).set("nested_byte_field_2", Arrays.copyOfRange(array, this.sizePerField / 2, this.sizePerField)));
                } else if (i == 5) {
                    tableRow.set(BigQueryStreamingLT.REPEATED_FIELD_PREFIX + i2, Arrays.asList(new byte[]{Arrays.copyOfRange(array, 0, this.sizePerField / 3), Arrays.copyOfRange(array, this.sizePerField / 3, (this.sizePerField * 2) / 3), Arrays.copyOfRange(array, (this.sizePerField * 2) / 3, this.sizePerField)}));
                    i = 0;
                } else {
                    tableRow.set(BigQueryStreamingLT.FIELD_PREFIX + i2, array);
                }
                i++;
            }
            return tableRow;
        }

        private ByteBuffer getPayload(int i, long j) {
            if (i <= 0) {
                return null;
            }
            byte[] bArr = new byte[i];
            Random random = (Random) ThreadLocal.withInitial(() -> {
                return new Random(j);
            }).get();
            random.setSeed(j);
            random.nextBytes(bArr);
            return ByteBuffer.wrap(bArr);
        }

        static {
            $assertionsDisabled = !BigQueryStreamingLT.class.desiredAssertionStatus();
        }
    }

    /* loaded from: input_file:org/apache/beam/it/gcp/bigquery/BigQueryStreamingLT$MultiplierDoFn.class */
    public static class MultiplierDoFn extends DoFn<Long, Long> {
        private long multiplier;

        MultiplierDoFn(long j) {
            this.multiplier = j;
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element Long l, DoFn.OutputReceiver<Long> outputReceiver) {
            for (int i = 0; i < this.multiplier; i++) {
                outputReceiver.output(l);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @AutoValue
    /* loaded from: input_file:org/apache/beam/it/gcp/bigquery/BigQueryStreamingLT$TestConfiguration.class */
    public static abstract class TestConfiguration {

        /* JADX INFO: Access modifiers changed from: package-private */
        @AutoValue.Builder
        /* loaded from: input_file:org/apache/beam/it/gcp/bigquery/BigQueryStreamingLT$TestConfiguration$Builder.class */
        public static abstract class Builder {
            abstract Builder setMinutes(int i);

            abstract Builder setByteSizePerField(int i);

            abstract Builder setNumFields(int i);

            abstract Builder setRowsPerSecond(int i);

            abstract Builder setRunner(String str);

            abstract Builder setExpectedTable(String str);

            abstract TestConfiguration build();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Integer getMinutes();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Integer getByteSizePerField();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Integer getNumFields();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Integer getRowsPerSecond();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract String getRunner();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract String getExpectedTable();

        static TestConfiguration of(int i, int i2, int i3, int i4, String str, String str2) {
            return new AutoValue_BigQueryStreamingLT_TestConfiguration.Builder().setMinutes(i).setByteSizePerField(i2).setNumFields(i3).setRowsPerSecond(i4).setRunner(str).setExpectedTable(str2).build();
        }

        abstract Builder toBuilder();
    }

    @BeforeClass
    public static void setUpTestClass() throws IOException, InterruptedException {
        PipelineOptionsFactory.register(TestPipelineOptions.class);
        BQ_CLIENT.createNewDataset(project, BIG_QUERY_DATASET_ID);
    }

    @Before
    public void setUpTest() {
        String property = TestProperties.getProperty("configuration", "small", TestProperties.Type.PROPERTY);
        this.config = TEST_CONFIGS.get(property);
        if (this.config == null) {
            throw new IllegalArgumentException(String.format("Unknown test configuration: [%s]. Known configs: %s", property, TEST_CONFIGS.keySet()));
        }
        if (!Strings.isNullOrEmpty(this.tempBucketName)) {
            String format = String.format("gs://%s/temp/", this.tempBucketName);
            this.fileLoadsPipeline.getOptions().as(TestPipelineOptions.class).setTempRoot(format);
            this.fileLoadsPipeline.getOptions().setTempLocation(format);
        }
        String property2 = TestProperties.getProperty("expectedTable", "", TestProperties.Type.PROPERTY);
        if (!Strings.isNullOrEmpty(property2)) {
            this.config.toBuilder().setExpectedTable(property2).build();
        }
        this.crashIntervalSeconds = Integer.valueOf(Integer.parseInt(TestProperties.getProperty("crashIntervalSeconds", "-1", TestProperties.Type.PROPERTY)));
    }

    @AfterClass
    public static void cleanup() {
        BQ_CLIENT.deleteDataset(project, BIG_QUERY_DATASET_ID);
    }

    @Test
    public void testExactlyOnceStreaming() throws IOException, InterruptedException {
        runTest(BigQueryIO.Write.Method.STORAGE_WRITE_API);
    }

    @Test
    @Ignore
    public void testAtLeastOnceStreaming() throws IOException, InterruptedException {
        runTest(BigQueryIO.Write.Method.STORAGE_API_AT_LEAST_ONCE);
    }

    public void runTest(BigQueryIO.Write.Method method) throws IOException, InterruptedException {
        long millis = Duration.standardMinutes(this.config.getMinutes().intValue()).getMillis();
        int max = Math.max(this.config.getRowsPerSecond().intValue(), 1000);
        long j = max / 1000;
        long j2 = (j * millis) / 1;
        boolean z = this.config.getRunner().equalsIgnoreCase(DataflowRunner.class.getSimpleName()) && this.crashIntervalSeconds.intValue() <= 0;
        String expectedTable = this.config.getExpectedTable();
        GenerateTableRow generateTableRow = new GenerateTableRow(this.config.getNumFields().intValue(), this.config.getByteSizePerField().intValue());
        TableSchema generateTableSchema = generateTableSchema(this.config.getNumFields().intValue());
        if (Strings.isNullOrEmpty(expectedTable)) {
            String format = String.format("fileloads-%s-records", withScaleSymbol(j2));
            expectedTable = String.format("%s.%s.%s", project, BIG_QUERY_DATASET_ID, format);
            LOG.info("No expected table was set. Will run a batch job to load {} rows to {}. This will be used as the source of truth.", Long.valueOf(j2), expectedTable);
            this.fileLoadsPipeline.apply(GenerateSequence.from(0L).to(j2)).apply("Write to source of truth", BigQueryIO.write().to(expectedTable).withFormatFunction(generateTableRow).withMethod(BigQueryIO.Write.Method.FILE_LOADS).withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE).withSchema(generateTableSchema));
            if (z) {
                this.pipelineLauncher.launch(project, region, PipelineLauncher.LaunchConfig.builder("test-" + format).setSdk(PipelineLauncher.Sdk.JAVA).setPipeline(this.fileLoadsPipeline).addParameter("runner", this.config.getRunner()).addParameter("maxNumWorkers", TestProperties.getProperty("maxNumWorkers", "10", TestProperties.Type.PROPERTY)).build());
            } else {
                this.fileLoadsPipeline.run();
            }
        }
        String format2 = String.format("storageapi%s-load-%sqps-%smin-%stotal", method == BigQueryIO.Write.Method.STORAGE_API_AT_LEAST_ONCE ? "-atleastonce" : "", withScaleSymbol(max), this.config.getMinutes(), withScaleSymbol(j2));
        String format3 = String.format("%s.%s.%s", project, BIG_QUERY_DATASET_ID, format2);
        LOG.info("Preparing a source generating at a rate of {} rows per second for a period of {} minutes. This results in a total of {} rows written to {}.", new Object[]{Integer.valueOf(max), this.config.getMinutes(), Long.valueOf(j2), format3});
        PCollection apply = this.storageApiPipeline.apply(PeriodicImpulse.create().stopAfter(Duration.millis(millis - 1)).withInterval(Duration.millis(1))).apply("Extract row IDs", MapElements.into(TypeDescriptors.longs()).via(instant -> {
            return Long.valueOf(instant.getMillis() % j2);
        }));
        if (j > 1) {
            apply = (PCollection) apply.apply(String.format("One input to %s outputs", Long.valueOf(j)), ParDo.of(new MultiplierDoFn(j))).apply("Reshuffle fanout", Reshuffle.viaRandomKey());
        }
        BigQueryIO.Write withSchema = BigQueryIO.write().to(format3).withFormatFunction(generateTableRow).withMethod(method).withTriggeringFrequency(Duration.standardSeconds(1L)).withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND).withSchema(generateTableSchema);
        if (this.crashIntervalSeconds.intValue() > 0) {
            LOG.info("A crash interval of {} seconds has been set. The Storage API sink will periodically crash.", this.crashIntervalSeconds);
            withSchema = withSchema.withTestServices(new CrashingBigQueryServices(this.crashIntervalSeconds));
        }
        apply.apply(withSchema);
        if (z) {
            PipelineLauncher.LaunchInfo launch = this.pipelineLauncher.launch(project, region, PipelineLauncher.LaunchConfig.builder("test-" + format2).setSdk(PipelineLauncher.Sdk.JAVA).setPipeline(this.storageApiPipeline).addParameter("runner", this.config.getRunner()).addParameter("streaming", "true").addParameter("experiments", "enable_streaming_engine").addParameter("maxNumWorkers", TestProperties.getProperty("maxNumWorkers", "10", TestProperties.Type.PROPERTY)).build());
            Assert.assertNotEquals(PipelineOperator.Result.LAUNCH_FAILED, this.pipelineOperator.waitUntilDoneAndFinish(PipelineOperator.Config.builder().setJobId(launch.jobId()).setProject(project).setRegion(region).setTimeoutAfter(java.time.Duration.ofMinutes(this.config.getMinutes().intValue() * 2)).setCheckAfter(java.time.Duration.ofSeconds((this.config.getMinutes().intValue() * 60) / 20)).build()));
            Assert.assertEquals(PipelineLauncher.JobState.DONE, this.pipelineLauncher.getJobStatus(project, region, launch.jobId()));
            try {
                exportMetricsToBigQuery(launch, getMetrics(launch, LoadTestBase.MetricsConfiguration.builder().setInputPCollection(j > 1 ? "Extract row IDs.out0" : "Reshuffle fanout.out0").build()));
            } catch (Exception e) {
                LOG.error("Encountered an error while exporting metrics to BigQuery:\n{}", e);
            }
        } else {
            this.storageApiPipeline.run().waitUntilFinish();
        }
        LOG.info("Write pipeline finished writing to {}. Will now perform accuracy checks against the rows in {}.", format3, expectedTable);
        checkCorrectness((String) generateTableSchema.getFields().stream().map((v0) -> {
            return v0.getName();
        }).filter(str -> {
            return str.startsWith(FIELD_PREFIX);
        }).collect(Collectors.joining(", ")), format3, expectedTable);
        if (method == BigQueryIO.Write.Method.STORAGE_WRITE_API) {
            checkNonDuplication(format3, expectedTable, j2);
        }
    }

    public void checkCorrectness(String str, String str2, String str3) throws IOException, InterruptedException {
        String tableSpec = BigQueryUtils.toTableSpec(BigQueryUtils.toTableReference(str2));
        String format = String.format("WITH \nstorage_api_table AS (SELECT %s FROM `%s`), \nexpected_table AS (SELECT %s FROM `%s`), \nrows_mismatched AS (SELECT * FROM expected_table EXCEPT DISTINCT SELECT * FROM storage_api_table) \nSELECT COUNT(*) FROM rows_mismatched", str, tableSpec, str, BigQueryUtils.toTableSpec(BigQueryUtils.toTableReference(str3)));
        LOG.info("Executing query to check correctness:\n{}", format);
        long parseLong = Long.parseLong((String) ((TableRow) Iterables.getOnlyElement(BQ_CLIENT.queryUnflattened(format, "google.com:clouddfe", true, true))).get("f0_"));
        LOG.info("Number of mismatched rows: {}", Long.valueOf(parseLong));
        Assert.assertEquals(String.format("Saw %s rows that are missing from %s.", Long.valueOf(parseLong), tableSpec), 0L, parseLong);
    }

    public void checkNonDuplication(String str, String str2, long j) throws IOException, InterruptedException {
        String format = String.format("SELECT \n(SELECT COUNT(*) FROM  `%s`) AS actualCount,\n(SELECT COUNT(*) FROM  `%s`) AS expectedCount", str, str2);
        LOG.info("Executing query to check non-duplication:\n{}", format);
        TableRow tableRow = (TableRow) Iterables.getOnlyElement(BQ_CLIENT.queryUnflattened(format, "google.com:clouddfe", true, true));
        long parseLong = Long.parseLong((String) tableRow.get("actualCount"));
        Assert.assertEquals("Comparing actual table count and expected table count.", Long.parseLong((String) tableRow.get("expectedCount")), parseLong);
        Assert.assertEquals("Comparing actual table count and calculated expected count.", j, parseLong);
    }

    public String withScaleSymbol(long j) {
        List asList = Arrays.asList("", "K", "M", "B", "T", "Q");
        int i = 0;
        while (j / 1000 > 0) {
            i++;
            j /= 1000;
        }
        return String.format("%s%s", Long.valueOf(j), asList.get(i));
    }

    public static TableSchema generateTableSchema(int i) {
        ArrayList arrayList = new ArrayList(i);
        arrayList.add(new TableFieldSchema().setType("INTEGER").setName("id"));
        int i2 = 1;
        for (int i3 = 1; i3 <= i; i3++) {
            TableFieldSchema tableFieldSchema = new TableFieldSchema();
            if (i2 == 4) {
                tableFieldSchema.setType("RECORD").setName(RECORD_FIELD_PREFIX + i3).setFields(Arrays.asList(new TableFieldSchema().setType("BYTES").setName("nested_byte_field_1"), new TableFieldSchema().setType("BYTES").setName("nested_byte_field_2")));
            } else if (i2 == 5) {
                tableFieldSchema.setType("BYTES").setMode("REPEATED").setName(REPEATED_FIELD_PREFIX + i3);
                i2 = 0;
            } else {
                tableFieldSchema.setType("BYTES").setName(FIELD_PREFIX + i3);
            }
            i2++;
            arrayList.add(tableFieldSchema);
        }
        return new TableSchema().setFields(arrayList);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1419191254:
                if (implMethodName.equals("lambda$runTest$2ee7c920$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/it/gcp/bigquery/BigQueryStreamingLT") && serializedLambda.getImplMethodSignature().equals("(JLorg/joda/time/Instant;)Ljava/lang/Long;")) {
                    long longValue = ((Long) serializedLambda.getCapturedArg(0)).longValue();
                    return instant -> {
                        return Long.valueOf(instant.getMillis() % longValue);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
