package org.apache.beam.sdk.io.aws2.sqs.testing;

import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.util.Objects;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.aws2.ITEnvironment;
import org.apache.beam.sdk.io.aws2.sqs.SqsIO;
import org.apache.beam.sdk.io.common.HashingFn;
import org.apache.beam.sdk.io.common.TestRow;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExternalResource;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.testcontainers.containers.localstack.LocalStackContainer;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/aws2/sqs/testing/SqsIOIT.class */
public class SqsIOIT {
    private static final TypeDescriptor<SendMessageRequest> requestType = TypeDescriptor.of(SendMessageRequest.class);

    @ClassRule
    public static ITEnvironment<SqsITOptions> env = new ITEnvironment<>(LocalStackContainer.Service.SQS, SqsITOptions.class, "SQS_PROVIDER=elasticmq");

    @Rule
    public Timeout globalTimeout = Timeout.seconds(600);

    @Rule
    public TestPipeline pipelineWrite = env.createTestPipeline();

    @Rule
    public TestPipeline pipelineRead = env.createTestPipeline();

    @Rule
    public SqsQueue sqsQueue = new SqsQueue();

    /* loaded from: input_file:org/apache/beam/sdk/io/aws2/sqs/testing/SqsIOIT$SqsITOptions.class */
    public interface SqsITOptions extends ITEnvironment.ITOptions {
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/aws2/sqs/testing/SqsIOIT$SqsQueue.class */
    private static class SqsQueue extends ExternalResource implements Serializable {
        private transient SqsClient client;
        private String url;

        private SqsQueue() {
            this.client = (SqsClient) SqsIOIT.env.buildClient(SqsClient.builder());
        }

        SendMessageRequest messageRequest(TestRow testRow) {
            return (SendMessageRequest) SendMessageRequest.builder().queueUrl(this.url).messageBody(testRow.name()).build();
        }

        protected void before() throws Throwable {
            this.url = this.client.createQueue(builder -> {
                builder.queueName("beam-sqsio-it");
            }).queueUrl();
        }

        protected void after() {
            this.client.deleteQueue(builder -> {
                builder.queueUrl(this.url);
            });
            this.client.close();
        }
    }

    @Test
    public void testWriteThenRead() {
        int intValue = env.options().getNumberOfRows().intValue();
        PCollection apply = this.pipelineWrite.apply("Generate Sequence", GenerateSequence.from(0L).to(intValue)).apply("Prepare TestRows", ParDo.of(new TestRow.DeterministicallyConstructTestRowFn()));
        MapElements into = MapElements.into(requestType);
        SqsQueue sqsQueue = this.sqsQueue;
        Objects.requireNonNull(sqsQueue);
        apply.apply("Prepare SQS message", into.via(sqsQueue::messageRequest)).apply("Write to SQS", SqsIO.write());
        PCollection apply2 = this.pipelineRead.apply("Read from SQS", SqsIO.read().withQueueUrl(this.sqsQueue.url).withMaxNumRecords(intValue)).apply("Extract body", MapElements.into(TypeDescriptors.strings()).via((v0) -> {
            return v0.getBody();
        }));
        PAssert.thatSingleton(apply2.apply("Count All", Count.globally())).isEqualTo(Long.valueOf(intValue));
        PAssert.that(apply2.apply(Combine.globally(new HashingFn()).withoutDefaults())).containsInAnyOrder(new String[]{TestRow.getExpectedHashForRowCount(intValue)});
        this.pipelineWrite.run();
        this.pipelineRead.run();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -75652584:
                if (implMethodName.equals("getBody")) {
                    z = false;
                    break;
                }
                break;
            case 1952760968:
                if (implMethodName.equals("messageRequest")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/aws2/sqs/SqsMessage") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/String;")) {
                    return (v0) -> {
                        return v0.getBody();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/aws2/sqs/testing/SqsIOIT$SqsQueue") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/io/common/TestRow;)Lsoftware/amazon/awssdk/services/sqs/model/SendMessageRequest;")) {
                    SqsQueue sqsQueue = (SqsQueue) serializedLambda.getCapturedArg(0);
                    return sqsQueue::messageRequest;
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
