/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.aws2.kinesis;

import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.aws2.MockClientBuilderFactory;
import org.apache.beam.sdk.io.aws2.kinesis.EFOKinesisReader;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisIO;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisIOOptions;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisReader;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisReaderCheckpoint;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisSource;
import org.apache.beam.sdk.io.aws2.kinesis.TestHelpers;
import org.apache.beam.sdk.io.aws2.options.AwsOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import software.amazon.awssdk.core.exception.SdkServiceException;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.KinesisClientBuilder;
import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
import software.amazon.kinesis.common.InitialPositionInStream;

@RunWith(value=MockitoJUnitRunner.Silent.class)
public class KinesisSourceTest {
    @Mock
    private KinesisClient kinesisClient;
    private PipelineOptions options = this.createOptions();

    @Test
    public void testCreateReaderOfCorrectType() throws Exception {
        KinesisIO.Read readSpec = KinesisIO.read().withStreamName("stream-xxx").withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON);
        KinesisIO.Read readSpecEFO = KinesisIO.read().withStreamName("stream-xxx").withConsumerArn("consumer-aaa").withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON);
        KinesisReaderCheckpoint initCheckpoint = new KinesisReaderCheckpoint((Iterable)ImmutableList.of());
        UnboundedSource.UnboundedReader reader = new KinesisSource(readSpec, initCheckpoint).createReader(this.options, null);
        Assertions.assertThat((Object)reader).isInstanceOf(KinesisReader.class);
        UnboundedSource.UnboundedReader efoReader = new KinesisSource(readSpecEFO, initCheckpoint).createReader(this.options, null);
        Assertions.assertThat((Object)efoReader).isInstanceOf(EFOKinesisReader.class);
    }

    @Test
    public void testSplitGeneratesCorrectNumberOfSources() throws Exception {
        TestHelpers.mockShards(this.kinesisClient, 3);
        KinesisSource source = this.sourceWithMockedKinesisClient(this.spec());
        Assertions.assertThat((int)source.split(1, this.options).size()).isEqualTo(1);
        Assertions.assertThat((int)source.split(2, this.options).size()).isEqualTo(2);
        Assertions.assertThat((int)source.split(3, this.options).size()).isEqualTo(3);
        Assertions.assertThat((int)source.split(4, this.options).size()).isEqualTo(3);
    }

    @Test
    public void shouldThrowLimitExceededExceptionForShardListing() {
        this.shouldThrowShardListingError((Exception)LimitExceededException.builder().build(), LimitExceededException.class);
    }

    @Test
    public void shouldThrowServiceErrorForShardListing() {
        this.shouldThrowShardListingError((Exception)SdkServiceException.builder().statusCode(504).build(), SdkServiceException.class);
    }

    private KinesisSource sourceWithMockedKinesisClient(KinesisIO.Read read) {
        MockClientBuilderFactory.set(this.options, KinesisClientBuilder.class, this.kinesisClient);
        return new KinesisSource(read);
    }

    private PipelineOptions createOptions() {
        AwsOptions options = (AwsOptions)PipelineOptionsFactory.fromArgs((String[])new String[0]).as(AwsOptions.class);
        options.setAwsRegion(Region.AP_EAST_1);
        return options;
    }

    private KinesisIO.Read spec() {
        return KinesisIO.read().withStreamName("stream").withInitialPositionInStream(InitialPositionInStream.LATEST);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void shouldThrowShardListingError(Exception thrownException, Class<? extends Exception> expectedExceptionClass) {
        Mockito.when((Object)this.kinesisClient.listShards((ListShardsRequest)ArgumentMatchers.any(ListShardsRequest.class))).thenThrow(new Throwable[]{thrownException});
        try {
            KinesisSource source = this.sourceWithMockedKinesisClient(this.spec());
            source.split(1, this.options);
            Assertions.failBecauseExceptionWasNotThrown(expectedExceptionClass);
        }
        catch (Exception e) {
            try {
                Assertions.assertThat((Throwable)e).isExactlyInstanceOf(expectedExceptionClass);
            }
            catch (Throwable throwable) {
                Mockito.reset((Object[])new KinesisClient[]{this.kinesisClient});
                throw throwable;
            }
            Mockito.reset((Object[])new KinesisClient[]{this.kinesisClient});
        }
        Mockito.reset((Object[])new KinesisClient[]{this.kinesisClient});
    }

    @Test
    public void testConsumerArnNotPassed() {
        KinesisIO.Read readSpec = KinesisIO.read().withStreamName("stream-xxx");
        KinesisIOOptions options = TestHelpers.createIOOptions(new String[0]);
        Assertions.assertThat((String)KinesisSource.resolveConsumerArn((KinesisIO.Read)readSpec, (PipelineOptions)options)).isNull();
    }

    @Test
    public void testConsumerArnPassedInIO() {
        KinesisIO.Read readSpec = KinesisIO.read().withStreamName("stream-xxx").withConsumerArn("arn::consumer-yyy");
        KinesisIOOptions options = TestHelpers.createIOOptions(new String[0]);
        Assertions.assertThat((String)KinesisSource.resolveConsumerArn((KinesisIO.Read)readSpec, (PipelineOptions)options)).isEqualTo((Object)"arn::consumer-yyy");
    }

    @Test
    public void testConsumerArnPassedInPipelineOptions() {
        KinesisIO.Read readSpec = KinesisIO.read().withStreamName("stream-xxx");
        KinesisIOOptions options = TestHelpers.createIOOptions("--kinesisIOConsumerArns={\"stream-xxx\": \"arn-01\"}");
        Assertions.assertThat((String)KinesisSource.resolveConsumerArn((KinesisIO.Read)readSpec, (PipelineOptions)options)).isEqualTo((Object)"arn-01");
    }

    @Test
    public void testConsumerArnForSpecificStreamNotPassedInPipelineOptions() {
        KinesisIO.Read readSpec = KinesisIO.read().withStreamName("stream-xxx");
        KinesisIOOptions options = TestHelpers.createIOOptions("--kinesisIOConsumerArns={\"stream-01\": \"arn-01\"}");
        Assertions.assertThat((String)KinesisSource.resolveConsumerArn((KinesisIO.Read)readSpec, (PipelineOptions)options)).isNull();
    }

    @Test
    public void testConsumerArnInPipelineOptionsOverwritesIOSetting() {
        KinesisIO.Read readSpec = KinesisIO.read().withStreamName("stream-xxx").withConsumerArn("arn-ignored");
        KinesisIOOptions options = TestHelpers.createIOOptions("--kinesisIOConsumerArns={\"stream-xxx\": \"arn-01\"}");
        Assertions.assertThat((String)KinesisSource.resolveConsumerArn((KinesisIO.Read)readSpec, (PipelineOptions)options)).isEqualTo((Object)"arn-01");
    }

    @Test
    public void testConsumerArnInPipelineOptionsDiscardsIOSetting() {
        KinesisIO.Read readSpec = KinesisIO.read().withStreamName("stream-xxx").withConsumerArn("arn-ignored");
        KinesisIOOptions options = TestHelpers.createIOOptions("--kinesisIOConsumerArns={\"stream-xxx\": null}");
        Assertions.assertThat((String)KinesisSource.resolveConsumerArn((KinesisIO.Read)readSpec, (PipelineOptions)options)).isNull();
    }
}

