package org.apache.beam.sdk.io.aws2.kinesis;

import java.util.List;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisIO;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.assertj.core.api.Assertions;
import org.joda.time.Instant;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import software.amazon.awssdk.services.cloudwatch.CloudWatchClient;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
import software.amazon.kinesis.common.InitialPositionInStream;

@RunWith(MockitoJUnitRunner.class)
/* loaded from: input_file:org/apache/beam/sdk/io/aws2/kinesis/ShardReadersPoolExtendedTest.class */
public class ShardReadersPoolExtendedTest {
    private static final String STREAM = "stream-0";
    private static final String SHARD_0 = "0";
    private static final int GET_RECORDS_LIMIT = 100;

    @Mock
    private KinesisClient kinesis;

    @Mock
    private CloudWatchClient cloudWatch;
    private ShardReadersPool shardReadersPool;
    private SimplifiedKinesisClient simplifiedKinesisClient;

    @Before
    public void setUp() {
        this.simplifiedKinesisClient = new SimplifiedKinesisClient(() -> {
            return this.kinesis;
        }, () -> {
            return this.cloudWatch;
        }, Integer.valueOf(GET_RECORDS_LIMIT));
    }

    @Test
    public void testNextRecordReturnsRecords() throws TransientKinesisException {
        this.shardReadersPool = initPool(initCheckpoint(ShardIteratorType.AFTER_SEQUENCE_NUMBER, SHARD_0, 0L));
        List<List<Record>> createRecords = TestHelpers.createRecords(1, 3);
        TestHelpers.mockShardIterators(this.kinesis, createRecords);
        TestHelpers.mockRecords(this.kinesis, createRecords, 3);
        this.shardReadersPool.start();
        Assertions.assertThat(this.shardReadersPool.getCheckpointMark()).containsExactlyInAnyOrder(new ShardCheckpoint[]{new ShardCheckpoint(STREAM, SHARD_0, ShardIteratorType.AFTER_SEQUENCE_NUMBER, SHARD_0, 0L)});
        consumeAndCheckNonAggregatedRecords(1, 3);
        Assertions.assertThat(this.shardReadersPool.nextRecord().isPresent()).isFalse();
    }

    @Test
    public void testNextRecordReturnsNonAggregatedRecordsIfSubSeqNumIsPositive() throws TransientKinesisException {
        this.shardReadersPool = initPool(initCheckpoint(ShardIteratorType.AFTER_SEQUENCE_NUMBER, SHARD_0, 125L));
        List<List<Record>> createRecords = TestHelpers.createRecords(1, 3);
        TestHelpers.mockShardIterators(this.kinesis, createRecords);
        TestHelpers.mockRecords(this.kinesis, createRecords, 3);
        this.shardReadersPool.start();
        consumeAndCheckNonAggregatedRecords(1, 3);
        Assertions.assertThat(this.shardReadersPool.nextRecord().isPresent()).isFalse();
    }

    @Test
    public void testNextRecordReturnsRecordsWhenStartedAtTrimHorizon() throws TransientKinesisException {
        this.shardReadersPool = initPool(initCheckpoint(ShardIteratorType.TRIM_HORIZON, null, null));
        List<List<Record>> createRecords = TestHelpers.createRecords(1, 3);
        TestHelpers.mockShardIterators(this.kinesis, createRecords);
        TestHelpers.mockRecords(this.kinesis, createRecords, 3);
        this.shardReadersPool.start();
        Assertions.assertThat(this.shardReadersPool.getCheckpointMark()).containsExactlyInAnyOrder(new ShardCheckpoint[]{new ShardCheckpoint(STREAM, SHARD_0, new StartingPoint(InitialPositionInStream.TRIM_HORIZON))});
        consumeAndCheckNonAggregatedRecords(0, 3);
        Assertions.assertThat(this.shardReadersPool.nextRecord().isPresent()).isFalse();
    }

