package io.camunda.connector.inbound;

import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import io.camunda.connector.api.inbound.Activity;
import io.camunda.connector.api.inbound.CorrelationFailureHandlingStrategy;
import io.camunda.connector.api.inbound.CorrelationResult;
import io.camunda.connector.api.inbound.Health;
import io.camunda.connector.api.inbound.InboundConnectorContext;
import io.camunda.connector.api.inbound.Severity;
import io.camunda.connector.inbound.model.SqsInboundProperties;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.SwitchBootstraps;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/camunda/connector/inbound/SqsQueueConsumer.class */
public class SqsQueueConsumer implements Runnable {
    private static final Logger LOGGER = LoggerFactory.getLogger(SqsQueueConsumer.class);
    private static final List<String> ALL_ATTRIBUTES_KEY = List.of("All");
    private final AmazonSQS sqsClient;
    private final SqsInboundProperties properties;
    private final InboundConnectorContext context;
    private final AtomicBoolean queueConsumerActive = new AtomicBoolean(true);

    public SqsQueueConsumer(AmazonSQS amazonSQS, SqsInboundProperties sqsInboundProperties, InboundConnectorContext inboundConnectorContext) {
        this.sqsClient = amazonSQS;
        this.properties = sqsInboundProperties;
        this.context = inboundConnectorContext;
    }

    @Override // java.lang.Runnable
    public void run() {
        LOGGER.info("Started SQS consumer for queue {}", this.properties.getQueue().url());
        ReceiveMessageRequest createReceiveMessageRequest = createReceiveMessageRequest();
        do {
            try {
                for (Message message : this.sqsClient.receiveMessage(createReceiveMessageRequest).getMessages()) {
                    this.context.log(Activity.level(Severity.INFO).tag("Message").message("Received SQS Message with ID " + message.getMessageId()));
                    handleCorrelationResult(message, this.context.correlateWithResult(MessageMapper.toSqsInboundMessage(message)));
                }
            } catch (Exception e) {
                LOGGER.debug("NACK - unhandled exception", e);
                this.context.log(Activity.level(Severity.WARNING).tag("Message").message("NACK - failed to correlate event : " + e.getMessage()));
            }
        } while (this.queueConsumerActive.get());
        LOGGER.info("Stopping SQS consumer for queue {}", this.properties.getQueue().url());
        this.context.reportHealth(Health.down());
    }

    private void handleCorrelationResult(Message message, CorrelationResult correlationResult) {
        Objects.requireNonNull(correlationResult);
        switch ((int) SwitchBootstraps.typeSwitch(MethodHandles.lookup(), "typeSwitch", MethodType.methodType(Integer.TYPE, Object.class, Integer.TYPE), CorrelationResult.Success.class, CorrelationResult.Failure.class).dynamicInvoker().invoke(correlationResult, 0) /* invoke-custom */) {
            case 0:
                LOGGER.debug("ACK - message correlated successfully");
                this.sqsClient.deleteMessage(this.properties.getQueue().url(), message.getReceiptHandle());
                return;
            case 1:
                CorrelationResult.Failure failure = (CorrelationResult.Failure) correlationResult;
                this.context.log(Activity.level(Severity.WARNING).tag("Message").message(failure.message()));
                CorrelationFailureHandlingStrategy.ForwardErrorToUpstream handlingStrategy = failure.handlingStrategy();
                Objects.requireNonNull(handlingStrategy);
                switch ((int) SwitchBootstraps.typeSwitch(MethodHandles.lookup(), "typeSwitch", MethodType.methodType(Integer.TYPE, Object.class, Integer.TYPE), CorrelationFailureHandlingStrategy.ForwardErrorToUpstream.class, CorrelationFailureHandlingStrategy.Ignore.class).dynamicInvoker().invoke(handlingStrategy, 0) /* invoke-custom */) {
                    case 0:
                        LOGGER.debug("NACK (requeue) - message not correlated");
                        return;
                    case 1:
                        LOGGER.debug("ACK - message ignored");
                        this.sqsClient.deleteMessage(this.properties.getQueue().url(), message.getReceiptHandle());
                        return;
                    default:
                        throw new MatchException((String) null, (Throwable) null);
                }
            default:
                throw new MatchException((String) null, (Throwable) null);
        }
    }

    private ReceiveMessageRequest createReceiveMessageRequest() {
        return new ReceiveMessageRequest().withWaitTimeSeconds(Integer.valueOf(this.properties.getQueue().pollingWaitTime())).withQueueUrl(this.properties.getQueue().url()).withMessageAttributeNames((Collection) Optional.ofNullable(this.properties.getQueue().messageAttributeNames()).filter(list -> {
            return !list.isEmpty();
        }).orElse(ALL_ATTRIBUTES_KEY)).withAttributeNames((Collection) Optional.ofNullable(this.properties.getQueue().attributeNames()).filter(list2 -> {
            return !list2.isEmpty();
        }).orElse(ALL_ATTRIBUTES_KEY));
    }

    public boolean isQueueConsumerActive() {
        return this.queueConsumerActive.get();
    }

    public void setQueueConsumerActive(boolean z) {
        this.queueConsumerActive.set(z);
    }
}
