package org.apache.beam.sdk.io.kinesis;

import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.model.DescribeStreamResult;
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
import com.amazonaws.services.kinesis.model.LimitExceededException;
import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
import com.amazonaws.services.kinesis.model.StreamDescription;
import org.assertj.core.api.Assertions;
import org.joda.time.Instant;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.BDDMockito;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
/* loaded from: input_file:org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.class */
public class SimplifiedKinesisClientTest {
    private static final String STREAM = "stream";
    private static final String SHARD_1 = "shard-01";
    private static final String SHARD_2 = "shard-02";
    private static final String SHARD_3 = "shard-03";
    private static final String SHARD_ITERATOR = "iterator";
    private static final String SEQUENCE_NUMBER = "abc123";

    @Mock
    private AmazonKinesis kinesis;

    @InjectMocks
    private SimplifiedKinesisClient underTest;

    @Test
    public void shouldReturnIteratorStartingWithSequenceNumber() throws Exception {
        BDDMockito.given(this.kinesis.getShardIterator(new GetShardIteratorRequest().withStreamName(STREAM).withShardId(SHARD_1).withShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER).withStartingSequenceNumber(SEQUENCE_NUMBER))).willReturn(new GetShardIteratorResult().withShardIterator(SHARD_ITERATOR));
        Assertions.assertThat(this.underTest.getShardIterator(STREAM, SHARD_1, ShardIteratorType.AT_SEQUENCE_NUMBER, SEQUENCE_NUMBER, (Instant) null)).isEqualTo(SHARD_ITERATOR);
    }

    @Test
    public void shouldReturnIteratorStartingWithTimestamp() throws Exception {
        Instant now = Instant.now();
        BDDMockito.given(this.kinesis.getShardIterator(new GetShardIteratorRequest().withStreamName(STREAM).withShardId(SHARD_1).withShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER).withTimestamp(now.toDate()))).willReturn(new GetShardIteratorResult().withShardIterator(SHARD_ITERATOR));
        Assertions.assertThat(this.underTest.getShardIterator(STREAM, SHARD_1, ShardIteratorType.AT_SEQUENCE_NUMBER, (String) null, now)).isEqualTo(SHARD_ITERATOR);
    }

    @Test
    public void shouldHandleExpiredIterationExceptionForGetShardIterator() {
        shouldHandleGetShardIteratorError(new ExpiredIteratorException(""), ExpiredIteratorException.class);
    }

    @Test
    public void shouldHandleLimitExceededExceptionForGetShardIterator() {
        shouldHandleGetShardIteratorError(new LimitExceededException(""), TransientKinesisException.class);
    }

    @Test
    public void shouldHandleProvisionedThroughputExceededExceptionForGetShardIterator() {
        shouldHandleGetShardIteratorError(new ProvisionedThroughputExceededException(""), TransientKinesisException.class);
    }

    @Test
    public void shouldHandleServiceErrorForGetShardIterator() {
        shouldHandleGetShardIteratorError(newAmazonServiceException(AmazonServiceException.ErrorType.Service), TransientKinesisException.class);
    }

    @Test
    public void shouldHandleClientErrorForGetShardIterator() {
        shouldHandleGetShardIteratorError(newAmazonServiceException(AmazonServiceException.ErrorType.Client), RuntimeException.class);
    }

    @Test
    public void shouldHandleUnexpectedExceptionForGetShardIterator() {
        shouldHandleGetShardIteratorError(new NullPointerException(), RuntimeException.class);
    }

    private void shouldHandleGetShardIteratorError(Exception exc, Class<? extends Exception> cls) {
        BDDMockito.given(this.kinesis.getShardIterator(new GetShardIteratorRequest().withStreamName(STREAM).withShardId(SHARD_1).withShardIteratorType(ShardIteratorType.LATEST))).willThrow(new Throwable[]{exc});
        try {
            try {
                this.underTest.getShardIterator(STREAM, SHARD_1, ShardIteratorType.LATEST, (String) null, (Instant) null);
                Assertions.failBecauseExceptionWasNotThrown(cls);
                Mockito.reset(new AmazonKinesis[]{this.kinesis});
            } catch (Exception e) {
                Assertions.assertThat(e).isExactlyInstanceOf(cls);
                Mockito.reset(new AmazonKinesis[]{this.kinesis});
            }
        } catch (Throwable th) {
            Mockito.reset(new AmazonKinesis[]{this.kinesis});
            throw th;
        }
    }

    @Test
    public void shouldListAllShards() throws Exception {
        Shard withShardId = new Shard().withShardId(SHARD_1);
        Shard withShardId2 = new Shard().withShardId(SHARD_2);
        Shard withShardId3 = new Shard().withShardId(SHARD_3);
        BDDMockito.given(this.kinesis.describeStream(STREAM, (String) null)).willReturn(new DescribeStreamResult().withStreamDescription(new StreamDescription().withShards(new Shard[]{withShardId, withShardId2}).withHasMoreShards(true)));
        BDDMockito.given(this.kinesis.describeStream(STREAM, SHARD_2)).willReturn(new DescribeStreamResult().withStreamDescription(new StreamDescription().withShards(new Shard[]{withShardId3}).withHasMoreShards(false)));
        Assertions.assertThat(this.underTest.listShards(STREAM)).containsOnly(new Shard[]{withShardId, withShardId2, withShardId3});
    }

    @Test
    public void shouldHandleExpiredIterationExceptionForShardListing() {
        shouldHandleShardListingError(new ExpiredIteratorException(""), ExpiredIteratorException.class);
    }

    @Test
    public void shouldHandleLimitExceededExceptionForShardListing() {
        shouldHandleShardListingError(new LimitExceededException(""), TransientKinesisException.class);
    }

    @Test
    public void shouldHandleProvisionedThroughputExceededExceptionForShardListing() {
        shouldHandleShardListingError(new ProvisionedThroughputExceededException(""), TransientKinesisException.class);
    }

    @Test
    public void shouldHandleServiceErrorForShardListing() {
        shouldHandleShardListingError(newAmazonServiceException(AmazonServiceException.ErrorType.Service), TransientKinesisException.class);
    }

    @Test
    public void shouldHandleClientErrorForShardListing() {
        shouldHandleShardListingError(newAmazonServiceException(AmazonServiceException.ErrorType.Client), RuntimeException.class);
    }

    @Test
    public void shouldHandleUnexpectedExceptionForShardListing() {
        shouldHandleShardListingError(new NullPointerException(), RuntimeException.class);
    }

    private void shouldHandleShardListingError(Exception exc, Class<? extends Exception> cls) {
        BDDMockito.given(this.kinesis.describeStream(STREAM, (String) null)).willThrow(new Throwable[]{exc});
        try {
            try {
                this.underTest.listShards(STREAM);
                Assertions.failBecauseExceptionWasNotThrown(cls);
                Mockito.reset(new AmazonKinesis[]{this.kinesis});
            } catch (Exception e) {
                Assertions.assertThat(e).isExactlyInstanceOf(cls);
                Mockito.reset(new AmazonKinesis[]{this.kinesis});
            }
        } catch (Throwable th) {
            Mockito.reset(new AmazonKinesis[]{this.kinesis});
            throw th;
        }
    }

    private AmazonServiceException newAmazonServiceException(AmazonServiceException.ErrorType errorType) {
        AmazonServiceException amazonServiceException = new AmazonServiceException("");
        amazonServiceException.setErrorType(errorType);
        return amazonServiceException;
    }
}
