package org.apache.beam.sdk.io.aws2.kinesis;

import org.apache.beam.sdk.io.aws2.MockClientBuilderFactory;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisIO;
import org.apache.beam.sdk.io.aws2.options.AwsOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import software.amazon.awssdk.core.exception.SdkServiceException;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.KinesisClientBuilder;
import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
import software.amazon.kinesis.common.InitialPositionInStream;

@RunWith(MockitoJUnitRunner.Silent.class)
/* loaded from: input_file:org/apache/beam/sdk/io/aws2/kinesis/KinesisSourceTest.class */
public class KinesisSourceTest {

    @Mock
    private KinesisClient kinesisClient;
    private PipelineOptions options = createOptions();

    @Test
    public void testCreateReaderOfCorrectType() throws Exception {
        KinesisIO.Read withInitialPositionInStream = KinesisIO.read().withStreamName("stream-xxx").withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON);
        KinesisIO.Read withInitialPositionInStream2 = KinesisIO.read().withStreamName("stream-xxx").withConsumerArn("consumer-aaa").withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON);
        KinesisReaderCheckpoint kinesisReaderCheckpoint = new KinesisReaderCheckpoint(ImmutableList.of());
        Assertions.assertThat(new KinesisSource(withInitialPositionInStream, kinesisReaderCheckpoint).createReader(this.options, (KinesisReaderCheckpoint) null)).isInstanceOf(KinesisReader.class);
        Assertions.assertThat(new KinesisSource(withInitialPositionInStream2, kinesisReaderCheckpoint).createReader(this.options, (KinesisReaderCheckpoint) null)).isInstanceOf(EFOKinesisReader.class);
    }

    @Test
    public void testSplitGeneratesCorrectNumberOfSources() throws Exception {
        TestHelpers.mockShards(this.kinesisClient, 3);
        KinesisSource sourceWithMockedKinesisClient = sourceWithMockedKinesisClient(spec());
        Assertions.assertThat(sourceWithMockedKinesisClient.split(1, this.options).size()).isEqualTo(1);
        Assertions.assertThat(sourceWithMockedKinesisClient.split(2, this.options).size()).isEqualTo(2);
        Assertions.assertThat(sourceWithMockedKinesisClient.split(3, this.options).size()).isEqualTo(3);
        Assertions.assertThat(sourceWithMockedKinesisClient.split(4, this.options).size()).isEqualTo(3);
    }

    @Test
    public void shouldThrowLimitExceededExceptionForShardListing() {
        shouldThrowShardListingError((Exception) LimitExceededException.builder().build(), LimitExceededException.class);
    }

    @Test
    public void shouldThrowServiceErrorForShardListing() {
        shouldThrowShardListingError(SdkServiceException.builder().statusCode(504).build(), SdkServiceException.class);
    }

    private KinesisSource sourceWithMockedKinesisClient(KinesisIO.Read read) {
        MockClientBuilderFactory.set(this.options, KinesisClientBuilder.class, this.kinesisClient);
        return new KinesisSource(read);
    }

    private PipelineOptions createOptions() {
        AwsOptions as = PipelineOptionsFactory.fromArgs(new String[0]).as(AwsOptions.class);
        as.setAwsRegion(Region.AP_EAST_1);
        return as;
    }

    private KinesisIO.Read spec() {
        return KinesisIO.read().withStreamName("stream").withInitialPositionInStream(InitialPositionInStream.LATEST);
    }

    private void shouldThrowShardListingError(Exception exc, Class<? extends Exception> cls) {
        Mockito.when(this.kinesisClient.listShards((ListShardsRequest) ArgumentMatchers.any(ListShardsRequest.class))).thenThrow(new Throwable[]{exc});
        try {
            try {
                sourceWithMockedKinesisClient(spec()).split(1, this.options);
                Assertions.failBecauseExceptionWasNotThrown(cls);
                Mockito.reset(new KinesisClient[]{this.kinesisClient});
            } catch (Exception e) {
                Assertions.assertThat(e).isExactlyInstanceOf(cls);
                Mockito.reset(new KinesisClient[]{this.kinesisClient});
            }
        } catch (Throwable th) {
            Mockito.reset(new KinesisClient[]{this.kinesisClient});
            throw th;
        }
    }

    @Test
    public void testConsumerArnNotPassed() {
        Assertions.assertThat(KinesisSource.resolveConsumerArn(KinesisIO.read().withStreamName("stream-xxx"), TestHelpers.createIOOptions(new String[0]))).isNull();
    }

    @Test
    public void testConsumerArnPassedInIO() {
        Assertions.assertThat(KinesisSource.resolveConsumerArn(KinesisIO.read().withStreamName("stream-xxx").withConsumerArn("arn::consumer-yyy"), TestHelpers.createIOOptions(new String[0]))).isEqualTo("arn::consumer-yyy");
    }

    @Test
    public void testConsumerArnPassedInPipelineOptions() {
        Assertions.assertThat(KinesisSource.resolveConsumerArn(KinesisIO.read().withStreamName("stream-xxx"), TestHelpers.createIOOptions("--kinesisIOConsumerArns={\"stream-xxx\": \"arn-01\"}"))).isEqualTo("arn-01");
    }

    @Test
    public void testConsumerArnForSpecificStreamNotPassedInPipelineOptions() {
        Assertions.assertThat(KinesisSource.resolveConsumerArn(KinesisIO.read().withStreamName("stream-xxx"), TestHelpers.createIOOptions("--kinesisIOConsumerArns={\"stream-01\": \"arn-01\"}"))).isNull();
    }

    @Test
    public void testConsumerArnInPipelineOptionsOverwritesIOSetting() {
        Assertions.assertThat(KinesisSource.resolveConsumerArn(KinesisIO.read().withStreamName("stream-xxx").withConsumerArn("arn-ignored"), TestHelpers.createIOOptions("--kinesisIOConsumerArns={\"stream-xxx\": \"arn-01\"}"))).isEqualTo("arn-01");
    }

    @Test
    public void testConsumerArnInPipelineOptionsDiscardsIOSetting() {
        Assertions.assertThat(KinesisSource.resolveConsumerArn(KinesisIO.read().withStreamName("stream-xxx").withConsumerArn("arn-ignored"), TestHelpers.createIOOptions("--kinesisIOConsumerArns={\"stream-xxx\": null}"))).isNull();
    }
}
