package org.apache.beam.sdk.io.kinesis;

import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.apache.beam.repackaged.beam_sdks_java_io_kinesis.com.google.common.collect.ImmutableList;
import org.assertj.core.api.Assertions;
import org.joda.time.Instant;
import org.junit.Test;
import org.mockito.BDDMockito;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/beam/sdk/io/kinesis/StartingPointShardsFinderTest.class */
public class StartingPointShardsFinderTest {
    private static final String STREAM_NAME = "streamName";
    private SimplifiedKinesisClient kinesis = (SimplifiedKinesisClient) Mockito.mock(SimplifiedKinesisClient.class);
    private final Shard shard00 = createClosedShard("0000");
    private final Shard shard01 = createClosedShard("0001");
    private final Shard shard02 = createClosedShard("0002").withParentShardId("0000");
    private final Shard shard03 = createClosedShard("0003").withParentShardId("0000");
    private final Shard shard04 = createClosedShard("0004").withParentShardId("0001");
    private final Shard shard05 = createClosedShard("0005").withParentShardId("0001");
    private final Shard shard06 = createClosedShard("0006").withParentShardId("0003").withAdjacentParentShardId("0004");
    private final Shard shard07 = createClosedShard("0007").withParentShardId("0006");
    private final Shard shard08 = createClosedShard("0008").withParentShardId("0006");
    private final Shard shard09 = createOpenShard("0009").withParentShardId("0002").withAdjacentParentShardId("0007");
    private final Shard shard10 = createOpenShard("0010").withParentShardId("0008").withAdjacentParentShardId("0005");
    private final List<Shard> allShards = ImmutableList.of(this.shard00, this.shard01, this.shard02, this.shard03, this.shard04, this.shard05, this.shard06, this.shard07, this.shard08, this.shard09, this.shard10);
    private StartingPointShardsFinder underTest = new StartingPointShardsFinder();

    @Test
    public void shouldFindFirstShardsWhenAllShardsAreValid() throws Exception {
        Instant instant = new Instant();
        StartingPoint startingPoint = new StartingPoint(instant);
        Iterator<Shard> it = this.allShards.iterator();
        while (it.hasNext()) {
            activeAtTimestamp(it.next(), instant);
        }
        BDDMockito.given(this.kinesis.listShards(STREAM_NAME)).willReturn(this.allShards);
        Assertions.assertThat(this.underTest.findShardsAtStartingPoint(this.kinesis, STREAM_NAME, startingPoint)).containsExactlyInAnyOrder(new Shard[]{this.shard00, this.shard01});
    }

    @Test
    public void shouldFind3StartingShardsInTheMiddle() throws Exception {
        Instant instant = new Instant();
        StartingPoint startingPoint = new StartingPoint(instant);
        expiredAtTimestamp(this.shard00, instant);
        expiredAtTimestamp(this.shard01, instant);
        activeAtTimestamp(this.shard02, instant);
        expiredAtTimestamp(this.shard03, instant);
        expiredAtTimestamp(this.shard04, instant);
        activeAtTimestamp(this.shard05, instant);
        activeAtTimestamp(this.shard06, instant);
        activeAtTimestamp(this.shard07, instant);
        activeAtTimestamp(this.shard08, instant);
        activeAtTimestamp(this.shard09, instant);
        activeAtTimestamp(this.shard10, instant);
        BDDMockito.given(this.kinesis.listShards(STREAM_NAME)).willReturn(this.allShards);
        Assertions.assertThat(this.underTest.findShardsAtStartingPoint(this.kinesis, STREAM_NAME, startingPoint)).containsExactlyInAnyOrder(new Shard[]{this.shard02, this.shard05, this.shard06});
    }

    @Test
    public void shouldFindLastShardWhenAllPreviousExpired() throws Exception {
        Instant instant = new Instant();
        StartingPoint startingPoint = new StartingPoint(instant);
        expiredAtTimestamp(this.shard00, instant);
        expiredAtTimestamp(this.shard01, instant);
        expiredAtTimestamp(this.shard02, instant);
        expiredAtTimestamp(this.shard03, instant);
        expiredAtTimestamp(this.shard04, instant);
        expiredAtTimestamp(this.shard05, instant);
        expiredAtTimestamp(this.shard06, instant);
        expiredAtTimestamp(this.shard07, instant);
        expiredAtTimestamp(this.shard08, instant);
        activeAtTimestamp(this.shard09, instant);
        activeAtTimestamp(this.shard10, instant);
        BDDMockito.given(this.kinesis.listShards(STREAM_NAME)).willReturn(this.allShards);
        Assertions.assertThat(this.underTest.findShardsAtStartingPoint(this.kinesis, STREAM_NAME, startingPoint)).containsExactlyInAnyOrder(new Shard[]{this.shard09, this.shard10});
    }

