/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.aws2.kinesis;

import io.netty.handler.timeout.ReadTimeoutException;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletionException;
import org.apache.beam.sdk.io.aws2.kinesis.EFOShardSubscribersPool;
import org.apache.beam.sdk.io.aws2.kinesis.EFOStubbedKinesisAsyncClient;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisIO;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisReaderCheckpoint;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisRecord;
import org.apache.beam.sdk.io.aws2.kinesis.ShardCheckpoint;
import org.apache.beam.sdk.io.aws2.kinesis.StartingPoint;
import org.apache.beam.sdk.io.aws2.kinesis.TestHelpers;
import org.apache.beam.sdk.io.aws2.kinesis.TimeUtil;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.assertj.core.api.Assertions;
import org.joda.time.Duration;
import org.joda.time.ReadableDuration;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
import software.amazon.kinesis.common.InitialPositionInStream;

public class EFOShardSubscribersPoolTest {
    private static final String STREAM = "stream-01";
    private static final String CONSUMER = "consumer-01";
    private KinesisIO.Read readSpec;
    private String consumerArn;
    private EFOStubbedKinesisAsyncClient kinesis;
    private EFOShardSubscribersPool pool;

    @Before
    public void setUp() {
        this.readSpec = TestHelpers.createReadSpec();
        this.consumerArn = CONSUMER;
    }

    @After
    public void tearDown() {
        this.kinesis.close();
        this.pool.stop();
    }

    @Test
    public void poolReSubscribesAndReadsRecords() throws Exception {
        this.kinesis = new EFOStubbedKinesisAsyncClient(10);
        this.kinesis.stubSubscribeToShard("shard-000", new SubscribeToShardEventStream[]{TestHelpers.eventWithRecords(3)});
        this.kinesis.stubSubscribeToShard("shard-000", new SubscribeToShardEventStream[]{TestHelpers.eventWithRecords(3, 7)});
        this.kinesis.stubSubscribeToShard("shard-000", (SubscribeToShardEventStream[])TestHelpers.eventsWithoutRecords(10, 3));
        this.kinesis.stubSubscribeToShard("shard-001", new SubscribeToShardEventStream[]{TestHelpers.eventWithRecords(3)});
        this.kinesis.stubSubscribeToShard("shard-001", new SubscribeToShardEventStream[]{TestHelpers.eventWithRecords(3, 5)});
        this.kinesis.stubSubscribeToShard("shard-001", (SubscribeToShardEventStream[])TestHelpers.eventsWithoutRecords(8, 3));
        KinesisReaderCheckpoint initialCheckpoint = this.initialLatestCheckpoint((List<String>)ImmutableList.of((Object)"shard-000", (Object)"shard-001"));
        this.pool = new EFOShardSubscribersPool(this.readSpec, this.consumerArn, (KinesisAsyncClient)this.kinesis);
        this.pool.start((Iterable)initialCheckpoint);
        PoolAssertion.assertPool(this.pool).givesCheckPointedRecords(ShardAssertion.shard("shard-000").gives(KinesisRecordView.generate("shard-000", 0, 10)).withLastCheckpointSequenceNumber(12), ShardAssertion.shard("shard-001").gives(KinesisRecordView.generate("shard-001", 0, 8)).withLastCheckpointSequenceNumber(10));
        Assertions.assertThat(this.kinesis.subscribeRequestsSeen()).containsExactlyInAnyOrder((Object[])new SubscribeToShardRequest[]{TestHelpers.subscribeLatest("shard-000"), TestHelpers.subscribeLatest("shard-001"), TestHelpers.subscribeAfterSeqNumber("shard-000", "2"), TestHelpers.subscribeAfterSeqNumber("shard-001", "2"), TestHelpers.subscribeAfterSeqNumber("shard-000", "9"), TestHelpers.subscribeAfterSeqNumber("shard-001", "7"), TestHelpers.subscribeAfterSeqNumber("shard-000", "12"), TestHelpers.subscribeAfterSeqNumber("shard-001", "10")});
    }

    @Test
    public void poolReSubscribesAndReadsRecordsAfterCheckPoint() throws Exception {
        this.kinesis = new EFOStubbedKinesisAsyncClient(10);
        this.kinesis.stubSubscribeToShard("shard-000", new SubscribeToShardEventStream[]{TestHelpers.eventWithRecords(3)});
        this.kinesis.stubSubscribeToShard("shard-001", new SubscribeToShardEventStream[]{TestHelpers.eventWithRecords(11, 3)});
        KinesisReaderCheckpoint initialCheckpoint = new KinesisReaderCheckpoint((Iterable)ImmutableList.of((Object)this.afterCheckpoint("shard-000", "0"), (Object)this.afterCheckpoint("shard-001", "11")));
        this.pool = new EFOShardSubscribersPool(this.readSpec, this.consumerArn, (KinesisAsyncClient)this.kinesis);
        this.pool.start((Iterable)initialCheckpoint);
        PoolAssertion.assertPool(this.pool).givesCheckPointedRecords(ShardAssertion.shard("shard-000").gives(KinesisRecordView.generate("shard-000", 1, 2)).withLastCheckpointSequenceNumber(2), ShardAssertion.shard("shard-001").gives(KinesisRecordView.generate("shard-001", 12, 2)).withLastCheckpointSequenceNumber(13));
        Assertions.assertThat(this.kinesis.subscribeRequestsSeen()).containsExactlyInAnyOrder((Object[])new SubscribeToShardRequest[]{TestHelpers.subscribeAtSeqNumber("shard-000", "0"), TestHelpers.subscribeAtSeqNumber("shard-001", "11"), TestHelpers.subscribeAfterSeqNumber("shard-000", "2"), TestHelpers.subscribeAfterSeqNumber("shard-001", "13")});
    }

    @Test
    public void poolReSubscribesAndReadsRecordsWithTrimHorizon() throws Exception {
        this.kinesis = new EFOStubbedKinesisAsyncClient(10);
        this.kinesis.stubSubscribeToShard("shard-000", new SubscribeToShardEventStream[]{TestHelpers.eventWithRecords(3)});
        this.kinesis.stubSubscribeToShard("shard-001", new SubscribeToShardEventStream[]{TestHelpers.eventWithRecords(11, 3)});
        KinesisReaderCheckpoint initialCheckpoint = new KinesisReaderCheckpoint((Iterable)ImmutableList.of((Object)this.trimHorizonCheckpoint("shard-000"), (Object)this.trimHorizonCheckpoint("shard-001")));
        this.pool = new EFOShardSubscribersPool(this.readSpec, this.consumerArn, (KinesisAsyncClient)this.kinesis);
        this.pool.start((Iterable)initialCheckpoint);
        PoolAssertion.assertPool(this.pool).givesCheckPointedRecords(ShardAssertion.shard("shard-000").gives(KinesisRecordView.generate("shard-000", 0, 3)).withLastCheckpointSequenceNumber(2), ShardAssertion.shard("shard-001").gives(KinesisRecordView.generate("shard-001", 11, 3)).withLastCheckpointSequenceNumber(13));
        Assertions.assertThat(this.kinesis.subscribeRequestsSeen()).containsExactlyInAnyOrder((Object[])new SubscribeToShardRequest[]{TestHelpers.subscribeTrimHorizon("shard-000"), TestHelpers.subscribeTrimHorizon("shard-001"), TestHelpers.subscribeAfterSeqNumber("shard-000", "2"), TestHelpers.subscribeAfterSeqNumber("shard-001", "13")});
    }

