/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.aws2.kinesis;

import java.time.Instant;
import java.util.List;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisIO;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisReaderCheckpoint;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisRecord;
import org.apache.beam.sdk.io.aws2.kinesis.ShardCheckpoint;
import org.apache.beam.sdk.io.aws2.kinesis.ShardReadersPool;
import org.apache.beam.sdk.io.aws2.kinesis.SimplifiedKinesisClient;
import org.apache.beam.sdk.io.aws2.kinesis.StartingPoint;
import org.apache.beam.sdk.io.aws2.kinesis.TestHelpers;
import org.apache.beam.sdk.io.aws2.kinesis.TimeUtil;
import org.apache.beam.sdk.io.aws2.kinesis.TransientKinesisException;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import software.amazon.awssdk.services.cloudwatch.CloudWatchClient;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
import software.amazon.kinesis.common.InitialPositionInStream;

@RunWith(value=MockitoJUnitRunner.class)
public class ShardReadersPoolExtendedTest {
    private static final String STREAM = "stream-0";
    private static final String SHARD_0 = "0";
    private static final int GET_RECORDS_LIMIT = 100;
    @Mock
    private KinesisClient kinesis;
    @Mock
    private CloudWatchClient cloudWatch;
    private ShardReadersPool shardReadersPool;
    private SimplifiedKinesisClient simplifiedKinesisClient;

    @Before
    public void setUp() {
        this.simplifiedKinesisClient = new SimplifiedKinesisClient(() -> this.kinesis, () -> this.cloudWatch, Integer.valueOf(100));
    }

    @Test
    public void testNextRecordReturnsRecords() throws TransientKinesisException {
        KinesisReaderCheckpoint initialCheckpoint = this.initCheckpoint(ShardIteratorType.AFTER_SEQUENCE_NUMBER, SHARD_0, 0L);
        this.shardReadersPool = this.initPool(initialCheckpoint);
        List<List<Record>> records = TestHelpers.createRecords(1, 3);
        TestHelpers.mockShardIterators(this.kinesis, records);
        TestHelpers.mockRecords(this.kinesis, records, 3);
        this.shardReadersPool.start();
        Assertions.assertThat((Iterable)this.shardReadersPool.getCheckpointMark()).containsExactlyInAnyOrder((Object[])new ShardCheckpoint[]{new ShardCheckpoint(STREAM, SHARD_0, ShardIteratorType.AFTER_SEQUENCE_NUMBER, SHARD_0, Long.valueOf(0L))});
        this.consumeAndCheckNonAggregatedRecords(1, 3);
        Assertions.assertThat((boolean)this.shardReadersPool.nextRecord().isPresent()).isFalse();
    }

    @Test
    public void testNextRecordReturnsNonAggregatedRecordsIfSubSeqNumIsPositive() throws TransientKinesisException {
        KinesisReaderCheckpoint initialCheckpoint = this.initCheckpoint(ShardIteratorType.AFTER_SEQUENCE_NUMBER, SHARD_0, 125L);
        this.shardReadersPool = this.initPool(initialCheckpoint);
        List<List<Record>> records = TestHelpers.createRecords(1, 3);
        TestHelpers.mockShardIterators(this.kinesis, records);
        TestHelpers.mockRecords(this.kinesis, records, 3);
        this.shardReadersPool.start();
        this.consumeAndCheckNonAggregatedRecords(1, 3);
        Assertions.assertThat((boolean)this.shardReadersPool.nextRecord().isPresent()).isFalse();
    }

    @Test
    public void testNextRecordReturnsRecordsWhenStartedAtTrimHorizon() throws TransientKinesisException {
        KinesisReaderCheckpoint initialCheckpoint = this.initCheckpoint(ShardIteratorType.TRIM_HORIZON, null, null);
        this.shardReadersPool = this.initPool(initialCheckpoint);
        List<List<Record>> records = TestHelpers.createRecords(1, 3);
        TestHelpers.mockShardIterators(this.kinesis, records);
        TestHelpers.mockRecords(this.kinesis, records, 3);
        this.shardReadersPool.start();
        Assertions.assertThat((Iterable)this.shardReadersPool.getCheckpointMark()).containsExactlyInAnyOrder((Object[])new ShardCheckpoint[]{new ShardCheckpoint(STREAM, SHARD_0, new StartingPoint(InitialPositionInStream.TRIM_HORIZON))});
        this.consumeAndCheckNonAggregatedRecords(0, 3);
        Assertions.assertThat((boolean)this.shardReadersPool.nextRecord().isPresent()).isFalse();
    }

