/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.aws2.kinesis;

import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.aws2.kinesis.AWSClientsProvider;
import org.apache.beam.sdk.io.aws2.kinesis.AmazonKinesisMock;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisIO;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisRecord;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.joda.time.DateTime;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import software.amazon.kinesis.common.InitialPositionInStream;

@RunWith(value=JUnit4.class)
public class KinesisMockReadTest {
    @Rule
    public final transient TestPipeline p = TestPipeline.create();
    private final int noOfShards = 3;
    private final int noOfEventsPerShard = 100;

    @Test
    public void readsDataFromMockKinesis() {
        List<List<AmazonKinesisMock.TestData>> testData = this.defaultTestData();
        this.verifyReadWithProvider(new AmazonKinesisMock.Provider(testData, 10), testData);
    }

    @Test
    public void readsDataFromMockKinesisWithDescribeStreamRateLimit() {
        List<List<AmazonKinesisMock.TestData>> testData = this.defaultTestData();
        this.verifyReadWithProvider(new AmazonKinesisMock.Provider(testData, 10).withRateLimitedDescribeStream(2), testData);
    }

    @Test(expected=Pipeline.PipelineExecutionException.class)
    public void readsDataFromMockKinesisWithDescribeStreamRateLimitFailure() {
        List<List<AmazonKinesisMock.TestData>> testData = this.defaultTestData();
        this.verifyReadWithProvider(new AmazonKinesisMock.Provider(testData, 10).withRateLimitedDescribeStream(11), testData);
    }

    public void verifyReadWithProvider(AmazonKinesisMock.Provider provider, List<List<AmazonKinesisMock.TestData>> testData) {
        PCollection result = (PCollection)((PCollection)this.p.apply((PTransform)KinesisIO.read().withStreamName("stream").withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON).withAWSClientsProvider((AWSClientsProvider)provider).withArrivalTimeWatermarkPolicy().withMaxNumRecords(300L))).apply((PTransform)ParDo.of((DoFn)new KinesisRecordToTestData()));
        PAssert.that((PCollection)result).containsInAnyOrder(Iterables.concat(testData));
        this.p.run();
    }

    private List<List<AmazonKinesisMock.TestData>> defaultTestData() {
        return this.provideTestData(3, 100);
    }

    private List<List<AmazonKinesisMock.TestData>> provideTestData(int noOfShards, int noOfEventsPerShard) {
        int seqNumber = 0;
        ArrayList shardedData = Lists.newArrayList();
        for (int i = 0; i < noOfShards; ++i) {
            ArrayList shardData = Lists.newArrayList();
            shardedData.add(shardData);
            DateTime arrival = DateTime.now();
            for (int j = 0; j < noOfEventsPerShard; ++j) {
                arrival = arrival.plusSeconds(1);
                shardData.add(new AmazonKinesisMock.TestData(Integer.toString(++seqNumber), arrival.toInstant(), Integer.toString(seqNumber)));
            }
        }
        return shardedData;
    }

    static class KinesisRecordToTestData
    extends DoFn<KinesisRecord, AmazonKinesisMock.TestData> {
        KinesisRecordToTestData() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c) throws Exception {
            c.output((Object)new AmazonKinesisMock.TestData((KinesisRecord)c.element()));
        }
    }
}

