package org.bk.aws.messaging;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.bk.aws.model.Result;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import software.amazon.awssdk.services.sns.SnsAsyncClient;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;

/* loaded from: input_file:org/bk/aws/messaging/SqsEventHandler.class */
public class SqsEventHandler<T> implements EventHandler<T> {
    private static final Logger LOGGER = LoggerFactory.getLogger(SqsEventHandler.class);
    public static final int WAIT_TIME_SECONDS = 20;
    private static final String DEAD_QUEUE_SUFFIX = "-dead";
    private final SnsAsyncClient snsAsyncClient;
    private final SqsAsyncClient sqsAsyncClient;
    private final ObjectMapper objectMapper;
    private final String queueName;
    private final String snsTopicName;
    private final boolean unwrapSnsMessage;
    private String queueUrl;
    private final AtomicBoolean running;

    /* JADX INFO: Access modifiers changed from: package-private */
    @JsonIgnoreProperties(ignoreUnknown = true)
    /* loaded from: input_file:org/bk/aws/messaging/SqsEventHandler$SnsNotification.class */
    public static class SnsNotification {
        private final String message;

        @JsonCreator
        SnsNotification(@JsonProperty("Message") String str) {
            this.message = str;
        }

        public String getMessage() {
            return this.message;
        }
    }

    public SqsEventHandler(SqsAsyncClient sqsAsyncClient, ObjectMapper objectMapper, String str) {
        this(null, sqsAsyncClient, objectMapper, str, null, false);
    }

    public SqsEventHandler(SnsAsyncClient snsAsyncClient, SqsAsyncClient sqsAsyncClient, ObjectMapper objectMapper, String str, String str2) {
        this(snsAsyncClient, sqsAsyncClient, objectMapper, str, str2, str2 != null);
    }

    public SqsEventHandler(SnsAsyncClient snsAsyncClient, SqsAsyncClient sqsAsyncClient, ObjectMapper objectMapper, String str, String str2, boolean z) {
        this.running = new AtomicBoolean(true);
        this.snsAsyncClient = snsAsyncClient;
        this.sqsAsyncClient = sqsAsyncClient;
        this.objectMapper = objectMapper;
        this.queueName = str;
        this.snsTopicName = str2;
        this.unwrapSnsMessage = z;
    }

    @Override // org.bk.aws.messaging.EventHandler
    public Flux<MessageWithDeleteHandle<T>> listen(int i, Class<T> cls) {
        return Flux.generate(synchronousSink -> {
            if (!this.running.get()) {
                synchronousSink.complete();
                return;
            }
            try {
                List messages = ((ReceiveMessageResponse) this.sqsAsyncClient.receiveMessage((ReceiveMessageRequest) ReceiveMessageRequest.builder().queueUrl(this.queueUrl).maxNumberOfMessages(Integer.valueOf(i)).waitTimeSeconds(20).build()).get()).messages();
                LOGGER.info("Emitting : {}", messages);
                synchronousSink.next(messages);
            } catch (Exception e) {
                LOGGER.error("Error in retrieving messages", e);
                synchronousSink.error(e);
            }
        }).flatMapIterable(Function.identity()).doOnError(th -> {
            LOGGER.error(th.getMessage(), th);
        }).retry().map(message -> {
            return Result.of(() -> {
                return new MessageWithDeleteHandle(new ContentMessage(unwrapIfNeeded(message.body(), cls, this.unwrapSnsMessage)), deleteQueueMessage(message, this.queueUrl));
            });
        }).filter(result -> {
            if (result.isFailure()) {
                LOGGER.error("Processing failed on unmarshalling message", result.getCause());
            }
            return result.isSuccess();
        }).map(result2 -> {
            return (MessageWithDeleteHandle) result2.get();
        });
    }

    @Override // org.bk.aws.messaging.EventHandler
    public <V> Flux<ContentMessage<V>> processWithResultStream(int i, String str, Function<T, Mono<V>> function, Class<T> cls) {
        Scheduler newElastic = Schedulers.newElastic(str);
        return listen(i, cls).subscribeOn(Schedulers.newSingle(str)).flatMap(messageWithDeleteHandle -> {
            return Mono.defer(() -> {
                ContentMessage<T> message = messageWithDeleteHandle.getMessage();
                Mono<Void> deleteHandle = messageWithDeleteHandle.getDeleteHandle();
                LOGGER.debug("Processing: {}", message);
                Mono mono = (Mono) function.apply(message.getBody());
                Objects.requireNonNull(message);
                return mono.map(message::withNewBody).defaultIfEmpty(message.withNewBody(null)).flatMap(contentMessage -> {
                    return deleteHandle.thenReturn(contentMessage);
                }).onErrorResume(th -> {
                    LOGGER.error("Error in processing task", th);
                    return Mono.empty();
                }).subscribeOn(newElastic).publishOn(newElastic);
            });
        }, i);
    }

    @Override // org.bk.aws.messaging.EventHandler
    public <V> void processMessage(int i, String str, Function<T, Mono<V>> function, Class<T> cls) {
        processWithResultStream(i, str, function, cls).subscribe(contentMessage -> {
            LOGGER.info("Completed Processing {}", contentMessage);
        }, th -> {
            LOGGER.error("Processing Pipeline failed..", th);
        });
    }

    @PostConstruct
    public void init() {
        QueueProvisioningUtils.createPrimaryAndDeadLetterQueues(this.sqsAsyncClient, this.queueName, this.queueName + "-dead").doOnNext(queueDetails -> {
            this.queueUrl = queueDetails.getQueueUrl();
        }).flatMap(queueDetails2 -> {
            return this.snsTopicName != null ? QueueProvisioningUtils.createTopic(this.snsAsyncClient, this.snsTopicName).map(topicDetails -> {
                return QueueProvisioningUtils.bridgeTopicToSqsQueue(this.sqsAsyncClient, this.snsAsyncClient, topicDetails.getTopicArn(), queueDetails2);
            }) : Mono.empty();
        }).block();
    }

    @PreDestroy
    public void preDestroy() {
        setRunning(false);
    }

    public void setRunning(boolean z) {
        this.running.set(z);
    }

    private Mono<Void> deleteQueueMessage(Message message, String str) {
        return Mono.defer(() -> {
            return QueueProvisioningUtils.deleteMessage(this.sqsAsyncClient, str, message);
        }).onErrorResume(th -> {
            LOGGER.error("Error when deleting message from queue: {}, message: {}", new Object[]{str, message, th});
            return Mono.empty();
        }).then();
    }

    private T unwrapIfNeeded(String str, Class<T> cls, boolean z) {
        String str2 = str;
        if (z) {
            try {
                str2 = ((SnsNotification) this.objectMapper.readValue(str, SnsNotification.class)).getMessage();
            } catch (JsonProcessingException e) {
                throw new RuntimeException((Throwable) e);
            }
        }
        return (T) this.objectMapper.readValue(str2, cls);
    }
}
