package org.apache.beam.sdk.io.aws.sqs;

import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import com.amazonaws.services.sqs.model.SendMessageRequest;
import java.util.ArrayList;
import java.util.Iterator;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.elasticmq.rest.sqs.SQSRestServer;
import org.elasticmq.rest.sqs.SQSRestServerBuilder;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExternalResource;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/aws/sqs/SqsIOTest.class */
public class SqsIOTest {

    @Rule
    public TestPipeline pipeline = TestPipeline.create();

    @Rule
    public EmbeddedSqsServer embeddedSqsRestServer = new EmbeddedSqsServer();

    /* loaded from: input_file:org/apache/beam/sdk/io/aws/sqs/SqsIOTest$EmbeddedSqsServer.class */
    private static class EmbeddedSqsServer extends ExternalResource {
        private SQSRestServer sqsRestServer;
        private AmazonSQS client;
        private String queueUrl;

        private EmbeddedSqsServer() {
        }

        protected void before() {
            this.sqsRestServer = SQSRestServerBuilder.start();
            this.client = (AmazonSQS) AmazonSQSClientBuilder.standard().withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("x", "x"))).withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration("http://localhost:9324", "elasticmq")).withRegion("elasticmq").build();
            this.queueUrl = this.client.createQueue("test").getQueueUrl();
        }

        protected void after() {
            this.sqsRestServer.stopAndWait();
        }

        public AmazonSQS getClient() {
            return this.client;
        }

        public String getQueueUrl() {
            return this.queueUrl;
        }
    }

    @Test
    public void testRead() {
        AmazonSQS client = this.embeddedSqsRestServer.getClient();
        String queueUrl = this.embeddedSqsRestServer.getQueueUrl();
        PAssert.thatSingleton(this.pipeline.apply(SqsIO.read().withQueueUrl(queueUrl).withMaxNumRecords(100L)).apply(Count.globally())).isEqualTo(100L);
        for (int i = 0; i < 100; i++) {
            client.sendMessage(queueUrl, "This is a test");
        }
        this.pipeline.run();
    }

    @Test
    public void testWrite() {
        AmazonSQS client = this.embeddedSqsRestServer.getClient();
        String queueUrl = this.embeddedSqsRestServer.getQueueUrl();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 100; i++) {
            arrayList.add(new SendMessageRequest(queueUrl, "This is a test " + i));
        }
        this.pipeline.apply(Create.of(arrayList)).apply(SqsIO.write());
        this.pipeline.run().waitUntilFinish();
        ArrayList arrayList2 = new ArrayList();
        while (arrayList2.size() < 100) {
            ReceiveMessageResult receiveMessage = client.receiveMessage(queueUrl);
            if (receiveMessage.getMessages() != null) {
                Iterator it = receiveMessage.getMessages().iterator();
                while (it.hasNext()) {
                    arrayList2.add(((Message) it.next()).getBody());
                }
            }
        }
        Assert.assertEquals(100L, arrayList2.size());
        for (int i2 = 0; i2 < 100; i2++) {
            arrayList2.contains("This is a test " + i2);
        }
    }
}
