package org.apache.beam.sdk.io.aws2.kinesis;

import java.io.IOException;
import org.assertj.core.api.Assertions;
import org.joda.time.DateTimeUtils;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.After;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import software.amazon.awssdk.core.exception.SdkServiceException;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryResponse;
import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
import software.amazon.awssdk.services.kinesis.model.ListShardsResponse;
import software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardFilter;
import software.amazon.awssdk.services.kinesis.model.ShardFilterType;
import software.amazon.awssdk.services.kinesis.model.StreamDescriptionSummary;
import software.amazon.kinesis.common.InitialPositionInStream;

@RunWith(MockitoJUnitRunner.class)
/* loaded from: input_file:org/apache/beam/sdk/io/aws2/kinesis/ShardListingUtilsTest.class */
public class ShardListingUtilsTest {
    private static final String STREAM = "stream";
    private static final String SHARD_1 = "shard-01";
    private static final String SHARD_2 = "shard-02";
    private static final String SHARD_3 = "shard-03";
    private static final Instant CURRENT_TIMESTAMP = Instant.parse("2000-01-01T15:00:00.000Z");

    @Mock
    private KinesisClient kinesis;

    @After
    public void afterEach() {
        DateTimeUtils.setCurrentMillisSystem();
    }

    @Test
    public void shouldListAllShardsForTrimHorizon() throws Exception {
        Shard shard = (Shard) Shard.builder().shardId(SHARD_1).build();
        Shard shard2 = (Shard) Shard.builder().shardId(SHARD_2).build();
        Shard shard3 = (Shard) Shard.builder().shardId(SHARD_3).build();
        Mockito.when(this.kinesis.listShards((ListShardsRequest) ListShardsRequest.builder().streamName(STREAM).shardFilter((ShardFilter) ShardFilter.builder().type(ShardFilterType.AT_TRIM_HORIZON).build()).maxResults(1000).build())).thenReturn((ListShardsResponse) ListShardsResponse.builder().shards(new Shard[]{shard, shard2, shard3}).nextToken((String) null).build());
        Assertions.assertThat(ShardListingUtils.listShardsAtPoint(this.kinesis, STREAM, new StartingPoint(InitialPositionInStream.TRIM_HORIZON))).containsOnly(new Shard[]{shard, shard2, shard3});
    }

    @Test
    public void shouldListAllShardsForTrimHorizonWithPagedResults() throws Exception {
        Shard shard = (Shard) Shard.builder().shardId(SHARD_1).build();
        Shard shard2 = (Shard) Shard.builder().shardId(SHARD_2).build();
        Shard shard3 = (Shard) Shard.builder().shardId(SHARD_3).build();
        ShardFilter shardFilter = (ShardFilter) ShardFilter.builder().type(ShardFilterType.AT_TRIM_HORIZON).build();
        Mockito.when(this.kinesis.listShards((ListShardsRequest) ListShardsRequest.builder().streamName(STREAM).shardFilter(shardFilter).maxResults(1000).build())).thenReturn((ListShardsResponse) ListShardsResponse.builder().shards(new Shard[]{shard, shard2}).nextToken("testNextToken").build());
        Mockito.when(this.kinesis.listShards((ListShardsRequest) ListShardsRequest.builder().maxResults(1000).shardFilter(shardFilter).nextToken("testNextToken").build())).thenReturn((ListShardsResponse) ListShardsResponse.builder().shards(new Shard[]{shard3}).nextToken((String) null).build());
        Assertions.assertThat(ShardListingUtils.listShardsAtPoint(this.kinesis, STREAM, new StartingPoint(InitialPositionInStream.TRIM_HORIZON))).containsOnly(new Shard[]{shard, shard2, shard3});
    }