    @Test
    public void poolReSubscribesAndReadsRecordsWithAtTimestamp() throws Exception {
        this.kinesis = new EFOStubbedKinesisAsyncClient(10);
        this.kinesis.stubSubscribeToShard("shard-000", new SubscribeToShardEventStream[]{TestHelpers.eventWithRecords(3)});
        this.kinesis.stubSubscribeToShard("shard-001", new SubscribeToShardEventStream[]{TestHelpers.eventWithRecords(11, 3)});
        org.joda.time.Instant shard000ts = org.joda.time.Instant.now().minus((ReadableDuration)Duration.standardHours((long)1L));
        org.joda.time.Instant shard001ts = org.joda.time.Instant.now().minus((ReadableDuration)Duration.standardHours((long)1L));
        KinesisReaderCheckpoint initialCheckpoint = new KinesisReaderCheckpoint((Iterable)ImmutableList.of((Object)this.tsCheckpoint("shard-000", shard000ts), (Object)this.tsCheckpoint("shard-001", shard001ts)));
        this.pool = new EFOShardSubscribersPool(this.readSpec, this.consumerArn, (KinesisAsyncClient)this.kinesis);
        this.pool.start((Iterable)initialCheckpoint);
        PoolAssertion.assertPool(this.pool).givesCheckPointedRecords(ShardAssertion.shard("shard-000").gives(KinesisRecordView.generate("shard-000", 0, 3)).withLastCheckpointSequenceNumber(2), ShardAssertion.shard("shard-001").gives(KinesisRecordView.generate("shard-001", 11, 3)).withLastCheckpointSequenceNumber(13));
        Assertions.assertThat(this.kinesis.subscribeRequestsSeen()).containsExactlyInAnyOrder((Object[])new SubscribeToShardRequest[]{TestHelpers.subscribeAtTs("shard-000", shard000ts), TestHelpers.subscribeAtTs("shard-001", shard001ts), TestHelpers.subscribeAfterSeqNumber("shard-000", "2"), TestHelpers.subscribeAfterSeqNumber("shard-001", "13")});
    }

    @Test
    public void poolReSubscribesAndSkipsAllRecordsWithAtTimestampGreaterThanRecords() throws Exception {
        this.kinesis = new EFOStubbedKinesisAsyncClient(10);
        this.kinesis.stubSubscribeToShard("shard-000", new SubscribeToShardEventStream[]{TestHelpers.eventWithRecords(3)});
        this.kinesis.stubSubscribeToShard("shard-001", new SubscribeToShardEventStream[]{TestHelpers.eventWithRecords(11, 3)});
        org.joda.time.Instant shard000ts = org.joda.time.Instant.now().plus((ReadableDuration)Duration.standardHours((long)1L));
        org.joda.time.Instant shard001ts = org.joda.time.Instant.now().plus((ReadableDuration)Duration.standardHours((long)1L));
        KinesisReaderCheckpoint initialCheckpoint = new KinesisReaderCheckpoint((Iterable)ImmutableList.of((Object)this.tsCheckpoint("shard-000", shard000ts), (Object)this.tsCheckpoint("shard-001", shard001ts)));
        this.pool = new EFOShardSubscribersPool(this.readSpec, this.consumerArn, (KinesisAsyncClient)this.kinesis);
        this.pool.start((Iterable)initialCheckpoint);
        PoolAssertion.assertPool(this.pool).givesCheckPointedRecords(ShardAssertion.shard("shard-000").gives(new KinesisRecordView[0]).withLastCheckpointSequenceNumber(2), ShardAssertion.shard("shard-001").gives(new KinesisRecordView[0]).withLastCheckpointSequenceNumber(13));
        Assertions.assertThat(this.kinesis.subscribeRequestsSeen()).containsExactlyInAnyOrder((Object[])new SubscribeToShardRequest[]{TestHelpers.subscribeAtTs("shard-000", shard000ts), TestHelpers.subscribeAtTs("shard-001", shard001ts), TestHelpers.subscribeAfterSeqNumber("shard-000", "2"), TestHelpers.subscribeAfterSeqNumber("shard-001", "13")});
    }

    @Test
    public void poolReSubscribesAndReadsRecordsAfterCheckPointWithPositiveSubSeqNumber() throws Exception {
        this.kinesis = new EFOStubbedKinesisAsyncClient(10);
        this.kinesis.stubSubscribeToShard("shard-000", new SubscribeToShardEventStream[]{TestHelpers.eventWithRecords(3)});
        this.kinesis.stubSubscribeToShard("shard-001", new SubscribeToShardEventStream[]{TestHelpers.eventWithRecords(11, 3)});
        KinesisReaderCheckpoint initialCheckpoint = new KinesisReaderCheckpoint((Iterable)ImmutableList.of((Object)this.afterCheckpoint("shard-000", "0", 1000L), (Object)this.afterCheckpoint("shard-001", "11", Long.MAX_VALUE)));
        this.pool = new EFOShardSubscribersPool(this.readSpec, this.consumerArn, (KinesisAsyncClient)this.kinesis);
        this.pool.start((Iterable)initialCheckpoint);
        PoolAssertion.assertPool(this.pool).givesCheckPointedRecords(ShardAssertion.shard("shard-000").gives(KinesisRecordView.generate("shard-000", 1, 2)).withLastCheckpointSequenceNumber(2), ShardAssertion.shard("shard-001").gives(KinesisRecordView.generate("shard-001", 12, 2)).withLastCheckpointSequenceNumber(13));
        Assertions.assertThat(this.kinesis.subscribeRequestsSeen()).containsExactlyInAnyOrder((Object[])new SubscribeToShardRequest[]{TestHelpers.subscribeAtSeqNumber("shard-000", "0"), TestHelpers.subscribeAtSeqNumber("shard-001", "11"), TestHelpers.subscribeAfterSeqNumber("shard-000", "2"), TestHelpers.subscribeAfterSeqNumber("shard-001", "13")});
    }

    @Test
    public void poolReSubscribesAndReadsManyEvents() throws Exception {
        this.kinesis = new EFOStubbedKinesisAsyncClient(1);
        this.kinesis.stubSubscribeToShard("shard-000", (SubscribeToShardEventStream[])TestHelpers.eventsWithRecords(18, 300));
        this.kinesis.stubSubscribeToShard("shard-000", (SubscribeToShardEventStream[])TestHelpers.eventsWithoutRecords(318, 3));
        this.kinesis.stubSubscribeToShard("shard-001", (SubscribeToShardEventStream[])TestHelpers.eventsWithRecords(75, 200));
        this.kinesis.stubSubscribeToShard("shard-001", (SubscribeToShardEventStream[])TestHelpers.eventsWithoutRecords(275, 3));
        KinesisReaderCheckpoint initialCheckpoint = this.initialLatestCheckpoint((List<String>)ImmutableList.of((Object)"shard-000", (Object)"shard-001"));
        this.pool = new EFOShardSubscribersPool(this.readSpec, this.consumerArn, (KinesisAsyncClient)this.kinesis);
        this.pool.start((Iterable)initialCheckpoint);
        PoolAssertion.assertPool(this.pool).givesCheckPointedRecords(ShardAssertion.shard("shard-000").gives(KinesisRecordView.generate("shard-000", 18, 300)).withLastCheckpointSequenceNumber(320), ShardAssertion.shard("shard-001").gives(KinesisRecordView.generate("shard-001", 75, 200)).withLastCheckpointSequenceNumber(277));
        Assertions.assertThat(this.kinesis.subscribeRequestsSeen()).containsExactlyInAnyOrder((Object[])new SubscribeToShardRequest[]{TestHelpers.subscribeLatest("shard-000"), TestHelpers.subscribeLatest("shard-001"), TestHelpers.subscribeAfterSeqNumber("shard-000", "317"), TestHelpers.subscribeAfterSeqNumber("shard-001", "274"), TestHelpers.subscribeAfterSeqNumber("shard-000", "320"), TestHelpers.subscribeAfterSeqNumber("shard-001", "277")});
    }