    @Test
    public void testNextRecordReturnsRecordsWhenStartedAtLatest() throws TransientKinesisException {
        KinesisReaderCheckpoint initialCheckpoint = this.initCheckpoint(ShardIteratorType.LATEST, null, null);
        this.shardReadersPool = this.initPool(initialCheckpoint);
        List<List<Record>> records = TestHelpers.createRecords(1, 3);
        TestHelpers.mockShardIterators(this.kinesis, records);
        TestHelpers.mockRecords(this.kinesis, records, 3);
        this.shardReadersPool.start();
        Assertions.assertThat((Iterable)this.shardReadersPool.getCheckpointMark()).containsExactlyInAnyOrder((Object[])new ShardCheckpoint[]{new ShardCheckpoint(STREAM, SHARD_0, ShardIteratorType.LATEST, null, null)});
        this.consumeAndCheckNonAggregatedRecords(0, 3);
        Assertions.assertThat((boolean)this.shardReadersPool.nextRecord().isPresent()).isFalse();
    }

    @Test
    public void testNextRecordReturnsDeAggregatedRecords() throws TransientKinesisException {
        KinesisReaderCheckpoint initialCheckpoint = new KinesisReaderCheckpoint((Iterable)ImmutableList.of((Object)new ShardCheckpoint(STREAM, SHARD_0, new StartingPoint(InitialPositionInStream.LATEST))));
        this.shardReadersPool = this.initPool(initialCheckpoint);
        List<List<Record>> records = TestHelpers.createAggregatedRecords(1, 6);
        TestHelpers.mockShardIterators(this.kinesis, records);
        TestHelpers.mockRecords(this.kinesis, records, 1);
        this.shardReadersPool.start();
        Assertions.assertThat((Iterable)this.shardReadersPool.getCheckpointMark()).containsExactlyInAnyOrder((Object[])new ShardCheckpoint[]{new ShardCheckpoint(STREAM, SHARD_0, new StartingPoint(InitialPositionInStream.LATEST))});
        this.consumerAndCheckAggregatedRecords(0L, 3L);
        KinesisReaderCheckpoint intermediateCheckpoint = this.shardReadersPool.getCheckpointMark();
        this.shardReadersPool.stop();
        this.shardReadersPool = this.initPool(intermediateCheckpoint);
        this.shardReadersPool.start();
        this.consumerAndCheckAggregatedRecords(3L, 6L);
        Assertions.assertThat((boolean)this.shardReadersPool.nextRecord().isPresent()).isFalse();
    }

    @Test
    public void testNextRecordReturnsDeAggregatedRecordsWhenStartedAfterSeqNum() throws TransientKinesisException {
        KinesisReaderCheckpoint initialCheckpoint = this.initCheckpoint(ShardIteratorType.AFTER_SEQUENCE_NUMBER, SHARD_0, 2L);
        List<List<Record>> records = TestHelpers.createAggregatedRecords(1, 6);
        TestHelpers.mockShardIterators(this.kinesis, records);
        TestHelpers.mockRecords(this.kinesis, records, 1);
        this.shardReadersPool = this.initPool(initialCheckpoint);
        this.shardReadersPool.start();
        Assertions.assertThat((Iterable)this.shardReadersPool.getCheckpointMark()).containsExactlyInAnyOrder((Object[])new ShardCheckpoint[]{new ShardCheckpoint(STREAM, SHARD_0, ShardIteratorType.AFTER_SEQUENCE_NUMBER, SHARD_0, Long.valueOf(2L))});
        this.consumerAndCheckAggregatedRecords(3L, 6L);
        Assertions.assertThat((boolean)this.shardReadersPool.nextRecord().isPresent()).isFalse();
    }

