package org.apache.beam.sdk.io.kinesis;

import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
import com.amazonaws.services.cloudwatch.model.Datapoint;
import com.amazonaws.services.cloudwatch.model.GetMetricStatisticsResult;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.model.DescribeStreamResult;
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import com.amazonaws.services.kinesis.model.GetRecordsRequest;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
import com.amazonaws.services.kinesis.model.LimitExceededException;
import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
import com.amazonaws.services.kinesis.model.StreamDescription;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.assertj.core.api.Assertions;
import org.joda.time.Instant;
import org.joda.time.Minutes;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
/* loaded from: input_file:org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.class */
public class SimplifiedKinesisClientTest {
    private static final String STREAM = "stream";
    private static final String SHARD_1 = "shard-01";
    private static final String SHARD_2 = "shard-02";
    private static final String SHARD_3 = "shard-03";
    private static final String SHARD_ITERATOR = "iterator";
    private static final String SEQUENCE_NUMBER = "abc123";

    @Mock
    private AmazonKinesis kinesis;

    @Mock
    private AmazonCloudWatch cloudWatch;

    @InjectMocks
    private SimplifiedKinesisClient underTest;

    @Test
    public void shouldReturnIteratorStartingWithSequenceNumber() throws Exception {
        Mockito.when(this.kinesis.getShardIterator(new GetShardIteratorRequest().withStreamName(STREAM).withShardId(SHARD_1).withShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER).withStartingSequenceNumber(SEQUENCE_NUMBER))).thenReturn(new GetShardIteratorResult().withShardIterator(SHARD_ITERATOR));
        Assertions.assertThat(this.underTest.getShardIterator(STREAM, SHARD_1, ShardIteratorType.AT_SEQUENCE_NUMBER, SEQUENCE_NUMBER, (Instant) null)).isEqualTo(SHARD_ITERATOR);
    }

    @Test
    public void shouldReturnIteratorStartingWithTimestamp() throws Exception {
        Instant now = Instant.now();
        Mockito.when(this.kinesis.getShardIterator(new GetShardIteratorRequest().withStreamName(STREAM).withShardId(SHARD_1).withShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER).withTimestamp(now.toDate()))).thenReturn(new GetShardIteratorResult().withShardIterator(SHARD_ITERATOR));
        Assertions.assertThat(this.underTest.getShardIterator(STREAM, SHARD_1, ShardIteratorType.AT_SEQUENCE_NUMBER, (String) null, now)).isEqualTo(SHARD_ITERATOR);
    }

    @Test
    public void shouldHandleExpiredIterationExceptionForGetShardIterator() {
        shouldHandleGetShardIteratorError(new ExpiredIteratorException(""), ExpiredIteratorException.class);
    }

    @Test
    public void shouldHandleLimitExceededExceptionForGetShardIterator() {
        shouldHandleGetShardIteratorError(new LimitExceededException(""), KinesisClientThrottledException.class);
    }

    @Test
    public void shouldHandleProvisionedThroughputExceededExceptionForGetShardIterator() {
        shouldHandleGetShardIteratorError(new ProvisionedThroughputExceededException(""), KinesisClientThrottledException.class);
    }

    @Test
    public void shouldHandleServiceErrorForGetShardIterator() {
        shouldHandleGetShardIteratorError(newAmazonServiceException(AmazonServiceException.ErrorType.Service), TransientKinesisException.class);
    }

    @Test
    public void shouldHandleClientErrorForGetShardIterator() {
        shouldHandleGetShardIteratorError(newAmazonServiceException(AmazonServiceException.ErrorType.Client), RuntimeException.class);
    }

    @Test
    public void shouldHandleUnexpectedExceptionForGetShardIterator() {
        shouldHandleGetShardIteratorError(new NullPointerException(), RuntimeException.class);
    }

    private void shouldHandleGetShardIteratorError(Exception exc, Class<? extends Exception> cls) {
        Mockito.when(this.kinesis.getShardIterator(new GetShardIteratorRequest().withStreamName(STREAM).withShardId(SHARD_1).withShardIteratorType(ShardIteratorType.LATEST))).thenThrow(new Throwable[]{exc});
        try {
            try {
                this.underTest.getShardIterator(STREAM, SHARD_1, ShardIteratorType.LATEST, (String) null, (Instant) null);
                Assertions.failBecauseExceptionWasNotThrown(cls);
                Mockito.reset(new AmazonKinesis[]{this.kinesis});
            } catch (Exception e) {
                Assertions.assertThat(e).isExactlyInstanceOf(cls);
                Mockito.reset(new AmazonKinesis[]{this.kinesis});
            }
        } catch (Throwable th) {
            Mockito.reset(new AmazonKinesis[]{this.kinesis});
            throw th;
        }
    }

    @Test
    public void shouldListAllShards() throws Exception {
        Shard withShardId = new Shard().withShardId(SHARD_1);
        Shard withShardId2 = new Shard().withShardId(SHARD_2);
        Shard withShardId3 = new Shard().withShardId(SHARD_3);
        Mockito.when(this.kinesis.describeStream(STREAM, (String) null)).thenReturn(new DescribeStreamResult().withStreamDescription(new StreamDescription().withShards(new Shard[]{withShardId, withShardId2}).withHasMoreShards(true)));
        Mockito.when(this.kinesis.describeStream(STREAM, SHARD_2)).thenReturn(new DescribeStreamResult().withStreamDescription(new StreamDescription().withShards(new Shard[]{withShardId3}).withHasMoreShards(false)));
        Assertions.assertThat(this.underTest.listShards(STREAM)).containsOnly(new Shard[]{withShardId, withShardId2, withShardId3});
    }

    @Test
    public void shouldHandleExpiredIterationExceptionForShardListing() {
        shouldHandleShardListingError(new ExpiredIteratorException(""), ExpiredIteratorException.class);
    }

    @Test
    public void shouldHandleLimitExceededExceptionForShardListing() {
        shouldHandleShardListingError(new LimitExceededException(""), KinesisClientThrottledException.class);
    }

    @Test
    public void shouldHandleProvisionedThroughputExceededExceptionForShardListing() {
        shouldHandleShardListingError(new ProvisionedThroughputExceededException(""), KinesisClientThrottledException.class);
    }

    @Test
    public void shouldHandleServiceErrorForShardListing() {
        shouldHandleShardListingError(newAmazonServiceException(AmazonServiceException.ErrorType.Service), TransientKinesisException.class);
    }

    @Test
    public void shouldHandleClientErrorForShardListing() {
        shouldHandleShardListingError(newAmazonServiceException(AmazonServiceException.ErrorType.Client), RuntimeException.class);
    }

    @Test
    public void shouldHandleUnexpectedExceptionForShardListing() {
        shouldHandleShardListingError(new NullPointerException(), RuntimeException.class);
    }

    private void shouldHandleShardListingError(Exception exc, Class<? extends Exception> cls) {
        Mockito.when(this.kinesis.describeStream(STREAM, (String) null)).thenThrow(new Throwable[]{exc});
        try {
            try {
                this.underTest.listShards(STREAM);
                Assertions.failBecauseExceptionWasNotThrown(cls);
                Mockito.reset(new AmazonKinesis[]{this.kinesis});
            } catch (Exception e) {
                Assertions.assertThat(e).isExactlyInstanceOf(cls);
                Mockito.reset(new AmazonKinesis[]{this.kinesis});
            }
        } catch (Throwable th) {
            Mockito.reset(new AmazonKinesis[]{this.kinesis});
            throw th;
        }
    }

    @Test
    public void shouldCountBytesWhenSingleDataPointReturned() throws Exception {
        Instant instant = new Instant("2017-04-06T10:00:00.000Z");
        Instant instant2 = new Instant("2017-04-06T11:00:00.000Z");
        Mockito.when(this.cloudWatch.getMetricStatistics(this.underTest.createMetricStatisticsRequest(STREAM, instant, instant2, Minutes.minutesBetween(instant, instant2)))).thenReturn(new GetMetricStatisticsResult().withDatapoints(new Datapoint[]{new Datapoint().withSum(Double.valueOf(1.0d))}));
        Assertions.assertThat(this.underTest.getBacklogBytes(STREAM, instant, instant2)).isEqualTo(1L);
    }

    @Test
    public void shouldCountBytesWhenMultipleDataPointsReturned() throws Exception {
        Instant instant = new Instant("2017-04-06T10:00:00.000Z");
        Instant instant2 = new Instant("2017-04-06T11:00:00.000Z");
        Mockito.when(this.cloudWatch.getMetricStatistics(this.underTest.createMetricStatisticsRequest(STREAM, instant, instant2, Minutes.minutesBetween(instant, instant2)))).thenReturn(new GetMetricStatisticsResult().withDatapoints(new Datapoint[]{new Datapoint().withSum(Double.valueOf(1.0d)), new Datapoint().withSum(Double.valueOf(3.0d)), new Datapoint().withSum(Double.valueOf(2.0d))}));
        Assertions.assertThat(this.underTest.getBacklogBytes(STREAM, instant, instant2)).isEqualTo(6L);
    }

    @Test
    public void shouldNotCallCloudWatchWhenSpecifiedPeriodTooShort() throws Exception {
        Assertions.assertThat(this.underTest.getBacklogBytes(STREAM, new Instant("2017-04-06T10:00:00.000Z"), new Instant("2017-04-06T10:00:02.000Z"))).isEqualTo(0L);
        Mockito.verifyZeroInteractions(new Object[]{this.cloudWatch});
    }

    @Test
    public void shouldHandleLimitExceededExceptionForGetBacklogBytes() {
        shouldHandleGetBacklogBytesError(new LimitExceededException(""), KinesisClientThrottledException.class);
    }

    @Test
    public void shouldHandleProvisionedThroughputExceededExceptionForGetBacklogBytes() {
        shouldHandleGetBacklogBytesError(new ProvisionedThroughputExceededException(""), KinesisClientThrottledException.class);
    }

    @Test
    public void shouldHandleServiceErrorForGetBacklogBytes() {
        shouldHandleGetBacklogBytesError(newAmazonServiceException(AmazonServiceException.ErrorType.Service), TransientKinesisException.class);
    }

    @Test
    public void shouldHandleClientErrorForGetBacklogBytes() {
        shouldHandleGetBacklogBytesError(newAmazonServiceException(AmazonServiceException.ErrorType.Client), RuntimeException.class);
    }

    @Test
    public void shouldHandleUnexpectedExceptionForGetBacklogBytes() {
        shouldHandleGetBacklogBytesError(new NullPointerException(), RuntimeException.class);
    }

    private void shouldHandleGetBacklogBytesError(Exception exc, Class<? extends Exception> cls) {
        Instant instant = new Instant("2017-04-06T10:00:00.000Z");
        Instant instant2 = new Instant("2017-04-06T11:00:00.000Z");
        Mockito.when(this.cloudWatch.getMetricStatistics(this.underTest.createMetricStatisticsRequest(STREAM, instant, instant2, Minutes.minutesBetween(instant, instant2)))).thenThrow(new Throwable[]{exc});
        try {
            try {
                this.underTest.getBacklogBytes(STREAM, instant, instant2);
                Assertions.failBecauseExceptionWasNotThrown(cls);
                Mockito.reset(new AmazonKinesis[]{this.kinesis});
            } catch (Exception e) {
                Assertions.assertThat(e).isExactlyInstanceOf(cls);
                Mockito.reset(new AmazonKinesis[]{this.kinesis});
            }
        } catch (Throwable th) {
            Mockito.reset(new AmazonKinesis[]{this.kinesis});
            throw th;
        }
    }

    private AmazonServiceException newAmazonServiceException(AmazonServiceException.ErrorType errorType) {
        AmazonServiceException amazonServiceException = new AmazonServiceException("");
        amazonServiceException.setErrorType(errorType);
        return amazonServiceException;
    }

    @Test
    public void shouldReturnLimitedNumberOfRecords() throws Exception {
        ((AmazonKinesis) Mockito.doAnswer(invocationOnMock -> {
            return new GetRecordsResult().withRecords(generateRecords(((GetRecordsRequest) invocationOnMock.getArguments()[0]).getLimit().intValue())).withMillisBehindLatest(1000L);
        }).when(this.kinesis)).getRecords((GetRecordsRequest) Matchers.any(GetRecordsRequest.class));
        Assertions.assertThat(this.underTest.getRecords(SHARD_ITERATOR, STREAM, SHARD_1, 100).getRecords().size()).isEqualTo(100);
    }

    private List<Record> generateRecords(int i) {
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < i; i2++) {
            byte[] bArr = new byte[1024];
            Arrays.fill(bArr, (byte) i2);
            arrayList.add(new Record().withSequenceNumber(String.valueOf(i2)).withPartitionKey("key").withData(ByteBuffer.wrap(bArr)));
        }
        return arrayList;
    }
}