    @Test
    public void handlesAggregatedRecords() throws Exception {
        this.kinesis = new EFOStubbedKinesisAsyncClient(10);
        this.kinesis.stubSubscribeToShard("shard-000", new SubscribeToShardEventStream[]{TestHelpers.eventWithAggRecords(12, 2)});
        this.kinesis.stubSubscribeToShard("shard-000", (SubscribeToShardEventStream[])TestHelpers.eventsWithoutRecords(13, 1));
        this.kinesis.stubSubscribeToShard("shard-001", new SubscribeToShardEventStream[]{TestHelpers.eventWithAggRecords(55, 3)});
        this.kinesis.stubSubscribeToShard("shard-001", (SubscribeToShardEventStream[])TestHelpers.eventsWithoutRecords(56, 1));
        KinesisReaderCheckpoint initialCheckpoint = this.initialLatestCheckpoint((List<String>)ImmutableList.of((Object)"shard-000", (Object)"shard-001"));
        this.pool = new EFOShardSubscribersPool(this.readSpec, this.consumerArn, (KinesisAsyncClient)this.kinesis);
        this.pool.start((Iterable)initialCheckpoint);
        PoolAssertion.assertPool(this.pool).givesCheckPointedRecords(ShardAssertion.shard("shard-000").gives(KinesisRecordView.generateAggregated("shard-000", 12, 2)).withLastCheckpointSequenceNumber(13), ShardAssertion.shard("shard-001").gives(KinesisRecordView.generateAggregated("shard-001", 55, 3)).withLastCheckpointSequenceNumber(56));
        Assertions.assertThat(this.kinesis.subscribeRequestsSeen()).containsExactlyInAnyOrder((Object[])new SubscribeToShardRequest[]{TestHelpers.subscribeLatest("shard-000"), TestHelpers.subscribeLatest("shard-001"), TestHelpers.subscribeAfterSeqNumber("shard-000", "12"), TestHelpers.subscribeAfterSeqNumber("shard-001", "55"), TestHelpers.subscribeAfterSeqNumber("shard-000", "13"), TestHelpers.subscribeAfterSeqNumber("shard-001", "56")});
    }

    @Test
    public void doesNotIntroduceDuplicatesWithAggregatedRecordsCheckpoints() throws Exception {
        this.kinesis = new EFOStubbedKinesisAsyncClient(10);
        SubscribeToShardEvent eventWithAggRecords = TestHelpers.eventWithAggRecords(12, 6);
        this.kinesis.stubSubscribeToShard("shard-000", new SubscribeToShardEventStream[]{eventWithAggRecords});
        KinesisReaderCheckpoint initialCheckpoint = this.initialLatestCheckpoint((List<String>)ImmutableList.of((Object)"shard-000"));
        this.pool = new EFOShardSubscribersPool(this.readSpec, this.consumerArn, (KinesisAsyncClient)this.kinesis);
        this.pool.start((Iterable)initialCheckpoint);
        this.validateAggregatedRecords("12", 0, 3);
        KinesisReaderCheckpoint checkpoint = this.pool.getCheckpointMark();
        Assertions.assertThat((Iterator)checkpoint.iterator()).containsExactlyInAnyOrder((Object[])new ShardCheckpoint[]{new ShardCheckpoint(STREAM, "shard-000", ShardIteratorType.AFTER_SEQUENCE_NUMBER, "12", Long.valueOf(3L))});
        Assertions.assertThat(this.kinesis.subscribeRequestsSeen()).containsExactlyInAnyOrder((Object[])new SubscribeToShardRequest[]{TestHelpers.subscribeLatest("shard-000"), TestHelpers.subscribeAfterSeqNumber("shard-000", "12")});
        this.pool.stop();
        this.kinesis.close();
        this.kinesis = new EFOStubbedKinesisAsyncClient(10);
        this.kinesis.stubSubscribeToShard("shard-000", new SubscribeToShardEventStream[]{eventWithAggRecords});
        this.pool = new EFOShardSubscribersPool(this.readSpec, this.consumerArn, (KinesisAsyncClient)this.kinesis);
        this.pool.start((Iterable)checkpoint);
        this.validateAggregatedRecords("12", 4, 5);
        Assertions.assertThat((int)EFOShardSubscribersPoolTest.waitForRecords(this.pool, 3).size()).isEqualTo(0);
        Assertions.assertThat(this.kinesis.subscribeRequestsSeen()).containsExactlyInAnyOrder((Object[])new SubscribeToShardRequest[]{TestHelpers.subscribeAtSeqNumber("shard-000", "12"), TestHelpers.subscribeAfterSeqNumber("shard-000", "12")});
    }

    private void validateAggregatedRecords(String expectedSeqNum, int startSubSeqNum, int endSubSeqNum) throws Exception {
        for (int i = startSubSeqNum; i < endSubSeqNum + 1; ++i) {
            KinesisRecord r = EFOShardSubscribersPoolTest.waitForRecords(this.pool, 1).get(0);
            Assertions.assertThat((String)r.getSequenceNumber()).isEqualTo((Object)expectedSeqNum);
            Assertions.assertThat((long)r.getSubSequenceNumber()).isEqualTo((long)i);
            Assertions.assertThat((Iterator)this.pool.getCheckpointMark().iterator()).containsExactlyInAnyOrder((Object[])new ShardCheckpoint[]{new ShardCheckpoint(STREAM, "shard-000", ShardIteratorType.AFTER_SEQUENCE_NUMBER, expectedSeqNum, Long.valueOf(i))});
        }
    }

    @Test
    public void skipsEntireAggregatedBatch() throws Exception {
        this.kinesis = new EFOStubbedKinesisAsyncClient(10);
        SubscribeToShardEvent eventWithAggRecords = TestHelpers.eventWithAggRecords(12, 6);
        this.kinesis.stubSubscribeToShard("shard-000", new SubscribeToShardEventStream[]{eventWithAggRecords});
        KinesisReaderCheckpoint initialCheckpoint = new KinesisReaderCheckpoint((Iterable)ImmutableList.of((Object)new ShardCheckpoint(STREAM, "shard-000", ShardIteratorType.AFTER_SEQUENCE_NUMBER, "12", Long.valueOf(5L))));
        this.pool = new EFOShardSubscribersPool(this.readSpec, this.consumerArn, (KinesisAsyncClient)this.kinesis);
        this.pool.start((Iterable)initialCheckpoint);
        List<KinesisRecord> actualRecords = EFOShardSubscribersPoolTest.waitForRecords(this.pool, 10);
        Assertions.assertThat((int)actualRecords.size()).isEqualTo(0);
        Assertions.assertThat(this.kinesis.subscribeRequestsSeen()).containsExactlyInAnyOrder((Object[])new SubscribeToShardRequest[]{TestHelpers.subscribeAtSeqNumber("shard-000", "12"), TestHelpers.subscribeAfterSeqNumber("shard-000", "12")});
    }

