/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.aws2.kinesis.testing;

import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.aws2.ITEnvironment;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisIO;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisPartitioner;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisRecord;
import org.apache.beam.sdk.io.common.HashingFn;
import org.apache.beam.sdk.io.common.TestRow;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineFnBase;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExternalResource;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.testcontainers.containers.localstack.LocalStackContainer;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.ConsumerStatus;
import software.amazon.awssdk.services.kinesis.model.DeleteStreamRequest;
import software.amazon.awssdk.services.kinesis.model.DeregisterStreamConsumerRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamConsumerRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamConsumerResponse;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
import software.amazon.awssdk.services.kinesis.model.RegisterStreamConsumerRequest;
import software.amazon.awssdk.services.kinesis.model.RegisterStreamConsumerResponse;
import software.amazon.awssdk.services.kinesis.model.StreamStatus;
import software.amazon.kinesis.common.InitialPositionInStream;

@RunWith(value=JUnit4.class)
public class KinesisIOIT
implements Serializable {
    @ClassRule
    public static ITEnvironment<ITOptions> env = new ITEnvironment<ITOptions>(LocalStackContainer.Service.KINESIS, ITOptions.class, "KINESIS_ERROR_PROBABILITY=0.01", "USE_SSL=true");
    private static Instant now = Instant.now();
    private final StringBuilder createdConsumerArn = new StringBuilder();
    @Rule
    public TestPipeline writePipeline = env.createTestPipeline();
    @Rule
    public TestPipeline readPipeline = env.createTestPipeline();
    @Rule
    public ExternalResource kinesisStream = CreateStream.optionally(env.options());
    @Rule
    public ExternalResource kinesisConsumer = CreateConsumer.optionally(env.options(), this.createdConsumerArn);
    private static final SerializableFunction<TestRow, byte[]> testRowToBytes = (SerializableFunction & Serializable)row -> row.name().getBytes(StandardCharsets.UTF_8);

    @Test
    public void testWriteThenRead() {
        if (env.options().getUseLocalstack().booleanValue()) {
            now = Instant.ofEpochMilli((long)Long.divideUnsigned(now.getMillis(), 1000L));
        }
        this.runWrite();
        this.runRead();
    }

    private void runWrite() {
        ITOptions options = env.options();
        KinesisIO.Write write = KinesisIO.write().withStreamName(env.options().getKinesisStream()).withPartitioner((KinesisPartitioner & Serializable)row -> row.name()).withSerializer(testRowToBytes);
        if (!options.getUseRecordAggregation().booleanValue()) {
            write = write.withRecordAggregationDisabled();
        }
        ((PCollection)((PCollection)this.writePipeline.apply("Generate Sequence", (PTransform)GenerateSequence.from((long)0L).to((long)options.getNumberOfRows().intValue()))).apply("Prepare TestRows", (PTransform)ParDo.of((DoFn)new TestRow.DeterministicallyConstructTestRowFn()))).apply("Write to Kinesis", (PTransform)write);
        this.writePipeline.run().waitUntilFinish();
    }

    private void runRead() {
        ITOptions options = env.options();
        int records = env.options().getNumberOfRows();
        String finalConsumerArn = this.createdConsumerArn.length() != 0 ? this.createdConsumerArn.toString() : options.getConsumerArn();
        PCollection output = (PCollection)this.readPipeline.apply((PTransform)KinesisIO.read().withStreamName(options.getKinesisStream()).withConsumerArn(finalConsumerArn).withMaxNumRecords((long)records).withMaxReadTime(Duration.standardMinutes((long)5L)).withInitialPositionInStream(InitialPositionInStream.AT_TIMESTAMP).withInitialTimestampInStream(now));
        PAssert.thatSingleton((PCollection)((PCollection)output.apply("Count All", Count.globally()))).isEqualTo((Object)records);
        PCollection consolidatedHashcode = (PCollection)((PCollection)output.apply((PTransform)ParDo.of((DoFn)new ExtractDataValues()))).apply((PTransform)Combine.globally((CombineFnBase.GlobalCombineFn)new HashingFn()).withoutDefaults());
        PAssert.that((PCollection)consolidatedHashcode).containsInAnyOrder((Object[])new String[]{TestRow.getExpectedHashForRowCount((int)records)});
        this.readPipeline.run().waitUntilFinish();
    }

    private static DescribeStreamResponse pollStreamInfo(KinesisClient client, String streamName, int attempts) throws InterruptedException {
        for (int i = 0; i <= attempts; ++i) {
            DescribeStreamResponse streamInfo = client.describeStream(b -> b.streamName(streamName));
            if (StreamStatus.ACTIVE == streamInfo.streamDescription().streamStatus()) {
                return streamInfo;
            }
            Thread.sleep(1000L);
        }
        throw new RuntimeException("Stream did not become active");
    }

    private static DescribeStreamConsumerResponse pollConsumerInfo(KinesisClient client, DescribeStreamResponse streamInfo, RegisterStreamConsumerResponse consumerInfo, int attempts) throws InterruptedException {
        for (int i = 0; i <= attempts; ++i) {
            DescribeStreamConsumerRequest request = (DescribeStreamConsumerRequest)DescribeStreamConsumerRequest.builder().streamARN(streamInfo.streamDescription().streamARN()).consumerARN(consumerInfo.consumer().consumerARN()).build();
            DescribeStreamConsumerResponse newConsumerInfo = client.describeStreamConsumer(request);
            if (newConsumerInfo.consumerDescription().consumerStatus() == ConsumerStatus.ACTIVE) {
                return newConsumerInfo;
            }
            Thread.sleep(1000L);
        }
        throw new RuntimeException("Consumer did not become active");
    }

    private static class ExtractDataValues
    extends DoFn<KinesisRecord, String> {
        private ExtractDataValues() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c) {
            c.output((Object)new String(((KinesisRecord)c.element()).getDataAsBytes(), StandardCharsets.UTF_8));
        }
    }

    static class CreateConsumer
    extends ExternalResource {
        private StringBuilder createdConsumerArn;
        private final String name = env.options().getKinesisStream();
        private final KinesisClient client = (KinesisClient)env.buildClient(KinesisClient.builder());

        CreateConsumer(StringBuilder createdConsumerArn) {
            this.createdConsumerArn = createdConsumerArn;
        }

        static ExternalResource optionally(ITOptions opts, StringBuilder createdConsumerArn) {
            return opts.getCreateConsumerWithName() != null ? new CreateConsumer(createdConsumerArn) : new ExternalResource(){};
        }

        protected void before() throws Exception {
            DescribeStreamResponse streamInfo = KinesisIOIT.pollStreamInfo(this.client, this.name, 15);
            String consumerName = env.options().getCreateConsumerWithName();
            RegisterStreamConsumerResponse consumerInfo = this.client.registerStreamConsumer((RegisterStreamConsumerRequest)RegisterStreamConsumerRequest.builder().streamARN(streamInfo.streamDescription().streamARN()).consumerName(consumerName).build());
            DescribeStreamConsumerResponse newConsumerInfo = KinesisIOIT.pollConsumerInfo(this.client, streamInfo, consumerInfo, 10);
            this.createdConsumerArn.append(newConsumerInfo.consumerDescription().consumerARN());
        }

        protected void after() {
            String arn = this.createdConsumerArn.toString();
            if (!arn.isEmpty()) {
                this.client.deregisterStreamConsumer(b -> {
                    DeregisterStreamConsumerRequest cfr_ignored_0 = (DeregisterStreamConsumerRequest)b.consumerARN(arn).build();
                });
            }
            this.client.close();
        }
    }

    static class CreateStream
    extends ExternalResource {
        private final String name = env.options().getKinesisStream();
        private final int shards = env.options().getKinesisShards();
        private final KinesisClient client = (KinesisClient)env.buildClient(KinesisClient.builder());

        CreateStream() {
        }

        static ExternalResource optionally(ITOptions opts) {
            boolean create = opts.getCreateStream() != false || opts.getUseLocalstack() != false;
            return create ? new CreateStream() : new ExternalResource(){};
        }

        protected void before() throws Exception {
            this.client.createStream(b -> b.streamName(this.name).shardCount(Integer.valueOf(this.shards)));
            KinesisIOIT.pollStreamInfo(this.client, this.name, 10);
        }

        protected void after() {
            this.client.deleteStream(b -> {
                DeleteStreamRequest cfr_ignored_0 = (DeleteStreamRequest)b.streamName(this.name).build();
            });
            this.client.close();
        }
    }

    public static interface ITOptions
    extends ITEnvironment.ITOptions {
        @Description(value="Kinesis stream name")
        @Default.String(value="beam-kinesisio-it")
        public String getKinesisStream();

        public void setKinesisStream(String var1);

        @Description(value="Kinesis consumer ARN - set if want to test EFO")
        public @Nullable String getConsumerArn();

        public void setConsumerArn(@Nullable String var1);

        @Description(value="Number of shards of stream")
        @Default.Integer(value=8)
        public Integer getKinesisShards();

        public void setKinesisShards(Integer var1);

        @Description(value="Use record aggregation when writing to Kinesis")
        @Default.Boolean(value=true)
        public Boolean getUseRecordAggregation();

        public void setUseRecordAggregation(Boolean var1);

        @Description(value="Create stream")
        @Default.Boolean(value=false)
        public Boolean getCreateStream();

        public void setCreateStream(Boolean var1);

        @Description(value="Create EFO consumer with the given name. If set, consumer ARN conf will be ignored.")
        public @Nullable String getCreateConsumerWithName();

        public void setCreateConsumerWithName(@Nullable String var1);
    }
}

