/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.aws2.kinesis;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.io.aws2.kinesis.GetKinesisRecordsResult;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisClientThrottledException;
import org.apache.beam.sdk.io.aws2.kinesis.SimplifiedKinesisClient;
import org.apache.beam.sdk.io.aws2.kinesis.TimeUtil;
import org.apache.beam.sdk.io.aws2.kinesis.TransientKinesisException;
import org.assertj.core.api.Assertions;
import org.joda.time.DateTimeUtils;
import org.joda.time.Instant;
import org.joda.time.Minutes;
import org.joda.time.ReadableInstant;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.core.exception.ApiCallAttemptTimeoutException;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.core.exception.SdkServiceException;
import software.amazon.awssdk.services.cloudwatch.CloudWatchClient;
import software.amazon.awssdk.services.cloudwatch.model.Datapoint;
import software.amazon.awssdk.services.cloudwatch.model.GetMetricStatisticsRequest;
import software.amazon.awssdk.services.cloudwatch.model.GetMetricStatisticsResponse;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
import software.amazon.awssdk.services.kinesis.model.ListShardsResponse;
import software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardFilter;
import software.amazon.awssdk.services.kinesis.model.ShardFilterType;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;

@RunWith(value=MockitoJUnitRunner.class)
public class SimplifiedKinesisClientTest {
    private static final String STREAM = "stream";
    private static final String SHARD_1 = "shard-01";
    private static final String SHARD_2 = "shard-02";
    private static final String SHARD_3 = "shard-03";
    private static final String SHARD_ITERATOR = "iterator";
    private static final String SEQUENCE_NUMBER = "abc123";
    @Mock
    private KinesisClient kinesis;
    @Mock
    private CloudWatchClient cloudWatch;
    private SimplifiedKinesisClient underTest;

    @Before
    public void init() {
        this.underTest = new SimplifiedKinesisClient(() -> this.kinesis, () -> this.cloudWatch, null);
    }

    @After
    public void afterEach() {
        DateTimeUtils.setCurrentMillisSystem();
    }