    @Test
    public void poolReSubscribesWhenNoRecordsCome() throws Exception {
        this.kinesis = new EFOStubbedKinesisAsyncClient(10);
        this.kinesis.stubSubscribeToShard("shard-000", (SubscribeToShardEventStream[])TestHelpers.eventsWithoutRecords(31, 3));
        this.kinesis.stubSubscribeToShard("shard-001", (SubscribeToShardEventStream[])TestHelpers.eventsWithoutRecords(8, 3));
        KinesisReaderCheckpoint initialCheckpoint = this.initialLatestCheckpoint((List<String>)ImmutableList.of((Object)"shard-000", (Object)"shard-001"));
        this.pool = new EFOShardSubscribersPool(this.readSpec, this.consumerArn, (KinesisAsyncClient)this.kinesis);
        this.pool.start((Iterable)initialCheckpoint);
        PoolAssertion.assertPool(this.pool).givesCheckPointedRecords(ShardAssertion.shard("shard-000").gives(new KinesisRecordView[0]).withLastCheckpointSequenceNumber(33), ShardAssertion.shard("shard-001").gives(new KinesisRecordView[0]).withLastCheckpointSequenceNumber(10));
        Assertions.assertThat(this.kinesis.subscribeRequestsSeen()).containsExactlyInAnyOrder((Object[])new SubscribeToShardRequest[]{TestHelpers.subscribeLatest("shard-000"), TestHelpers.subscribeLatest("shard-001"), TestHelpers.subscribeAfterSeqNumber("shard-000", "33"), TestHelpers.subscribeAfterSeqNumber("shard-001", "10")});
    }

    @Test
    public void poolReSubscribesWhenRecoverableErrorOccurs() throws Exception {
        this.kinesis = new EFOStubbedKinesisAsyncClient(10);
        this.kinesis.stubSubscribeToShard("shard-000", new SubscribeToShardEventStream[]{TestHelpers.eventWithRecords(3)}).failWith((Throwable)new ReadTimeoutException());
        this.kinesis.stubSubscribeToShard("shard-000", new SubscribeToShardEventStream[]{TestHelpers.eventWithRecords(3, 7)});
        this.kinesis.stubSubscribeToShard("shard-000", (SubscribeToShardEventStream[])TestHelpers.eventsWithoutRecords(10, 10));
        this.kinesis.stubSubscribeToShard("shard-001", new SubscribeToShardEventStream[]{TestHelpers.eventWithRecords(3)});
        this.kinesis.stubSubscribeToShard("shard-001", new SubscribeToShardEventStream[]{TestHelpers.eventWithRecords(3, 5)}).failWith((Throwable)SdkClientException.create((String)"this is recoverable", (Throwable)new ReadTimeoutException()));
        this.kinesis.stubSubscribeToShard("shard-001", (SubscribeToShardEventStream[])TestHelpers.eventsWithoutRecords(8, 8));
        KinesisReaderCheckpoint initialCheckpoint = this.initialLatestCheckpoint((List<String>)ImmutableList.of((Object)"shard-000", (Object)"shard-001"));
        this.pool = new EFOShardSubscribersPool(this.readSpec, this.consumerArn, (KinesisAsyncClient)this.kinesis, 1);
        this.pool.start((Iterable)initialCheckpoint);
        PoolAssertion.assertPool(this.pool).givesCheckPointedRecords(ShardAssertion.shard("shard-000").gives(KinesisRecordView.generate("shard-000", 0, 10)).withLastCheckpointSequenceNumber(19), ShardAssertion.shard("shard-001").gives(KinesisRecordView.generate("shard-001", 0, 8)).withLastCheckpointSequenceNumber(15));
        Assertions.assertThat(this.kinesis.subscribeRequestsSeen()).containsExactlyInAnyOrder((Object[])new SubscribeToShardRequest[]{TestHelpers.subscribeLatest("shard-000"), TestHelpers.subscribeLatest("shard-001"), TestHelpers.subscribeAfterSeqNumber("shard-000", "2"), TestHelpers.subscribeAfterSeqNumber("shard-001", "2"), TestHelpers.subscribeAfterSeqNumber("shard-000", "9"), TestHelpers.subscribeAfterSeqNumber("shard-001", "7"), TestHelpers.subscribeAfterSeqNumber("shard-000", "19"), TestHelpers.subscribeAfterSeqNumber("shard-001", "15")});
    }

    @Test
    public void poolReSubscribesWhenManyRecoverableErrorsOccur() throws Exception {
        this.kinesis = new EFOStubbedKinesisAsyncClient(1);
        for (int i = 0; i < 250; ++i) {
            this.kinesis.stubSubscribeToShard("shard-000", (SubscribeToShardEventStream[])TestHelpers.eventsWithRecords(i, 1)).failWith((Throwable)new ReadTimeoutException());
        }
        this.kinesis.stubSubscribeToShard("shard-000", (SubscribeToShardEventStream[])TestHelpers.eventsWithoutRecords(250, 3));
        this.kinesis.stubSubscribeToShard("shard-001", (SubscribeToShardEventStream[])TestHelpers.eventsWithRecords(333, 250)).failWith((Throwable)SdkClientException.create((String)"this is recoverable", (Throwable)new ReadTimeoutException()));
        this.kinesis.stubSubscribeToShard("shard-001", (SubscribeToShardEventStream[])TestHelpers.eventsWithoutRecords(583, 3));
        KinesisReaderCheckpoint initialCheckpoint = this.initialLatestCheckpoint((List<String>)ImmutableList.of((Object)"shard-000", (Object)"shard-001"));
        this.pool = new EFOShardSubscribersPool(this.readSpec, this.consumerArn, (KinesisAsyncClient)this.kinesis, 1);
        this.pool.start((Iterable)initialCheckpoint);
        PoolAssertion.assertPool(this.pool).givesCheckPointedRecords(ShardAssertion.shard("shard-000").gives(KinesisRecordView.generate("shard-000", 0, 250)).withLastCheckpointSequenceNumber(252), ShardAssertion.shard("shard-001").gives(KinesisRecordView.generate("shard-001", 333, 250)).withLastCheckpointSequenceNumber(585));
        Assertions.assertThat((int)this.kinesis.subscribeRequestsSeen().size()).isEqualTo(255);
    }

    @Test
    public void poolReSubscribesFromInitialWhenRecoverableErrorOccursImmediately() throws Exception {
        this.kinesis = new EFOStubbedKinesisAsyncClient(10);
        this.kinesis.stubSubscribeToShard("shard-000", new SubscribeToShardEventStream[0]).failWith((Throwable)new ReadTimeoutException());
        this.kinesis.stubSubscribeToShard("shard-000", new SubscribeToShardEventStream[]{TestHelpers.eventWithRecords(550, 3)});
        this.kinesis.stubSubscribeToShard("shard-000", (SubscribeToShardEventStream[])TestHelpers.eventsWithoutRecords(553, 1));
        KinesisReaderCheckpoint initialCheckpoint = this.initialLatestCheckpoint((List<String>)ImmutableList.of((Object)"shard-000"));
        this.pool = new EFOShardSubscribersPool(this.readSpec, this.consumerArn, (KinesisAsyncClient)this.kinesis, 1);
        this.pool.start((Iterable)initialCheckpoint);
        PoolAssertion.assertPool(this.pool).givesCheckPointedRecords(ShardAssertion.shard("shard-000").gives(KinesisRecordView.generate("shard-000", 550, 3)).withLastCheckpointSequenceNumber(553));
        Assertions.assertThat(this.kinesis.subscribeRequestsSeen()).containsExactlyInAnyOrder((Object[])new SubscribeToShardRequest[]{TestHelpers.subscribeLatest("shard-000"), TestHelpers.subscribeLatest("shard-000"), TestHelpers.subscribeAfterSeqNumber("shard-000", "552"), TestHelpers.subscribeAfterSeqNumber("shard-000", "553")});
    }

