package org.apache.beam.sdk.io.gcp.spanner.changestreams;

import com.google.api.gax.grpc.testing.MockServiceHelper;
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.MockSpannerServiceImpl;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.Statement;
import com.google.protobuf.ListValue;
import com.google.protobuf.NullValue;
import com.google.protobuf.Value;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.ResultSet;
import com.google.spanner.v1.ResultSetMetadata;
import com.google.spanner.v1.StructType;
import com.google.spanner.v1.Type;
import com.google.spanner.v1.TypeCode;
import io.grpc.Status;
import java.io.Serializable;
import java.util.Collections;
import org.apache.beam.runners.direct.DirectOptions;
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.testing.TestPipeline;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/spanner/changestreams/SpannerChangeStreamErrorTest.class */
public class SpannerChangeStreamErrorTest implements Serializable {
    public static final String SPANNER_HOST = "my-host";
    private static final String TEST_PROJECT = "my-project";
    private static final String TEST_INSTANCE = "my-instance";
    private static final String TEST_DATABASE = "my-database";
    private static final String TEST_TABLE = "my-metadata-table";
    private static final String TEST_CHANGE_STREAM = "my-change-stream";

    @Rule
    public transient Timeout globalTimeout = Timeout.seconds(600);

    @Rule
    public final transient TestPipeline pipeline = TestPipeline.create().enableAbandonedNodeEnforcement(false);

    @Rule
    public final transient ExpectedException thrown = ExpectedException.none();
    private MockSpannerServiceImpl mockSpannerService;
    private MockServiceHelper serviceHelper;
    private static final ResultSetMetadata PARTITION_METADATA_RESULT_SET_METADATA = ResultSetMetadata.newBuilder().setRowType(StructType.newBuilder().addFields(StructType.Field.newBuilder().setName("PartitionToken").setType(Type.newBuilder().setCode(TypeCode.STRING)).build()).addFields(StructType.Field.newBuilder().setName("ParentTokens").setType(Type.newBuilder().setCode(TypeCode.ARRAY).setArrayElementType(Type.newBuilder().setCode(TypeCode.STRING))).build()).addFields(StructType.Field.newBuilder().setName("StartTimestamp").setType(Type.newBuilder().setCode(TypeCode.TIMESTAMP))).addFields(StructType.Field.newBuilder().setName("EndTimestamp").setType(Type.newBuilder().setCode(TypeCode.TIMESTAMP))).addFields(StructType.Field.newBuilder().setName("HeartbeatMillis").setType(Type.newBuilder().setCode(TypeCode.INT64))).addFields(StructType.Field.newBuilder().setName("State").setType(Type.newBuilder().setCode(TypeCode.STRING))).addFields(StructType.Field.newBuilder().setName("Watermark").setType(Type.newBuilder().setCode(TypeCode.TIMESTAMP))).addFields(StructType.Field.newBuilder().setName("CreatedAt").setType(Type.newBuilder().setCode(TypeCode.TIMESTAMP))).addFields(StructType.Field.newBuilder().setName("ScheduledAt").setType(Type.newBuilder().setCode(TypeCode.TIMESTAMP))).addFields(StructType.Field.newBuilder().setName("RunningAt").setType(Type.newBuilder().setCode(TypeCode.TIMESTAMP))).addFields(StructType.Field.newBuilder().setName("FinishedAt").setType(Type.newBuilder().setCode(TypeCode.TIMESTAMP))).build()).build();

    @Before
    public void setUp() throws Exception {
        this.mockSpannerService = new MockSpannerServiceImpl();
        this.serviceHelper = new MockServiceHelper(SPANNER_HOST, Collections.singletonList(this.mockSpannerService));
        this.serviceHelper.start();
        this.serviceHelper.reset();
    }

    @After
    public void tearDown() throws NoSuchFieldException, IllegalAccessException {
        this.serviceHelper.reset();
        this.serviceHelper.stop();
        this.mockSpannerService.reset();
    }

