/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.kinesis;

import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.io.kinesis.AmazonKinesisMock;
import org.apache.beam.sdk.io.kinesis.KinesisClientProvider;
import org.apache.beam.sdk.io.kinesis.KinesisIO;
import org.apache.beam.sdk.io.kinesis.KinesisRecord;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.DateTime;
import org.junit.Rule;
import org.junit.Test;

public class KinesisMockReadTest {
    @Rule
    public final transient TestPipeline p = TestPipeline.create();

    @Test
    public void readsDataFromMockKinesis() {
        int noOfShards = 3;
        int noOfEventsPerShard = 100;
        List<List<AmazonKinesisMock.TestData>> testData = this.provideTestData(noOfShards, noOfEventsPerShard);
        PCollection result = (PCollection)((PCollection)this.p.apply((PTransform)KinesisIO.Read.from((String)"stream", (InitialPositionInStream)InitialPositionInStream.TRIM_HORIZON).using((KinesisClientProvider)new AmazonKinesisMock.Provider(testData, 10)).withMaxNumRecords((long)(noOfShards * noOfEventsPerShard)))).apply((PTransform)ParDo.of((DoFn)new KinesisRecordToTestData()));
        PAssert.that((PCollection)result).containsInAnyOrder(Iterables.concat(testData));
        this.p.run();
    }

    private List<List<AmazonKinesisMock.TestData>> provideTestData(int noOfShards, int noOfEventsPerShard) {
        int seqNumber = 0;
        ArrayList shardedData = Lists.newArrayList();
        for (int i = 0; i < noOfShards; ++i) {
            ArrayList shardData = Lists.newArrayList();
            shardedData.add(shardData);
            DateTime arrival = DateTime.now();
            for (int j = 0; j < noOfEventsPerShard; ++j) {
                arrival = arrival.plusSeconds(1);
                shardData.add(new AmazonKinesisMock.TestData(Integer.toString(++seqNumber), arrival.toInstant(), Integer.toString(seqNumber)));
            }
        }
        return shardedData;
    }

    private static class KinesisRecordToTestData
    extends DoFn<KinesisRecord, AmazonKinesisMock.TestData> {
        private KinesisRecordToTestData() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c) throws Exception {
            c.output((Object)new AmazonKinesisMock.TestData((KinesisRecord)c.element()));
        }
    }
}