    @Test
    public void poolFailsWhenNonRecoverableErrorOccurs() throws Exception {
        this.kinesis = new EFOStubbedKinesisAsyncClient(10);
        this.kinesis.stubSubscribeToShard("shard-000", new SubscribeToShardEventStream[]{TestHelpers.eventWithRecords(7)});
        this.kinesis.stubSubscribeToShard("shard-000", new SubscribeToShardEventStream[]{TestHelpers.eventWithRecords(7, 3)}).failWith(new RuntimeException("Oh..."));
        this.kinesis.stubSubscribeToShard("shard-000", (SubscribeToShardEventStream[])TestHelpers.eventsWithoutRecords(3, 10));
        this.kinesis.stubSubscribeToShard("shard-001", new SubscribeToShardEventStream[]{TestHelpers.eventWithRecords(3)});
        this.kinesis.stubSubscribeToShard("shard-001", (SubscribeToShardEventStream[])TestHelpers.eventsWithoutRecords(3, 8));
        KinesisReaderCheckpoint initialCheckpoint = this.initialLatestCheckpoint((List<String>)ImmutableList.of((Object)"shard-000", (Object)"shard-001"));
        this.pool = new EFOShardSubscribersPool(this.readSpec, this.consumerArn, (KinesisAsyncClient)this.kinesis);
        this.pool.start((Iterable)initialCheckpoint);
        Throwable exception = Assert.assertThrows(IOException.class, () -> EFOShardSubscribersPoolTest.waitForRecords(this.pool, 20));
        Assertions.assertThat((String)exception.getMessage()).isEqualTo((Object)"java.lang.RuntimeException: Oh...");
        Assertions.assertThat((Throwable)exception.getCause()).isInstanceOf(RuntimeException.class);
        Assertions.assertThat((Iterable)this.pool.getCheckpointMark()).contains((Object[])new ShardCheckpoint[]{new ShardCheckpoint(STREAM, "shard-000", ShardIteratorType.AFTER_SEQUENCE_NUMBER, "9", Long.valueOf(0L))}).doesNotContain((Object[])new ShardCheckpoint[]{new ShardCheckpoint(STREAM, "shard-001", new StartingPoint(InitialPositionInStream.LATEST))});
    }

    @Test
    public void poolFailsWhenConsumerDoesNotExist() throws Exception {
        this.kinesis = new EFOStubbedKinesisAsyncClient(10);
        this.kinesis.stubSubscribeToShard("shard-000", new SubscribeToShardEventStream[]{TestHelpers.eventWithRecords(3)}).failWith(new CompletionException("Err ...", (Throwable)ResourceNotFoundException.builder().cause(null).awsErrorDetails(AwsErrorDetails.builder().serviceName("Kinesis").errorCode("ResourceNotFoundException").errorMessage("Consumer consumer-01 not found.").build()).build()));
        this.kinesis.stubSubscribeToShard("shard-000", new SubscribeToShardEventStream[]{TestHelpers.eventWithRecords(3, 7)});
        this.kinesis.stubSubscribeToShard("shard-000", (SubscribeToShardEventStream[])TestHelpers.eventsWithoutRecords(3, 10));
        this.kinesis.stubSubscribeToShard("shard-001", new SubscribeToShardEventStream[]{TestHelpers.eventWithRecords(3)});
        this.kinesis.stubSubscribeToShard("shard-001", (SubscribeToShardEventStream[])TestHelpers.eventsWithoutRecords(3, 8));
        KinesisReaderCheckpoint initialCheckpoint = this.initialLatestCheckpoint((List<String>)ImmutableList.of((Object)"shard-000", (Object)"shard-001"));
        this.pool = new EFOShardSubscribersPool(this.readSpec, this.consumerArn, (KinesisAsyncClient)this.kinesis);
        this.pool.start((Iterable)initialCheckpoint);
        Throwable exception = Assert.assertThrows(IOException.class, () -> EFOShardSubscribersPoolTest.waitForRecords(this.pool, 10));
        Assertions.assertThat((String)exception.getMessage()).isEqualTo((Object)"java.util.concurrent.CompletionException: Err ...");
        Throwable cause = exception.getCause().getCause();
        Assertions.assertThat((Throwable)cause).isInstanceOf(ResourceNotFoundException.class);
        Assertions.assertThat((String)cause.getMessage()).isEqualTo((Object)"Consumer consumer-01 not found. (Service: Kinesis, Status Code: 0, Request ID: null)");
    }

    @Test
    public void poolHandlesShardUp() throws Exception {
        this.kinesis = new EFOStubbedKinesisAsyncClient(10);
        this.kinesis.stubSubscribeToShard("shard-000", new SubscribeToShardEventStream[]{TestHelpers.eventWithRecords(3)});
        this.kinesis.stubSubscribeToShard("shard-000", new SubscribeToShardEventStream[]{TestHelpers.eventWithRecords(3, 7)});
        this.kinesis.stubSubscribeToShard("shard-000", (SubscribeToShardEventStream[])TestHelpers.eventsWithoutRecords(1, 10));
        this.kinesis.stubSubscribeToShard("shard-000", new SubscribeToShardEventStream[]{TestHelpers.reShardEvent((List<String>)ImmutableList.of((Object)"shard-000"), (List<String>)ImmutableList.of((Object)"shard-002", (Object)"shard-003"))});
        this.kinesis.stubSubscribeToShard("shard-002", new SubscribeToShardEventStream[]{TestHelpers.eventWithRecords(5)});
        this.kinesis.stubSubscribeToShard("shard-002", (SubscribeToShardEventStream[])TestHelpers.eventsWithoutRecords(3, 5));
        this.kinesis.stubSubscribeToShard("shard-003", new SubscribeToShardEventStream[]{TestHelpers.eventWithRecords(6)});
        this.kinesis.stubSubscribeToShard("shard-003", (SubscribeToShardEventStream[])TestHelpers.eventsWithoutRecords(3, 6));
        this.kinesis.stubSubscribeToShard("shard-001", new SubscribeToShardEventStream[]{TestHelpers.eventWithRecords(3)});
        this.kinesis.stubSubscribeToShard("shard-001", new SubscribeToShardEventStream[]{TestHelpers.eventWithRecords(3, 5)});
        this.kinesis.stubSubscribeToShard("shard-001", (SubscribeToShardEventStream[])TestHelpers.eventsWithoutRecords(1, 8));
        this.kinesis.stubSubscribeToShard("shard-001", new SubscribeToShardEventStream[]{TestHelpers.reShardEvent((List<String>)ImmutableList.of((Object)"shard-001"), (List<String>)ImmutableList.of((Object)"shard-004"))});
        this.kinesis.stubSubscribeToShard("shard-004", new SubscribeToShardEventStream[]{TestHelpers.eventWithRecords(5)});
        this.kinesis.stubSubscribeToShard("shard-004", (SubscribeToShardEventStream[])TestHelpers.eventsWithoutRecords(2, 5));
        KinesisReaderCheckpoint initialCheckpoint = this.initialLatestCheckpoint((List<String>)ImmutableList.of((Object)"shard-000", (Object)"shard-001"));
        this.pool = new EFOShardSubscribersPool(this.readSpec, this.consumerArn, (KinesisAsyncClient)this.kinesis);
        this.pool.start((Iterable)initialCheckpoint);
        List<KinesisRecord> actualRecords = EFOShardSubscribersPoolTest.waitForRecords(this.pool, 35);
        Assertions.assertThat((int)actualRecords.size()).isEqualTo(34);
        Assertions.assertThat(this.kinesis.subscribeRequestsSeen()).containsExactlyInAnyOrder((Object[])new SubscribeToShardRequest[]{TestHelpers.subscribeLatest("shard-000"), TestHelpers.subscribeLatest("shard-001"), TestHelpers.subscribeAfterSeqNumber("shard-000", "2"), TestHelpers.subscribeAfterSeqNumber("shard-001", "2"), TestHelpers.subscribeAfterSeqNumber("shard-000", "9"), TestHelpers.subscribeAfterSeqNumber("shard-001", "7"), TestHelpers.subscribeAfterSeqNumber("shard-000", "10"), TestHelpers.subscribeAfterSeqNumber("shard-001", "8"), TestHelpers.subscribeTrimHorizon("shard-002"), TestHelpers.subscribeTrimHorizon("shard-003"), TestHelpers.subscribeTrimHorizon("shard-004"), TestHelpers.subscribeAfterSeqNumber("shard-002", "4"), TestHelpers.subscribeAfterSeqNumber("shard-003", "5"), TestHelpers.subscribeAfterSeqNumber("shard-004", "4"), TestHelpers.subscribeAfterSeqNumber("shard-002", "7"), TestHelpers.subscribeAfterSeqNumber("shard-003", "8"), TestHelpers.subscribeAfterSeqNumber("shard-004", "6")});
        Assertions.assertThat((Iterator)this.pool.getCheckpointMark().iterator()).containsExactlyInAnyOrder((Object[])new ShardCheckpoint[]{new ShardCheckpoint(STREAM, "shard-002", ShardIteratorType.AFTER_SEQUENCE_NUMBER, "7", Long.valueOf(0L)), new ShardCheckpoint(STREAM, "shard-003", ShardIteratorType.AFTER_SEQUENCE_NUMBER, "8", Long.valueOf(0L)), new ShardCheckpoint(STREAM, "shard-004", ShardIteratorType.AFTER_SEQUENCE_NUMBER, "6", Long.valueOf(0L))});
    }

