package org.apache.beam.sdk.io.kinesis;

import com.amazonaws.regions.Regions;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.Random;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.common.HashingFn;
import org.apache.beam.sdk.io.common.TestRow;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/kinesis/KinesisIOIT.class */
public class KinesisIOIT implements Serializable {
    private static int numberOfShards;
    private static int numberOfRows;

    @Rule
    public TestPipeline pipelineWrite = TestPipeline.create();

    @Rule
    public TestPipeline pipelineRead = TestPipeline.create();
    private static KinesisTestOptions options;
    private static final Instant now = Instant.now();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/kinesis/KinesisIOIT$ConvertToBytes.class */
    public static class ConvertToBytes extends DoFn<TestRow, byte[]> {
        private ConvertToBytes() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<TestRow, byte[]>.ProcessContext processContext) {
            processContext.output(String.valueOf(((TestRow) processContext.element()).name()).getBytes(StandardCharsets.UTF_8));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/kinesis/KinesisIOIT$ExtractDataValues.class */
    public static class ExtractDataValues extends DoFn<KinesisRecord, String> {
        private ExtractDataValues() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<KinesisRecord, String>.ProcessContext processContext) {
            processContext.output(new String(((KinesisRecord) processContext.element()).getDataAsBytes(), StandardCharsets.UTF_8));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/kinesis/KinesisIOIT$RandomPartitioner.class */
    public static final class RandomPartitioner implements KinesisPartitioner {
        private RandomPartitioner() {
        }

        public String getPartitionKey(byte[] bArr) {
            return String.valueOf(new Random().nextInt(KinesisIOIT.numberOfShards) + 1);
        }

        public String getExplicitHashKey(byte[] bArr) {
            return null;
        }
    }

    @BeforeClass
    public static void setup() {
        PipelineOptionsFactory.register(KinesisTestOptions.class);
        options = TestPipeline.testingPipelineOptions().as(KinesisTestOptions.class);
        numberOfShards = options.getNumberOfShards().intValue();
        numberOfRows = options.getNumberOfRecords().intValue();
    }

    @Test
    public void testWriteThenRead() {
        runWrite();
        runRead();
    }

    private void runWrite() {
        this.pipelineWrite.apply("Generate Sequence", GenerateSequence.from(0L).to(numberOfRows)).apply("Prepare TestRows", ParDo.of(new TestRow.DeterministicallyConstructTestRowFn())).apply("Prepare Kinesis input records", ParDo.of(new ConvertToBytes())).apply("Write to Kinesis", KinesisIO.write().withStreamName(options.getAwsKinesisStream()).withPartitioner(new RandomPartitioner()).withAWSClientsProvider(options.getAwsAccessKey(), options.getAwsSecretKey(), Regions.fromName(options.getAwsKinesisRegion())));
        this.pipelineWrite.run().waitUntilFinish();
    }

    private void runRead() {
        PCollection apply = this.pipelineRead.apply(KinesisIO.read().withStreamName(options.getAwsKinesisStream()).withAWSClientsProvider(options.getAwsAccessKey(), options.getAwsSecretKey(), Regions.fromName(options.getAwsKinesisRegion())).withMaxNumRecords(numberOfRows).withMaxReadTime(Duration.standardMinutes(10L)).withInitialPositionInStream(InitialPositionInStream.AT_TIMESTAMP).withInitialTimestampInStream(now).withRequestRecordsLimit(1000));
        PAssert.thatSingleton(apply.apply("Count All", Count.globally())).isEqualTo(Long.valueOf(numberOfRows));
        PAssert.that(apply.apply(ParDo.of(new ExtractDataValues())).apply("Hash row contents", Combine.globally(new HashingFn()).withoutDefaults())).containsInAnyOrder(new String[]{TestRow.getExpectedHashForRowCount(numberOfRows)});
        this.pipelineRead.run().waitUntilFinish();
    }
}
