package io.camunda.connector.inbound;

import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import io.camunda.connector.api.error.ConnectorInputException;
import io.camunda.connector.api.inbound.InboundConnectorContext;
import io.camunda.connector.api.inbound.InboundConnectorResult;
import io.camunda.connector.inbound.model.SqsInboundProperties;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/camunda/connector/inbound/SqsQueueConsumer.class */
public class SqsQueueConsumer implements Runnable {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) SqsQueueConsumer.class);
    private static final List<String> ALL_ATTRIBUTES_KEY = List.of("All");
    private final AmazonSQS sqsClient;
    private final SqsInboundProperties properties;
    private final InboundConnectorContext context;
    private final AtomicBoolean queueConsumerActive = new AtomicBoolean(true);

    public SqsQueueConsumer(AmazonSQS amazonSQS, SqsInboundProperties sqsInboundProperties, InboundConnectorContext inboundConnectorContext) {
        this.sqsClient = amazonSQS;
        this.properties = sqsInboundProperties;
        this.context = inboundConnectorContext;
    }

    @Override // java.lang.Runnable
    public void run() {
        LOGGER.info("Started SQS consumer for queue {}", this.properties.getQueue().getUrl());
        ReceiveMessageRequest createReceiveMessageRequest = createReceiveMessageRequest();
        do {
            try {
                for (Message message : this.sqsClient.receiveMessage(createReceiveMessageRequest).getMessages()) {
                    try {
                        correlate(message);
                        this.sqsClient.deleteMessage(this.properties.getQueue().getUrl(), message.getReceiptHandle());
                    } catch (ConnectorInputException e) {
                        LOGGER.warn("NACK - failed to parse SQS message body: {}", e.getMessage());
                    }
                }
            } catch (Exception e2) {
                LOGGER.debug("NACK - failed to correlate event", (Throwable) e2);
            }
        } while (this.queueConsumerActive.get());
        LOGGER.info("Stopping SQS consumer for queue {}", this.properties.getQueue().getUrl());
    }

    private void correlate(Message message) {
        InboundConnectorResult<?> correlate = this.context.correlate(MessageMapper.toSqsInboundMessage(message));
        if (correlate.isActivated()) {
            LOGGER.debug("Inbound event correlated successfully: {}", correlate.getResponseData());
        } else {
            LOGGER.debug("Inbound event was correlated but not activated: {}", correlate.getErrorData());
        }
    }

    private ReceiveMessageRequest createReceiveMessageRequest() {
        return new ReceiveMessageRequest().withWaitTimeSeconds(Integer.valueOf(this.properties.getQueue().getPollingWaitTime())).withQueueUrl(this.properties.getQueue().getUrl()).withMessageAttributeNames((Collection<String>) Optional.ofNullable(this.properties.getQueue().getMessageAttributeNames()).filter(list -> {
            return !list.isEmpty();
        }).orElse(ALL_ATTRIBUTES_KEY)).withAttributeNames((Collection<String>) Optional.ofNullable(this.properties.getQueue().getAttributeNames()).filter(list2 -> {
            return !list2.isEmpty();
        }).orElse(ALL_ATTRIBUTES_KEY));
    }

    public boolean isQueueConsumerActive() {
        return this.queueConsumerActive.get();
    }

    public void setQueueConsumerActive(boolean z) {
        this.queueConsumerActive.set(z);
    }
}