    @Test
    @Ignore("BEAM-12164 Reenable this test when databaseClient.getDialect returns the right message.")
    public void testResourceExhaustedDoesNotRetry() {
        this.mockSpannerService.setExecuteStreamingSqlExecutionTime(MockSpannerServiceImpl.SimulatedExecutionTime.ofStickyException(Status.RESOURCE_EXHAUSTED.asRuntimeException()));
        Timestamp ofTimeSecondsAndNanos = Timestamp.ofTimeSecondsAndNanos(0L, 1000);
        try {
            this.pipeline.apply(SpannerIO.readChangeStream().withSpannerConfig(getSpannerConfig()).withChangeStreamName(TEST_CHANGE_STREAM).withMetadataDatabase(TEST_DATABASE).withMetadataTable(TEST_TABLE).withInclusiveStartAt(ofTimeSecondsAndNanos).withInclusiveEndAt(Timestamp.ofTimeSecondsAndNanos(ofTimeSecondsAndNanos.getSeconds(), ofTimeSecondsAndNanos.getNanos() + 1)));
            this.pipeline.run().waitUntilFinish();
        } finally {
            this.thrown.expect(SpannerException.class);
            this.thrown.expectMessage("RESOURCE_EXHAUSTED - Statement: 'SELECT 'POSTGRESQL' AS DIALECT");
            MatcherAssert.assertThat(Integer.valueOf(this.mockSpannerService.countRequestsOfType(ExecuteSqlRequest.class)), Matchers.equalTo(Integer.valueOf(0)));
        }
    }

    @Test
    @Ignore("BEAM-12164 Reenable this test when databaseClient.getDialect returns the right message.")
    public void testUnavailableExceptionRetries() throws InterruptedException {
        DirectOptions as = PipelineOptionsFactory.as(DirectOptions.class);
        as.setBlockOnRun(false);
        as.setRunner(DirectRunner.class);
        Pipeline create = TestPipeline.create(as);
        this.mockSpannerService.setExecuteStreamingSqlExecutionTime(MockSpannerServiceImpl.SimulatedExecutionTime.ofStickyException(Status.UNAVAILABLE.asRuntimeException()));
        Timestamp ofTimeSecondsAndNanos = Timestamp.ofTimeSecondsAndNanos(0L, 1000);
        try {
            create.apply(SpannerIO.readChangeStream().withSpannerConfig(getSpannerConfig()).withChangeStreamName(TEST_CHANGE_STREAM).withMetadataDatabase(TEST_DATABASE).withMetadataTable(TEST_TABLE).withInclusiveStartAt(ofTimeSecondsAndNanos).withInclusiveEndAt(Timestamp.ofTimeSecondsAndNanos(ofTimeSecondsAndNanos.getSeconds(), ofTimeSecondsAndNanos.getNanos() + 1)));
            PipelineResult run = create.run();
            while (run.getState() != PipelineResult.State.RUNNING) {
                Thread.sleep(50L);
            }
            Assert.assertNull(run.waitUntilFinish(Duration.millis(500L)));
            this.thrown.expectMessage("UNAVAILABLE - Statement: 'SELECT 'POSTGRESQL' AS DIALECT");
            MatcherAssert.assertThat(Integer.valueOf(this.mockSpannerService.countRequestsOfType(ExecuteSqlRequest.class)), Matchers.equalTo(0));
        } catch (Throwable th) {
            this.thrown.expectMessage("UNAVAILABLE - Statement: 'SELECT 'POSTGRESQL' AS DIALECT");
            MatcherAssert.assertThat(Integer.valueOf(this.mockSpannerService.countRequestsOfType(ExecuteSqlRequest.class)), Matchers.equalTo(0));
            throw th;
        }
    }

    @Test
    @Ignore("BEAM-12164 Reenable this test when databaseClient.getDialect returns the right message.")
    public void testAbortedExceptionNotRetried() {
        this.mockSpannerService.setExecuteStreamingSqlExecutionTime(MockSpannerServiceImpl.SimulatedExecutionTime.ofStickyException(Status.ABORTED.asRuntimeException()));
        Timestamp ofTimeSecondsAndNanos = Timestamp.ofTimeSecondsAndNanos(0L, 1000);
        try {
            this.pipeline.apply(SpannerIO.readChangeStream().withSpannerConfig(getSpannerConfig()).withChangeStreamName(TEST_CHANGE_STREAM).withMetadataDatabase(TEST_DATABASE).withMetadataTable(TEST_TABLE).withInclusiveStartAt(ofTimeSecondsAndNanos).withInclusiveEndAt(Timestamp.ofTimeSecondsAndNanos(ofTimeSecondsAndNanos.getSeconds(), ofTimeSecondsAndNanos.getNanos() + 1)));
            this.pipeline.run().waitUntilFinish();
        } finally {
            this.thrown.expect(SpannerException.class);
            this.thrown.expectMessage("ABORTED - Statement: 'SELECT 'POSTGRESQL' AS DIALECT");
            MatcherAssert.assertThat(Integer.valueOf(this.mockSpannerService.countRequestsOfType(ExecuteSqlRequest.class)), Matchers.equalTo(Integer.valueOf(0)));
        }
    }