    @Test
    public void testNextRecordReturnsRecordsWhenStartedAtLatest() throws TransientKinesisException {
        this.shardReadersPool = initPool(initCheckpoint(ShardIteratorType.LATEST, null, null));
        List<List<Record>> createRecords = TestHelpers.createRecords(1, 3);
        TestHelpers.mockShardIterators(this.kinesis, createRecords);
        TestHelpers.mockRecords(this.kinesis, createRecords, 3);
        this.shardReadersPool.start();
        Assertions.assertThat(this.shardReadersPool.getCheckpointMark()).containsExactlyInAnyOrder(new ShardCheckpoint[]{new ShardCheckpoint(STREAM, SHARD_0, ShardIteratorType.LATEST, (String) null, (Long) null)});
        consumeAndCheckNonAggregatedRecords(0, 3);
        Assertions.assertThat(this.shardReadersPool.nextRecord().isPresent()).isFalse();
    }

    @Test
    public void testNextRecordReturnsDeAggregatedRecords() throws TransientKinesisException {
        this.shardReadersPool = initPool(new KinesisReaderCheckpoint(ImmutableList.of(new ShardCheckpoint(STREAM, SHARD_0, new StartingPoint(InitialPositionInStream.LATEST)))));
        List<List<Record>> createAggregatedRecords = TestHelpers.createAggregatedRecords(1, 6);
        TestHelpers.mockShardIterators(this.kinesis, createAggregatedRecords);
        TestHelpers.mockRecords(this.kinesis, createAggregatedRecords, 1);
        this.shardReadersPool.start();
        Assertions.assertThat(this.shardReadersPool.getCheckpointMark()).containsExactlyInAnyOrder(new ShardCheckpoint[]{new ShardCheckpoint(STREAM, SHARD_0, new StartingPoint(InitialPositionInStream.LATEST))});
        consumerAndCheckAggregatedRecords(0L, 3L);
        KinesisReaderCheckpoint checkpointMark = this.shardReadersPool.getCheckpointMark();
        this.shardReadersPool.stop();
        this.shardReadersPool = initPool(checkpointMark);
        this.shardReadersPool.start();
        consumerAndCheckAggregatedRecords(3L, 6L);
        Assertions.assertThat(this.shardReadersPool.nextRecord().isPresent()).isFalse();
    }

    @Test
    public void testNextRecordReturnsDeAggregatedRecordsWhenStartedAfterSeqNum() throws TransientKinesisException {
        KinesisReaderCheckpoint initCheckpoint = initCheckpoint(ShardIteratorType.AFTER_SEQUENCE_NUMBER, SHARD_0, 2L);
        List<List<Record>> createAggregatedRecords = TestHelpers.createAggregatedRecords(1, 6);
        TestHelpers.mockShardIterators(this.kinesis, createAggregatedRecords);
        TestHelpers.mockRecords(this.kinesis, createAggregatedRecords, 1);
        this.shardReadersPool = initPool(initCheckpoint);
        this.shardReadersPool.start();
        Assertions.assertThat(this.shardReadersPool.getCheckpointMark()).containsExactlyInAnyOrder(new ShardCheckpoint[]{new ShardCheckpoint(STREAM, SHARD_0, ShardIteratorType.AFTER_SEQUENCE_NUMBER, SHARD_0, 2L)});
        consumerAndCheckAggregatedRecords(3L, 6L);
        Assertions.assertThat(this.shardReadersPool.nextRecord().isPresent()).isFalse();
    }

