package io.camunda.connector.e2e;

import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.sns.AmazonSNS;
import com.amazonaws.services.sns.AmazonSNSClientBuilder;
import com.amazonaws.services.sns.model.CreateTopicRequest;
import com.amazonaws.services.sns.model.CreateTopicResult;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.Message;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.camunda.connector.sns.outbound.model.TopicType;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.model.bpmn.BpmnModelInstance;
import io.camunda.zeebe.process.test.assertions.BpmnAssert;
import java.io.File;
import java.util.List;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.localstack.LocalStackContainer;

/* loaded from: input_file:io/camunda/connector/e2e/AwsSnsTest.class */
public class AwsSnsTest extends BaseAwsTest {
    private static final String ELEMENT_TEMPLATE_PATH = "../../../connectors/aws/aws-sns/element-templates/aws-sns-outbound-connector.json";
    private static final String TEST_TOPIC_NAME = "test-sns-topic";
    private static final String TEST_QUEUE_NAME = "test-sqs-sqs-queue";
    private static final String MESSAGE_GROUP_ID = "messageGroupId";
    private static final String MESSAGE_DEDUPLICATION_ID = "messageDeduplicationId";
    private static final String EXPECTED_SNS_MESSAGE = "{\"message\":\"Hello, AWS SNS e2e testing world!\"}";
    private String topicArn;
    private String topicFifoArn;
    private AmazonSQS sqsClient;
    private AmazonSNS snsClient;

    @BeforeEach
    public void init() {
        this.snsClient = (AmazonSNS) AmazonSNSClientBuilder.standard().withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(localstack.getAccessKey(), localstack.getSecretKey()))).withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(localstack.getEndpointOverride(LocalStackContainer.Service.SNS).toString(), localstack.getRegion())).build();
        CreateTopicResult createTopic = this.snsClient.createTopic(new CreateTopicRequest(TEST_TOPIC_NAME));
        CreateTopicResult createTopic2 = this.snsClient.createTopic(new CreateTopicRequest("test-sns-topic.fifo").addAttributesEntry("FifoTopic", "true").addAttributesEntry("ContentBasedDeduplication", "true"));
        this.topicArn = createTopic.getTopicArn();
        this.topicArn = createTopic.getTopicArn();
        this.topicFifoArn = createTopic2.getTopicArn();
        this.sqsClient = AwsTestHelper.initSqsClient(localstack);
    }

    @Test
    public void testSnsFunction() throws JsonProcessingException {
        String createQueue = AwsTestHelper.createQueue(this.sqsClient, TEST_QUEUE_NAME, false);
        this.snsClient.subscribe(this.topicArn, LocalStackContainer.Service.SQS.getLocalStackName(), AwsTestHelper.getQueueArn(this.sqsClient, createQueue));
        BpmnModelInstance done = Bpmn.createProcess().executable().startEvent().serviceTask("sns").endEvent().done();
        BpmnAssert.assertThat(ZeebeTest.with(this.zeebeClient).deploy(new BpmnFile(done).writeToFile(new File(this.tempDir, "testSns.bpmn")).apply(ElementTemplate.from(ELEMENT_TEMPLATE_PATH).property("authentication.type", "credentials").property("authentication.accessKey", localstack.getAccessKey()).property("authentication.secretKey", localstack.getSecretKey()).property("configuration.region", localstack.getRegion()).property("topic.topicArn", this.topicArn).property("topic.subject", "Sample Subject").property("topic.message", "=".concat(EXPECTED_SNS_MESSAGE)).property("topic.messageAttributes", "={\"AttributeKey1\":{\"DataType\":\"String\",\"StringValue\":\"AttributeValue1\"}, \"AttributeKey2\":{\"DataType\":\"Number\",\"StringValue\":\"123\"}}").property("retryCount", "0").property("resultVariable", "result").property("configuration.endpoint", localstack.getEndpoint().toString()).writeTo(new File(this.tempDir, "template.json")), "sns", new File(this.tempDir, "resultSns.bpmn"))).createInstance().waitForProcessCompletion().getProcessInstanceEvent()).hasVariable("result");
        List receiveMessages = AwsTestHelper.receiveMessages(this.sqsClient, createQueue);
        Assertions.assertFalse(receiveMessages.isEmpty(), "The SQS queue should have received a message");
        Assertions.assertEquals(EXPECTED_SNS_MESSAGE, new ObjectMapper().readTree(((Message) receiveMessages.getFirst()).getBody()).get("Message").asText(), "The received message content does not match the expected content");
    }

    @Test
    public void testSnsFifoFunction() throws JsonProcessingException {
        String createQueue = AwsTestHelper.createQueue(this.sqsClient, TEST_QUEUE_NAME.concat(".fifo"), true);
        this.snsClient.subscribe(this.topicFifoArn, LocalStackContainer.Service.SQS.getLocalStackName(), AwsTestHelper.getQueueArn(this.sqsClient, createQueue));
        BpmnModelInstance done = Bpmn.createProcess().executable().startEvent().serviceTask("sns").endEvent().done();
        BpmnAssert.assertThat(ZeebeTest.with(this.zeebeClient).deploy(new BpmnFile(done).writeToFile(new File(this.tempDir, "testSns.bpmn")).apply(ElementTemplate.from(ELEMENT_TEMPLATE_PATH).property("authentication.type", "credentials").property("authentication.accessKey", localstack.getAccessKey()).property("authentication.secretKey", localstack.getSecretKey()).property("configuration.region", localstack.getRegion()).property("topic.topicArn", this.topicFifoArn).property("topic.type", TopicType.fifo.name()).property("topic.messageGroupId", MESSAGE_GROUP_ID).property("topic.messageDeduplicationId", MESSAGE_DEDUPLICATION_ID).property("topic.subject", "Sample Subject").property("topic.message", "=".concat(EXPECTED_SNS_MESSAGE)).property("topic.messageAttributes", "={\"AttributeKey1\":{\"DataType\":\"String\",\"StringValue\":\"AttributeValue1\"}, \"AttributeKey2\":{\"DataType\":\"Number\",\"StringValue\":\"123\"}}").property("retryCount", "0").property("resultVariable", "result").property("configuration.endpoint", localstack.getEndpoint().toString()).writeTo(new File(this.tempDir, "template.json")), "sns", new File(this.tempDir, "resultSns.bpmn"))).createInstance().waitForProcessCompletion().getProcessInstanceEvent()).hasVariable("result");
        List receiveMessages = AwsTestHelper.receiveMessages(this.sqsClient, createQueue);
        Assertions.assertFalse(receiveMessages.isEmpty(), "The SQS queue should have received a message");
        Assertions.assertEquals(EXPECTED_SNS_MESSAGE, new ObjectMapper().readTree(((Message) receiveMessages.getFirst()).getBody()).get("Message").asText(), "The received message content does not match the expected content");
        String str = (String) ((Message) receiveMessages.getFirst()).getAttributes().get("MessageGroupId");
        String str2 = (String) ((Message) receiveMessages.getFirst()).getAttributes().get("MessageDeduplicationId");
        Assertions.assertEquals(MESSAGE_GROUP_ID, str);
        Assertions.assertEquals(MESSAGE_DEDUPLICATION_ID, str2);
    }
}
