/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.spanner.changestreams;

import com.google.api.gax.grpc.testing.MockServiceHelper;
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.MockSpannerServiceImpl;
import com.google.cloud.spanner.Statement;
import com.google.protobuf.ListValue;
import com.google.protobuf.NullValue;
import com.google.protobuf.Value;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.ResultSet;
import com.google.spanner.v1.ResultSetMetadata;
import com.google.spanner.v1.StructType;
import com.google.spanner.v1.Type;
import com.google.spanner.v1.TypeCode;
import io.grpc.Status;
import java.io.Serializable;
import java.util.Collections;
import org.apache.beam.runners.direct.DirectOptions;
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.PTransform;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.threeten.bp.Duration;

@RunWith(value=JUnit4.class)
public class SpannerChangeStreamErrorTest
implements Serializable {
    public static final String SPANNER_HOST = "my-host";
    private static final String TEST_PROJECT = "my-project";
    private static final String TEST_INSTANCE = "my-instance";
    private static final String TEST_DATABASE = "my-database";
    private static final String TEST_TABLE = "my-metadata-table";
    private static final String TEST_CHANGE_STREAM = "my-change-stream";
    @Rule
    public final transient TestPipeline pipeline = TestPipeline.create().enableAbandonedNodeEnforcement(false);
    @Rule
    public final transient ExpectedException thrown = ExpectedException.none();
    private MockSpannerServiceImpl mockSpannerService;
    private MockServiceHelper serviceHelper;
    private static final ResultSetMetadata PARTITION_METADATA_RESULT_SET_METADATA = ResultSetMetadata.newBuilder().setRowType(StructType.newBuilder().addFields(StructType.Field.newBuilder().setName("PartitionToken").setType(Type.newBuilder().setCode(TypeCode.STRING)).build()).addFields(StructType.Field.newBuilder().setName("ParentTokens").setType(Type.newBuilder().setCode(TypeCode.ARRAY).setArrayElementType(Type.newBuilder().setCode(TypeCode.STRING))).build()).addFields(StructType.Field.newBuilder().setName("StartTimestamp").setType(Type.newBuilder().setCode(TypeCode.TIMESTAMP))).addFields(StructType.Field.newBuilder().setName("EndTimestamp").setType(Type.newBuilder().setCode(TypeCode.TIMESTAMP))).addFields(StructType.Field.newBuilder().setName("HeartbeatMillis").setType(Type.newBuilder().setCode(TypeCode.INT64))).addFields(StructType.Field.newBuilder().setName("State").setType(Type.newBuilder().setCode(TypeCode.STRING))).addFields(StructType.Field.newBuilder().setName("Watermark").setType(Type.newBuilder().setCode(TypeCode.TIMESTAMP))).addFields(StructType.Field.newBuilder().setName("CreatedAt").setType(Type.newBuilder().setCode(TypeCode.TIMESTAMP))).addFields(StructType.Field.newBuilder().setName("ScheduledAt").setType(Type.newBuilder().setCode(TypeCode.TIMESTAMP))).addFields(StructType.Field.newBuilder().setName("RunningAt").setType(Type.newBuilder().setCode(TypeCode.TIMESTAMP))).addFields(StructType.Field.newBuilder().setName("FinishedAt").setType(Type.newBuilder().setCode(TypeCode.TIMESTAMP))).build()).build();

    @Before
    public void setUp() throws Exception {
        this.mockSpannerService = new MockSpannerServiceImpl();
        this.serviceHelper = new MockServiceHelper(SPANNER_HOST, Collections.singletonList(this.mockSpannerService));
        this.serviceHelper.start();
        this.serviceHelper.reset();
    }

    @After
    public void tearDown() throws NoSuchFieldException, IllegalAccessException {
        this.serviceHelper.reset();
        this.serviceHelper.stop();
        this.mockSpannerService.reset();
    }

    @Test
    public void testResourceExhaustedDoesNotRetry() {
        this.mockSpannerService.setExecuteStreamingSqlExecutionTime(MockSpannerServiceImpl.SimulatedExecutionTime.ofStickyException((Exception)Status.RESOURCE_EXHAUSTED.asRuntimeException()));
        Timestamp startTimestamp = Timestamp.ofTimeSecondsAndNanos((long)0L, (int)1000);
        Timestamp endTimestamp = Timestamp.ofTimeSecondsAndNanos((long)startTimestamp.getSeconds(), (int)(startTimestamp.getNanos() + 1));
        try {
            this.pipeline.apply((PTransform)SpannerIO.readChangeStream().withSpannerConfig(this.getSpannerConfig()).withChangeStreamName(TEST_CHANGE_STREAM).withMetadataDatabase(TEST_DATABASE).withMetadataTable(TEST_TABLE).withInclusiveStartAt(startTimestamp).withInclusiveEndAt(endTimestamp));
            this.pipeline.run().waitUntilFinish();
        }
        finally {
            this.thrown.expect(Pipeline.PipelineExecutionException.class);
            this.thrown.expectMessage(ErrorCode.RESOURCE_EXHAUSTED.name());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Ignore(value="BEAM-14152")
    public void testUnavailableExceptionRetries() throws InterruptedException {
        DirectOptions options = (DirectOptions)PipelineOptionsFactory.as(DirectOptions.class);
        options.setBlockOnRun(false);
        options.setRunner(DirectRunner.class);
        Pipeline nonBlockingPipeline = TestPipeline.create((PipelineOptions)options);
        this.mockSpannerService.setExecuteStreamingSqlExecutionTime(MockSpannerServiceImpl.SimulatedExecutionTime.ofStickyException((Exception)Status.UNAVAILABLE.asRuntimeException()));
        Timestamp startTimestamp = Timestamp.ofTimeSecondsAndNanos((long)0L, (int)1000);
        Timestamp endTimestamp = Timestamp.ofTimeSecondsAndNanos((long)startTimestamp.getSeconds(), (int)(startTimestamp.getNanos() + 1));
        try {
            nonBlockingPipeline.apply((PTransform)SpannerIO.readChangeStream().withSpannerConfig(this.getSpannerConfig()).withChangeStreamName(TEST_CHANGE_STREAM).withMetadataDatabase(TEST_DATABASE).withMetadataTable(TEST_TABLE).withInclusiveStartAt(startTimestamp).withInclusiveEndAt(endTimestamp));
            PipelineResult result = nonBlockingPipeline.run();
            while (result.getState() != PipelineResult.State.RUNNING) {
                Thread.sleep(50L);
            }
            Assert.assertNull((Object)result.waitUntilFinish(org.joda.time.Duration.millis((long)5L)));
        }
        finally {
            MatcherAssert.assertThat((Object)this.mockSpannerService.countRequestsOfType(ExecuteSqlRequest.class), (Matcher)Matchers.greaterThan((Comparable)Integer.valueOf(1)));
        }
    }

    @Test
    public void testAbortedExceptionRetries() {
        this.mockSpannerService.setExecuteStreamingSqlExecutionTime(MockSpannerServiceImpl.SimulatedExecutionTime.ofStickyException((Exception)Status.ABORTED.asRuntimeException()));
        Timestamp startTimestamp = Timestamp.ofTimeSecondsAndNanos((long)0L, (int)1000);
        Timestamp endTimestamp = Timestamp.ofTimeSecondsAndNanos((long)startTimestamp.getSeconds(), (int)(startTimestamp.getNanos() + 1));
        try {
            this.pipeline.apply((PTransform)SpannerIO.readChangeStream().withSpannerConfig(this.getSpannerConfig()).withChangeStreamName(TEST_CHANGE_STREAM).withMetadataDatabase(TEST_DATABASE).withMetadataTable(TEST_TABLE).withInclusiveStartAt(startTimestamp).withInclusiveEndAt(endTimestamp));
            this.pipeline.run().waitUntilFinish();
        }
        finally {
            MatcherAssert.assertThat((Object)this.mockSpannerService.countRequestsOfType(ExecuteSqlRequest.class), (Matcher)Matchers.greaterThan((Comparable)Integer.valueOf(1)));
            this.thrown.expect(Pipeline.PipelineExecutionException.class);
            this.thrown.expectMessage(ErrorCode.ABORTED.name());
        }
    }

    @Test
    public void testUnknownExceptionDoesNotRetry() {
        this.mockSpannerService.setExecuteStreamingSqlExecutionTime(MockSpannerServiceImpl.SimulatedExecutionTime.ofStickyException((Exception)Status.UNKNOWN.asRuntimeException()));
        Timestamp startTimestamp = Timestamp.ofTimeSecondsAndNanos((long)0L, (int)1000);
        Timestamp endTimestamp = Timestamp.ofTimeSecondsAndNanos((long)startTimestamp.getSeconds(), (int)(startTimestamp.getNanos() + 1));
        try {
            this.pipeline.apply((PTransform)SpannerIO.readChangeStream().withSpannerConfig(this.getSpannerConfig()).withChangeStreamName(TEST_CHANGE_STREAM).withMetadataDatabase(TEST_DATABASE).withMetadataTable(TEST_TABLE).withInclusiveStartAt(startTimestamp).withInclusiveEndAt(endTimestamp));
            this.pipeline.run().waitUntilFinish();
        }
        finally {
            MatcherAssert.assertThat((Object)this.mockSpannerService.countRequestsOfType(ExecuteSqlRequest.class), (Matcher)Matchers.equalTo((Object)1));
            this.thrown.expect(Pipeline.PipelineExecutionException.class);
            this.thrown.expectMessage(ErrorCode.UNKNOWN.name());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testInvalidRecordReceived() {
        Timestamp startTimestamp = Timestamp.ofTimeSecondsAndNanos((long)0L, (int)1000);
        Timestamp endTimestamp = Timestamp.ofTimeSecondsAndNanos((long)startTimestamp.getSeconds(), (int)(startTimestamp.getNanos() + 1));
        this.mockTableExists();
        this.mockGetWatermark(startTimestamp);
        ResultSet getPartitionResultSet = this.mockGetParentPartition(startTimestamp, endTimestamp);
        this.mockGetPartitionsAfter(Timestamp.ofTimeSecondsAndNanos((long)startTimestamp.getSeconds(), (int)(startTimestamp.getNanos() - 1)), getPartitionResultSet);
        this.mockGetPartitionsAfter(Timestamp.ofTimeSecondsAndNanos((long)startTimestamp.getSeconds(), (int)startTimestamp.getNanos()), ResultSet.newBuilder().setMetadata(PARTITION_METADATA_RESULT_SET_METADATA).build());
        this.mockGetPartitionsAfter(Timestamp.ofTimeSecondsAndNanos((long)startTimestamp.getSeconds(), (int)(startTimestamp.getNanos() + 1)), ResultSet.newBuilder().setMetadata(PARTITION_METADATA_RESULT_SET_METADATA).build());
        this.mockInvalidChangeStreamRecordReceived(startTimestamp, endTimestamp);
        try {
            this.pipeline.apply((PTransform)SpannerIO.readChangeStream().withSpannerConfig(this.getSpannerConfig()).withChangeStreamName(TEST_CHANGE_STREAM).withMetadataDatabase(TEST_DATABASE).withMetadataTable(TEST_TABLE).withInclusiveStartAt(startTimestamp).withInclusiveEndAt(endTimestamp));
            this.pipeline.run().waitUntilFinish();
        }
        finally {
            this.thrown.expect(Pipeline.PipelineExecutionException.class);
            this.thrown.expectMessage("Field not found");
        }
    }

    private void mockInvalidChangeStreamRecordReceived(Timestamp now, Timestamp after3Seconds) {
        Statement changeStreamQueryStatement = ((Statement.Builder)((Statement.Builder)((Statement.Builder)((Statement.Builder)Statement.newBuilder((String)"SELECT * FROM READ_my-change-stream(   start_timestamp => @startTimestamp,   end_timestamp => @endTimestamp,   partition_token => @partitionToken,   read_options => null,   heartbeat_milliseconds => @heartbeatMillis)").bind("startTimestamp").to(now)).bind("endTimestamp").to(after3Seconds)).bind("partitionToken").to((String)null)).bind("heartbeatMillis").to(500L)).build();
        ResultSetMetadata readChangeStreamResultSetMetadata = ResultSetMetadata.newBuilder().setRowType(StructType.newBuilder().addFields(StructType.Field.newBuilder().setName("COL1").setType(Type.newBuilder().setCode(TypeCode.ARRAY).setArrayElementType(Type.newBuilder().setCode(TypeCode.STRUCT).setStructType(StructType.newBuilder().addFields(StructType.Field.newBuilder().setName("field_name").setType(Type.newBuilder().setCode(TypeCode.STRUCT).setStructType(StructType.newBuilder().addFields(StructType.Field.newBuilder().setType(Type.newBuilder().setCode(TypeCode.STRING))))))))))).build();
        ResultSet readChangeStreamResultSet = ResultSet.newBuilder().addRows(ListValue.newBuilder().addValues(Value.newBuilder().setListValue(ListValue.newBuilder().addValues(Value.newBuilder().setListValue(ListValue.newBuilder().addValues(Value.newBuilder().setListValue(ListValue.newBuilder().addValues(Value.newBuilder().setStringValue("bad_value"))))))))).setMetadata(readChangeStreamResultSetMetadata).build();
        this.mockSpannerService.putStatementResult(MockSpannerServiceImpl.StatementResult.query((Statement)changeStreamQueryStatement, (ResultSet)readChangeStreamResultSet));
    }

    private void mockGetPartitionsAfter(Timestamp timestamp, ResultSet getPartitionResultSet) {
        Statement getPartitionsAfterStatement = ((Statement.Builder)Statement.newBuilder((String)"SELECT * FROM my-metadata-table WHERE CreatedAt > @timestamp ORDER BY CreatedAt ASC, StartTimestamp ASC").bind("timestamp").to(Timestamp.ofTimeSecondsAndNanos((long)timestamp.getSeconds(), (int)timestamp.getNanos()))).build();
        this.mockSpannerService.putStatementResult(MockSpannerServiceImpl.StatementResult.query((Statement)getPartitionsAfterStatement, (ResultSet)getPartitionResultSet));
    }

    private void mockGetWatermark(Timestamp watermark) {
        Statement watermarkStatement = ((Statement.Builder)Statement.newBuilder((String)"SELECT Watermark FROM my-metadata-table WHERE State != @state ORDER BY Watermark ASC LIMIT 1").bind("state").to(PartitionMetadata.State.FINISHED.name())).build();
        ResultSetMetadata watermarkResultSetMetadata = ResultSetMetadata.newBuilder().setRowType(StructType.newBuilder().addFields(StructType.Field.newBuilder().setName("Watermark").setType(Type.newBuilder().setCode(TypeCode.TIMESTAMP).build()).build()).build()).build();
        ResultSet watermarkResultSet = ResultSet.newBuilder().addRows(ListValue.newBuilder().addValues(Value.newBuilder().setStringValue(watermark.toString()).build()).build()).setMetadata(watermarkResultSetMetadata).build();
        this.mockSpannerService.putStatementResult(MockSpannerServiceImpl.StatementResult.query((Statement)watermarkStatement, (ResultSet)watermarkResultSet));
    }

    private ResultSet mockGetParentPartition(Timestamp startTimestamp, Timestamp after3Seconds) {
        Statement getPartitionStatement = ((Statement.Builder)Statement.newBuilder((String)"SELECT * FROM my-metadata-table WHERE PartitionToken = @partition").bind("partition").to("Parent0")).build();
        ResultSet getPartitionResultSet = ResultSet.newBuilder().addRows(ListValue.newBuilder().addValues(Value.newBuilder().setStringValue("Parent0")).addValues(Value.newBuilder().setListValue(ListValue.newBuilder().build())).addValues(Value.newBuilder().setStringValue(startTimestamp.toString())).addValues(Value.newBuilder().setStringValue(after3Seconds.toString())).addValues(Value.newBuilder().setStringValue("500")).addValues(Value.newBuilder().setStringValue(PartitionMetadata.State.CREATED.name())).addValues(Value.newBuilder().setStringValue(startTimestamp.toString())).addValues(Value.newBuilder().setStringValue(startTimestamp.toString())).addValues(Value.newBuilder().setNullValue(NullValue.NULL_VALUE).build()).addValues(Value.newBuilder().setNullValue(NullValue.NULL_VALUE).build()).addValues(Value.newBuilder().setNullValue(NullValue.NULL_VALUE).build()).build()).setMetadata(PARTITION_METADATA_RESULT_SET_METADATA).build();
        this.mockSpannerService.putStatementResult(MockSpannerServiceImpl.StatementResult.query((Statement)getPartitionStatement, (ResultSet)getPartitionResultSet));
        return getPartitionResultSet;
    }

    private void mockTableExists() {
        Statement tableExistsStatement = Statement.of((String)"SELECT t.table_name FROM information_schema.tables AS t WHERE t.table_catalog = '' AND t.table_schema = '' AND t.table_name = 'my-metadata-table'");
        ResultSetMetadata tableExistsResultSetMetadata = ResultSetMetadata.newBuilder().setRowType(StructType.newBuilder().addFields(StructType.Field.newBuilder().setName("table_name").setType(Type.newBuilder().setCode(TypeCode.STRING).build()).build()).build()).build();
        ResultSet tableExistsResultSet = ResultSet.newBuilder().addRows(ListValue.newBuilder().addValues(Value.newBuilder().setStringValue(TEST_TABLE).build()).build()).setMetadata(tableExistsResultSetMetadata).build();
        this.mockSpannerService.putStatementResult(MockSpannerServiceImpl.StatementResult.query((Statement)tableExistsStatement, (ResultSet)tableExistsResultSet));
    }

    private SpannerConfig getSpannerConfig() {
        RetrySettings quickRetrySettings = RetrySettings.newBuilder().setInitialRetryDelay(Duration.ofMillis((long)250L)).setMaxRetryDelay(Duration.ofSeconds((long)1L)).setRetryDelayMultiplier(5.0).setTotalTimeout(Duration.ofSeconds((long)1L)).build();
        return SpannerConfig.create().withEmulatorHost((ValueProvider)ValueProvider.StaticValueProvider.of((Object)SPANNER_HOST)).withIsLocalChannelProvider((ValueProvider)ValueProvider.StaticValueProvider.of((Object)true)).withCommitRetrySettings(quickRetrySettings).withExecuteStreamingSqlRetrySettings(quickRetrySettings).withProjectId(TEST_PROJECT).withInstanceId(TEST_INSTANCE).withDatabaseId(TEST_DATABASE);
    }
}

