package io.camunda.connector.inbound;

import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import io.camunda.connector.api.inbound.InboundConnectorContext;
import io.camunda.connector.api.inbound.InboundConnectorResult;
import io.camunda.connector.inbound.model.SqsInboundProperties;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/camunda/connector/inbound/SqsQueueConsumer.class */
public class SqsQueueConsumer implements Runnable {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) SqsQueueConsumer.class);
    private final AmazonSQS sqsClient;
    private final SqsInboundProperties properties;
    private final InboundConnectorContext context;
    private final AtomicBoolean isQueueConsumerActive;

    public SqsQueueConsumer(AmazonSQS amazonSQS, SqsInboundProperties sqsInboundProperties, InboundConnectorContext inboundConnectorContext, AtomicBoolean atomicBoolean) {
        this.sqsClient = amazonSQS;
        this.properties = sqsInboundProperties;
        this.context = inboundConnectorContext;
        this.isQueueConsumerActive = atomicBoolean;
    }

    @Override // java.lang.Runnable
    public void run() {
        LOGGER.info("Started SQS consumer for queue {}", this.properties.getQueue().getUrl());
        ReceiveMessageRequest createReceiveMessageRequest = createReceiveMessageRequest();
        do {
            for (Message message : this.sqsClient.receiveMessage(createReceiveMessageRequest).getMessages()) {
                InboundConnectorResult<?> correlate = this.context.correlate(message);
                if (correlate.isActivated()) {
                    this.sqsClient.deleteMessage(this.properties.getQueue().getUrl(), message.getReceiptHandle());
                    LOGGER.debug("Inbound event correlated successfully: {}, and message with ID {} was deleted", correlate.getResponseData(), message.getMessageId());
                } else {
                    LOGGER.debug("Inbound event not correlated: {}", correlate.getErrorData());
                }
            }
        } while (this.isQueueConsumerActive.get());
        LOGGER.info("Stopping SQS consumer for queue {}", this.properties.getQueue().getUrl());
    }

    private ReceiveMessageRequest createReceiveMessageRequest() {
        ReceiveMessageRequest withQueueUrl = new ReceiveMessageRequest().withWaitTimeSeconds(Integer.valueOf(this.properties.getQueue().getPollingWaitTime())).withQueueUrl(this.properties.getQueue().getUrl());
        if (this.properties.getQueue().isContainAttributeNames()) {
            withQueueUrl.withAttributeNames(this.properties.getQueue().getAttributeNames());
        }
        if (this.properties.getQueue().isContainMessageAttributeNames()) {
            withQueueUrl.withMessageAttributeNames(this.properties.getQueue().getMessageAttributeNames());
        }
        return withQueueUrl;
    }
}