    @Test
    public void testAbortedExceptionNotRetriedithDefaultsForStreamSqlRetrySettings() {
        this.mockSpannerService.setExecuteStreamingSqlExecutionTime(MockSpannerServiceImpl.SimulatedExecutionTime.ofStickyException(Status.ABORTED.asRuntimeException()));
        Timestamp ofTimeSecondsAndNanos = Timestamp.ofTimeSecondsAndNanos(0L, 1000);
        try {
            this.pipeline.apply(SpannerIO.readChangeStream().withSpannerConfig(SpannerConfig.create().withEmulatorHost(ValueProvider.StaticValueProvider.of(SPANNER_HOST)).withIsLocalChannelProvider(ValueProvider.StaticValueProvider.of(true)).withCommitRetrySettings((RetrySettings) null).withExecuteStreamingSqlRetrySettings((RetrySettings) null).withProjectId(TEST_PROJECT).withInstanceId(TEST_INSTANCE).withDatabaseId(TEST_DATABASE)).withChangeStreamName(TEST_CHANGE_STREAM).withMetadataDatabase(TEST_DATABASE).withMetadataTable(TEST_TABLE).withInclusiveStartAt(ofTimeSecondsAndNanos).withInclusiveEndAt(Timestamp.ofTimeSecondsAndNanos(ofTimeSecondsAndNanos.getSeconds(), ofTimeSecondsAndNanos.getNanos() + 1)));
            this.pipeline.run().waitUntilFinish();
            this.thrown.expect(SpannerException.class);
            this.thrown.expectMessage("ABORTED - Statement: 'SELECT 'POSTGRESQL' AS DIALECT");
            MatcherAssert.assertThat(Integer.valueOf(this.mockSpannerService.countRequestsOfType(ExecuteSqlRequest.class)), Matchers.equalTo(0));
        } catch (Throwable th) {
            this.thrown.expect(SpannerException.class);
            this.thrown.expectMessage("ABORTED - Statement: 'SELECT 'POSTGRESQL' AS DIALECT");
            MatcherAssert.assertThat(Integer.valueOf(this.mockSpannerService.countRequestsOfType(ExecuteSqlRequest.class)), Matchers.equalTo(0));
            throw th;
        }
    }

    @Test
    public void testUnknownExceptionDoesNotRetry() {
        this.mockSpannerService.setExecuteStreamingSqlExecutionTime(MockSpannerServiceImpl.SimulatedExecutionTime.ofStickyException(Status.UNKNOWN.asRuntimeException()));
        Timestamp ofTimeSecondsAndNanos = Timestamp.ofTimeSecondsAndNanos(0L, 1000);
        try {
            this.pipeline.apply(SpannerIO.readChangeStream().withSpannerConfig(getSpannerConfig()).withChangeStreamName(TEST_CHANGE_STREAM).withMetadataDatabase(TEST_DATABASE).withMetadataTable(TEST_TABLE).withInclusiveStartAt(ofTimeSecondsAndNanos).withInclusiveEndAt(Timestamp.ofTimeSecondsAndNanos(ofTimeSecondsAndNanos.getSeconds(), ofTimeSecondsAndNanos.getNanos() + 1)));
            this.pipeline.run().waitUntilFinish();
        } finally {
            this.thrown.expect(SpannerException.class);
            this.thrown.expectMessage("UNKNOWN - Statement: 'SELECT 'POSTGRESQL' AS DIALECT");
            MatcherAssert.assertThat(Integer.valueOf(this.mockSpannerService.countRequestsOfType(ExecuteSqlRequest.class)), Matchers.equalTo(Integer.valueOf(0)));
        }
    }

