package org.bk.aws.messaging;

import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.function.TupleUtils;
import software.amazon.awssdk.services.sns.SnsAsyncClient;
import software.amazon.awssdk.services.sns.model.CreateTopicRequest;
import software.amazon.awssdk.services.sns.model.SubscribeRequest;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.model.CreateQueueRequest;
import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest;
import software.amazon.awssdk.services.sqs.model.DeleteMessageResponse;
import software.amazon.awssdk.services.sqs.model.GetQueueAttributesRequest;
import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
import software.amazon.awssdk.services.sqs.model.QueueNameExistsException;
import software.amazon.awssdk.services.sqs.model.SetQueueAttributesRequest;
import software.amazon.awssdk.services.sqs.model.SetQueueAttributesResponse;

/* loaded from: input_file:org/bk/aws/messaging/QueueProvisioningUtils.class */
public final class QueueProvisioningUtils {
    private static final Logger LOGGER = LoggerFactory.getLogger(QueueProvisioningUtils.class);
    public static final int DEFAULT_TIMEOUT = 60;
    public static final int DEFAULT_MAX_RECEIVE_COUNT = 5;

    /* loaded from: input_file:org/bk/aws/messaging/QueueProvisioningUtils$QueueDetails.class */
    public static class QueueDetails {
        private final String queueUrl;
        private final String queueArn;

        public QueueDetails(String str, String str2) {
            this.queueUrl = str;
            this.queueArn = str2;
        }

        public String getQueueUrl() {
            return this.queueUrl;
        }

        public String getQueueArn() {
            return this.queueArn;
        }
    }

    /* loaded from: input_file:org/bk/aws/messaging/QueueProvisioningUtils$TopicDetails.class */
    public static class TopicDetails {
        private final String topicName;
        private final String topicArn;

        public TopicDetails(String str, String str2) {
            this.topicName = str;
            this.topicArn = str2;
        }

        public String getTopicName() {
            return this.topicName;
        }

        public String getTopicArn() {
            return this.topicArn;
        }
    }

    private QueueProvisioningUtils() {
    }

    public static Mono<QueueDetails> createPrimaryAndDeadLetterQueues(SqsAsyncClient sqsAsyncClient, String str, String str2) {
        return createDeadLetterQueue(sqsAsyncClient, str2).flatMap(queueDetails -> {
            return Mono.zip(createPrimaryQueue(sqsAsyncClient, str, queueDetails.getQueueArn()), Mono.just(queueDetails));
        }).flatMap(TupleUtils.function((queueDetails2, queueDetails3) -> {
            return Mono.zip(Mono.just(queueDetails2), updateDeadLetterRedrive(sqsAsyncClient, queueDetails2, queueDetails3));
        })).map(TupleUtils.function((queueDetails4, setQueueAttributesResponse) -> {
            return queueDetails4;
        }));
    }

    public static Mono<QueueDetails> createDeadLetterQueue(SqsAsyncClient sqsAsyncClient, String str) {
        return createQueue(sqsAsyncClient, (CreateQueueRequest) CreateQueueRequest.builder().queueName(str).build());
    }

    public static Mono<QueueDetails> createPrimaryQueue(SqsAsyncClient sqsAsyncClient, String str, String str2) {
        return createQueue(sqsAsyncClient, (CreateQueueRequest) CreateQueueRequest.builder().queueName(str).attributes(Map.of(QueueAttributeName.VISIBILITY_TIMEOUT, String.valueOf(60), QueueAttributeName.REDRIVE_POLICY, JsonNodeFactory.instance.objectNode().put("maxReceiveCount", 5).put("deadLetterTargetArn", str2).toString())).build());
    }

    public static Mono<SetQueueAttributesResponse> updateDeadLetterRedrive(SqsAsyncClient sqsAsyncClient, QueueDetails queueDetails, QueueDetails queueDetails2) {
        return Mono.fromFuture(sqsAsyncClient.setQueueAttributes((SetQueueAttributesRequest) SetQueueAttributesRequest.builder().queueUrl(queueDetails2.getQueueUrl()).attributes(Map.of(QueueAttributeName.REDRIVE_POLICY, JsonNodeFactory.instance.objectNode().put("maxReceiveCount", 1).put("deadLetterTargetArn", queueDetails.getQueueArn()).toString())).build()));
    }

