/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.aws.sqs;

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.CreateQueueResult;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import com.amazonaws.services.sqs.model.SendMessageRequest;
import java.util.ArrayList;
import org.apache.beam.sdk.io.aws.sqs.SqsIO;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.elasticmq.rest.sqs.SQSRestServer;
import org.elasticmq.rest.sqs.SQSRestServerBuilder;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExternalResource;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class SqsIOTest {
    @Rule
    public TestPipeline pipeline = TestPipeline.create();
    @Rule
    public EmbeddedSqsServer embeddedSqsRestServer = new EmbeddedSqsServer();

    @Test
    public void testRead() {
        AmazonSQS client = this.embeddedSqsRestServer.getClient();
        String queueUrl = this.embeddedSqsRestServer.getQueueUrl();
        PCollection output = (PCollection)this.pipeline.apply((PTransform)SqsIO.read().withQueueUrl(queueUrl).withMaxNumRecords(100L));
        PAssert.thatSingleton((PCollection)((PCollection)output.apply(Count.globally()))).isEqualTo((Object)100L);
        for (int i = 0; i < 100; ++i) {
            client.sendMessage(queueUrl, "This is a test");
        }
        this.pipeline.run();
    }

    @Test
    public void testWrite() {
        AmazonSQS client = this.embeddedSqsRestServer.getClient();
        String queueUrl = this.embeddedSqsRestServer.getQueueUrl();
        ArrayList<SendMessageRequest> messages = new ArrayList<SendMessageRequest>();
        for (int i = 0; i < 100; ++i) {
            SendMessageRequest request = new SendMessageRequest(queueUrl, "This is a test " + i);
            messages.add(request);
        }
        ((PCollection)this.pipeline.apply((PTransform)Create.of(messages))).apply((PTransform)SqsIO.write());
        this.pipeline.run().waitUntilFinish();
        ArrayList<String> received = new ArrayList<String>();
        while (received.size() < 100) {
            ReceiveMessageResult receiveMessageResult = client.receiveMessage(queueUrl);
            if (receiveMessageResult.getMessages() == null) continue;
            for (Message message : receiveMessageResult.getMessages()) {
                received.add(message.getBody());
            }
        }
        Assert.assertEquals((long)100L, (long)received.size());
        for (int i = 0; i < 100; ++i) {
            received.contains("This is a test " + i);
        }
    }

    private static class EmbeddedSqsServer
    extends ExternalResource {
        private SQSRestServer sqsRestServer;
        private AmazonSQS client;
        private String queueUrl;

        private EmbeddedSqsServer() {
        }

        protected void before() {
            this.sqsRestServer = SQSRestServerBuilder.start();
            String endpoint = "http://localhost:9324";
            String region = "elasticmq";
            String accessKey = "x";
            String secretKey = "x";
            this.client = (AmazonSQS)((AmazonSQSClientBuilder)((AmazonSQSClientBuilder)AmazonSQSClientBuilder.standard().withCredentials((AWSCredentialsProvider)new AWSStaticCredentialsProvider((AWSCredentials)new BasicAWSCredentials(accessKey, secretKey)))).withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpoint, region))).build();
            CreateQueueResult queue = this.client.createQueue("test");
            this.queueUrl = queue.getQueueUrl();
        }

        protected void after() {
            this.sqsRestServer.stopAndWait();
        }

        public AmazonSQS getClient() {
            return this.client;
        }

        public String getQueueUrl() {
            return this.queueUrl;
        }
    }
}

