package com.google.cloud.spanner.watcher;

import com.google.api.core.ApiService;
import com.google.api.core.SettableApiFuture;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.DatabaseId;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.MockSpannerServiceImpl;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.watcher.SpannerTableChangeWatcher;
import com.google.cloud.spanner.watcher.TimebasedShardProvider;
import com.google.common.truth.Truth;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.spanner.v1.ResultSetMetadata;
import com.google.spanner.v1.StructType;
import com.google.spanner.v1.Type;
import com.google.spanner.v1.TypeCode;
import io.grpc.Status;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.threeten.bp.Duration;

@RunWith(JUnit4.class)
/* loaded from: input_file:com/google/cloud/spanner/watcher/SpannerTableTailerTest.class */
public class SpannerTableTailerTest extends AbstractMockServerTest {
    private static final int STRESS_TEST_RUNS = 1;

    /* loaded from: input_file:com/google/cloud/spanner/watcher/SpannerTableTailerTest$TestChangeCallback.class */
    private static final class TestChangeCallback implements SpannerTableChangeWatcher.RowChangeCallback {
        private final AtomicInteger receivedRows;
        private CountDownLatch latch;
        private Timestamp lastSeenCommitTimestamp;

        private TestChangeCallback(int i) {
            this.receivedRows = new AtomicInteger();
            this.lastSeenCommitTimestamp = Timestamp.MIN_VALUE;
            this.latch = new CountDownLatch(i);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void setCountDown(int i) {
            this.latch = new CountDownLatch(i);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public CountDownLatch getLatch() {
            return this.latch;
        }

        public void rowChange(TableId tableId, SpannerTableChangeWatcher.Row row, Timestamp timestamp) {
            if (timestamp.compareTo(this.lastSeenCommitTimestamp) > 0) {
                this.lastSeenCommitTimestamp = timestamp;
            }
            this.receivedRows.incrementAndGet();
            this.latch.countDown();
        }
    }

    @Test
    public void testReceiveChanges() throws Exception {
        Spanner spanner = getSpanner();
        DatabaseId of = DatabaseId.of("p", "i", "d");
        final AtomicInteger atomicInteger = new AtomicInteger();
        final CountDownLatch countDownLatch = new CountDownLatch(10);
        SpannerTableTailer build = SpannerTableTailer.newBuilder(spanner, TableId.of(of, "Foo")).setPollInterval(Duration.ofMillis(10L)).setCommitTimestampRepository(SpannerCommitTimestampRepository.newBuilder(spanner, of).setInitialCommitTimestamp(Timestamp.MIN_VALUE).build()).build();
        build.addCallback(new SpannerTableChangeWatcher.RowChangeCallback() { // from class: com.google.cloud.spanner.watcher.SpannerTableTailerTest.1
            public void rowChange(TableId tableId, SpannerTableChangeWatcher.Row row, Timestamp timestamp) {
                atomicInteger.incrementAndGet();
                countDownLatch.countDown();
            }
        });
        build.startAsync().awaitRunning();
        countDownLatch.await(5L, TimeUnit.SECONDS);
        build.stopAsync().awaitTerminated();
        Truth.assertThat(Integer.valueOf(atomicInteger.get())).isEqualTo(10);
    }

    @Test
    public void testTableNotFoundDuringInitialization() throws Exception {
        Spanner spanner = getSpanner();
        DatabaseId of = DatabaseId.of("p", "i", "d");
        SpannerTableTailer build = SpannerTableTailer.newBuilder(spanner, TableId.of(of, "NonExistingTable")).setCommitTimestampRepository(SpannerCommitTimestampRepository.newBuilder(spanner, of).setInitialCommitTimestamp(Timestamp.MIN_VALUE).build()).build();
        final SettableApiFuture create = SettableApiFuture.create();
        build.addListener(new ApiService.Listener() { // from class: com.google.cloud.spanner.watcher.SpannerTableTailerTest.2
            public void failed(ApiService.State state, Throwable th) {
                if (state != ApiService.State.STARTING) {
                    create.setException(new AssertionError("expected from State to be STARTING"));
                }
                create.set(Boolean.TRUE);
            }
        }, MoreExecutors.directExecutor());
        build.startAsync();
        Truth.assertThat((Boolean) create.get(5L, TimeUnit.SECONDS)).isTrue();
    }

    @Test
    public void testTableDeleted() throws Exception {
        Spanner spanner = getSpanner();
        DatabaseId of = DatabaseId.of("p", "i", "d");
        final AtomicInteger atomicInteger = new AtomicInteger();
        final CountDownLatch countDownLatch = new CountDownLatch(10);
        SpannerTableTailer build = SpannerTableTailer.newBuilder(spanner, TableId.of(of, "Foo")).setPollInterval(Duration.ofMillis(10L)).setCommitTimestampRepository(SpannerCommitTimestampRepository.newBuilder(spanner, of).setInitialCommitTimestamp(Timestamp.MIN_VALUE).build()).build();
        build.addCallback(new SpannerTableChangeWatcher.RowChangeCallback() { // from class: com.google.cloud.spanner.watcher.SpannerTableTailerTest.3
            public void rowChange(TableId tableId, SpannerTableChangeWatcher.Row row, Timestamp timestamp) {
                atomicInteger.incrementAndGet();
                countDownLatch.countDown();
            }
        });
        final SettableApiFuture create = SettableApiFuture.create();
        build.addListener(new ApiService.Listener() { // from class: com.google.cloud.spanner.watcher.SpannerTableTailerTest.4
            public void failed(ApiService.State state, Throwable th) {
                SpannerTableTailer.logger.warning(String.format("Database change watcher failed.%n    State before failure: %s%n    Error: %s%n", state, th.getMessage()));
                if (state != ApiService.State.RUNNING) {
                    create.setException(new AssertionError("expected from State to be RUNNING"));
                }
                create.set(Boolean.TRUE);
            }
        }, MoreExecutors.directExecutor());
        build.startAsync().awaitRunning();
        countDownLatch.await(5L, TimeUnit.SECONDS);
        Truth.assertThat(Integer.valueOf(atomicInteger.get())).isEqualTo(10);
        Level level = SpannerTableTailer.logger.getLevel();
        try {
            SpannerTableTailer.logger.setLevel(Level.OFF);
            mockSpanner.putStatementResult(MockSpannerServiceImpl.StatementResult.exception(getCurrentFooPollStatement(), Status.NOT_FOUND.withDescription("Table not found").asRuntimeException()));
            Truth.assertThat((Boolean) create.get(5L, TimeUnit.SECONDS)).isTrue();
            SpannerTableTailer.logger.setLevel(level);
            Truth.assertThat(build.state()).isEqualTo(ApiService.State.FAILED);
        } catch (Throwable th) {
            SpannerTableTailer.logger.setLevel(level);
            throw th;
        }
    }

    @Test
    public void testStressReceiveMultipleChanges() throws Exception {
        Random random = new Random();
        for (int i = 0; i < STRESS_TEST_RUNS; i += STRESS_TEST_RUNS) {
            Spanner spanner = getSpanner();
            DatabaseId of = DatabaseId.of("p", "i", "d");
            TestChangeCallback testChangeCallback = new TestChangeCallback(10);
            SpannerTableTailer build = SpannerTableTailer.newBuilder(spanner, TableId.of(of, "Foo")).setPollInterval(Duration.ofMillis(1L)).setCommitTimestampRepository(SpannerCommitTimestampRepository.newBuilder(spanner, of).setInitialCommitTimestamp(Timestamp.MIN_VALUE).build()).build();
            build.addCallback(testChangeCallback);
            build.startAsync();
            testChangeCallback.getLatch().await(5L, TimeUnit.SECONDS);
            Truth.assertThat(Integer.valueOf(testChangeCallback.receivedRows.get())).isEqualTo(10);
            int i2 = 10;
            for (int i3 = 0; i3 < 50; i3 += STRESS_TEST_RUNS) {
                int nextInt = random.nextInt(10) + STRESS_TEST_RUNS;
                i2 += nextInt;
                testChangeCallback.setCountDown(nextInt);
                Timestamp timestamp = testChangeCallback.lastSeenCommitTimestamp;
                Timestamp ofTimeSecondsAndNanos = Timestamp.ofTimeSecondsAndNanos(timestamp.getSeconds() + 1, timestamp.getNanos());
                mockSpanner.putStatementResults(new MockSpannerServiceImpl.StatementResult[]{MockSpannerServiceImpl.StatementResult.query(((Statement.Builder) SELECT_FOO_STATEMENT.toBuilder().bind("prevCommitTimestamp").to(timestamp)).build(), new RandomResultSetGenerator(nextInt).generateWithFixedCommitTimestamp(ofTimeSecondsAndNanos)), MockSpannerServiceImpl.StatementResult.query(((Statement.Builder) SELECT_FOO_STATEMENT.toBuilder().bind("prevCommitTimestamp").to(ofTimeSecondsAndNanos)).build(), new RandomResultSetGenerator(0).generate())});
                testChangeCallback.getLatch().await(5L, TimeUnit.SECONDS);
                Truth.assertThat(Integer.valueOf(testChangeCallback.receivedRows.get())).isEqualTo(Integer.valueOf(i2));
            }
            build.stopAsync().awaitTerminated();
            if (i < 0) {
                stopServer();
                startStaticServer();
                setupResults();
            }
        }
    }

    @Test
    public void testCustomExecutor() throws Exception {
        Spanner spanner = getSpanner();
        DatabaseId of = DatabaseId.of("p", "i", "d");
        final AtomicInteger atomicInteger = new AtomicInteger();
        final CountDownLatch countDownLatch = new CountDownLatch(10);
        ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(8);
        SpannerTableTailer build = SpannerTableTailer.newBuilder(spanner, TableId.of(of, "Foo")).setExecutor(newScheduledThreadPool).setPollInterval(Duration.ofMillis(10L)).setCommitTimestampRepository(SpannerCommitTimestampRepository.newBuilder(spanner, of).setInitialCommitTimestamp(Timestamp.MIN_VALUE).build()).build();
        build.addCallback(new SpannerTableChangeWatcher.RowChangeCallback() { // from class: com.google.cloud.spanner.watcher.SpannerTableTailerTest.5
            public void rowChange(TableId tableId, SpannerTableChangeWatcher.Row row, Timestamp timestamp) {
                atomicInteger.incrementAndGet();
                countDownLatch.countDown();
            }
        });
        build.startAsync().awaitRunning();
        countDownLatch.await(5L, TimeUnit.SECONDS);
        build.stopAsync().awaitTerminated();
        Truth.assertThat(Integer.valueOf(atomicInteger.get())).isEqualTo(10);
        Truth.assertThat(Boolean.valueOf(newScheduledThreadPool.isShutdown())).isFalse();
        newScheduledThreadPool.shutdown();
    }

    @Test
    public void testAutomaticSharding() throws Exception {
        Spanner spanner = getSpanner();
        DatabaseId of = DatabaseId.of("p", "i", "d");
        final AtomicInteger atomicInteger = new AtomicInteger();
        final CountDownLatch countDownLatch = new CountDownLatch(5);
        SpannerTableTailer build = SpannerTableTailer.newBuilder(spanner, TableId.of(of, "Foo")).setShardProvider(TimebasedShardProvider.create("ShardId", TimebasedShardProvider.Interval.DAY)).setPollInterval(Duration.ofMillis(10L)).setCommitTimestampRepository(SpannerCommitTimestampRepository.newBuilder(spanner, of).setInitialCommitTimestamp(Timestamp.MIN_VALUE).build()).build();
        build.addCallback(new SpannerTableChangeWatcher.RowChangeCallback() { // from class: com.google.cloud.spanner.watcher.SpannerTableTailerTest.6
            public void rowChange(TableId tableId, SpannerTableChangeWatcher.Row row, Timestamp timestamp) {
                atomicInteger.incrementAndGet();
                countDownLatch.countDown();
            }
        });
        build.startAsync().awaitRunning();
        countDownLatch.await(5L, TimeUnit.SECONDS);
        build.stopAsync().awaitTerminated();
        Truth.assertThat(Integer.valueOf(atomicInteger.get())).isEqualTo(5);
    }

    @Test
    public void testFixedCommitTimestampColumn() throws Exception {
        Timestamp now = Timestamp.now();
        ResultSetMetadata build = RandomResultSetGenerator.METADATA.toBuilder().setRowType(RandomResultSetGenerator.METADATA.getRowType().toBuilder().setFields(RandomResultSetGenerator.METADATA.getRowType().getFieldsCount() - STRESS_TEST_RUNS, StructType.Field.newBuilder().setName("AlternativeCommitTS").setType(Type.newBuilder().setCode(TypeCode.TIMESTAMP).build()).build()).build()).build();
        Statement build2 = ((Statement.Builder) Statement.newBuilder("SELECT *\nFROM `Foo`\nWHERE `AlternativeCommitTS`>@prevCommitTimestamp\nORDER BY `AlternativeCommitTS`").bind("prevCommitTimestamp").to(Timestamp.MIN_VALUE)).build();
        mockSpanner.putStatementResult(MockSpannerServiceImpl.StatementResult.query(build2, new RandomResultSetGenerator(STRESS_TEST_RUNS).generateWithFixedCommitTimestamp(now).toBuilder().setMetadata(build).build()));
        mockSpanner.putStatementResult(MockSpannerServiceImpl.StatementResult.query(((Statement.Builder) build2.toBuilder().bind("prevCommitTimestamp").to(now)).build(), new RandomResultSetGenerator(0).generate().toBuilder().setMetadata(build).build()));
        Spanner spanner = getSpanner();
        DatabaseId of = DatabaseId.of("p", "i", "d");
        final AtomicInteger atomicInteger = new AtomicInteger();
        final CountDownLatch countDownLatch = new CountDownLatch(STRESS_TEST_RUNS);
        SpannerTableTailer build3 = SpannerTableTailer.newBuilder(spanner, TableId.of(of, "Foo")).setCommitTimestampRepository(SpannerCommitTimestampRepository.newBuilder(spanner, of).setInitialCommitTimestamp(Timestamp.MIN_VALUE).build()).setCommitTimestampColumn("AlternativeCommitTS").build();
        build3.addCallback(new SpannerTableChangeWatcher.RowChangeCallback() { // from class: com.google.cloud.spanner.watcher.SpannerTableTailerTest.7
            public void rowChange(TableId tableId, SpannerTableChangeWatcher.Row row, Timestamp timestamp) {
                atomicInteger.incrementAndGet();
                countDownLatch.countDown();
            }
        });
        build3.startAsync().awaitRunning();
        countDownLatch.await(5L, TimeUnit.SECONDS);
        build3.stopAsync().awaitTerminated();
        Truth.assertThat(Integer.valueOf(atomicInteger.get())).isEqualTo(Integer.valueOf(STRESS_TEST_RUNS));
    }

    @Test
    public void testInvalidCommitTimestampColumn() throws Exception {
        mockSpanner.putStatementResult(MockSpannerServiceImpl.StatementResult.exception(((Statement.Builder) Statement.newBuilder("SELECT *\nFROM `Foo`\nWHERE `AlternativeCommitTS`>@prevCommitTimestamp\nORDER BY `AlternativeCommitTS`").bind("prevCommitTimestamp").to(Timestamp.MIN_VALUE)).build(), Status.NOT_FOUND.withDescription("Column not found").asRuntimeException()));
        Spanner spanner = getSpanner();
        DatabaseId of = DatabaseId.of("p", "i", "d");
        SpannerTableTailer build = SpannerTableTailer.newBuilder(spanner, TableId.of(of, "Foo")).setCommitTimestampRepository(SpannerCommitTimestampRepository.newBuilder(spanner, of).setInitialCommitTimestamp(Timestamp.MIN_VALUE).build()).setCommitTimestampColumn("AlternativeCommitTS").build();
        final SettableApiFuture create = SettableApiFuture.create();
        build.addListener(new ApiService.Listener() { // from class: com.google.cloud.spanner.watcher.SpannerTableTailerTest.8
            public void failed(ApiService.State state, Throwable th) {
                if (state != ApiService.State.RUNNING) {
                    create.setException(new AssertionError("expected from State to be STARTING"));
                }
                if (!(th instanceof SpannerException)) {
                    create.setException(new AssertionError("expected SpannerException"));
                }
                if (((SpannerException) th).getErrorCode() != ErrorCode.NOT_FOUND) {
                    create.setException(new AssertionError("expected NOT_FOUND"));
                }
                create.set(Boolean.TRUE);
            }
        }, MoreExecutors.directExecutor());
        build.startAsync();
        Truth.assertThat((Boolean) create.get(500L, TimeUnit.SECONDS)).isTrue();
    }
}
