/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.aws2.kinesis;

import com.amazonaws.regions.Regions;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.Random;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisIO;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisRecord;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisTestOptions;
import org.apache.beam.sdk.io.common.HashingFn;
import org.apache.beam.sdk.io.common.TestRow;
import org.apache.beam.sdk.io.kinesis.KinesisPartitioner;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineFnBase;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import software.amazon.awssdk.regions.Region;
import software.amazon.kinesis.common.InitialPositionInStream;

@RunWith(value=JUnit4.class)
public class KinesisIOIT
implements Serializable {
    private static int numberOfShards;
    private static int numberOfRows;
    @Rule
    public TestPipeline pipelineWrite = TestPipeline.create();
    @Rule
    public TestPipeline pipelineRead = TestPipeline.create();
    private static KinesisTestOptions options;
    private static final Instant now;

    @BeforeClass
    public static void setup() {
        PipelineOptionsFactory.register(KinesisTestOptions.class);
        options = (KinesisTestOptions)TestPipeline.testingPipelineOptions().as(KinesisTestOptions.class);
        numberOfShards = options.getNumberOfShards();
        numberOfRows = options.getNumberOfRecords();
    }

    @Test
    public void testWriteThenRead() {
        this.runWrite();
        this.runRead();
    }

    private void runWrite() {
        ((PCollection)((PCollection)((PCollection)this.pipelineWrite.apply("Generate Sequence", (PTransform)GenerateSequence.from((long)0L).to((long)numberOfRows))).apply("Prepare TestRows", (PTransform)ParDo.of((DoFn)new TestRow.DeterministicallyConstructTestRowFn()))).apply("Prepare Kinesis input records", (PTransform)ParDo.of((DoFn)new ConvertToBytes()))).apply("Write to Kinesis", (PTransform)org.apache.beam.sdk.io.kinesis.KinesisIO.write().withStreamName(options.getAwsKinesisStream()).withPartitioner((KinesisPartitioner)new RandomPartitioner()).withAWSClientsProvider(options.getAwsAccessKey(), options.getAwsSecretKey(), Regions.fromName((String)options.getAwsKinesisRegion())));
        this.pipelineWrite.run().waitUntilFinish();
    }

    private void runRead() {
        PCollection output = (PCollection)this.pipelineRead.apply((PTransform)KinesisIO.read().withStreamName(options.getAwsKinesisStream()).withAWSClientsProvider(options.getAwsAccessKey(), options.getAwsSecretKey(), Region.of((String)options.getAwsKinesisRegion())).withMaxNumRecords((long)numberOfRows).withMaxReadTime(Duration.standardMinutes((long)10L)).withInitialPositionInStream(InitialPositionInStream.AT_TIMESTAMP).withInitialTimestampInStream(now).withRequestRecordsLimit(1000));
        PAssert.thatSingleton((PCollection)((PCollection)output.apply("Count All", Count.globally()))).isEqualTo((Object)numberOfRows);
        PCollection consolidatedHashcode = (PCollection)((PCollection)output.apply((PTransform)ParDo.of((DoFn)new ExtractDataValues()))).apply("Hash row contents", (PTransform)Combine.globally((CombineFnBase.GlobalCombineFn)new HashingFn()).withoutDefaults());
        PAssert.that((PCollection)consolidatedHashcode).containsInAnyOrder((Object[])new String[]{TestRow.getExpectedHashForRowCount((int)numberOfRows)});
        this.pipelineRead.run().waitUntilFinish();
    }

    static {
        now = Instant.now();
    }

    private static final class RandomPartitioner
    implements KinesisPartitioner {
        private RandomPartitioner() {
        }

        public String getPartitionKey(byte[] value) {
            Random rand = new Random();
            int n = rand.nextInt(numberOfShards) + 1;
            return String.valueOf(n);
        }

        public String getExplicitHashKey(byte[] value) {
            return null;
        }
    }

    private static class ExtractDataValues
    extends DoFn<KinesisRecord, String> {
        private ExtractDataValues() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c) {
            c.output((Object)new String(((KinesisRecord)c.element()).getDataAsBytes(), StandardCharsets.UTF_8));
        }
    }

    private static class ConvertToBytes
    extends DoFn<TestRow, byte[]> {
        private ConvertToBytes() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c) {
            c.output((Object)String.valueOf(((TestRow)c.element()).name()).getBytes(StandardCharsets.UTF_8));
        }
    }
}