    @Test
    public void shouldFindLastShardsWhenLatestStartingPointRequested() throws Exception {
        StartingPoint startingPoint = new StartingPoint(InitialPositionInStream.LATEST);
        BDDMockito.given(this.kinesis.listShards(STREAM_NAME)).willReturn(this.allShards);
        Assertions.assertThat(this.underTest.findShardsAtStartingPoint(this.kinesis, STREAM_NAME, startingPoint)).containsExactlyInAnyOrder(new Shard[]{this.shard09, this.shard10});
    }

    @Test
    public void shouldFindEarliestShardsWhenTrimHorizonStartingPointRequested() throws Exception {
        StartingPoint startingPoint = new StartingPoint(InitialPositionInStream.TRIM_HORIZON);
        BDDMockito.given(this.kinesis.listShards(STREAM_NAME)).willReturn(this.allShards);
        Assertions.assertThat(this.underTest.findShardsAtStartingPoint(this.kinesis, STREAM_NAME, startingPoint)).containsExactlyInAnyOrder(new Shard[]{this.shard00, this.shard01});
    }

    @Test(expected = IllegalStateException.class)
    public void shouldThrowExceptionWhenSuccessorsNotFoundForExpiredShard() throws Exception {
        StartingPoint startingPoint = new StartingPoint(InitialPositionInStream.LATEST);
        BDDMockito.given(this.kinesis.listShards(STREAM_NAME)).willReturn(ImmutableList.of(this.shard00, this.shard01, this.shard02, this.shard03, this.shard04, this.shard05, this.shard06, this.shard07, this.shard08, this.shard09, createClosedShard("0010").withParentShardId("0008").withAdjacentParentShardId("0005")));
        this.underTest.findShardsAtStartingPoint(this.kinesis, STREAM_NAME, startingPoint);
    }

    private Shard createClosedShard(String str) {
        Shard withShardId = new Shard().withShardId(str);
        activeAtPoint(withShardId, ShardIteratorType.TRIM_HORIZON);
        expiredAtPoint(withShardId, ShardIteratorType.LATEST);
        return withShardId;
    }

    private Shard createOpenShard(String str) {
        Shard withShardId = new Shard().withShardId(str);
        activeAtPoint(withShardId, ShardIteratorType.TRIM_HORIZON);
        activeAtPoint(withShardId, ShardIteratorType.LATEST);
        return withShardId;
    }

    private void expiredAtTimestamp(Shard shard, Instant instant) {
        prepareShard(shard, null, ShardIteratorType.AT_TIMESTAMP, instant);
    }

    private void expiredAtPoint(Shard shard, ShardIteratorType shardIteratorType) {
        prepareShard(shard, null, shardIteratorType, null);
    }

    private void activeAtTimestamp(Shard shard, Instant instant) {
        prepareShard(shard, "timestampIterator-" + shard.getShardId(), ShardIteratorType.AT_TIMESTAMP, instant);
    }

    private void activeAtPoint(Shard shard, ShardIteratorType shardIteratorType) {
        prepareShard(shard, shardIteratorType.toString() + shard.getShardId(), shardIteratorType, null);
    }

    private void prepareShard(Shard shard, String str, ShardIteratorType shardIteratorType, Instant instant) {
        try {
            String str2 = shardIteratorType + shard.getShardId() + "-current";
            if (shardIteratorType == ShardIteratorType.AT_TIMESTAMP) {
                BDDMockito.given(this.kinesis.getShardIterator(STREAM_NAME, shard.getShardId(), ShardIteratorType.AT_TIMESTAMP, (String) null, instant)).willReturn(str2);
            } else {
                BDDMockito.given(this.kinesis.getShardIterator(STREAM_NAME, shard.getShardId(), shardIteratorType, (String) null, (Instant) null)).willReturn(str2);
            }
            BDDMockito.given(this.kinesis.getRecords(str2, STREAM_NAME, shard.getShardId())).willReturn(new GetKinesisRecordsResult(Collections.emptyList(), str, 0L, STREAM_NAME, shard.getShardId()));
        } catch (TransientKinesisException e) {
            throw new RuntimeException((Throwable) e);
        }
    }
}