    @Test
    public void shouldListAllShardsForTimestampWithinStreamRetentionAfterStreamCreationTimestamp() throws Exception {
        Shard shard = (Shard) Shard.builder().shardId(SHARD_1).build();
        Shard shard2 = (Shard) Shard.builder().shardId(SHARD_2).build();
        Shard shard3 = (Shard) Shard.builder().shardId(SHARD_3).build();
        int i = 1 * 3;
        Instant minus = CURRENT_TIMESTAMP.minus(Duration.standardHours(i));
        Instant plus = minus.plus(Duration.standardHours(1));
        DateTimeUtils.setCurrentMillisFixed(CURRENT_TIMESTAMP.getMillis());
        Mockito.when(this.kinesis.describeStreamSummary((DescribeStreamSummaryRequest) DescribeStreamSummaryRequest.builder().streamName(STREAM).build())).thenReturn((DescribeStreamSummaryResponse) DescribeStreamSummaryResponse.builder().streamDescriptionSummary((StreamDescriptionSummary) StreamDescriptionSummary.builder().retentionPeriodHours(Integer.valueOf(i)).streamCreationTimestamp(TimeUtil.toJava(minus)).build()).build());
        Mockito.when(this.kinesis.listShards((ListShardsRequest) ListShardsRequest.builder().streamName(STREAM).shardFilter((ShardFilter) ShardFilter.builder().type(ShardFilterType.AT_TIMESTAMP).timestamp(TimeUtil.toJava(plus)).build()).maxResults(1000).build())).thenReturn((ListShardsResponse) ListShardsResponse.builder().shards(new Shard[]{shard, shard2, shard3}).nextToken((String) null).build());
        Assertions.assertThat(ShardListingUtils.listShardsAtPoint(this.kinesis, STREAM, new StartingPoint(plus))).containsOnly(new Shard[]{shard, shard2, shard3});
    }

    @Test
    public void shouldListAllShardsForTimestampWithRetriedDescribeStreamSummaryCallAfterStreamCreationTimestamp() throws IOException, InterruptedException {
        Shard shard = (Shard) Shard.builder().shardId(SHARD_1).build();
        Shard shard2 = (Shard) Shard.builder().shardId(SHARD_2).build();
        Shard shard3 = (Shard) Shard.builder().shardId(SHARD_3).build();
        int i = 1 * 3;
        Instant minus = CURRENT_TIMESTAMP.minus(Duration.standardHours(i));
        Instant plus = minus.plus(Duration.standardHours(1));
        DateTimeUtils.setCurrentMillisFixed(CURRENT_TIMESTAMP.getMillis());
        Mockito.when(this.kinesis.describeStreamSummary((DescribeStreamSummaryRequest) DescribeStreamSummaryRequest.builder().streamName(STREAM).build())).thenThrow(new Throwable[]{(Throwable) LimitExceededException.builder().message("Fake Exception: Limit exceeded").build()}).thenReturn((DescribeStreamSummaryResponse) DescribeStreamSummaryResponse.builder().streamDescriptionSummary((StreamDescriptionSummary) StreamDescriptionSummary.builder().retentionPeriodHours(Integer.valueOf(i)).streamCreationTimestamp(TimeUtil.toJava(minus)).build()).build());
        Mockito.when(this.kinesis.listShards((ListShardsRequest) ListShardsRequest.builder().streamName(STREAM).shardFilter((ShardFilter) ShardFilter.builder().type(ShardFilterType.AT_TIMESTAMP).timestamp(TimeUtil.toJava(plus)).build()).maxResults(1000).build())).thenReturn((ListShardsResponse) ListShardsResponse.builder().shards(new Shard[]{shard, shard2, shard3}).nextToken((String) null).build());
        Assertions.assertThat(ShardListingUtils.listShardsAtPoint(this.kinesis, STREAM, new StartingPoint(plus))).containsOnly(new Shard[]{shard, shard2, shard3});
    }

    @Test
    public void shouldListAllShardsForTimestampOutsideStreamRetentionAfterStreamCreationTimestamp() throws Exception {
        Shard shard = (Shard) Shard.builder().shardId(SHARD_1).build();
        Shard shard2 = (Shard) Shard.builder().shardId(SHARD_2).build();
        Shard shard3 = (Shard) Shard.builder().shardId(SHARD_3).build();
        Instant minus = CURRENT_TIMESTAMP.minus(Duration.standardHours(6));
        Instant minus2 = CURRENT_TIMESTAMP.minus(Duration.standardHours(5));
        DateTimeUtils.setCurrentMillisFixed(CURRENT_TIMESTAMP.getMillis());
        Mockito.when(this.kinesis.describeStreamSummary((DescribeStreamSummaryRequest) DescribeStreamSummaryRequest.builder().streamName(STREAM).build())).thenReturn((DescribeStreamSummaryResponse) DescribeStreamSummaryResponse.builder().streamDescriptionSummary((StreamDescriptionSummary) StreamDescriptionSummary.builder().retentionPeriodHours(3).streamCreationTimestamp(TimeUtil.toJava(minus)).build()).build());
        Mockito.when(this.kinesis.listShards((ListShardsRequest) ListShardsRequest.builder().streamName(STREAM).shardFilter((ShardFilter) ShardFilter.builder().type(ShardFilterType.AT_TRIM_HORIZON).build()).maxResults(1000).build())).thenReturn((ListShardsResponse) ListShardsResponse.builder().shards(new Shard[]{shard, shard2, shard3}).nextToken((String) null).build());
        Assertions.assertThat(ShardListingUtils.listShardsAtPoint(this.kinesis, STREAM, new StartingPoint(minus2))).containsOnly(new Shard[]{shard, shard2, shard3});
    }