    @Test
    @Ignore("BEAM-12164 Reenable this test when databaseClient.getDialect works.")
    public void testInvalidRecordReceived() {
        Timestamp ofTimeSecondsAndNanos = Timestamp.ofTimeSecondsAndNanos(0L, 1000);
        Timestamp ofTimeSecondsAndNanos2 = Timestamp.ofTimeSecondsAndNanos(ofTimeSecondsAndNanos.getSeconds(), ofTimeSecondsAndNanos.getNanos() + 1);
        mockGetDialect();
        mockTableExists();
        mockGetWatermark(ofTimeSecondsAndNanos);
        mockGetPartitionsAfter(Timestamp.ofTimeSecondsAndNanos(ofTimeSecondsAndNanos.getSeconds(), ofTimeSecondsAndNanos.getNanos() - 1), mockGetParentPartition(ofTimeSecondsAndNanos, ofTimeSecondsAndNanos2));
        mockGetPartitionsAfter(Timestamp.ofTimeSecondsAndNanos(ofTimeSecondsAndNanos.getSeconds(), ofTimeSecondsAndNanos.getNanos()), ResultSet.newBuilder().setMetadata(PARTITION_METADATA_RESULT_SET_METADATA).build());
        mockGetPartitionsAfter(Timestamp.ofTimeSecondsAndNanos(ofTimeSecondsAndNanos.getSeconds(), ofTimeSecondsAndNanos.getNanos() + 1), ResultSet.newBuilder().setMetadata(PARTITION_METADATA_RESULT_SET_METADATA).build());
        mockInvalidChangeStreamRecordReceived(ofTimeSecondsAndNanos, ofTimeSecondsAndNanos2);
        try {
            this.pipeline.apply(SpannerIO.readChangeStream().withSpannerConfig(getSpannerConfig()).withChangeStreamName(TEST_CHANGE_STREAM).withMetadataDatabase(TEST_DATABASE).withMetadataTable(TEST_TABLE).withInclusiveStartAt(ofTimeSecondsAndNanos).withInclusiveEndAt(ofTimeSecondsAndNanos2));
            this.pipeline.run().waitUntilFinish();
            this.thrown.expect(Pipeline.PipelineExecutionException.class);
            this.thrown.expectMessage("Field not found");
            MatcherAssert.assertThat(Integer.valueOf(this.mockSpannerService.countRequestsOfType(ExecuteSqlRequest.class)), Matchers.equalTo(0));
        } catch (Throwable th) {
            this.thrown.expect(Pipeline.PipelineExecutionException.class);
            this.thrown.expectMessage("Field not found");
            MatcherAssert.assertThat(Integer.valueOf(this.mockSpannerService.countRequestsOfType(ExecuteSqlRequest.class)), Matchers.equalTo(0));
            throw th;
        }
    }

    private void mockInvalidChangeStreamRecordReceived(Timestamp timestamp, Timestamp timestamp2) {
        this.mockSpannerService.putStatementResult(MockSpannerServiceImpl.StatementResult.query(((Statement.Builder) ((Statement.Builder) ((Statement.Builder) ((Statement.Builder) Statement.newBuilder("SELECT * FROM READ_my-change-stream(   start_timestamp => @startTimestamp,   end_timestamp => @endTimestamp,   partition_token => @partitionToken,   read_options => null,   heartbeat_milliseconds => @heartbeatMillis)").bind("startTimestamp").to(timestamp)).bind("endTimestamp").to(timestamp2)).bind("partitionToken").to((String) null)).bind("heartbeatMillis").to(500L)).build(), ResultSet.newBuilder().addRows(ListValue.newBuilder().addValues(Value.newBuilder().setListValue(ListValue.newBuilder().addValues(Value.newBuilder().setListValue(ListValue.newBuilder().addValues(Value.newBuilder().setListValue(ListValue.newBuilder().addValues(Value.newBuilder().setStringValue("bad_value"))))))))).setMetadata(ResultSetMetadata.newBuilder().setRowType(StructType.newBuilder().addFields(StructType.Field.newBuilder().setName("COL1").setType(Type.newBuilder().setCode(TypeCode.ARRAY).setArrayElementType(Type.newBuilder().setCode(TypeCode.STRUCT).setStructType(StructType.newBuilder().addFields(StructType.Field.newBuilder().setName("field_name").setType(Type.newBuilder().setCode(TypeCode.STRUCT).setStructType(StructType.newBuilder().addFields(StructType.Field.newBuilder().setType(Type.newBuilder().setCode(TypeCode.STRING))))))))))).build()).build()));
    }

    private void mockGetPartitionsAfter(Timestamp timestamp, ResultSet resultSet) {
        this.mockSpannerService.putStatementResult(MockSpannerServiceImpl.StatementResult.query(((Statement.Builder) Statement.newBuilder("SELECT * FROM my-metadata-table WHERE CreatedAt > @timestamp ORDER BY CreatedAt ASC, StartTimestamp ASC").bind("timestamp").to(Timestamp.ofTimeSecondsAndNanos(timestamp.getSeconds(), timestamp.getNanos()))).build(), resultSet));
    }