    @Test
    public void poolWatermarkReturnsTsOfOldestAcknowledgedRecord() throws TransientKinesisException {
        KinesisReaderCheckpoint initialCheckpoint = new KinesisReaderCheckpoint((Iterable)ImmutableList.of((Object)new ShardCheckpoint(STREAM, SHARD_0, ShardIteratorType.TRIM_HORIZON, null, null), (Object)new ShardCheckpoint(STREAM, "1", ShardIteratorType.TRIM_HORIZON, null, null)));
        this.shardReadersPool = this.initPool(initialCheckpoint);
        List<Record> shard0records0 = TestHelpers.recordWithMinutesAgo(5);
        List<Record> shard1records0 = TestHelpers.recordWithMinutesAgo(4);
        List<Record> shard0records1 = TestHelpers.recordWithMinutesAgo(3);
        ImmutableList records = ImmutableList.of((Object)ImmutableList.builder().addAll(shard0records0).addAll(shard0records1).build(), shard1records0);
        org.joda.time.Instant ts1 = TimeUtil.toJoda((Instant)shard1records0.get(0).approximateArrivalTimestamp());
        TestHelpers.mockShardIterators(this.kinesis, (List<List<Record>>)records);
        TestHelpers.mockRecords(this.kinesis, (List<List<Record>>)records, 1);
        this.shardReadersPool.start();
        Assertions.assertThat((Comparable)this.shardReadersPool.getWatermark()).isEqualTo((Object)BoundedWindow.TIMESTAMP_MIN_VALUE);
        Assertions.assertThat((boolean)this.shardReadersPool.nextRecord().isPresent()).isTrue();
        Assertions.assertThat((Comparable)this.shardReadersPool.getWatermark()).isEqualTo((Object)BoundedWindow.TIMESTAMP_MIN_VALUE);
        Assertions.assertThat((boolean)this.shardReadersPool.nextRecord().isPresent()).isTrue();
        Assertions.assertThat((Comparable)this.shardReadersPool.getWatermark()).isGreaterThanOrEqualTo((Comparable)BoundedWindow.TIMESTAMP_MIN_VALUE);
        Assertions.assertThat((boolean)this.shardReadersPool.nextRecord().isPresent()).isTrue();
        Assertions.assertThat((Comparable)this.shardReadersPool.getWatermark()).isEqualTo((Object)ts1);
    }

    @After
    public void clean() throws Exception {
        this.shardReadersPool.stop();
        this.simplifiedKinesisClient.close();
        ((KinesisClient)Mockito.verify((Object)this.kinesis)).close();
        Mockito.verifyNoInteractions((Object[])new Object[]{this.cloudWatch});
    }

    private static KinesisIO.Read spec() {
        return KinesisIO.read().withStreamName(STREAM);
    }

    private ShardReadersPool initPool(KinesisReaderCheckpoint initialCheckpoint) {
        return new ShardReadersPool(ShardReadersPoolExtendedTest.spec(), this.simplifiedKinesisClient, initialCheckpoint);
    }

    private KinesisReaderCheckpoint initCheckpoint(ShardIteratorType type, String seqNum, Long subSeqNum) {
        return new KinesisReaderCheckpoint((Iterable)ImmutableList.of((Object)new ShardCheckpoint(STREAM, SHARD_0, type, seqNum, subSeqNum)));
    }

    private void consumeAndCheckNonAggregatedRecords(int startSeqNum, int endSeqNum) {
        for (int i = startSeqNum; i < endSeqNum; ++i) {
            KinesisRecord kinesisRecord = (KinesisRecord)this.shardReadersPool.nextRecord().get();
            Assertions.assertThat((String)kinesisRecord.getSequenceNumber()).isEqualTo((Object)String.valueOf(i));
            Assertions.assertThat((long)kinesisRecord.getSubSequenceNumber()).isEqualTo(0L);
            Assertions.assertThat((Iterable)this.shardReadersPool.getCheckpointMark()).containsExactlyInAnyOrder((Object[])new ShardCheckpoint[]{new ShardCheckpoint(STREAM, SHARD_0, ShardIteratorType.AFTER_SEQUENCE_NUMBER, String.valueOf(i), Long.valueOf(0L))});
        }
    }

    private void consumerAndCheckAggregatedRecords(long startSubSeqNum, long endSubSeqNum) {
        for (long i = startSubSeqNum; i < endSubSeqNum; ++i) {
            KinesisRecord kinesisRecord = (KinesisRecord)this.shardReadersPool.nextRecord().get();
            Assertions.assertThat((String)kinesisRecord.getSequenceNumber()).isEqualTo((Object)SHARD_0);
            Assertions.assertThat((long)kinesisRecord.getSubSequenceNumber()).isEqualTo(i);
            Assertions.assertThat((Iterable)this.shardReadersPool.getCheckpointMark()).containsExactlyInAnyOrder((Object[])new ShardCheckpoint[]{new ShardCheckpoint(STREAM, SHARD_0, ShardIteratorType.AFTER_SEQUENCE_NUMBER, SHARD_0, Long.valueOf(i))});
        }
    }
}

