package org.apache.beam.sdk.io.aws2.kinesis.testing;

import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.nio.charset.StandardCharsets;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.aws2.ITEnvironment;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisIO;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisRecord;
import org.apache.beam.sdk.io.common.HashingFn;
import org.apache.beam.sdk.io.common.TestRow;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExternalResource;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.testcontainers.containers.localstack.LocalStackContainer;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.StreamStatus;
import software.amazon.kinesis.common.InitialPositionInStream;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/aws2/kinesis/testing/KinesisIOIT.class */
public class KinesisIOIT implements Serializable {

    @Rule
    public TestPipeline writePipeline = env.createTestPipeline();

    @Rule
    public TestPipeline readPipeline = env.createTestPipeline();

    @Rule
    public ExternalResource kinesisStream = CreateStream.optionally(env.options());

    @ClassRule
    public static ITEnvironment<ITOptions> env = new ITEnvironment<>(LocalStackContainer.Service.KINESIS, ITOptions.class, "KINESIS_ERROR_PROBABILITY=0.01", "USE_SSL=true");
    private static Instant now = Instant.now();
    private static final SerializableFunction<TestRow, byte[]> testRowToBytes = testRow -> {
        return testRow.name().getBytes(StandardCharsets.UTF_8);
    };

    /* loaded from: input_file:org/apache/beam/sdk/io/aws2/kinesis/testing/KinesisIOIT$CreateStream.class */
    static class CreateStream extends ExternalResource {
        private final String name = KinesisIOIT.env.options().getKinesisStream();
        private final int shards = KinesisIOIT.env.options().getKinesisShards().intValue();
        private final KinesisClient client = (KinesisClient) KinesisIOIT.env.buildClient(KinesisClient.builder());

        CreateStream() {
        }

        static ExternalResource optionally(ITOptions iTOptions) {
            return iTOptions.getCreateStream().booleanValue() || iTOptions.getUseLocalstack().booleanValue() ? new CreateStream() : new ExternalResource() { // from class: org.apache.beam.sdk.io.aws2.kinesis.testing.KinesisIOIT.CreateStream.1
            };
        }

        protected void before() throws Exception {
            this.client.createStream(builder -> {
                builder.streamName(this.name).shardCount(Integer.valueOf(this.shards));
            });
            for (int i = 0; i <= 10; i++) {
                if (StreamStatus.ACTIVE == this.client.describeStream(builder2 -> {
                    builder2.streamName(this.name);
                }).streamDescription().streamStatus()) {
                    return;
                }
                Thread.sleep(1000L);
            }
            throw new RuntimeException("Unable to initialize stream");
        }

        protected void after() {
            this.client.deleteStream(builder -> {
                builder.streamName(this.name).build();
            });
            this.client.close();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/aws2/kinesis/testing/KinesisIOIT$ExtractDataValues.class */
    public static class ExtractDataValues extends DoFn<KinesisRecord, String> {
        private ExtractDataValues() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<KinesisRecord, String>.ProcessContext processContext) {
            processContext.output(new String(((KinesisRecord) processContext.element()).getDataAsBytes(), StandardCharsets.UTF_8));
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/aws2/kinesis/testing/KinesisIOIT$ITOptions.class */
    public interface ITOptions extends ITEnvironment.ITOptions {
        @Default.String("beam-kinesisio-it")
        @Description("Kinesis stream name")
        String getKinesisStream();

        void setKinesisStream(String str);

        @Description("Number of shards of stream")
        @Default.Integer(8)
        Integer getKinesisShards();

        void setKinesisShards(Integer num);

        @Description("Use record aggregation when writing to Kinesis")
        @Default.Boolean(true)
        Boolean getUseRecordAggregation();

        void setUseRecordAggregation(Boolean bool);

        @Description("Create stream")
        @Default.Boolean(false)
        Boolean getCreateStream();

        void setCreateStream(Boolean bool);
    }

    @Test
    public void testWriteThenRead() {
        if (env.options().getUseLocalstack().booleanValue()) {
            now = Instant.ofEpochMilli(Long.divideUnsigned(now.getMillis(), 1000L));
        }
        runWrite();
        runRead();
    }

    private void runWrite() {
        ITOptions options = env.options();
        KinesisIO.Write withSerializer = KinesisIO.write().withStreamName(env.options().getKinesisStream()).withPartitioner(testRow -> {
            return testRow.name();
        }).withSerializer(testRowToBytes);
        if (!options.getUseRecordAggregation().booleanValue()) {
            withSerializer = withSerializer.withRecordAggregationDisabled();
        }
        this.writePipeline.apply("Generate Sequence", GenerateSequence.from(0L).to(options.getNumberOfRows().intValue())).apply("Prepare TestRows", ParDo.of(new TestRow.DeterministicallyConstructTestRowFn())).apply("Write to Kinesis", withSerializer);
        this.writePipeline.run().waitUntilFinish();
    }

    private void runRead() {
        ITOptions options = env.options();
        int intValue = env.options().getNumberOfRows().intValue();
        PCollection apply = this.readPipeline.apply(KinesisIO.read().withStreamName(options.getKinesisStream()).withMaxNumRecords(intValue).withMaxReadTime(Duration.standardMinutes(5L)).withInitialPositionInStream(InitialPositionInStream.AT_TIMESTAMP).withInitialTimestampInStream(now));
        PAssert.thatSingleton(apply.apply("Count All", Count.globally())).isEqualTo(Long.valueOf(intValue));
        PAssert.that(apply.apply(ParDo.of(new ExtractDataValues())).apply(Combine.globally(new HashingFn()).withoutDefaults())).containsInAnyOrder(new String[]{TestRow.getExpectedHashForRowCount(intValue)});
        this.readPipeline.run().waitUntilFinish();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1565544829:
                if (implMethodName.equals("lambda$runWrite$a5baa056$1")) {
                    z = false;
                    break;
                }
                break;
            case -1001959318:
                if (implMethodName.equals("lambda$static$967b6431$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/io/aws2/kinesis/KinesisPartitioner") && serializedLambda.getFunctionalInterfaceMethodName().equals("getPartitionKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/String;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/aws2/kinesis/testing/KinesisIOIT") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/io/common/TestRow;)Ljava/lang/String;")) {
                    return testRow -> {
                        return testRow.name();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/aws2/kinesis/testing/KinesisIOIT") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/io/common/TestRow;)[B")) {
                    return testRow2 -> {
                        return testRow2.name().getBytes(StandardCharsets.UTF_8);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