    @Test
    public void shouldListAllShardsForTimestampBeforeStreamCreationTimestamp() throws Exception {
        Shard shard = (Shard) Shard.builder().shardId(SHARD_1).build();
        Shard shard2 = (Shard) Shard.builder().shardId(SHARD_2).build();
        Shard shard3 = (Shard) Shard.builder().shardId(SHARD_3).build();
        Instant parse = Instant.parse("2000-01-01T15:00:00.000Z");
        Mockito.when(this.kinesis.describeStreamSummary((DescribeStreamSummaryRequest) DescribeStreamSummaryRequest.builder().streamName(STREAM).build())).thenReturn((DescribeStreamSummaryResponse) DescribeStreamSummaryResponse.builder().streamDescriptionSummary((StreamDescriptionSummary) StreamDescriptionSummary.builder().streamCreationTimestamp(TimeUtil.toJava(parse.plus(Duration.standardHours(1L)))).build()).build());
        Mockito.when(this.kinesis.listShards((ListShardsRequest) ListShardsRequest.builder().streamName(STREAM).shardFilter((ShardFilter) ShardFilter.builder().type(ShardFilterType.AT_TRIM_HORIZON).build()).maxResults(1000).build())).thenReturn((ListShardsResponse) ListShardsResponse.builder().shards(new Shard[]{shard, shard2, shard3}).nextToken((String) null).build());
        Assertions.assertThat(ShardListingUtils.listShardsAtPoint(this.kinesis, STREAM, new StartingPoint(parse))).containsOnly(new Shard[]{shard, shard2, shard3});
    }

    @Test
    public void shouldListAllShardsForLatest() throws Exception {
        Shard shard = (Shard) Shard.builder().shardId(SHARD_1).build();
        Shard shard2 = (Shard) Shard.builder().shardId(SHARD_2).build();
        Shard shard3 = (Shard) Shard.builder().shardId(SHARD_3).build();
        Mockito.when(this.kinesis.listShards((ListShardsRequest) ListShardsRequest.builder().streamName(STREAM).shardFilter((ShardFilter) ShardFilter.builder().type(ShardFilterType.AT_LATEST).build()).maxResults(1000).build())).thenReturn((ListShardsResponse) ListShardsResponse.builder().shards(new Shard[]{shard, shard2, shard3}).nextToken((String) null).build());
        Assertions.assertThat(ShardListingUtils.listShardsAtPoint(this.kinesis, STREAM, new StartingPoint(InitialPositionInStream.LATEST))).containsOnly(new Shard[]{shard, shard2, shard3});
    }

    @Test
    public void shouldThrowLimitExceededExceptionForShardListing() {
        shouldThrowShardListingError((Exception) LimitExceededException.builder().build(), LimitExceededException.class);
    }

    @Test
    public void shouldThrowProvisionedThroughputExceededExceptionForShardListing() {
        shouldThrowShardListingError((Exception) ProvisionedThroughputExceededException.builder().build(), ProvisionedThroughputExceededException.class);
    }

    @Test
    public void shouldThrowServiceErrorForShardListing() {
        shouldThrowShardListingError(SdkServiceException.builder().statusCode(504).build(), SdkServiceException.class);
    }

    private void shouldThrowShardListingError(Exception exc, Class<? extends Exception> cls) {
        Mockito.when(this.kinesis.listShards((ListShardsRequest) ArgumentMatchers.any(ListShardsRequest.class))).thenThrow(new Throwable[]{exc});
        try {
            try {
                ShardListingUtils.listShardsAtPoint(this.kinesis, STREAM, new StartingPoint(InitialPositionInStream.TRIM_HORIZON));
                Assertions.failBecauseExceptionWasNotThrown(cls);
                Mockito.reset(new KinesisClient[]{this.kinesis});
            } catch (Exception e) {
                Assertions.assertThat(e).isExactlyInstanceOf(cls);
                Mockito.reset(new KinesisClient[]{this.kinesis});
            }
        } catch (Throwable th) {
            Mockito.reset(new KinesisClient[]{this.kinesis});
            throw th;
        }
    }
}