    @Test
    public void poolWatermarkReturnsTsOfOldestAcknowledgedRecord() throws TransientKinesisException {
        this.shardReadersPool = initPool(new KinesisReaderCheckpoint(ImmutableList.of(new ShardCheckpoint(STREAM, SHARD_0, ShardIteratorType.TRIM_HORIZON, (String) null, (Long) null), new ShardCheckpoint(STREAM, "1", ShardIteratorType.TRIM_HORIZON, (String) null, (Long) null))));
        List<Record> recordWithMinutesAgo = TestHelpers.recordWithMinutesAgo(5);
        List<Record> recordWithMinutesAgo2 = TestHelpers.recordWithMinutesAgo(4);
        ImmutableList of = ImmutableList.of(ImmutableList.builder().addAll(recordWithMinutesAgo).addAll(TestHelpers.recordWithMinutesAgo(3)).build(), recordWithMinutesAgo2);
        Instant joda = TimeUtil.toJoda(recordWithMinutesAgo2.get(0).approximateArrivalTimestamp());
        TestHelpers.mockShardIterators(this.kinesis, of);
        TestHelpers.mockRecords(this.kinesis, of, 1);
        this.shardReadersPool.start();
        Assertions.assertThat(this.shardReadersPool.getWatermark()).isEqualTo(BoundedWindow.TIMESTAMP_MIN_VALUE);
        Assertions.assertThat(this.shardReadersPool.nextRecord().isPresent()).isTrue();
        Assertions.assertThat(this.shardReadersPool.getWatermark()).isEqualTo(BoundedWindow.TIMESTAMP_MIN_VALUE);
        Assertions.assertThat(this.shardReadersPool.nextRecord().isPresent()).isTrue();
        Assertions.assertThat(this.shardReadersPool.getWatermark()).isGreaterThanOrEqualTo(BoundedWindow.TIMESTAMP_MIN_VALUE);
        Assertions.assertThat(this.shardReadersPool.nextRecord().isPresent()).isTrue();
        Assertions.assertThat(this.shardReadersPool.getWatermark()).isEqualTo(joda);
    }

    @After
    public void clean() throws Exception {
        this.shardReadersPool.stop();
        this.simplifiedKinesisClient.close();
        ((KinesisClient) Mockito.verify(this.kinesis)).close();
        Mockito.verifyNoInteractions(new Object[]{this.cloudWatch});
    }

    private static KinesisIO.Read spec() {
        return KinesisIO.read().withStreamName(STREAM);
    }

    private ShardReadersPool initPool(KinesisReaderCheckpoint kinesisReaderCheckpoint) {
        return new ShardReadersPool(spec(), this.simplifiedKinesisClient, kinesisReaderCheckpoint);
    }

    private KinesisReaderCheckpoint initCheckpoint(ShardIteratorType shardIteratorType, String str, Long l) {
        return new KinesisReaderCheckpoint(ImmutableList.of(new ShardCheckpoint(STREAM, SHARD_0, shardIteratorType, str, l)));
    }

    private void consumeAndCheckNonAggregatedRecords(int i, int i2) {
        for (int i3 = i; i3 < i2; i3++) {
            KinesisRecord kinesisRecord = (KinesisRecord) this.shardReadersPool.nextRecord().get();
            Assertions.assertThat(kinesisRecord.getSequenceNumber()).isEqualTo(String.valueOf(i3));
            Assertions.assertThat(kinesisRecord.getSubSequenceNumber()).isEqualTo(0L);
            Assertions.assertThat(this.shardReadersPool.getCheckpointMark()).containsExactlyInAnyOrder(new ShardCheckpoint[]{new ShardCheckpoint(STREAM, SHARD_0, ShardIteratorType.AFTER_SEQUENCE_NUMBER, String.valueOf(i3), 0L)});
        }
    }

    private void consumerAndCheckAggregatedRecords(long j, long j2) {
        long j3 = j;
        while (true) {
            long j4 = j3;
            if (j4 >= j2) {
                return;
            }
            KinesisRecord kinesisRecord = (KinesisRecord) this.shardReadersPool.nextRecord().get();
            Assertions.assertThat(kinesisRecord.getSequenceNumber()).isEqualTo(SHARD_0);
            Assertions.assertThat(kinesisRecord.getSubSequenceNumber()).isEqualTo(j4);
            Assertions.assertThat(this.shardReadersPool.getCheckpointMark()).containsExactlyInAnyOrder(new ShardCheckpoint[]{new ShardCheckpoint(STREAM, SHARD_0, ShardIteratorType.AFTER_SEQUENCE_NUMBER, SHARD_0, Long.valueOf(j4))});
            j3 = j4 + 1;
        }
    }
}