    public static Mono<QueueDetails> createQueue(SqsAsyncClient sqsAsyncClient, CreateQueueRequest createQueueRequest) {
        return Mono.fromFuture(sqsAsyncClient.createQueue(createQueueRequest)).map((v0) -> {
            return v0.queueUrl();
        }).onErrorResume(QueueNameExistsException.class, queueNameExistsException -> {
            LOGGER.info("Queue name={} already exists, updating attributes...", createQueueRequest.queueName());
            return Mono.fromFuture(sqsAsyncClient.getQueueUrl((GetQueueUrlRequest) GetQueueUrlRequest.builder().queueName(createQueueRequest.queueName()).build())).flatMap(getQueueUrlResponse -> {
                String queueUrl = getQueueUrlResponse.queueUrl();
                return Mono.fromFuture(sqsAsyncClient.setQueueAttributes((SetQueueAttributesRequest) SetQueueAttributesRequest.builder().queueUrl(queueUrl).attributes(createQueueRequest.attributes()).build())).thenReturn(queueUrl);
            });
        }).flatMap(str -> {
            return Mono.zip(Mono.just(str), getQueueArn(sqsAsyncClient, str));
        }).map(TupleUtils.function(QueueDetails::new));
    }

    public static Mono<TopicDetails> createTopic(SnsAsyncClient snsAsyncClient, String str) {
        return Mono.fromFuture(snsAsyncClient.createTopic((CreateTopicRequest) CreateTopicRequest.builder().name(str).build())).map((v0) -> {
            return v0.topicArn();
        }).map(str2 -> {
            return new TopicDetails(str, str2);
        });
    }

    public static Mono<String> getQueueArn(SqsAsyncClient sqsAsyncClient, String str) {
        return Mono.fromFuture(sqsAsyncClient.getQueueAttributes((GetQueueAttributesRequest) GetQueueAttributesRequest.builder().queueUrl(str).attributeNames(new QueueAttributeName[]{QueueAttributeName.QUEUE_ARN}).build())).map(getQueueAttributesResponse -> {
            return (String) getQueueAttributesResponse.attributes().get(QueueAttributeName.QUEUE_ARN);
        });
    }

    public static Mono<DeleteMessageResponse> deleteMessage(SqsAsyncClient sqsAsyncClient, String str, Message message) {
        return Mono.fromFuture(sqsAsyncClient.deleteMessage((DeleteMessageRequest) DeleteMessageRequest.builder().queueUrl(str).receiptHandle(message.receiptHandle()).build()));
    }

    public static Mono<SetQueueAttributesResponse> bridgeTopicToSqsQueue(SqsAsyncClient sqsAsyncClient, SnsAsyncClient snsAsyncClient, String str, QueueDetails queueDetails) {
        SubscribeRequest subscribeRequest = (SubscribeRequest) SubscribeRequest.builder().protocol("sqs").endpoint(queueDetails.getQueueArn()).returnSubscriptionArn(true).topicArn(str).build();
        return Mono.fromFuture(snsAsyncClient.subscribe(subscribeRequest)).doOnSuccess(subscribeResponse -> {
            LOGGER.info("Subscribed queue ARN={} to topic ARN={}, subscription={}", new Object[]{queueDetails.getQueueArn(), str, subscribeRequest});
        }).then(Mono.defer(() -> {
            return Mono.fromFuture(sqsAsyncClient.setQueueAttributes((SetQueueAttributesRequest) SetQueueAttributesRequest.builder().queueUrl(queueDetails.queueUrl).build()));
        })).doOnSuccess(setQueueAttributesResponse -> {
            LOGGER.info("Policy for queue={} updated to receive messages from SNS", queueDetails.getQueueUrl());
        });
    }
}