    @Test
    public void shouldReturnIteratorStartingWithSequenceNumber() throws Exception {
        Mockito.when((Object)this.kinesis.getShardIterator((GetShardIteratorRequest)GetShardIteratorRequest.builder().streamName(STREAM).shardId(SHARD_1).shardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER).startingSequenceNumber(SEQUENCE_NUMBER).build())).thenReturn((Object)((GetShardIteratorResponse)GetShardIteratorResponse.builder().shardIterator(SHARD_ITERATOR).build()));
        String stream = this.underTest.getShardIterator(STREAM, SHARD_1, ShardIteratorType.AT_SEQUENCE_NUMBER, SEQUENCE_NUMBER, null);
        Assertions.assertThat((String)stream).isEqualTo((Object)SHARD_ITERATOR);
        this.underTest.close();
        ((KinesisClient)Mockito.verify((Object)this.kinesis)).close();
    }

    @Test
    public void shouldReturnIteratorStartingtimestamp() throws Exception {
        Instant timestamp = Instant.now();
        Mockito.when((Object)this.kinesis.getShardIterator((GetShardIteratorRequest)GetShardIteratorRequest.builder().streamName(STREAM).shardId(SHARD_1).shardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER).timestamp(TimeUtil.toJava((Instant)timestamp)).build())).thenReturn((Object)((GetShardIteratorResponse)GetShardIteratorResponse.builder().shardIterator(SHARD_ITERATOR).build()));
        String stream = this.underTest.getShardIterator(STREAM, SHARD_1, ShardIteratorType.AT_SEQUENCE_NUMBER, null, timestamp);
        Assertions.assertThat((String)stream).isEqualTo((Object)SHARD_ITERATOR);
        this.underTest.close();
        ((KinesisClient)Mockito.verify((Object)this.kinesis)).close();
    }

    @Test
    public void shouldHandleExpiredIterationExceptionForGetShardIterator() {
        this.shouldHandleGetShardIteratorError((Exception)ExpiredIteratorException.builder().build(), ExpiredIteratorException.class);
    }

    @Test
    public void shouldHandleLimitExceededExceptionForGetShardIterator() {
        this.shouldHandleGetShardIteratorError((Exception)LimitExceededException.builder().build(), KinesisClientThrottledException.class);
    }

    @Test
    public void shouldHandleProvisionedThroughputExceededExceptionForGetShardIterator() {
        this.shouldHandleGetShardIteratorError((Exception)ProvisionedThroughputExceededException.builder().build(), KinesisClientThrottledException.class);
    }

    @Test
    public void shouldHandleServiceErrorForGetShardIterator() {
        this.shouldHandleGetShardIteratorError((Exception)SdkServiceException.builder().statusCode(500).build(), TransientKinesisException.class);
    }

    @Test
    public void shouldHandleRetryableClientErrorForGetShardIterator() {
        this.shouldHandleGetShardIteratorError((Exception)ApiCallAttemptTimeoutException.builder().build(), TransientKinesisException.class);
    }

    @Test
    public void shouldHandleNotRetryableClientErrorForGetShardIterator() {
        this.shouldHandleGetShardIteratorError((Exception)SdkClientException.builder().build(), SdkClientException.class);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void shouldHandleGetShardIteratorError(Exception thrownException, Class<? extends Exception> expectedExceptionClass) {
        GetShardIteratorRequest request = (GetShardIteratorRequest)GetShardIteratorRequest.builder().streamName(STREAM).shardId(SHARD_1).shardIteratorType(ShardIteratorType.LATEST).build();
        Mockito.when((Object)this.kinesis.getShardIterator(request)).thenThrow(new Throwable[]{thrownException});
        try {
            this.underTest.getShardIterator(STREAM, SHARD_1, ShardIteratorType.LATEST, null, null);
            Assertions.failBecauseExceptionWasNotThrown(expectedExceptionClass);
        }
        catch (Exception e) {
            try {
                Assertions.assertThat((Throwable)e).isExactlyInstanceOf(expectedExceptionClass);
            }
            catch (Throwable throwable) {
                Mockito.reset((Object[])new KinesisClient[]{this.kinesis});
                throw throwable;
            }
            Mockito.reset((Object[])new KinesisClient[]{this.kinesis});
        }
        Mockito.reset((Object[])new KinesisClient[]{this.kinesis});
    }

    @Test
    public void shouldListAllShardsForExclusiveStartShardId() throws Exception {
        Shard shard1 = (Shard)Shard.builder().shardId(SHARD_1).build();
        Shard shard2 = (Shard)Shard.builder().shardId(SHARD_2).build();
        Shard shard3 = (Shard)Shard.builder().shardId(SHARD_3).build();
        String exclusiveStartShardId = "exclusiveStartShardId";
        Mockito.when((Object)this.kinesis.listShards((ListShardsRequest)ListShardsRequest.builder().streamName(STREAM).maxResults(Integer.valueOf(1000)).shardFilter((ShardFilter)ShardFilter.builder().type(ShardFilterType.AFTER_SHARD_ID).shardId(exclusiveStartShardId).build()).build())).thenReturn((Object)((ListShardsResponse)ListShardsResponse.builder().shards(new Shard[]{shard1, shard2, shard3}).nextToken(null).build()));
        List shards = this.underTest.listShardsFollowingClosedShard(STREAM, exclusiveStartShardId);
        Assertions.assertThat((List)shards).containsOnly((Object[])new Shard[]{shard1, shard2, shard3});
    }

    @Test
    public void shouldHandleExpiredIterationExceptionForShardListing() {
        this.shouldHandleShardListingError((Exception)ExpiredIteratorException.builder().build(), ExpiredIteratorException.class);
    }

    @Test
    public void shouldHandleLimitExceededExceptionForShardListing() {
        this.shouldHandleShardListingError((Exception)LimitExceededException.builder().build(), KinesisClientThrottledException.class);
    }

    @Test
    public void shouldHandleProvisionedThroughputExceededExceptionForShardListing() {
        this.shouldHandleShardListingError((Exception)ProvisionedThroughputExceededException.builder().build(), KinesisClientThrottledException.class);
    }

    @Test
    public void shouldHandleServiceErrorForShardListing() {
        this.shouldHandleShardListingError((Exception)SdkServiceException.builder().statusCode(504).build(), TransientKinesisException.class);
    }

    @Test
    public void shouldHandleRetryableClientErrorForShardListing() {
        this.shouldHandleShardListingError((Exception)ApiCallAttemptTimeoutException.builder().build(), TransientKinesisException.class);
    }

    @Test
    public void shouldHandleUnexpectedExceptionForShardListing() {
        this.shouldHandleShardListingError(new NullPointerException(), RuntimeException.class);
    }

    @Test
    public void shouldCountBytesWhenSingleDataPointReturned() throws Exception {
        Instant countSince = new Instant((Object)"2017-04-06T10:00:00.000Z");
        Instant countTo = new Instant((Object)"2017-04-06T11:00:00.000Z");
        Minutes periodTime = Minutes.minutesBetween((ReadableInstant)countSince, (ReadableInstant)countTo);
        GetMetricStatisticsRequest metricStatisticsRequest = this.underTest.createMetricStatisticsRequest(STREAM, countSince, countTo, periodTime);
        GetMetricStatisticsResponse result = (GetMetricStatisticsResponse)GetMetricStatisticsResponse.builder().datapoints(new Datapoint[]{(Datapoint)Datapoint.builder().sum(Double.valueOf(1.0)).build()}).build();
        Mockito.when((Object)this.cloudWatch.getMetricStatistics(metricStatisticsRequest)).thenReturn((Object)result);
        long backlogBytes = this.underTest.getBacklogBytes(STREAM, countSince, countTo);
        Assertions.assertThat((long)backlogBytes).isEqualTo(1L);
        this.underTest.close();
        ((CloudWatchClient)Mockito.verify((Object)this.cloudWatch)).close();
    }

    @Test
    public void shouldCountBytesWhenMultipleDataPointsReturned() throws Exception {
        Instant countSince = new Instant((Object)"2017-04-06T10:00:00.000Z");
        Instant countTo = new Instant((Object)"2017-04-06T11:00:00.000Z");
        Minutes periodTime = Minutes.minutesBetween((ReadableInstant)countSince, (ReadableInstant)countTo);
        GetMetricStatisticsRequest metricStatisticsRequest = this.underTest.createMetricStatisticsRequest(STREAM, countSince, countTo, periodTime);
        GetMetricStatisticsResponse result = (GetMetricStatisticsResponse)GetMetricStatisticsResponse.builder().datapoints(new Datapoint[]{(Datapoint)Datapoint.builder().sum(Double.valueOf(1.0)).build(), (Datapoint)Datapoint.builder().sum(Double.valueOf(3.0)).build(), (Datapoint)Datapoint.builder().sum(Double.valueOf(2.0)).build()}).build();
        Mockito.when((Object)this.cloudWatch.getMetricStatistics(metricStatisticsRequest)).thenReturn((Object)result);
        long backlogBytes = this.underTest.getBacklogBytes(STREAM, countSince, countTo);
        Assertions.assertThat((long)backlogBytes).isEqualTo(6L);
    }

    @Test
    public void shouldNotCallCloudWatchWhenSpecifiedPeriodTooShort() throws Exception {
        Instant countSince = new Instant((Object)"2017-04-06T10:00:00.000Z");
        Instant countTo = new Instant((Object)"2017-04-06T10:00:02.000Z");
        long backlogBytes = this.underTest.getBacklogBytes(STREAM, countSince, countTo);
        Assertions.assertThat((long)backlogBytes).isEqualTo(0L);
        Mockito.verifyNoInteractions((Object[])new Object[]{this.cloudWatch});
    }

    @Test
    public void shouldHandleLimitExceededExceptionForGetBacklogBytes() {
        this.shouldHandleGetBacklogBytesError((Exception)LimitExceededException.builder().build(), KinesisClientThrottledException.class);
    }

    @Test
    public void shouldHandleProvisionedThroughputExceededExceptionForGetBacklogBytes() {
        this.shouldHandleGetBacklogBytesError((Exception)ProvisionedThroughputExceededException.builder().build(), KinesisClientThrottledException.class);
    }

    @Test
    public void shouldHandleServiceErrorForGetBacklogBytes() {
        this.shouldHandleGetBacklogBytesError((Exception)SdkServiceException.builder().statusCode(503).build(), TransientKinesisException.class);
    }

    @Test
    public void shouldHandleRetryableClientErrorForGetBacklogBytes() {
        this.shouldHandleGetBacklogBytesError((Exception)ApiCallAttemptTimeoutException.builder().build(), TransientKinesisException.class);
    }

    @Test
    public void shouldHandleNotRetryableClientErrorForGetBacklogBytes() {
        this.shouldHandleGetBacklogBytesError((Exception)SdkClientException.builder().build(), SdkClientException.class);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void shouldHandleGetBacklogBytesError(Exception thrownException, Class<? extends Exception> expectedExceptionClass) {
        Instant countSince = new Instant((Object)"2017-04-06T10:00:00.000Z");
        Instant countTo = new Instant((Object)"2017-04-06T11:00:00.000Z");
        Minutes periodTime = Minutes.minutesBetween((ReadableInstant)countSince, (ReadableInstant)countTo);
        GetMetricStatisticsRequest metricStatisticsRequest = this.underTest.createMetricStatisticsRequest(STREAM, countSince, countTo, periodTime);
        Mockito.when((Object)this.cloudWatch.getMetricStatistics(metricStatisticsRequest)).thenThrow(new Throwable[]{thrownException});
        try {
            this.underTest.getBacklogBytes(STREAM, countSince, countTo);
            Assertions.failBecauseExceptionWasNotThrown(expectedExceptionClass);
        }
        catch (Exception e) {
            try {
                Assertions.assertThat((Throwable)e).isExactlyInstanceOf(expectedExceptionClass);
            }
            catch (Throwable throwable) {
                Mockito.reset((Object[])new KinesisClient[]{this.kinesis});
                throw throwable;
            }
            Mockito.reset((Object[])new KinesisClient[]{this.kinesis});
        }
        Mockito.reset((Object[])new KinesisClient[]{this.kinesis});
    }

    @Test
    public void shouldReturnLimitedNumberOfRecords() throws Exception {
        Integer limit = 100;
        ((KinesisClient)Mockito.doAnswer(invocation -> {
            GetRecordsRequest request = (GetRecordsRequest)invocation.getArguments()[0];
            List<Record> records = this.generateRecords(request.limit());
            return (GetRecordsResponse)GetRecordsResponse.builder().records(records).millisBehindLatest(Long.valueOf(1000L)).build();
        }).when((Object)this.kinesis)).getRecords((GetRecordsRequest)Matchers.any(GetRecordsRequest.class));
        GetKinesisRecordsResult result = this.underTest.getRecords(SHARD_ITERATOR, STREAM, SHARD_1, limit);
        Assertions.assertThat((int)result.getRecords().size()).isEqualTo((Object)limit);
        this.underTest.close();
        ((KinesisClient)Mockito.verify((Object)this.kinesis)).close();
    }

    private List<Record> generateRecords(int num) {
        ArrayList<Record> records = new ArrayList<Record>();
        for (int i = 0; i < num; ++i) {
            byte[] value = new byte[1024];
            Arrays.fill(value, (byte)i);
            records.add((Record)Record.builder().sequenceNumber(String.valueOf(i)).partitionKey("key").data(SdkBytes.fromByteBuffer((ByteBuffer)ByteBuffer.wrap(value))).build());
        }
        return records;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void shouldHandleShardListingError(Exception thrownException, Class<? extends Exception> expectedExceptionClass) {
        Mockito.when((Object)this.kinesis.listShards((ListShardsRequest)Matchers.any(ListShardsRequest.class))).thenThrow(new Throwable[]{thrownException});
        try {
            this.underTest.listShardsFollowingClosedShard(STREAM, "some-shard-0123");
            Assertions.failBecauseExceptionWasNotThrown(expectedExceptionClass);
        }
        catch (Exception e) {
            try {
                Assertions.assertThat((Throwable)e).isExactlyInstanceOf(expectedExceptionClass);
            }
            catch (Throwable throwable) {
                Mockito.reset((Object[])new KinesisClient[]{this.kinesis});
                throw throwable;
            }
            Mockito.reset((Object[])new KinesisClient[]{this.kinesis});
        }
        Mockito.reset((Object[])new KinesisClient[]{this.kinesis});
    }
}

