package org.apache.beam.sdk.io.aws2.kinesis;

import io.netty.handler.timeout.ReadTimeoutException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletionException;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisIO;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.assertj.core.api.Assertions;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
import software.amazon.kinesis.common.InitialPositionInStream;

/* loaded from: input_file:org/apache/beam/sdk/io/aws2/kinesis/EFOShardSubscribersPoolTest.class */
public class EFOShardSubscribersPoolTest {
    private static final String STREAM = "stream-01";
    private static final String CONSUMER = "consumer-01";
    private KinesisIO.Read readSpec;
    private String consumerArn;
    private EFOStubbedKinesisAsyncClient kinesis;
    private EFOShardSubscribersPool pool;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/aws2/kinesis/EFOShardSubscribersPoolTest$KinesisRecordView.class */
    public static class KinesisRecordView {
        private final String shardId;
        private final String sequenceNumber;
        private final long subSequenceNumber;

        KinesisRecordView(String str, String str2, long j) {
            this.shardId = str;
            this.sequenceNumber = str2;
            this.subSequenceNumber = j;
        }

        static KinesisRecordView fromKinesisRecord(KinesisRecord kinesisRecord) {
            return new KinesisRecordView(kinesisRecord.getShardId(), kinesisRecord.getSequenceNumber(), kinesisRecord.getSubSequenceNumber());
        }

        static KinesisRecordView[] generate(String str, int i, int i2) {
            ArrayList arrayList = new ArrayList();
            for (int i3 = i; i3 < i + i2; i3++) {
                arrayList.add(new KinesisRecordView(str, String.valueOf(i3), 0L));
            }
            return (KinesisRecordView[]) arrayList.toArray(new KinesisRecordView[0]);
        }

        static KinesisRecordView[] generateAggregated(String str, int i, int i2) {
            ArrayList arrayList = new ArrayList();
            long j = 0;
            while (true) {
                long j2 = j;
                if (j2 >= i2) {
                    return (KinesisRecordView[]) arrayList.toArray(new KinesisRecordView[0]);
                }
                arrayList.add(new KinesisRecordView(str, String.valueOf(i), j2));
                j = j2 + 1;
            }
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            KinesisRecordView kinesisRecordView = (KinesisRecordView) obj;
            return this.subSequenceNumber == kinesisRecordView.subSequenceNumber && this.shardId.equals(kinesisRecordView.shardId) && this.sequenceNumber.equals(kinesisRecordView.sequenceNumber);
        }

        public int hashCode() {
            return Objects.hash(this.shardId, this.sequenceNumber, Long.valueOf(this.subSequenceNumber));
        }

