/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.aws2.sqs.testing;

import java.io.Serializable;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.aws2.ITEnvironment;
import org.apache.beam.sdk.io.aws2.sqs.SqsIO;
import org.apache.beam.sdk.io.aws2.sqs.SqsMessage;
import org.apache.beam.sdk.io.common.HashingFn;
import org.apache.beam.sdk.io.common.TestRow;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineFnBase;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExternalResource;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.testcontainers.containers.localstack.LocalStackContainer;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;

@RunWith(value=JUnit4.class)
public class SqsIOIT {
    private static final TypeDescriptor<SendMessageRequest> requestType = TypeDescriptor.of(SendMessageRequest.class);
    @ClassRule
    public static ITEnvironment<SqsITOptions> env = new ITEnvironment<SqsITOptions>(LocalStackContainer.Service.SQS, SqsITOptions.class, "SQS_PROVIDER=elasticmq");
    @Rule
    public Timeout globalTimeout = Timeout.seconds((long)600L);
    @Rule
    public TestPipeline pipelineWrite = env.createTestPipeline();
    @Rule
    public TestPipeline pipelineRead = env.createTestPipeline();
    @Rule
    public SqsQueue sqsQueue = new SqsQueue();

    @Test
    public void testWriteThenRead() {
        int rows = env.options().getNumberOfRows();
        ((PCollection)((PCollection)((PCollection)this.pipelineWrite.apply("Generate Sequence", (PTransform)GenerateSequence.from((long)0L).to((long)rows))).apply("Prepare TestRows", (PTransform)ParDo.of((DoFn)new TestRow.DeterministicallyConstructTestRowFn()))).apply("Prepare SQS message", (PTransform)MapElements.into(requestType).via(this.sqsQueue::messageRequest))).apply("Write to SQS", (PTransform)SqsIO.write());
        PCollection output = (PCollection)((PCollection)this.pipelineRead.apply("Read from SQS", (PTransform)SqsIO.read().withQueueUrl(this.sqsQueue.url).withMaxNumRecords((long)rows))).apply("Extract body", (PTransform)MapElements.into((TypeDescriptor)TypeDescriptors.strings()).via(SqsMessage::getBody));
        PAssert.thatSingleton((PCollection)((PCollection)output.apply("Count All", Count.globally()))).isEqualTo((Object)rows);
        PAssert.that((PCollection)((PCollection)output.apply((PTransform)Combine.globally((CombineFnBase.GlobalCombineFn)new HashingFn()).withoutDefaults()))).containsInAnyOrder((Object[])new String[]{TestRow.getExpectedHashForRowCount((int)rows)});
        this.pipelineWrite.run();
        this.pipelineRead.run();
    }

    @Test
    public void testWriteBatchesThenRead() {
        int rows = env.options().getNumberOfRows();
        ((PCollection)((PCollection)this.pipelineWrite.apply("Generate Sequence", (PTransform)GenerateSequence.from((long)0L).to((long)rows))).apply("Prepare TestRows", (PTransform)ParDo.of((DoFn)new TestRow.DeterministicallyConstructTestRowFn()))).apply("Write to SQS", (PTransform)SqsIO.writeBatches().withEntryMapper((SqsIO.WriteBatches.EntryMapperFn.Builder & Serializable)(b, row) -> b.messageBody(row.name())).to(this.sqsQueue.url));
        PCollection output = (PCollection)((PCollection)this.pipelineRead.apply("Read from SQS", (PTransform)SqsIO.read().withQueueUrl(this.sqsQueue.url).withMaxNumRecords((long)rows))).apply("Extract body", (PTransform)MapElements.into((TypeDescriptor)TypeDescriptors.strings()).via(SqsMessage::getBody));
        PAssert.thatSingleton((PCollection)((PCollection)output.apply("Count All", Count.globally()))).isEqualTo((Object)rows);
        PAssert.that((PCollection)((PCollection)output.apply((PTransform)Combine.globally((CombineFnBase.GlobalCombineFn)new HashingFn()).withoutDefaults()))).containsInAnyOrder((Object[])new String[]{TestRow.getExpectedHashForRowCount((int)rows)});
        this.pipelineWrite.run();
        this.pipelineRead.run();
    }

    private static class SqsQueue
    extends ExternalResource
    implements Serializable {
        private transient SqsClient client = (SqsClient)env.buildClient(SqsClient.builder());
        private String url;

        private SqsQueue() {
        }

        SendMessageRequest messageRequest(TestRow r) {
            return (SendMessageRequest)SendMessageRequest.builder().queueUrl(this.url).messageBody(r.name()).build();
        }

        protected void before() throws Throwable {
            this.url = this.client.createQueue(b -> b.queueName("beam-sqsio-it-" + RandomStringUtils.randomAlphanumeric((int)4))).queueUrl();
        }

        protected void after() {
            this.client.deleteQueue(b -> b.queueUrl(this.url));
            this.client.close();
        }
    }

    public static interface SqsITOptions
    extends ITEnvironment.ITOptions {
    }
}