    @Test
    public void poolHandlesShardUpWithRecords() throws Exception {
        this.kinesis = new EFOStubbedKinesisAsyncClient(1);
        this.kinesis.stubSubscribeToShard("shard-000", new SubscribeToShardEventStream[]{TestHelpers.reShardEventWithRecords(11, 1, (List<String>)ImmutableList.of((Object)"shard-000"), (List<String>)ImmutableList.of((Object)"shard-001", (Object)"shard-002"))});
        this.kinesis.stubSubscribeToShard("shard-001", (SubscribeToShardEventStream[])TestHelpers.eventsWithoutRecords(101, 1));
        this.kinesis.stubSubscribeToShard("shard-002", (SubscribeToShardEventStream[])TestHelpers.eventsWithoutRecords(102, 1));
        KinesisReaderCheckpoint initialCheckpoint = this.initialLatestCheckpoint((List<String>)ImmutableList.of((Object)"shard-000"));
        this.pool = new EFOShardSubscribersPool(this.readSpec, this.consumerArn, (KinesisAsyncClient)this.kinesis);
        this.pool.start((Iterable)initialCheckpoint);
        List<KinesisRecord> actualRecords = EFOShardSubscribersPoolTest.waitForRecords(this.pool, 1);
        Assertions.assertThat((int)actualRecords.size()).isEqualTo(1);
        Assertions.assertThat((Iterator)this.pool.getCheckpointMark().iterator()).containsExactlyInAnyOrder((Object[])new ShardCheckpoint[]{new ShardCheckpoint(STREAM, "shard-001", ShardIteratorType.TRIM_HORIZON, null, null), new ShardCheckpoint(STREAM, "shard-002", ShardIteratorType.TRIM_HORIZON, null, null)});
        Assertions.assertThat((int)EFOShardSubscribersPoolTest.waitForRecords(this.pool, 1).size()).isEqualTo(0);
        Assertions.assertThat((Iterator)this.pool.getCheckpointMark().iterator()).containsExactlyInAnyOrder((Object[])new ShardCheckpoint[]{new ShardCheckpoint(STREAM, "shard-001", ShardIteratorType.AFTER_SEQUENCE_NUMBER, "101", Long.valueOf(0L)), new ShardCheckpoint(STREAM, "shard-002", ShardIteratorType.AFTER_SEQUENCE_NUMBER, "102", Long.valueOf(0L))});
        Assertions.assertThat(this.kinesis.subscribeRequestsSeen()).containsExactlyInAnyOrder((Object[])new SubscribeToShardRequest[]{TestHelpers.subscribeLatest("shard-000"), TestHelpers.subscribeTrimHorizon("shard-001"), TestHelpers.subscribeTrimHorizon("shard-002"), TestHelpers.subscribeAfterSeqNumber("shard-001", "101"), TestHelpers.subscribeAfterSeqNumber("shard-002", "102")});
    }