        public String toString() {
            return "KinesisRecordView{shardId='" + this.shardId + "', sequenceNumber='" + this.sequenceNumber + "', subSequenceNumber=" + this.subSequenceNumber + '}';
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/aws2/kinesis/EFOShardSubscribersPoolTest$PoolAssertion.class */
    private static class PoolAssertion {
        private final EFOShardSubscribersPool pool;

        PoolAssertion(EFOShardSubscribersPool eFOShardSubscribersPool) {
            this.pool = eFOShardSubscribersPool;
        }

        static PoolAssertion assertPool(EFOShardSubscribersPool eFOShardSubscribersPool) {
            return new PoolAssertion(eFOShardSubscribersPool);
        }

        void givesCheckPointedRecords(ShardAssertion... shardAssertionArr) throws Exception {
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList();
            for (ShardAssertion shardAssertion : shardAssertionArr) {
                arrayList.addAll(shardAssertion.expectedRecords);
                if (shardAssertion.expectedRecords.size() > 0) {
                    arrayList2.add(new ShardCheckpoint(EFOShardSubscribersPoolTest.STREAM, shardAssertion.shardId, ShardIteratorType.AFTER_SEQUENCE_NUMBER, shardAssertion.expectedLastSequenceNumber, Long.valueOf(shardAssertion.expectedRecords.get(shardAssertion.expectedRecords.size() - 1).subSequenceNumber)));
                } else {
                    arrayList2.add(new ShardCheckpoint(EFOShardSubscribersPoolTest.STREAM, shardAssertion.shardId, ShardIteratorType.AFTER_SEQUENCE_NUMBER, shardAssertion.expectedLastSequenceNumber, 0L));
                }
            }
            ArrayList arrayList3 = new ArrayList();
            for (int i = 0; i < arrayList.size(); i++) {
                List<KinesisRecord> waitForRecords = EFOShardSubscribersPoolTest.waitForRecords(this.pool, 1);
                if (waitForRecords.size() == 0) {
                    throw new RuntimeException(String.format("Unable to fetch %s th record", Integer.valueOf(i)));
                }
                KinesisRecord kinesisRecord = waitForRecords.get(0);
                arrayList3.add(KinesisRecordView.fromKinesisRecord(kinesisRecord));
                Assertions.assertThat(this.pool.getCheckpointMark()).contains(new ShardCheckpoint[]{new ShardCheckpoint(EFOShardSubscribersPoolTest.STREAM, kinesisRecord.getShardId(), ShardIteratorType.AFTER_SEQUENCE_NUMBER, kinesisRecord.getSequenceNumber(), Long.valueOf(kinesisRecord.getSubSequenceNumber()))});
            }
            EFOShardSubscribersPoolTest.nothingElseShouldBeReceived(this.pool);
            Assertions.assertThat(arrayList3).containsExactlyInAnyOrder((KinesisRecordView[]) arrayList.toArray(new KinesisRecordView[0]));
            Assertions.assertThat(this.pool.getCheckpointMark()).containsExactlyInAnyOrder((ShardCheckpoint[]) arrayList2.toArray(new ShardCheckpoint[0]));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/aws2/kinesis/EFOShardSubscribersPoolTest$ShardAssertion.class */
    public static class ShardAssertion {
        final String shardId;
        final List<KinesisRecordView> expectedRecords = new ArrayList();
        String expectedLastSequenceNumber;

        ShardAssertion(String str) {
            this.shardId = str;
        }

        static ShardAssertion shard(String str) {
            return new ShardAssertion(str);
        }

        ShardAssertion gives(KinesisRecordView... kinesisRecordViewArr) {
            this.expectedRecords.addAll(Arrays.asList(kinesisRecordViewArr));
            return this;
        }

        ShardAssertion withLastCheckpointSequenceNumber(int i) {
            this.expectedLastSequenceNumber = String.valueOf(i);
            return this;
        }
    }

    @Before
    public void setUp() {
        this.readSpec = TestHelpers.createReadSpec();
        this.consumerArn = CONSUMER;
    }

    @After
    public void tearDown() {
        this.kinesis.close();
        this.pool.stop();
    }

    @Test
    public void poolReSubscribesAndReadsRecords() throws Exception {
        this.kinesis = new EFOStubbedKinesisAsyncClient(10);
        this.kinesis.stubSubscribeToShard("shard-000", TestHelpers.eventWithRecords(3));
        this.kinesis.stubSubscribeToShard("shard-000", TestHelpers.eventWithRecords(3, 7));
        this.kinesis.stubSubscribeToShard("shard-000", TestHelpers.eventsWithoutRecords(10, 3));
        this.kinesis.stubSubscribeToShard("shard-001", TestHelpers.eventWithRecords(3));
        this.kinesis.stubSubscribeToShard("shard-001", TestHelpers.eventWithRecords(3, 5));
        this.kinesis.stubSubscribeToShard("shard-001", TestHelpers.eventsWithoutRecords(8, 3));
        KinesisReaderCheckpoint initialLatestCheckpoint = initialLatestCheckpoint(ImmutableList.of("shard-000", "shard-001"));
        this.pool = new EFOShardSubscribersPool(this.readSpec, this.consumerArn, this.kinesis);
        this.pool.start(initialLatestCheckpoint);
        PoolAssertion.assertPool(this.pool).givesCheckPointedRecords(ShardAssertion.shard("shard-000").gives(KinesisRecordView.generate("shard-000", 0, 10)).withLastCheckpointSequenceNumber(12), ShardAssertion.shard("shard-001").gives(KinesisRecordView.generate("shard-001", 0, 8)).withLastCheckpointSequenceNumber(10));
        Assertions.assertThat(this.kinesis.subscribeRequestsSeen()).containsExactlyInAnyOrder(new SubscribeToShardRequest[]{TestHelpers.subscribeLatest("shard-000"), TestHelpers.subscribeLatest("shard-001"), TestHelpers.subscribeAfterSeqNumber("shard-000", "2"), TestHelpers.subscribeAfterSeqNumber("shard-001", "2"), TestHelpers.subscribeAfterSeqNumber("shard-000", "9"), TestHelpers.subscribeAfterSeqNumber("shard-001", "7"), TestHelpers.subscribeAfterSeqNumber("shard-000", "12"), TestHelpers.subscribeAfterSeqNumber("shard-001", "10")});
    }

    @Test
    public void poolReSubscribesAndReadsRecordsAfterCheckPoint() throws Exception {
        this.kinesis = new EFOStubbedKinesisAsyncClient(10);
        this.kinesis.stubSubscribeToShard("shard-000", TestHelpers.eventWithRecords(3));
        this.kinesis.stubSubscribeToShard("shard-001", TestHelpers.eventWithRecords(11, 3));
        KinesisReaderCheckpoint kinesisReaderCheckpoint = new KinesisReaderCheckpoint(ImmutableList.of(afterCheckpoint("shard-000", "0"), afterCheckpoint("shard-001", "11")));
        this.pool = new EFOShardSubscribersPool(this.readSpec, this.consumerArn, this.kinesis);
        this.pool.start(kinesisReaderCheckpoint);
        PoolAssertion.assertPool(this.pool).givesCheckPointedRecords(ShardAssertion.shard("shard-000").gives(KinesisRecordView.generate("shard-000", 1, 2)).withLastCheckpointSequenceNumber(2), ShardAssertion.shard("shard-001").gives(KinesisRecordView.generate("shard-001", 12, 2)).withLastCheckpointSequenceNumber(13));
        Assertions.assertThat(this.kinesis.subscribeRequestsSeen()).containsExactlyInAnyOrder(new SubscribeToShardRequest[]{TestHelpers.subscribeAtSeqNumber("shard-000", "0"), TestHelpers.subscribeAtSeqNumber("shard-001", "11"), TestHelpers.subscribeAfterSeqNumber("shard-000", "2"), TestHelpers.subscribeAfterSeqNumber("shard-001", "13")});
    }

    @Test
    public void poolReSubscribesAndReadsRecordsWithTrimHorizon() throws Exception {
        this.kinesis = new EFOStubbedKinesisAsyncClient(10);
        this.kinesis.stubSubscribeToShard("shard-000", TestHelpers.eventWithRecords(3));
        this.kinesis.stubSubscribeToShard("shard-001", TestHelpers.eventWithRecords(11, 3));
        KinesisReaderCheckpoint kinesisReaderCheckpoint = new KinesisReaderCheckpoint(ImmutableList.of(trimHorizonCheckpoint("shard-000"), trimHorizonCheckpoint("shard-001")));
        this.pool = new EFOShardSubscribersPool(this.readSpec, this.consumerArn, this.kinesis);
        this.pool.start(kinesisReaderCheckpoint);
        PoolAssertion.assertPool(this.pool).givesCheckPointedRecords(ShardAssertion.shard("shard-000").gives(KinesisRecordView.generate("shard-000", 0, 3)).withLastCheckpointSequenceNumber(2), ShardAssertion.shard("shard-001").gives(KinesisRecordView.generate("shard-001", 11, 3)).withLastCheckpointSequenceNumber(13));
        Assertions.assertThat(this.kinesis.subscribeRequestsSeen()).containsExactlyInAnyOrder(new SubscribeToShardRequest[]{TestHelpers.subscribeTrimHorizon("shard-000"), TestHelpers.subscribeTrimHorizon("shard-001"), TestHelpers.subscribeAfterSeqNumber("shard-000", "2"), TestHelpers.subscribeAfterSeqNumber("shard-001", "13")});
    }

    @Test
    public void poolReSubscribesAndReadsRecordsWithAtTimestamp() throws Exception {
        this.kinesis = new EFOStubbedKinesisAsyncClient(10);
        this.kinesis.stubSubscribeToShard("shard-000", TestHelpers.eventWithRecords(3));
        this.kinesis.stubSubscribeToShard("shard-001", TestHelpers.eventWithRecords(11, 3));
        Instant minus = Instant.now().minus(Duration.standardHours(1L));
        Instant minus2 = Instant.now().minus(Duration.standardHours(1L));
        KinesisReaderCheckpoint kinesisReaderCheckpoint = new KinesisReaderCheckpoint(ImmutableList.of(tsCheckpoint("shard-000", minus), tsCheckpoint("shard-001", minus2)));
        this.pool = new EFOShardSubscribersPool(this.readSpec, this.consumerArn, this.kinesis);
        this.pool.start(kinesisReaderCheckpoint);
        PoolAssertion.assertPool(this.pool).givesCheckPointedRecords(ShardAssertion.shard("shard-000").gives(KinesisRecordView.generate("shard-000", 0, 3)).withLastCheckpointSequenceNumber(2), ShardAssertion.shard("shard-001").gives(KinesisRecordView.generate("shard-001", 11, 3)).withLastCheckpointSequenceNumber(13));
        Assertions.assertThat(this.kinesis.subscribeRequestsSeen()).containsExactlyInAnyOrder(new SubscribeToShardRequest[]{TestHelpers.subscribeAtTs("shard-000", minus), TestHelpers.subscribeAtTs("shard-001", minus2), TestHelpers.subscribeAfterSeqNumber("shard-000", "2"), TestHelpers.subscribeAfterSeqNumber("shard-001", "13")});
    }

    @Test
    public void poolReSubscribesAndSkipsAllRecordsWithAtTimestampGreaterThanRecords() throws Exception {
        this.kinesis = new EFOStubbedKinesisAsyncClient(10);
        this.kinesis.stubSubscribeToShard("shard-000", TestHelpers.eventWithRecords(3));
        this.kinesis.stubSubscribeToShard("shard-001", TestHelpers.eventWithRecords(11, 3));
        Instant plus = Instant.now().plus(Duration.standardHours(1L));
        Instant plus2 = Instant.now().plus(Duration.standardHours(1L));
        KinesisReaderCheckpoint kinesisReaderCheckpoint = new KinesisReaderCheckpoint(ImmutableList.of(tsCheckpoint("shard-000", plus), tsCheckpoint("shard-001", plus2)));
        this.pool = new EFOShardSubscribersPool(this.readSpec, this.consumerArn, this.kinesis);
        this.pool.start(kinesisReaderCheckpoint);
        PoolAssertion.assertPool(this.pool).givesCheckPointedRecords(ShardAssertion.shard("shard-000").gives(new KinesisRecordView[0]).withLastCheckpointSequenceNumber(2), ShardAssertion.shard("shard-001").gives(new KinesisRecordView[0]).withLastCheckpointSequenceNumber(13));
        Assertions.assertThat(this.kinesis.subscribeRequestsSeen()).containsExactlyInAnyOrder(new SubscribeToShardRequest[]{TestHelpers.subscribeAtTs("shard-000", plus), TestHelpers.subscribeAtTs("shard-001", plus2), TestHelpers.subscribeAfterSeqNumber("shard-000", "2"), TestHelpers.subscribeAfterSeqNumber("shard-001", "13")});
    }

    @Test
    public void poolReSubscribesAndReadsRecordsAfterCheckPointWithPositiveSubSeqNumber() throws Exception {
        this.kinesis = new EFOStubbedKinesisAsyncClient(10);
        this.kinesis.stubSubscribeToShard("shard-000", TestHelpers.eventWithRecords(3));
        this.kinesis.stubSubscribeToShard("shard-001", TestHelpers.eventWithRecords(11, 3));
        KinesisReaderCheckpoint kinesisReaderCheckpoint = new KinesisReaderCheckpoint(ImmutableList.of(afterCheckpoint("shard-000", "0", 1000L), afterCheckpoint("shard-001", "11", Long.MAX_VALUE)));
        this.pool = new EFOShardSubscribersPool(this.readSpec, this.consumerArn, this.kinesis);
        this.pool.start(kinesisReaderCheckpoint);
        PoolAssertion.assertPool(this.pool).givesCheckPointedRecords(ShardAssertion.shard("shard-000").gives(KinesisRecordView.generate("shard-000", 1, 2)).withLastCheckpointSequenceNumber(2), ShardAssertion.shard("shard-001").gives(KinesisRecordView.generate("shard-001", 12, 2)).withLastCheckpointSequenceNumber(13));
        Assertions.assertThat(this.kinesis.subscribeRequestsSeen()).containsExactlyInAnyOrder(new SubscribeToShardRequest[]{TestHelpers.subscribeAtSeqNumber("shard-000", "0"), TestHelpers.subscribeAtSeqNumber("shard-001", "11"), TestHelpers.subscribeAfterSeqNumber("shard-000", "2"), TestHelpers.subscribeAfterSeqNumber("shard-001", "13")});
    }

    @Test
    public void poolReSubscribesAndReadsManyEvents() throws Exception {
        this.kinesis = new EFOStubbedKinesisAsyncClient(1);
        this.kinesis.stubSubscribeToShard("shard-000", TestHelpers.eventsWithRecords(18, 300));
        this.kinesis.stubSubscribeToShard("shard-000", TestHelpers.eventsWithoutRecords(318, 3));
        this.kinesis.stubSubscribeToShard("shard-001", TestHelpers.eventsWithRecords(75, 200));
        this.kinesis.stubSubscribeToShard("shard-001", TestHelpers.eventsWithoutRecords(275, 3));
        KinesisReaderCheckpoint initialLatestCheckpoint = initialLatestCheckpoint(ImmutableList.of("shard-000", "shard-001"));
        this.pool = new EFOShardSubscribersPool(this.readSpec, this.consumerArn, this.kinesis);
        this.pool.start(initialLatestCheckpoint);
        PoolAssertion.assertPool(this.pool).givesCheckPointedRecords(ShardAssertion.shard("shard-000").gives(KinesisRecordView.generate("shard-000", 18, 300)).withLastCheckpointSequenceNumber(320), ShardAssertion.shard("shard-001").gives(KinesisRecordView.generate("shard-001", 75, 200)).withLastCheckpointSequenceNumber(277));
        Assertions.assertThat(this.kinesis.subscribeRequestsSeen()).containsExactlyInAnyOrder(new SubscribeToShardRequest[]{TestHelpers.subscribeLatest("shard-000"), TestHelpers.subscribeLatest("shard-001"), TestHelpers.subscribeAfterSeqNumber("shard-000", "317"), TestHelpers.subscribeAfterSeqNumber("shard-001", "274"), TestHelpers.subscribeAfterSeqNumber("shard-000", "320"), TestHelpers.subscribeAfterSeqNumber("shard-001", "277")});
    }

    @Test
    public void handlesAggregatedRecords() throws Exception {
        this.kinesis = new EFOStubbedKinesisAsyncClient(10);
        this.kinesis.stubSubscribeToShard("shard-000", TestHelpers.eventWithAggRecords(12, 2));
        this.kinesis.stubSubscribeToShard("shard-000", TestHelpers.eventsWithoutRecords(13, 1));
        this.kinesis.stubSubscribeToShard("shard-001", TestHelpers.eventWithAggRecords(55, 3));
        this.kinesis.stubSubscribeToShard("shard-001", TestHelpers.eventsWithoutRecords(56, 1));
        KinesisReaderCheckpoint initialLatestCheckpoint = initialLatestCheckpoint(ImmutableList.of("shard-000", "shard-001"));
        this.pool = new EFOShardSubscribersPool(this.readSpec, this.consumerArn, this.kinesis);
        this.pool.start(initialLatestCheckpoint);
        PoolAssertion.assertPool(this.pool).givesCheckPointedRecords(ShardAssertion.shard("shard-000").gives(KinesisRecordView.generateAggregated("shard-000", 12, 2)).withLastCheckpointSequenceNumber(13), ShardAssertion.shard("shard-001").gives(KinesisRecordView.generateAggregated("shard-001", 55, 3)).withLastCheckpointSequenceNumber(56));
        Assertions.assertThat(this.kinesis.subscribeRequestsSeen()).containsExactlyInAnyOrder(new SubscribeToShardRequest[]{TestHelpers.subscribeLatest("shard-000"), TestHelpers.subscribeLatest("shard-001"), TestHelpers.subscribeAfterSeqNumber("shard-000", "12"), TestHelpers.subscribeAfterSeqNumber("shard-001", "55"), TestHelpers.subscribeAfterSeqNumber("shard-000", "13"), TestHelpers.subscribeAfterSeqNumber("shard-001", "56")});
    }

    @Test
    public void doesNotIntroduceDuplicatesWithAggregatedRecordsCheckpoints() throws Exception {
        this.kinesis = new EFOStubbedKinesisAsyncClient(10);
        SubscribeToShardEventStream eventWithAggRecords = TestHelpers.eventWithAggRecords(12, 6);
        this.kinesis.stubSubscribeToShard("shard-000", eventWithAggRecords);
        KinesisReaderCheckpoint initialLatestCheckpoint = initialLatestCheckpoint(ImmutableList.of("shard-000"));
        this.pool = new EFOShardSubscribersPool(this.readSpec, this.consumerArn, this.kinesis);
        this.pool.start(initialLatestCheckpoint);
        validateAggregatedRecords("12", 0, 3);
        KinesisReaderCheckpoint checkpointMark = this.pool.getCheckpointMark();
        Assertions.assertThat(checkpointMark.iterator()).containsExactlyInAnyOrder(new ShardCheckpoint[]{new ShardCheckpoint(STREAM, "shard-000", ShardIteratorType.AFTER_SEQUENCE_NUMBER, "12", 3L)});
        Assertions.assertThat(this.kinesis.subscribeRequestsSeen()).containsExactlyInAnyOrder(new SubscribeToShardRequest[]{TestHelpers.subscribeLatest("shard-000"), TestHelpers.subscribeAfterSeqNumber("shard-000", "12")});
        this.pool.stop();
        this.kinesis.close();
        this.kinesis = new EFOStubbedKinesisAsyncClient(10);
        this.kinesis.stubSubscribeToShard("shard-000", eventWithAggRecords);
        this.pool = new EFOShardSubscribersPool(this.readSpec, this.consumerArn, this.kinesis);
        this.pool.start(checkpointMark);
        validateAggregatedRecords("12", 4, 5);
        Assertions.assertThat(waitForRecords(this.pool, 3).size()).isEqualTo(0);
        Assertions.assertThat(this.kinesis.subscribeRequestsSeen()).containsExactlyInAnyOrder(new SubscribeToShardRequest[]{TestHelpers.subscribeAtSeqNumber("shard-000", "12"), TestHelpers.subscribeAfterSeqNumber("shard-000", "12")});
    }

    private void validateAggregatedRecords(String str, int i, int i2) throws Exception {
        for (int i3 = i; i3 < i2 + 1; i3++) {
            KinesisRecord kinesisRecord = waitForRecords(this.pool, 1).get(0);
            Assertions.assertThat(kinesisRecord.getSequenceNumber()).isEqualTo(str);
            Assertions.assertThat(kinesisRecord.getSubSequenceNumber()).isEqualTo(i3);
            Assertions.assertThat(this.pool.getCheckpointMark().iterator()).containsExactlyInAnyOrder(new ShardCheckpoint[]{new ShardCheckpoint(STREAM, "shard-000", ShardIteratorType.AFTER_SEQUENCE_NUMBER, str, Long.valueOf(i3))});
        }
    }

    @Test
    public void skipsEntireAggregatedBatch() throws Exception {
        this.kinesis = new EFOStubbedKinesisAsyncClient(10);
        this.kinesis.stubSubscribeToShard("shard-000", TestHelpers.eventWithAggRecords(12, 6));
        KinesisReaderCheckpoint kinesisReaderCheckpoint = new KinesisReaderCheckpoint(ImmutableList.of(new ShardCheckpoint(STREAM, "shard-000", ShardIteratorType.AFTER_SEQUENCE_NUMBER, "12", 5L)));
        this.pool = new EFOShardSubscribersPool(this.readSpec, this.consumerArn, this.kinesis);
        this.pool.start(kinesisReaderCheckpoint);
        Assertions.assertThat(waitForRecords(this.pool, 10).size()).isEqualTo(0);
        Assertions.assertThat(this.kinesis.subscribeRequestsSeen()).containsExactlyInAnyOrder(new SubscribeToShardRequest[]{TestHelpers.subscribeAtSeqNumber("shard-000", "12"), TestHelpers.subscribeAfterSeqNumber("shard-000", "12")});
    }

    @Test
    public void poolReSubscribesWhenNoRecordsCome() throws Exception {
        this.kinesis = new EFOStubbedKinesisAsyncClient(10);
        this.kinesis.stubSubscribeToShard("shard-000", TestHelpers.eventsWithoutRecords(31, 3));
        this.kinesis.stubSubscribeToShard("shard-001", TestHelpers.eventsWithoutRecords(8, 3));
        KinesisReaderCheckpoint initialLatestCheckpoint = initialLatestCheckpoint(ImmutableList.of("shard-000", "shard-001"));
        this.pool = new EFOShardSubscribersPool(this.readSpec, this.consumerArn, this.kinesis);
        this.pool.start(initialLatestCheckpoint);
        PoolAssertion.assertPool(this.pool).givesCheckPointedRecords(ShardAssertion.shard("shard-000").gives(new KinesisRecordView[0]).withLastCheckpointSequenceNumber(33), ShardAssertion.shard("shard-001").gives(new KinesisRecordView[0]).withLastCheckpointSequenceNumber(10));
        Assertions.assertThat(this.kinesis.subscribeRequestsSeen()).containsExactlyInAnyOrder(new SubscribeToShardRequest[]{TestHelpers.subscribeLatest("shard-000"), TestHelpers.subscribeLatest("shard-001"), TestHelpers.subscribeAfterSeqNumber("shard-000", "33"), TestHelpers.subscribeAfterSeqNumber("shard-001", "10")});
    }

    @Test
    public void poolReSubscribesWhenRecoverableErrorOccurs() throws Exception {
        this.kinesis = new EFOStubbedKinesisAsyncClient(10);
        this.kinesis.stubSubscribeToShard("shard-000", TestHelpers.eventWithRecords(3)).failWith(new ReadTimeoutException());
        this.kinesis.stubSubscribeToShard("shard-000", TestHelpers.eventWithRecords(3, 7));
        this.kinesis.stubSubscribeToShard("shard-000", TestHelpers.eventsWithoutRecords(10, 10));
        this.kinesis.stubSubscribeToShard("shard-001", TestHelpers.eventWithRecords(3));
        this.kinesis.stubSubscribeToShard("shard-001", TestHelpers.eventWithRecords(3, 5)).failWith(SdkClientException.create("this is recoverable", new ReadTimeoutException()));
        this.kinesis.stubSubscribeToShard("shard-001", TestHelpers.eventsWithoutRecords(8, 8));
        KinesisReaderCheckpoint initialLatestCheckpoint = initialLatestCheckpoint(ImmutableList.of("shard-000", "shard-001"));
        this.pool = new EFOShardSubscribersPool(this.readSpec, this.consumerArn, this.kinesis, 1);
        this.pool.start(initialLatestCheckpoint);
        PoolAssertion.assertPool(this.pool).givesCheckPointedRecords(ShardAssertion.shard("shard-000").gives(KinesisRecordView.generate("shard-000", 0, 10)).withLastCheckpointSequenceNumber(19), ShardAssertion.shard("shard-001").gives(KinesisRecordView.generate("shard-001", 0, 8)).withLastCheckpointSequenceNumber(15));
        Assertions.assertThat(this.kinesis.subscribeRequestsSeen()).containsExactlyInAnyOrder(new SubscribeToShardRequest[]{TestHelpers.subscribeLatest("shard-000"), TestHelpers.subscribeLatest("shard-001"), TestHelpers.subscribeAfterSeqNumber("shard-000", "2"), TestHelpers.subscribeAfterSeqNumber("shard-001", "2"), TestHelpers.subscribeAfterSeqNumber("shard-000", "9"), TestHelpers.subscribeAfterSeqNumber("shard-001", "7"), TestHelpers.subscribeAfterSeqNumber("shard-000", "19"), TestHelpers.subscribeAfterSeqNumber("shard-001", "15")});
    }

    @Test
    public void poolReSubscribesWhenManyRecoverableErrorsOccur() throws Exception {
        this.kinesis = new EFOStubbedKinesisAsyncClient(1);
        for (int i = 0; i < 250; i++) {
            this.kinesis.stubSubscribeToShard("shard-000", TestHelpers.eventsWithRecords(i, 1)).failWith(new ReadTimeoutException());
        }
        this.kinesis.stubSubscribeToShard("shard-000", TestHelpers.eventsWithoutRecords(250, 3));
        this.kinesis.stubSubscribeToShard("shard-001", TestHelpers.eventsWithRecords(333, 250)).failWith(SdkClientException.create("this is recoverable", new ReadTimeoutException()));
        this.kinesis.stubSubscribeToShard("shard-001", TestHelpers.eventsWithoutRecords(583, 3));
        KinesisReaderCheckpoint initialLatestCheckpoint = initialLatestCheckpoint(ImmutableList.of("shard-000", "shard-001"));
        this.pool = new EFOShardSubscribersPool(this.readSpec, this.consumerArn, this.kinesis, 1);
        this.pool.start(initialLatestCheckpoint);
        PoolAssertion.assertPool(this.pool).givesCheckPointedRecords(ShardAssertion.shard("shard-000").gives(KinesisRecordView.generate("shard-000", 0, 250)).withLastCheckpointSequenceNumber(252), ShardAssertion.shard("shard-001").gives(KinesisRecordView.generate("shard-001", 333, 250)).withLastCheckpointSequenceNumber(585));
        Assertions.assertThat(this.kinesis.subscribeRequestsSeen().size()).isEqualTo(255);
    }

    @Test
    public void poolReSubscribesFromInitialWhenRecoverableErrorOccursImmediately() throws Exception {
        this.kinesis = new EFOStubbedKinesisAsyncClient(10);
        this.kinesis.stubSubscribeToShard("shard-000", new SubscribeToShardEventStream[0]).failWith(new ReadTimeoutException());
        this.kinesis.stubSubscribeToShard("shard-000", TestHelpers.eventWithRecords(550, 3));
        this.kinesis.stubSubscribeToShard("shard-000", TestHelpers.eventsWithoutRecords(553, 1));
        KinesisReaderCheckpoint initialLatestCheckpoint = initialLatestCheckpoint(ImmutableList.of("shard-000"));
        this.pool = new EFOShardSubscribersPool(this.readSpec, this.consumerArn, this.kinesis, 1);
        this.pool.start(initialLatestCheckpoint);
        PoolAssertion.assertPool(this.pool).givesCheckPointedRecords(ShardAssertion.shard("shard-000").gives(KinesisRecordView.generate("shard-000", 550, 3)).withLastCheckpointSequenceNumber(553));
        Assertions.assertThat(this.kinesis.subscribeRequestsSeen()).containsExactlyInAnyOrder(new SubscribeToShardRequest[]{TestHelpers.subscribeLatest("shard-000"), TestHelpers.subscribeLatest("shard-000"), TestHelpers.subscribeAfterSeqNumber("shard-000", "552"), TestHelpers.subscribeAfterSeqNumber("shard-000", "553")});
    }

    @Test
    public void poolFailsWhenNonRecoverableErrorOccurs() throws Exception {
        this.kinesis = new EFOStubbedKinesisAsyncClient(10);
        this.kinesis.stubSubscribeToShard("shard-000", TestHelpers.eventWithRecords(7));
        this.kinesis.stubSubscribeToShard("shard-000", TestHelpers.eventWithRecords(7, 3)).failWith(new RuntimeException("Oh..."));
        this.kinesis.stubSubscribeToShard("shard-000", TestHelpers.eventsWithoutRecords(3, 10));
        this.kinesis.stubSubscribeToShard("shard-001", TestHelpers.eventWithRecords(3));
        this.kinesis.stubSubscribeToShard("shard-001", TestHelpers.eventsWithoutRecords(3, 8));
        KinesisReaderCheckpoint initialLatestCheckpoint = initialLatestCheckpoint(ImmutableList.of("shard-000", "shard-001"));
        this.pool = new EFOShardSubscribersPool(this.readSpec, this.consumerArn, this.kinesis);
        this.pool.start(initialLatestCheckpoint);
        Throwable assertThrows = Assert.assertThrows(IOException.class, () -> {
            waitForRecords(this.pool, 20);
        });
        Assertions.assertThat(assertThrows.getMessage()).isEqualTo("java.lang.RuntimeException: Oh...");
        Assertions.assertThat(assertThrows.getCause()).isInstanceOf(RuntimeException.class);
        Assertions.assertThat(this.pool.getCheckpointMark()).contains(new ShardCheckpoint[]{new ShardCheckpoint(STREAM, "shard-000", ShardIteratorType.AFTER_SEQUENCE_NUMBER, "9", 0L)}).doesNotContain(new ShardCheckpoint[]{new ShardCheckpoint(STREAM, "shard-001", new StartingPoint(InitialPositionInStream.LATEST))});
    }

    @Test
    public void poolFailsWhenConsumerDoesNotExist() throws Exception {
        this.kinesis = new EFOStubbedKinesisAsyncClient(10);
        this.kinesis.stubSubscribeToShard("shard-000", TestHelpers.eventWithRecords(3)).failWith(new CompletionException("Err ...", (Throwable) ResourceNotFoundException.builder().cause((Throwable) null).awsErrorDetails(AwsErrorDetails.builder().serviceName("Kinesis").errorCode("ResourceNotFoundException").errorMessage("Consumer consumer-01 not found.").build()).build()));
        this.kinesis.stubSubscribeToShard("shard-000", TestHelpers.eventWithRecords(3, 7));
        this.kinesis.stubSubscribeToShard("shard-000", TestHelpers.eventsWithoutRecords(3, 10));
        this.kinesis.stubSubscribeToShard("shard-001", TestHelpers.eventWithRecords(3));
        this.kinesis.stubSubscribeToShard("shard-001", TestHelpers.eventsWithoutRecords(3, 8));
        KinesisReaderCheckpoint initialLatestCheckpoint = initialLatestCheckpoint(ImmutableList.of("shard-000", "shard-001"));
        this.pool = new EFOShardSubscribersPool(this.readSpec, this.consumerArn, this.kinesis);
        this.pool.start(initialLatestCheckpoint);
        Throwable assertThrows = Assert.assertThrows(IOException.class, () -> {
            waitForRecords(this.pool, 10);
        });
        Assertions.assertThat(assertThrows.getMessage()).isEqualTo("java.util.concurrent.CompletionException: Err ...");
        Throwable cause = assertThrows.getCause().getCause();
        Assertions.assertThat(cause).isInstanceOf(ResourceNotFoundException.class);
        Assertions.assertThat(cause.getMessage()).isEqualTo("Consumer consumer-01 not found. (Service: Kinesis, Status Code: 0, Request ID: null)");
    }

    @Test
    public void poolHandlesShardUp() throws Exception {
        this.kinesis = new EFOStubbedKinesisAsyncClient(10);
        this.kinesis.stubSubscribeToShard("shard-000", TestHelpers.eventWithRecords(3));
        this.kinesis.stubSubscribeToShard("shard-000", TestHelpers.eventWithRecords(3, 7));
        this.kinesis.stubSubscribeToShard("shard-000", TestHelpers.eventsWithoutRecords(1, 10));
        this.kinesis.stubSubscribeToShard("shard-000", TestHelpers.reShardEvent(ImmutableList.of("shard-000"), ImmutableList.of("shard-002", "shard-003")));
        this.kinesis.stubSubscribeToShard("shard-002", TestHelpers.eventWithRecords(5));
        this.kinesis.stubSubscribeToShard("shard-002", TestHelpers.eventsWithoutRecords(3, 5));
        this.kinesis.stubSubscribeToShard("shard-003", TestHelpers.eventWithRecords(6));
        this.kinesis.stubSubscribeToShard("shard-003", TestHelpers.eventsWithoutRecords(3, 6));
        this.kinesis.stubSubscribeToShard("shard-001", TestHelpers.eventWithRecords(3));
        this.kinesis.stubSubscribeToShard("shard-001", TestHelpers.eventWithRecords(3, 5));
        this.kinesis.stubSubscribeToShard("shard-001", TestHelpers.eventsWithoutRecords(1, 8));
        this.kinesis.stubSubscribeToShard("shard-001", TestHelpers.reShardEvent(ImmutableList.of("shard-001"), ImmutableList.of("shard-004")));
        this.kinesis.stubSubscribeToShard("shard-004", TestHelpers.eventWithRecords(5));
        this.kinesis.stubSubscribeToShard("shard-004", TestHelpers.eventsWithoutRecords(2, 5));
        KinesisReaderCheckpoint initialLatestCheckpoint = initialLatestCheckpoint(ImmutableList.of("shard-000", "shard-001"));
        this.pool = new EFOShardSubscribersPool(this.readSpec, this.consumerArn, this.kinesis);
        this.pool.start(initialLatestCheckpoint);
        Assertions.assertThat(waitForRecords(this.pool, 35).size()).isEqualTo(34);
        Assertions.assertThat(this.kinesis.subscribeRequestsSeen()).containsExactlyInAnyOrder(new SubscribeToShardRequest[]{TestHelpers.subscribeLatest("shard-000"), TestHelpers.subscribeLatest("shard-001"), TestHelpers.subscribeAfterSeqNumber("shard-000", "2"), TestHelpers.subscribeAfterSeqNumber("shard-001", "2"), TestHelpers.subscribeAfterSeqNumber("shard-000", "9"), TestHelpers.subscribeAfterSeqNumber("shard-001", "7"), TestHelpers.subscribeAfterSeqNumber("shard-000", "10"), TestHelpers.subscribeAfterSeqNumber("shard-001", "8"), TestHelpers.subscribeTrimHorizon("shard-002"), TestHelpers.subscribeTrimHorizon("shard-003"), TestHelpers.subscribeTrimHorizon("shard-004"), TestHelpers.subscribeAfterSeqNumber("shard-002", "4"), TestHelpers.subscribeAfterSeqNumber("shard-003", "5"), TestHelpers.subscribeAfterSeqNumber("shard-004", "4"), TestHelpers.subscribeAfterSeqNumber("shard-002", "7"), TestHelpers.subscribeAfterSeqNumber("shard-003", "8"), TestHelpers.subscribeAfterSeqNumber("shard-004", "6")});
        Assertions.assertThat(this.pool.getCheckpointMark().iterator()).containsExactlyInAnyOrder(new ShardCheckpoint[]{new ShardCheckpoint(STREAM, "shard-002", ShardIteratorType.AFTER_SEQUENCE_NUMBER, "7", 0L), new ShardCheckpoint(STREAM, "shard-003", ShardIteratorType.AFTER_SEQUENCE_NUMBER, "8", 0L), new ShardCheckpoint(STREAM, "shard-004", ShardIteratorType.AFTER_SEQUENCE_NUMBER, "6", 0L)});
    }

    @Test
    public void poolHandlesShardUpWithRecords() throws Exception {
        this.kinesis = new EFOStubbedKinesisAsyncClient(1);
        this.kinesis.stubSubscribeToShard("shard-000", TestHelpers.reShardEventWithRecords(11, 1, ImmutableList.of("shard-000"), ImmutableList.of("shard-001", "shard-002")));
        this.kinesis.stubSubscribeToShard("shard-001", TestHelpers.eventsWithoutRecords(101, 1));
        this.kinesis.stubSubscribeToShard("shard-002", TestHelpers.eventsWithoutRecords(102, 1));
        KinesisReaderCheckpoint initialLatestCheckpoint = initialLatestCheckpoint(ImmutableList.of("shard-000"));
        this.pool = new EFOShardSubscribersPool(this.readSpec, this.consumerArn, this.kinesis);
        this.pool.start(initialLatestCheckpoint);
        Assertions.assertThat(waitForRecords(this.pool, 1).size()).isEqualTo(1);
        Assertions.assertThat(this.pool.getCheckpointMark().iterator()).containsExactlyInAnyOrder(new ShardCheckpoint[]{new ShardCheckpoint(STREAM, "shard-001", ShardIteratorType.TRIM_HORIZON, (String) null, (Long) null), new ShardCheckpoint(STREAM, "shard-002", ShardIteratorType.TRIM_HORIZON, (String) null, (Long) null)});
        Assertions.assertThat(waitForRecords(this.pool, 1).size()).isEqualTo(0);
        Assertions.assertThat(this.pool.getCheckpointMark().iterator()).containsExactlyInAnyOrder(new ShardCheckpoint[]{new ShardCheckpoint(STREAM, "shard-001", ShardIteratorType.AFTER_SEQUENCE_NUMBER, "101", 0L), new ShardCheckpoint(STREAM, "shard-002", ShardIteratorType.AFTER_SEQUENCE_NUMBER, "102", 0L)});
        Assertions.assertThat(this.kinesis.subscribeRequestsSeen()).containsExactlyInAnyOrder(new SubscribeToShardRequest[]{TestHelpers.subscribeLatest("shard-000"), TestHelpers.subscribeTrimHorizon("shard-001"), TestHelpers.subscribeTrimHorizon("shard-002"), TestHelpers.subscribeAfterSeqNumber("shard-001", "101"), TestHelpers.subscribeAfterSeqNumber("shard-002", "102")});
    }

    @Test
    public void poolHandlesShardDown() throws Exception {
        this.kinesis = new EFOStubbedKinesisAsyncClient(10);
        this.kinesis.stubSubscribeToShard("shard-000", TestHelpers.eventWithRecords(3));
        this.kinesis.stubSubscribeToShard("shard-000", TestHelpers.eventWithRecords(3, 7));
        this.kinesis.stubSubscribeToShard("shard-000", TestHelpers.eventsWithoutRecords(1, 10));
        this.kinesis.stubSubscribeToShard("shard-000", TestHelpers.reShardEvent(ImmutableList.of("shard-000", "shard-001"), ImmutableList.of("shard-004")));
        this.kinesis.stubSubscribeToShard("shard-001", TestHelpers.eventWithRecords(5));
        this.kinesis.stubSubscribeToShard("shard-001", TestHelpers.eventsWithoutRecords(2, 5));
        this.kinesis.stubSubscribeToShard("shard-001", TestHelpers.reShardEvent(ImmutableList.of("shard-000", "shard-001"), ImmutableList.of("shard-004")));
        this.kinesis.stubSubscribeToShard("shard-002", TestHelpers.eventWithRecords(5));
        this.kinesis.stubSubscribeToShard("shard-002", TestHelpers.eventsWithoutRecords(2, 5));
        this.kinesis.stubSubscribeToShard("shard-002", TestHelpers.reShardEvent(ImmutableList.of("shard-002", "shard-003"), ImmutableList.of("shard-005")));
        this.kinesis.stubSubscribeToShard("shard-003", TestHelpers.eventWithRecords(5));
        this.kinesis.stubSubscribeToShard("shard-003", TestHelpers.eventsWithoutRecords(2, 5));
        this.kinesis.stubSubscribeToShard("shard-003", TestHelpers.reShardEvent(ImmutableList.of("shard-002", "shard-003"), ImmutableList.of("shard-005")));
        this.kinesis.stubSubscribeToShard("shard-004", TestHelpers.eventWithRecords(6));
        this.kinesis.stubSubscribeToShard("shard-004", TestHelpers.eventsWithoutRecords(3, 6));
        this.kinesis.stubSubscribeToShard("shard-005", TestHelpers.eventWithRecords(6));
        this.kinesis.stubSubscribeToShard("shard-005", TestHelpers.eventsWithoutRecords(3, 6));
        KinesisReaderCheckpoint initialLatestCheckpoint = initialLatestCheckpoint(ImmutableList.of("shard-000", "shard-001", "shard-002", "shard-003"));
        this.pool = new EFOShardSubscribersPool(this.readSpec, this.consumerArn, this.kinesis);
        this.pool.start(initialLatestCheckpoint);
        Assertions.assertThat(waitForRecords(this.pool, 38).size()).isEqualTo(37);
        Assertions.assertThat(this.kinesis.subscribeRequestsSeen()).containsExactlyInAnyOrder(new SubscribeToShardRequest[]{TestHelpers.subscribeLatest("shard-000"), TestHelpers.subscribeLatest("shard-001"), TestHelpers.subscribeLatest("shard-002"), TestHelpers.subscribeLatest("shard-003"), TestHelpers.subscribeAfterSeqNumber("shard-000", "2"), TestHelpers.subscribeAfterSeqNumber("shard-001", "4"), TestHelpers.subscribeAfterSeqNumber("shard-002", "4"), TestHelpers.subscribeAfterSeqNumber("shard-003", "4"), TestHelpers.subscribeAfterSeqNumber("shard-000", "9"), TestHelpers.subscribeAfterSeqNumber("shard-001", "6"), TestHelpers.subscribeAfterSeqNumber("shard-002", "6"), TestHelpers.subscribeAfterSeqNumber("shard-003", "6"), TestHelpers.subscribeAfterSeqNumber("shard-000", "10"), TestHelpers.subscribeTrimHorizon("shard-004"), TestHelpers.subscribeTrimHorizon("shard-005"), TestHelpers.subscribeAfterSeqNumber("shard-004", "5"), TestHelpers.subscribeAfterSeqNumber("shard-005", "5"), TestHelpers.subscribeAfterSeqNumber("shard-004", "8"), TestHelpers.subscribeAfterSeqNumber("shard-005", "8")});
        Assertions.assertThat(this.pool.getCheckpointMark().iterator()).containsExactlyInAnyOrder(new ShardCheckpoint[]{new ShardCheckpoint(STREAM, "shard-005", ShardIteratorType.AFTER_SEQUENCE_NUMBER, "8", 0L), new ShardCheckpoint(STREAM, "shard-004", ShardIteratorType.AFTER_SEQUENCE_NUMBER, "8", 0L)});
    }

    @Test
    public void checkpointEqualsToInitStateIfNothingIsConsumed() {
        this.kinesis = new EFOStubbedKinesisAsyncClient(10);
        this.kinesis.stubSubscribeToShard("shard-000", TestHelpers.eventWithRecords(1));
        this.kinesis.stubSubscribeToShard("shard-001", TestHelpers.eventWithRecords(1));
        KinesisReaderCheckpoint initialLatestCheckpoint = initialLatestCheckpoint(ImmutableList.of("shard-000", "shard-001"));
        this.pool = new EFOShardSubscribersPool(this.readSpec, this.consumerArn, this.kinesis);
        this.pool.start(initialLatestCheckpoint);
        ShardCheckpoint[] shardCheckpointArr = {new ShardCheckpoint(STREAM, "shard-000", new StartingPoint(InitialPositionInStream.LATEST)), new ShardCheckpoint(STREAM, "shard-001", new StartingPoint(InitialPositionInStream.LATEST))};
        Assertions.assertThat(initialLatestCheckpoint.iterator()).containsExactlyInAnyOrder(shardCheckpointArr);
        Assertions.assertThat(this.pool.getCheckpointMark().iterator()).containsExactlyInAnyOrder(shardCheckpointArr);
    }

    @Test
    public void poolWatermarkReturnsTsOfOldestAcknowledgedRecord() throws Exception {
        List<Record> recordWithMinutesAgo = TestHelpers.recordWithMinutesAgo(5);
        List<Record> recordWithMinutesAgo2 = TestHelpers.recordWithMinutesAgo(4);
        List<Record> recordWithMinutesAgo3 = TestHelpers.recordWithMinutesAgo(3);
        this.kinesis = new EFOStubbedKinesisAsyncClient(10);
        this.kinesis.stubSubscribeToShard("shard-000", TestHelpers.eventWithRecords(recordWithMinutesAgo));
        this.kinesis.stubSubscribeToShard("shard-001", TestHelpers.eventWithRecords(recordWithMinutesAgo2));
        this.kinesis.stubSubscribeToShard("shard-000", TestHelpers.eventWithRecords(recordWithMinutesAgo3));
        Instant joda = TimeUtil.toJoda(recordWithMinutesAgo.get(0).approximateArrivalTimestamp());
        Instant joda2 = TimeUtil.toJoda(recordWithMinutesAgo2.get(0).approximateArrivalTimestamp());
        KinesisReaderCheckpoint initialLatestCheckpoint = initialLatestCheckpoint(ImmutableList.of("shard-000", "shard-001"));
        this.pool = new EFOShardSubscribersPool(this.readSpec, this.consumerArn, this.kinesis);
        this.pool.start(initialLatestCheckpoint);
        Assertions.assertThat(this.pool.getWatermark()).isEqualTo(BoundedWindow.TIMESTAMP_MIN_VALUE);
        Assertions.assertThat(waitForRecords(this.pool, 1).size()).isEqualTo(1);
        Assertions.assertThat(this.pool.getWatermark()).isEqualTo(BoundedWindow.TIMESTAMP_MIN_VALUE);
        Assertions.assertThat(waitForRecords(this.pool, 1).size()).isEqualTo(1);
        Assertions.assertThat(this.pool.getWatermark()).isEqualTo(joda);
        Assertions.assertThat(waitForRecords(this.pool, 1).size()).isEqualTo(1);
        Assertions.assertThat(this.pool.getWatermark()).isEqualTo(joda2);
    }

    static List<KinesisRecord> waitForRecords(EFOShardSubscribersPool eFOShardSubscribersPool, int i) throws Exception {
        ArrayList arrayList = new ArrayList(i);
        int i2 = 0;
        while (arrayList.size() < i && i2 < 5) {
            KinesisRecord nextRecord = eFOShardSubscribersPool.getNextRecord();
            if (nextRecord != null) {
                arrayList.add(nextRecord);
                i2 = 0;
            } else {
                i2++;
                Thread.sleep(50L);
            }
        }
        return arrayList;
    }

    private KinesisReaderCheckpoint initialLatestCheckpoint(List<String> list) {
        ArrayList arrayList = new ArrayList();
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(new ShardCheckpoint(STREAM, it.next(), new StartingPoint(InitialPositionInStream.LATEST)));
        }
        return new KinesisReaderCheckpoint(arrayList);
    }

    private ShardCheckpoint afterCheckpoint(String str, String str2) {
        return afterCheckpoint(str, str2, 0L);
    }

    private ShardCheckpoint afterCheckpoint(String str, String str2, long j) {
        return new ShardCheckpoint(STREAM, str, ShardIteratorType.AFTER_SEQUENCE_NUMBER, str2, Long.valueOf(j));
    }

    private ShardCheckpoint trimHorizonCheckpoint(String str) {
        return new ShardCheckpoint(STREAM, str, ShardIteratorType.TRIM_HORIZON, (Instant) null);
    }

    private ShardCheckpoint tsCheckpoint(String str, Instant instant) {
        return new ShardCheckpoint(STREAM, str, ShardIteratorType.AT_TIMESTAMP, instant);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void nothingElseShouldBeReceived(EFOShardSubscribersPool eFOShardSubscribersPool) throws Exception {
        Assertions.assertThat(waitForRecords(eFOShardSubscribersPool, 5)).hasSize(0);
    }
}
