package org.apache.beam.sdk.io.aws2.kinesis;

import java.util.List;
import java.util.function.Function;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.aws2.MockClientBuilderFactory;
import org.apache.beam.sdk.io.aws2.common.ClientConfiguration;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisIO;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.assertj.core.api.Assertions;
import org.joda.time.Duration;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import software.amazon.awssdk.services.cloudwatch.CloudWatchClient;
import software.amazon.awssdk.services.cloudwatch.CloudWatchClientBuilder;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.KinesisClientBuilder;
import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream;
import software.amazon.kinesis.common.InitialPositionInStream;

@RunWith(MockitoJUnitRunner.class)
/* loaded from: input_file:org/apache/beam/sdk/io/aws2/kinesis/KinesisIOReadTest.class */
public class KinesisIOReadTest {
    private static final int SHARDS = 3;

    @Rule
    public final transient TestPipeline p = TestPipeline.create();

    @Mock
    public KinesisClient client;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/sdk/io/aws2/kinesis/KinesisIOReadTest$ToRecord.class */
    public static class ToRecord extends DoFn<KinesisRecord, Record> {
        ToRecord() {
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element KinesisRecord kinesisRecord, DoFn.OutputReceiver<Record> outputReceiver) {
            outputReceiver.output(TestHelpers.record(kinesisRecord.getApproximateArrivalTimestamp(), kinesisRecord.getDataAsBytes(), kinesisRecord.getSequenceNumber()));
        }
    }

    @Before
    public void configureClientBuilderFactory() {
        MockClientBuilderFactory.set(this.p, KinesisClientBuilder.class, this.client);
        MockClientBuilderFactory.set(this.p, CloudWatchClientBuilder.class, (CloudWatchClient) Mockito.mock(CloudWatchClient.class));
    }

    @Test
    public void testReadDefaults() {
        KinesisIO.Read withInitialPositionInStream = KinesisIO.read().withStreamName("streamName").withInitialPositionInStream(InitialPositionInStream.LATEST);
        Assertions.assertThat(withInitialPositionInStream.getStreamName()).isEqualTo("streamName");
        Assertions.assertThat(withInitialPositionInStream.getConsumerArn()).isNull();
        Assertions.assertThat(withInitialPositionInStream.getInitialPosition()).isEqualTo(new StartingPoint(InitialPositionInStream.LATEST));
        Assertions.assertThat(withInitialPositionInStream.getWatermarkPolicyFactory()).isEqualTo(WatermarkPolicyFactory.withArrivalTimePolicy());
        Assertions.assertThat(withInitialPositionInStream.getUpToDateThreshold()).isEqualTo(Duration.ZERO);
        Assertions.assertThat(withInitialPositionInStream.getMaxCapacityPerShard()).isEqualTo((Object) null);
        Assertions.assertThat(withInitialPositionInStream.getMaxNumRecords()).isEqualTo(Long.MAX_VALUE);
        Assertions.assertThat(withInitialPositionInStream.getClientConfiguration()).isEqualTo(ClientConfiguration.builder().build());
    }

    @Test
    public void testReadFromShards() {
        List<List<Record>> createRecords = TestHelpers.createRecords(SHARDS, 100);
        TestHelpers.mockShards(this.client, SHARDS);
        TestHelpers.mockShardIterators(this.client, createRecords);
        TestHelpers.mockRecords(this.client, createRecords, 10);
        readFromShards(Function.identity(), Iterables.concat(createRecords));
    }

    @Test
    public void testReadWithEFOFromShards() {
        SubscribeToShardEventStream eventWithRecords = TestHelpers.eventWithRecords(SHARDS);
        SubscribeToShardEventStream eventWithRecords2 = TestHelpers.eventWithRecords(4);
        SubscribeToShardEventStream eventWithRecords3 = TestHelpers.eventWithRecords(5);
        EFOStubbedKinesisAsyncClient eFOStubbedKinesisAsyncClient = new EFOStubbedKinesisAsyncClient(10);
        eFOStubbedKinesisAsyncClient.stubSubscribeToShard("0", eventWithRecords);
        eFOStubbedKinesisAsyncClient.stubSubscribeToShard("1", eventWithRecords2);
        eFOStubbedKinesisAsyncClient.stubSubscribeToShard("2", eventWithRecords3);
        MockClientBuilderFactory.set(this.p, KinesisAsyncClientBuilder.class, eFOStubbedKinesisAsyncClient);
        Iterable concat = Iterables.concat(eventWithRecords.records(), eventWithRecords2.records(), eventWithRecords3.records());
        TestHelpers.mockShards(this.client, SHARDS);
        PAssert.that(this.p.apply(KinesisIO.read().withStreamName("stream").withConsumerArn("consumer").withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON).withArrivalTimeWatermarkPolicy().withMaxNumRecords(12L)).apply(ParDo.of(new ToRecord()))).containsInAnyOrder(concat);
        this.p.run();
    }

    @Test(expected = Pipeline.PipelineExecutionException.class)
    public void testReadWithLimitExceeded() {
        Mockito.when(this.client.listShards((ListShardsRequest) ArgumentMatchers.any(ListShardsRequest.class))).thenThrow(new Throwable[]{(Throwable) LimitExceededException.builder().message("ListShards rate limit exceeded").build()});
        readFromShards(Function.identity(), ImmutableList.of());
    }

    private void readFromShards(Function<KinesisIO.Read, KinesisIO.Read> function, Iterable<Record> iterable) {
        PAssert.that(this.p.apply(function.apply(KinesisIO.read().withStreamName("stream").withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON).withArrivalTimeWatermarkPolicy().withMaxNumRecords(300L))).apply(ParDo.of(new ToRecord()))).containsInAnyOrder(iterable);
        this.p.run();
    }
}