    @Test
    public void poolHandlesShardDown() throws Exception {
        this.kinesis = new EFOStubbedKinesisAsyncClient(10);
        this.kinesis.stubSubscribeToShard("shard-000", new SubscribeToShardEventStream[]{TestHelpers.eventWithRecords(3)});
        this.kinesis.stubSubscribeToShard("shard-000", new SubscribeToShardEventStream[]{TestHelpers.eventWithRecords(3, 7)});
        this.kinesis.stubSubscribeToShard("shard-000", (SubscribeToShardEventStream[])TestHelpers.eventsWithoutRecords(1, 10));
        this.kinesis.stubSubscribeToShard("shard-000", new SubscribeToShardEventStream[]{TestHelpers.reShardEvent((List<String>)ImmutableList.of((Object)"shard-000", (Object)"shard-001"), (List<String>)ImmutableList.of((Object)"shard-004"))});
        this.kinesis.stubSubscribeToShard("shard-001", new SubscribeToShardEventStream[]{TestHelpers.eventWithRecords(5)});
        this.kinesis.stubSubscribeToShard("shard-001", (SubscribeToShardEventStream[])TestHelpers.eventsWithoutRecords(2, 5));
        this.kinesis.stubSubscribeToShard("shard-001", new SubscribeToShardEventStream[]{TestHelpers.reShardEvent((List<String>)ImmutableList.of((Object)"shard-000", (Object)"shard-001"), (List<String>)ImmutableList.of((Object)"shard-004"))});
        this.kinesis.stubSubscribeToShard("shard-002", new SubscribeToShardEventStream[]{TestHelpers.eventWithRecords(5)});
        this.kinesis.stubSubscribeToShard("shard-002", (SubscribeToShardEventStream[])TestHelpers.eventsWithoutRecords(2, 5));
        this.kinesis.stubSubscribeToShard("shard-002", new SubscribeToShardEventStream[]{TestHelpers.reShardEvent((List<String>)ImmutableList.of((Object)"shard-002", (Object)"shard-003"), (List<String>)ImmutableList.of((Object)"shard-005"))});
        this.kinesis.stubSubscribeToShard("shard-003", new SubscribeToShardEventStream[]{TestHelpers.eventWithRecords(5)});
        this.kinesis.stubSubscribeToShard("shard-003", (SubscribeToShardEventStream[])TestHelpers.eventsWithoutRecords(2, 5));
        this.kinesis.stubSubscribeToShard("shard-003", new SubscribeToShardEventStream[]{TestHelpers.reShardEvent((List<String>)ImmutableList.of((Object)"shard-002", (Object)"shard-003"), (List<String>)ImmutableList.of((Object)"shard-005"))});
        this.kinesis.stubSubscribeToShard("shard-004", new SubscribeToShardEventStream[]{TestHelpers.eventWithRecords(6)});
        this.kinesis.stubSubscribeToShard("shard-004", (SubscribeToShardEventStream[])TestHelpers.eventsWithoutRecords(3, 6));
        this.kinesis.stubSubscribeToShard("shard-005", new SubscribeToShardEventStream[]{TestHelpers.eventWithRecords(6)});
        this.kinesis.stubSubscribeToShard("shard-005", (SubscribeToShardEventStream[])TestHelpers.eventsWithoutRecords(3, 6));
        KinesisReaderCheckpoint initialCheckpoint = this.initialLatestCheckpoint((List<String>)ImmutableList.of((Object)"shard-000", (Object)"shard-001", (Object)"shard-002", (Object)"shard-003"));
        this.pool = new EFOShardSubscribersPool(this.readSpec, this.consumerArn, (KinesisAsyncClient)this.kinesis);
        this.pool.start((Iterable)initialCheckpoint);
        List<KinesisRecord> actualRecords = EFOShardSubscribersPoolTest.waitForRecords(this.pool, 38);
        Assertions.assertThat((int)actualRecords.size()).isEqualTo(37);
        Assertions.assertThat(this.kinesis.subscribeRequestsSeen()).containsExactlyInAnyOrder((Object[])new SubscribeToShardRequest[]{TestHelpers.subscribeLatest("shard-000"), TestHelpers.subscribeLatest("shard-001"), TestHelpers.subscribeLatest("shard-002"), TestHelpers.subscribeLatest("shard-003"), TestHelpers.subscribeAfterSeqNumber("shard-000", "2"), TestHelpers.subscribeAfterSeqNumber("shard-001", "4"), TestHelpers.subscribeAfterSeqNumber("shard-002", "4"), TestHelpers.subscribeAfterSeqNumber("shard-003", "4"), TestHelpers.subscribeAfterSeqNumber("shard-000", "9"), TestHelpers.subscribeAfterSeqNumber("shard-001", "6"), TestHelpers.subscribeAfterSeqNumber("shard-002", "6"), TestHelpers.subscribeAfterSeqNumber("shard-003", "6"), TestHelpers.subscribeAfterSeqNumber("shard-000", "10"), TestHelpers.subscribeTrimHorizon("shard-004"), TestHelpers.subscribeTrimHorizon("shard-005"), TestHelpers.subscribeAfterSeqNumber("shard-004", "5"), TestHelpers.subscribeAfterSeqNumber("shard-005", "5"), TestHelpers.subscribeAfterSeqNumber("shard-004", "8"), TestHelpers.subscribeAfterSeqNumber("shard-005", "8")});
        Assertions.assertThat((Iterator)this.pool.getCheckpointMark().iterator()).containsExactlyInAnyOrder((Object[])new ShardCheckpoint[]{new ShardCheckpoint(STREAM, "shard-005", ShardIteratorType.AFTER_SEQUENCE_NUMBER, "8", Long.valueOf(0L)), new ShardCheckpoint(STREAM, "shard-004", ShardIteratorType.AFTER_SEQUENCE_NUMBER, "8", Long.valueOf(0L))});
    }

    @Test
    public void checkpointEqualsToInitStateIfNothingIsConsumed() {
        this.kinesis = new EFOStubbedKinesisAsyncClient(10);
        this.kinesis.stubSubscribeToShard("shard-000", new SubscribeToShardEventStream[]{TestHelpers.eventWithRecords(1)});
        this.kinesis.stubSubscribeToShard("shard-001", new SubscribeToShardEventStream[]{TestHelpers.eventWithRecords(1)});
        KinesisReaderCheckpoint initialCheckpoint = this.initialLatestCheckpoint((List<String>)ImmutableList.of((Object)"shard-000", (Object)"shard-001"));
        this.pool = new EFOShardSubscribersPool(this.readSpec, this.consumerArn, (KinesisAsyncClient)this.kinesis);
        this.pool.start((Iterable)initialCheckpoint);
        Object[] expectedShardsCheckpoints = new ShardCheckpoint[]{new ShardCheckpoint(STREAM, "shard-000", new StartingPoint(InitialPositionInStream.LATEST)), new ShardCheckpoint(STREAM, "shard-001", new StartingPoint(InitialPositionInStream.LATEST))};
        Assertions.assertThat((Iterator)initialCheckpoint.iterator()).containsExactlyInAnyOrder(expectedShardsCheckpoints);
        Assertions.assertThat((Iterator)this.pool.getCheckpointMark().iterator()).containsExactlyInAnyOrder(expectedShardsCheckpoints);
    }

    @Test
    public void poolWatermarkReturnsTsOfOldestAcknowledgedRecord() throws Exception {
        List<Record> shard000records0 = TestHelpers.recordWithMinutesAgo(5);
        List<Record> shard001records0 = TestHelpers.recordWithMinutesAgo(4);
        List<Record> shard000records1 = TestHelpers.recordWithMinutesAgo(3);
        this.kinesis = new EFOStubbedKinesisAsyncClient(10);
        this.kinesis.stubSubscribeToShard("shard-000", new SubscribeToShardEventStream[]{TestHelpers.eventWithRecords(shard000records0)});
        this.kinesis.stubSubscribeToShard("shard-001", new SubscribeToShardEventStream[]{TestHelpers.eventWithRecords(shard001records0)});
        this.kinesis.stubSubscribeToShard("shard-000", new SubscribeToShardEventStream[]{TestHelpers.eventWithRecords(shard000records1)});
        org.joda.time.Instant ts0 = TimeUtil.toJoda((Instant)shard000records0.get(0).approximateArrivalTimestamp());
        org.joda.time.Instant ts1 = TimeUtil.toJoda((Instant)shard001records0.get(0).approximateArrivalTimestamp());
        KinesisReaderCheckpoint initialCheckpoint = this.initialLatestCheckpoint((List<String>)ImmutableList.of((Object)"shard-000", (Object)"shard-001"));
        this.pool = new EFOShardSubscribersPool(this.readSpec, this.consumerArn, (KinesisAsyncClient)this.kinesis);
        this.pool.start((Iterable)initialCheckpoint);
        Assertions.assertThat((Comparable)this.pool.getWatermark()).isEqualTo((Object)BoundedWindow.TIMESTAMP_MIN_VALUE);
        Assertions.assertThat((int)EFOShardSubscribersPoolTest.waitForRecords(this.pool, 1).size()).isEqualTo(1);
        Assertions.assertThat((Comparable)this.pool.getWatermark()).isEqualTo((Object)BoundedWindow.TIMESTAMP_MIN_VALUE);
        Assertions.assertThat((int)EFOShardSubscribersPoolTest.waitForRecords(this.pool, 1).size()).isEqualTo(1);
        Assertions.assertThat((Comparable)this.pool.getWatermark()).isEqualTo((Object)ts0);
        Assertions.assertThat((int)EFOShardSubscribersPoolTest.waitForRecords(this.pool, 1).size()).isEqualTo(1);
        Assertions.assertThat((Comparable)this.pool.getWatermark()).isEqualTo((Object)ts1);
    }

    static List<KinesisRecord> waitForRecords(EFOShardSubscribersPool pool, int recordsToWaitFor) throws Exception {
        ArrayList<KinesisRecord> records = new ArrayList<KinesisRecord>(recordsToWaitFor);
        int attempts = 0;
        while (records.size() < recordsToWaitFor && attempts < 5) {
            KinesisRecord r = pool.getNextRecord();
            if (r != null) {
                records.add(r);
                attempts = 0;
                continue;
            }
            ++attempts;
            Thread.sleep(50L);
        }
        return records;
    }

