/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.aws2.kinesis;

import java.util.List;
import java.util.function.Function;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.aws2.MockClientBuilderFactory;
import org.apache.beam.sdk.io.aws2.common.ClientConfiguration;
import org.apache.beam.sdk.io.aws2.kinesis.EFOStubbedKinesisAsyncClient;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisIO;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisRecord;
import org.apache.beam.sdk.io.aws2.kinesis.StartingPoint;
import org.apache.beam.sdk.io.aws2.kinesis.TestHelpers;
import org.apache.beam.sdk.io.aws2.kinesis.WatermarkPolicyFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.assertj.core.api.Assertions;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import software.amazon.awssdk.services.cloudwatch.CloudWatchClient;
import software.amazon.awssdk.services.cloudwatch.CloudWatchClientBuilder;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.KinesisClientBuilder;
import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream;
import software.amazon.kinesis.common.InitialPositionInStream;

@RunWith(value=MockitoJUnitRunner.class)
public class KinesisIOReadTest {
    private static final int SHARDS = 3;
    @Rule
    public final transient TestPipeline p = TestPipeline.create();
    @Mock
    public KinesisClient client;

    @Before
    public void configureClientBuilderFactory() {
        MockClientBuilderFactory.set(this.p, KinesisClientBuilder.class, this.client);
        MockClientBuilderFactory.set(this.p, CloudWatchClientBuilder.class, (CloudWatchClient)Mockito.mock(CloudWatchClient.class));
    }

    @Test
    public void testReadDefaults() {
        KinesisIO.Read readSpec = KinesisIO.read().withStreamName("streamName").withInitialPositionInStream(InitialPositionInStream.LATEST);
        Assertions.assertThat((String)readSpec.getStreamName()).isEqualTo((Object)"streamName");
        Assertions.assertThat((String)readSpec.getConsumerArn()).isNull();
        Assertions.assertThat((Object)readSpec.getInitialPosition()).isEqualTo((Object)new StartingPoint(InitialPositionInStream.LATEST));
        Assertions.assertThat((Object)readSpec.getWatermarkPolicyFactory()).isEqualTo((Object)WatermarkPolicyFactory.withArrivalTimePolicy());
        Assertions.assertThat((Comparable)readSpec.getUpToDateThreshold()).isEqualTo((Object)Duration.ZERO);
        Assertions.assertThat((Integer)readSpec.getMaxCapacityPerShard()).isEqualTo(null);
        Assertions.assertThat((long)readSpec.getMaxNumRecords()).isEqualTo(Long.MAX_VALUE);
        Assertions.assertThat((Object)readSpec.getClientConfiguration()).isEqualTo((Object)ClientConfiguration.builder().build());
    }

    @Test
    public void testReadFromShards() {
        List<List<Record>> records = TestHelpers.createRecords(3, 100);
        TestHelpers.mockShards(this.client, 3);
        TestHelpers.mockShardIterators(this.client, records);
        TestHelpers.mockRecords(this.client, records, 10);
        this.readFromShards(Function.identity(), Iterables.concat(records));
    }

    @Test
    public void testReadWithEFOFromShards() {
        SubscribeToShardEvent shard0event = TestHelpers.eventWithRecords(3);
        SubscribeToShardEvent shard1event = TestHelpers.eventWithRecords(4);
        SubscribeToShardEvent shard2event = TestHelpers.eventWithRecords(5);
        EFOStubbedKinesisAsyncClient asyncClientStub = new EFOStubbedKinesisAsyncClient(10);
        asyncClientStub.stubSubscribeToShard("0", new SubscribeToShardEventStream[]{shard0event});
        asyncClientStub.stubSubscribeToShard("1", new SubscribeToShardEventStream[]{shard1event});
        asyncClientStub.stubSubscribeToShard("2", new SubscribeToShardEventStream[]{shard2event});
        MockClientBuilderFactory.set(this.p, KinesisAsyncClientBuilder.class, asyncClientStub);
        Iterable expectedRecords = Iterables.concat((Iterable)shard0event.records(), (Iterable)shard1event.records(), (Iterable)shard2event.records());
        TestHelpers.mockShards(this.client, 3);
        KinesisIO.Read read = KinesisIO.read().withStreamName("stream").withConsumerArn("consumer").withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON).withArrivalTimeWatermarkPolicy().withMaxNumRecords(12L);
        PCollection result = (PCollection)((PCollection)this.p.apply((PTransform)read)).apply((PTransform)ParDo.of((DoFn)new ToRecord()));
        PAssert.that((PCollection)result).containsInAnyOrder(expectedRecords);
        this.p.run();
    }

    @Test(expected=Pipeline.PipelineExecutionException.class)
    public void testReadWithLimitExceeded() {
        Mockito.when((Object)this.client.listShards((ListShardsRequest)ArgumentMatchers.any(ListShardsRequest.class))).thenThrow(new Throwable[]{(Throwable)LimitExceededException.builder().message("ListShards rate limit exceeded").build()});
        this.readFromShards(Function.identity(), (Iterable<Record>)ImmutableList.of());
    }

    private void readFromShards(Function<KinesisIO.Read, KinesisIO.Read> fn, Iterable<Record> expected) {
        KinesisIO.Read read = KinesisIO.read().withStreamName("stream").withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON).withArrivalTimeWatermarkPolicy().withMaxNumRecords(300L);
        PCollection result = (PCollection)((PCollection)this.p.apply((PTransform)fn.apply(read))).apply((PTransform)ParDo.of((DoFn)new ToRecord()));
        PAssert.that((PCollection)result).containsInAnyOrder(expected);
        this.p.run();
    }

    static class ToRecord
    extends DoFn<KinesisRecord, Record> {
        ToRecord() {
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element KinesisRecord rec, DoFn.OutputReceiver<Record> out) {
            Instant arrival = rec.getApproximateArrivalTimestamp();
            out.output((Object)TestHelpers.record(arrival, rec.getDataAsBytes(), rec.getSequenceNumber()));
        }
    }
}