    private void mockGetWatermark(Timestamp timestamp) {
        this.mockSpannerService.putStatementResult(MockSpannerServiceImpl.StatementResult.query(((Statement.Builder) Statement.newBuilder("SELECT Watermark FROM my-metadata-table WHERE State != @state ORDER BY Watermark ASC LIMIT 1").bind("state").to(PartitionMetadata.State.FINISHED.name())).build(), ResultSet.newBuilder().addRows(ListValue.newBuilder().addValues(Value.newBuilder().setStringValue(timestamp.toString()).build()).build()).setMetadata(ResultSetMetadata.newBuilder().setRowType(StructType.newBuilder().addFields(StructType.Field.newBuilder().setName("Watermark").setType(Type.newBuilder().setCode(TypeCode.TIMESTAMP).build()).build()).build()).build()).build()));
    }

    private ResultSet mockGetParentPartition(Timestamp timestamp, Timestamp timestamp2) {
        Statement build = ((Statement.Builder) Statement.newBuilder("SELECT * FROM my-metadata-table WHERE PartitionToken = @partition").bind("partition").to("Parent0")).build();
        ResultSet build2 = ResultSet.newBuilder().addRows(ListValue.newBuilder().addValues(Value.newBuilder().setStringValue("Parent0")).addValues(Value.newBuilder().setListValue(ListValue.newBuilder().build())).addValues(Value.newBuilder().setStringValue(timestamp.toString())).addValues(Value.newBuilder().setStringValue(timestamp2.toString())).addValues(Value.newBuilder().setStringValue("500")).addValues(Value.newBuilder().setStringValue(PartitionMetadata.State.CREATED.name())).addValues(Value.newBuilder().setStringValue(timestamp.toString())).addValues(Value.newBuilder().setStringValue(timestamp.toString())).addValues(Value.newBuilder().setNullValue(NullValue.NULL_VALUE).build()).addValues(Value.newBuilder().setNullValue(NullValue.NULL_VALUE).build()).addValues(Value.newBuilder().setNullValue(NullValue.NULL_VALUE).build()).build()).setMetadata(PARTITION_METADATA_RESULT_SET_METADATA).build();
        this.mockSpannerService.putStatementResult(MockSpannerServiceImpl.StatementResult.query(build, build2));
        return build2;
    }

    private void mockTableExists() {
        this.mockSpannerService.putStatementResult(MockSpannerServiceImpl.StatementResult.query(Statement.of("SELECT t.table_name FROM information_schema.tables AS t WHERE t.table_catalog = '' AND t.table_schema = '' AND t.table_name = 'my-metadata-table'"), ResultSet.newBuilder().addRows(ListValue.newBuilder().addValues(Value.newBuilder().setStringValue(TEST_TABLE).build()).build()).setMetadata(ResultSetMetadata.newBuilder().setRowType(StructType.newBuilder().addFields(StructType.Field.newBuilder().setName("table_name").setType(Type.newBuilder().setCode(TypeCode.STRING).build()).build()).build()).build()).build()));
    }

    private void mockGetDialect() {
        this.mockSpannerService.putStatementResult(MockSpannerServiceImpl.StatementResult.query(Statement.newBuilder("SELECT 'POSTGRESQL' AS DIALECT\nFROM INFORMATION_SCHEMA.SCHEMATA\nWHERE SCHEMA_NAME='information_schema'\nUNION ALL\nSELECT 'GOOGLE_STANDARD_SQL' AS DIALECT\nFROM INFORMATION_SCHEMA.SCHEMATA\nWHERE SCHEMA_NAME='INFORMATION_SCHEMA' AND CATALOG_NAME=''").build(), ResultSet.newBuilder().addRows(ListValue.newBuilder().addValues(Value.newBuilder().setStringValue("GOOGLE_STANDARD_SQL").build()).build()).setMetadata(ResultSetMetadata.newBuilder().setRowType(StructType.newBuilder().addFields(StructType.Field.newBuilder().setName("dialect").setType(Type.newBuilder().setCode(TypeCode.STRING).build()).build()).build()).build()).build()));
    }

    private SpannerConfig getSpannerConfig() {
        RetrySettings build = RetrySettings.newBuilder().setInitialRetryDelay(org.threeten.bp.Duration.ofMillis(250L)).setMaxRetryDelay(org.threeten.bp.Duration.ofSeconds(1L)).setRetryDelayMultiplier(5.0d).setTotalTimeout(org.threeten.bp.Duration.ofSeconds(1L)).build();
        return SpannerConfig.create().withEmulatorHost(ValueProvider.StaticValueProvider.of(SPANNER_HOST)).withIsLocalChannelProvider(ValueProvider.StaticValueProvider.of(true)).withCommitRetrySettings(build).withExecuteStreamingSqlRetrySettings(build).withProjectId(TEST_PROJECT).withInstanceId(TEST_INSTANCE).withDatabaseId(TEST_DATABASE);
    }
}