    private KinesisReaderCheckpoint initialLatestCheckpoint(List<String> shardIds) {
        ArrayList<ShardCheckpoint> shardCheckpoints = new ArrayList<ShardCheckpoint>();
        for (String shardId : shardIds) {
            shardCheckpoints.add(new ShardCheckpoint(STREAM, shardId, new StartingPoint(InitialPositionInStream.LATEST)));
        }
        return new KinesisReaderCheckpoint(shardCheckpoints);
    }

    private ShardCheckpoint afterCheckpoint(String shardId, String seqNum) {
        return this.afterCheckpoint(shardId, seqNum, 0L);
    }

    private ShardCheckpoint afterCheckpoint(String shardId, String seqNum, long subSeqNum) {
        return new ShardCheckpoint(STREAM, shardId, ShardIteratorType.AFTER_SEQUENCE_NUMBER, seqNum, Long.valueOf(subSeqNum));
    }

    private ShardCheckpoint trimHorizonCheckpoint(String shardId) {
        return new ShardCheckpoint(STREAM, shardId, ShardIteratorType.TRIM_HORIZON, null);
    }

    private ShardCheckpoint tsCheckpoint(String shardId, org.joda.time.Instant ts) {
        return new ShardCheckpoint(STREAM, shardId, ShardIteratorType.AT_TIMESTAMP, ts);
    }

    private static void nothingElseShouldBeReceived(EFOShardSubscribersPool pool) throws Exception {
        Assertions.assertThat(EFOShardSubscribersPoolTest.waitForRecords(pool, 5)).hasSize(0);
    }

    private static class ShardAssertion {
        final String shardId;
        final List<KinesisRecordView> expectedRecords = new ArrayList<KinesisRecordView>();
        String expectedLastSequenceNumber;

        ShardAssertion(String shardId) {
            this.shardId = shardId;
        }

        static ShardAssertion shard(String shardId) {
            return new ShardAssertion(shardId);
        }

        ShardAssertion gives(KinesisRecordView ... records) {
            this.expectedRecords.addAll(Arrays.asList(records));
            return this;
        }

        ShardAssertion withLastCheckpointSequenceNumber(int expectedLastSequenceNumber) {
            this.expectedLastSequenceNumber = String.valueOf(expectedLastSequenceNumber);
            return this;
        }
    }

    private static class PoolAssertion {
        private final EFOShardSubscribersPool pool;

        PoolAssertion(EFOShardSubscribersPool pool) {
            this.pool = pool;
        }

        static PoolAssertion assertPool(EFOShardSubscribersPool pool) {
            return new PoolAssertion(pool);
        }

        void givesCheckPointedRecords(ShardAssertion ... shardAssertions) throws Exception {
            ArrayList<KinesisRecordView> allExpectedRecords = new ArrayList<KinesisRecordView>();
            ArrayList<ShardCheckpoint> allExpectedFinalCheckPoints = new ArrayList<ShardCheckpoint>();
            for (ShardAssertion sa : shardAssertions) {
                allExpectedRecords.addAll(sa.expectedRecords);
                if (sa.expectedRecords.size() > 0) {
                    KinesisRecordView lastRecordView = sa.expectedRecords.get(sa.expectedRecords.size() - 1);
                    allExpectedFinalCheckPoints.add(new ShardCheckpoint(EFOShardSubscribersPoolTest.STREAM, sa.shardId, ShardIteratorType.AFTER_SEQUENCE_NUMBER, sa.expectedLastSequenceNumber, Long.valueOf(lastRecordView.subSequenceNumber)));
                    continue;
                }
                allExpectedFinalCheckPoints.add(new ShardCheckpoint(EFOShardSubscribersPoolTest.STREAM, sa.shardId, ShardIteratorType.AFTER_SEQUENCE_NUMBER, sa.expectedLastSequenceNumber, Long.valueOf(0L)));
            }
            ArrayList<KinesisRecordView> actualRecords = new ArrayList<KinesisRecordView>();
            for (int i = 0; i < allExpectedRecords.size(); ++i) {
                List<KinesisRecord> kinesisRecords = EFOShardSubscribersPoolTest.waitForRecords(this.pool, 1);
                if (kinesisRecords.size() == 0) {
                    String msg = String.format("Unable to fetch %s th record", i);
                    throw new RuntimeException(msg);
                }
                KinesisRecord kinesisRecord = kinesisRecords.get(0);
                actualRecords.add(KinesisRecordView.fromKinesisRecord(kinesisRecord));
                KinesisReaderCheckpoint checkpoint = this.pool.getCheckpointMark();
                Assertions.assertThat((Iterable)checkpoint).contains((Object[])new ShardCheckpoint[]{new ShardCheckpoint(EFOShardSubscribersPoolTest.STREAM, kinesisRecord.getShardId(), ShardIteratorType.AFTER_SEQUENCE_NUMBER, kinesisRecord.getSequenceNumber(), Long.valueOf(kinesisRecord.getSubSequenceNumber()))});
            }
            EFOShardSubscribersPoolTest.nothingElseShouldBeReceived(this.pool);
            Assertions.assertThat(actualRecords).containsExactlyInAnyOrder((Object[])allExpectedRecords.toArray(new KinesisRecordView[0]));
            Assertions.assertThat((Iterable)this.pool.getCheckpointMark()).containsExactlyInAnyOrder((Object[])allExpectedFinalCheckPoints.toArray(new ShardCheckpoint[0]));
        }
    }

    private static class KinesisRecordView {
        private final String shardId;
        private final String sequenceNumber;
        private final long subSequenceNumber;

        KinesisRecordView(String shardId, String sequenceNumber, long subSequenceNumber) {
            this.shardId = shardId;
            this.sequenceNumber = sequenceNumber;
            this.subSequenceNumber = subSequenceNumber;
        }

        static KinesisRecordView fromKinesisRecord(KinesisRecord r) {
            return new KinesisRecordView(r.getShardId(), r.getSequenceNumber(), r.getSubSequenceNumber());
        }

        static KinesisRecordView[] generate(String shardId, int startingSeqNum, int cnt) {
            ArrayList<KinesisRecordView> result = new ArrayList<KinesisRecordView>();
            for (int i = startingSeqNum; i < startingSeqNum + cnt; ++i) {
                result.add(new KinesisRecordView(shardId, String.valueOf(i), 0L));
            }
            return result.toArray(new KinesisRecordView[0]);
        }

        static KinesisRecordView[] generateAggregated(String shardId, int seqNum, int cnt) {
            ArrayList<KinesisRecordView> result = new ArrayList<KinesisRecordView>();
            for (long i = 0L; i < (long)cnt; ++i) {
                result.add(new KinesisRecordView(shardId, String.valueOf(seqNum), i));
            }
            return result.toArray(new KinesisRecordView[0]);
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            KinesisRecordView that = (KinesisRecordView)o;
            return this.subSequenceNumber == that.subSequenceNumber && this.shardId.equals(that.shardId) && this.sequenceNumber.equals(that.sequenceNumber);
        }

        public int hashCode() {
            return Objects.hash(this.shardId, this.sequenceNumber, this.subSequenceNumber);
        }

        public String toString() {
            return "KinesisRecordView{shardId='" + this.shardId + '\'' + ", sequenceNumber='" + this.sequenceNumber + '\'' + ", subSequenceNumber=" + this.subSequenceNumber + '}';
        }
    }
}

