/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.aws2.sns.testing;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.Serializable;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.aws2.ITEnvironment;
import org.apache.beam.sdk.io.aws2.sns.SnsIO;
import org.apache.beam.sdk.io.aws2.sqs.SqsIO;
import org.apache.beam.sdk.io.aws2.sqs.SqsMessage;
import org.apache.beam.sdk.io.common.HashingFn;
import org.apache.beam.sdk.io.common.IOITHelper;
import org.apache.beam.sdk.io.common.TestRow;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineFnBase;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExternalResource;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.testcontainers.containers.localstack.LocalStackContainer;
import software.amazon.awssdk.services.sns.SnsClient;
import software.amazon.awssdk.services.sns.model.PublishRequest;
import software.amazon.awssdk.services.sqs.SqsClient;

@RunWith(value=JUnit4.class)
public class SnsIOIT {
    private static final ObjectMapper MAPPER = new ObjectMapper();
    @ClassRule
    public static ITEnvironment<ITOptions> env = new ITEnvironment<ITOptions>(new LocalStackContainer.Service[]{LocalStackContainer.Service.SQS, LocalStackContainer.Service.SNS}, ITOptions.class, "SQS_PROVIDER=elasticmq");
    @Rule
    public Timeout globalTimeout = Timeout.seconds((long)600L);
    @Rule
    public TestPipeline pipelineWrite = env.createTestPipeline();
    @Rule
    public TestPipeline pipelineRead = env.createTestPipeline();
    @Rule
    public AwsResources resources = new AwsResources();

    @Test
    public void testWriteThenRead() {
        ITOptions opts = env.options();
        int rows = opts.getNumberOfRows();
        ((PCollection)((PCollection)this.pipelineWrite.apply("Generate Sequence", (PTransform)GenerateSequence.from((long)0L).to((long)rows))).apply("Prepare TestRows", (PTransform)ParDo.of((DoFn)new TestRow.DeterministicallyConstructTestRowFn()))).apply("Write to SNS", (PTransform)SnsIO.write().withTopicArn(this.resources.snsTopic).withPublishRequestBuilder((SerializableFunction & Serializable)r -> PublishRequest.builder().message(r.name())));
        PCollection output = (PCollection)((PCollection)this.pipelineRead.apply("Read from SQS", (PTransform)SqsIO.read().withQueueUrl(this.resources.sqsQueue).withMaxNumRecords((long)rows))).apply("Extract message", (PTransform)MapElements.into((TypeDescriptor)TypeDescriptors.strings()).via(SnsIOIT::extractMessage));
        PAssert.thatSingleton((PCollection)((PCollection)output.apply("Count All", Count.globally()))).isEqualTo((Object)rows);
        PAssert.that((PCollection)((PCollection)output.apply((PTransform)Combine.globally((CombineFnBase.GlobalCombineFn)new HashingFn()).withoutDefaults()))).containsInAnyOrder((Object[])new String[]{TestRow.getExpectedHashForRowCount((int)rows)});
        this.pipelineWrite.run();
        this.pipelineRead.run();
    }

    private static String extractMessage(SqsMessage msg) {
        try {
            return MAPPER.readTree(msg.getBody()).get("Message").asText();
        }
        catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }

    private static class AwsResources
    extends ExternalResource
    implements Serializable {
        private transient SqsClient sqs = (SqsClient)env.buildClient(SqsClient.builder());
        private transient SnsClient sns = (SnsClient)env.buildClient(SnsClient.builder());
        private String sqsQueue;
        private String snsTopic;
        private String sns2Sqs;

        private AwsResources() {
        }

        protected void before() throws Throwable {
            this.snsTopic = this.sns.createTopic(b -> b.name("beam-snsio-it")).topicArn();
            this.sqsQueue = this.sqs.createQueue(b -> b.queueName("beam-snsio-it")).queueUrl();
            this.sns2Sqs = this.sns.subscribe(b -> b.topicArn(this.snsTopic).endpoint(this.sqsQueue).protocol("sqs")).subscriptionArn();
        }

        protected void after() {
            try {
                IOITHelper.executeWithRetry(() -> this.sns.unsubscribe(b -> b.subscriptionArn(this.sns2Sqs)));
                IOITHelper.executeWithRetry(() -> this.sns.deleteTopic(b -> b.topicArn(this.snsTopic)));
                IOITHelper.executeWithRetry(() -> this.sqs.deleteQueue(b -> b.queueUrl(this.sqsQueue)));
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
            finally {
                this.sns.close();
                this.sqs.close();
            }
        }
    }

    public static interface ITOptions
    extends ITEnvironment.ITOptions {
    }
}

