package org.apache.camel.component.aws2.sqs;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.ExchangePropertyKey;
import org.apache.camel.ExtendedExchange;
import org.apache.camel.Processor;
import org.apache.camel.health.HealthCheckHelper;
import org.apache.camel.health.WritableHealthCheckRepository;
import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.spi.ScheduledPollConsumerScheduler;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.spi.ThreadPoolProfile;
import org.apache.camel.support.DefaultScheduledPollConsumerScheduler;
import org.apache.camel.support.ScheduledBatchPollingConsumer;
import org.apache.camel.util.CastUtils;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.URISupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest;
import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
import software.amazon.awssdk.services.sqs.model.MessageNotInflightException;
import software.amazon.awssdk.services.sqs.model.QueueDeletedRecentlyException;
import software.amazon.awssdk.services.sqs.model.QueueDoesNotExistException;
import software.amazon.awssdk.services.sqs.model.ReceiptHandleIsInvalidException;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
import software.amazon.awssdk.services.sqs.model.SqsException;

/* loaded from: input_file:org/apache/camel/component/aws2/sqs/Sqs2Consumer.class */
public class Sqs2Consumer extends ScheduledBatchPollingConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(Sqs2Consumer.class);
    private ScheduledExecutorService scheduledExecutor;
    private transient String sqsConsumerToString;
    private Collection<String> attributeNames;
    private Collection<String> messageAttributeNames;
    private WritableHealthCheckRepository healthCheckRepository;
    private Sqs2ConsumerHealthCheck consumerHealthCheck;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/camel/component/aws2/sqs/Sqs2Consumer$TimeoutExtender.class */
    public class TimeoutExtender implements Runnable {
        private final Exchange exchange;
        private final int repeatSeconds;

        TimeoutExtender(Exchange exchange, int i) {
            this.exchange = exchange;
            this.repeatSeconds = i;
        }

        @Override // java.lang.Runnable
        public void run() {
            ChangeMessageVisibilityRequest.Builder receiptHandle = ChangeMessageVisibilityRequest.builder().queueUrl(Sqs2Consumer.this.getQueueUrl()).visibilityTimeout(Integer.valueOf(this.repeatSeconds)).receiptHandle((String) this.exchange.getIn().getHeader(Sqs2Constants.RECEIPT_HANDLE, String.class));
            try {
                Sqs2Consumer.LOG.trace("Extending visibility window by {} seconds for exchange {}", Integer.valueOf(this.repeatSeconds), this.exchange);
                Sqs2Consumer.this.m1getEndpoint().getClient().changeMessageVisibility((ChangeMessageVisibilityRequest) receiptHandle.build());
                Sqs2Consumer.LOG.debug("Extended visibility window by {} seconds for exchange {}", Integer.valueOf(this.repeatSeconds), this.exchange);
            } catch (MessageNotInflightException | ReceiptHandleIsInvalidException e) {
            } catch (Exception e2) {
                logException(e2);
            } catch (SqsException e3) {
                if (e3.getMessage().contains("Message does not exist or is not available for visibility timeout change")) {
                    return;
                }
                logException(e3);
            }
        }

        private void logException(Exception exc) {
            Sqs2Consumer.LOG.warn("Extending visibility window failed for exchange {}. Will not attempt to extend visibility further. This exception will be ignored.", this.exchange, exc);
        }
    }

    public Sqs2Consumer(Sqs2Endpoint sqs2Endpoint, Processor processor) {
        super(sqs2Endpoint, processor);
        if (getConfiguration().getAttributeNames() != null) {
            this.attributeNames = Arrays.asList(getConfiguration().getAttributeNames().split(","));
        }
        if (getConfiguration().getMessageAttributeNames() != null) {
            this.messageAttributeNames = Arrays.asList(getConfiguration().getMessageAttributeNames().split(","));
        }
    }

    protected int poll() throws Exception {
        ReceiveMessageResponse receiveMessage;
        this.shutdownRunningTask = null;
        this.pendingExchanges = 0;
        ReceiveMessageRequest.Builder queueUrl = ReceiveMessageRequest.builder().queueUrl(getQueueUrl());
        queueUrl.maxNumberOfMessages(getMaxMessagesPerPoll() > 0 ? Integer.valueOf(getMaxMessagesPerPoll()) : null);
        queueUrl.visibilityTimeout(getConfiguration().getVisibilityTimeout());
        queueUrl.waitTimeSeconds(getConfiguration().getWaitTimeSeconds());
        if (this.attributeNames != null) {
            queueUrl.attributeNamesWithStrings(this.attributeNames);
        }
        if (this.messageAttributeNames != null) {
            queueUrl.messageAttributeNames(this.messageAttributeNames);
        }
        LOG.trace("Receiving messages with request [{}]...", queueUrl);
        ReceiveMessageRequest receiveMessageRequest = (ReceiveMessageRequest) queueUrl.build();
        try {
            receiveMessage = getClient().receiveMessage(receiveMessageRequest);
        } catch (QueueDoesNotExistException e) {
            LOG.info("Queue does not exist....recreating now...");
            reConnectToQueue();
            receiveMessage = getClient().receiveMessage(receiveMessageRequest);
        }
        if (LOG.isTraceEnabled()) {
            LOG.trace("Received {} messages", Integer.valueOf(receiveMessage.messages().size()));
        }
        return processBatch(CastUtils.cast(createExchanges(receiveMessage.messages())));
    }

    public void reConnectToQueue() {
        try {
            if (m1getEndpoint().getConfiguration().isAutoCreateQueue()) {
                m1getEndpoint().createQueue(getClient());
            }
        } catch (QueueDeletedRecentlyException e) {
            LOG.debug("Queue recently deleted, will retry in 30 seconds.");
            try {
                Thread.sleep(30000L);
                m1getEndpoint().createQueue(getClient());
            } catch (Exception e2) {
                LOG.warn("failed to retry queue connection.", e2);
            }
        } catch (Exception e3) {
            LOG.warn("Could not connect to queue in amazon.", e3);
        }
    }

    protected Queue<Exchange> createExchanges(List<Message> list) {
        if (LOG.isTraceEnabled()) {
            LOG.trace("Received {} messages in this poll", Integer.valueOf(list.size()));
        }
        LinkedList linkedList = new LinkedList();
        Iterator<Message> it = list.iterator();
        while (it.hasNext()) {
            linkedList.add(createExchange(it.next()));
        }
        return linkedList;
    }

    public int processBatch(Queue<Object> queue) throws Exception {
        int size = queue.size();
        int i = 0;
        while (i < size && isBatchAllowed()) {
            Exchange exchange = (Exchange) ObjectHelper.cast(Exchange.class, queue.poll());
            exchange.setProperty(ExchangePropertyKey.BATCH_INDEX, Integer.valueOf(i));
            exchange.setProperty(ExchangePropertyKey.BATCH_SIZE, Integer.valueOf(size));
            exchange.setProperty(ExchangePropertyKey.BATCH_COMPLETE, Boolean.valueOf(i == size - 1));
            this.pendingExchanges = (size - i) - 1;
            Integer visibilityTimeout = getConfiguration().getVisibilityTimeout();
            if (this.scheduledExecutor != null && visibilityTimeout != null && visibilityTimeout.intValue() / 2 > 0) {
                int intValue = visibilityTimeout.intValue() / 2;
                int intValue2 = visibilityTimeout.intValue();
                int doubleValue = (int) (visibilityTimeout.doubleValue() * 1.5d);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Scheduled TimeoutExtender task to start after {} delay, and run with {}/{} period/repeat (seconds), to extend exchangeId: {}", new Object[]{Integer.valueOf(intValue), Integer.valueOf(intValue2), Integer.valueOf(doubleValue), exchange.getExchangeId()});
                }
                final ScheduledFuture<?> scheduleAtFixedRate = this.scheduledExecutor.scheduleAtFixedRate(new TimeoutExtender(exchange, doubleValue), intValue, intValue2, TimeUnit.SECONDS);
                exchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() { // from class: org.apache.camel.component.aws2.sqs.Sqs2Consumer.1
                    public void onComplete(Exchange exchange2) {
                        cancelExtender(exchange2);
                    }

                    public void onFailure(Exchange exchange2) {
                        cancelExtender(exchange2);
                    }

                    private void cancelExtender(Exchange exchange2) {
                        Sqs2Consumer.LOG.trace("Processing done so cancelling TimeoutExtender task for exchangeId: {}", exchange2.getExchangeId());
                        scheduleAtFixedRate.cancel(false);
                    }
                });
            }
            exchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() { // from class: org.apache.camel.component.aws2.sqs.Sqs2Consumer.2
                public void onComplete(Exchange exchange2) {
                    Sqs2Consumer.this.processCommit(exchange2);
                }

                public void onFailure(Exchange exchange2) {
                    Sqs2Consumer.this.processRollback(exchange2);
                }

                public String toString() {
                    return "SqsConsumerOnCompletion";
                }
            });
            getAsyncProcessor().process(exchange, defaultConsumerCallback(exchange, true));
            i++;
        }
        return size;
    }

    protected void processCommit(Exchange exchange) {
        try {
            if (shouldDelete(exchange)) {
                String str = (String) exchange.getIn().getHeader(Sqs2Constants.RECEIPT_HANDLE, String.class);
                DeleteMessageRequest.Builder receiptHandle = DeleteMessageRequest.builder().queueUrl(getQueueUrl()).receiptHandle(str);
                LOG.trace("Deleting message with receipt handle {}...", str);
                getClient().deleteMessage((DeleteMessageRequest) receiptHandle.build());
                LOG.trace("Deleted message with receipt handle {}...", str);
            }
        } catch (AwsServiceException e) {
            getExceptionHandler().handleException("Error occurred during deleting message. This exception is ignored.", exchange, e);
        }
    }

    private boolean shouldDelete(Exchange exchange) {
        return getConfiguration().isDeleteAfterRead() || (exchange.getProperty(Sqs2Constants.SQS_DELETE_FILTERED) != null && getConfiguration().isDeleteIfFiltered() && passedThroughFilter(exchange));
    }

    private boolean passedThroughFilter(Exchange exchange) {
        return ((Boolean) exchange.getProperty(Sqs2Constants.SQS_DELETE_FILTERED, false, Boolean.class)).booleanValue();
    }

    protected void processRollback(Exchange exchange) {
        Exception exception = exchange.getException();
        if (exception != null) {
            getExceptionHandler().handleException("Error during processing exchange. Will attempt to process the message on next poll.", exchange, exception);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Sqs2Configuration getConfiguration() {
        return m1getEndpoint().getConfiguration();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public SqsClient getClient() {
        return m1getEndpoint().getClient();
    }

    protected String getQueueUrl() {
        return m1getEndpoint().getQueueUrl();
    }

    /* renamed from: getEndpoint, reason: merged with bridge method [inline-methods] */
    public Sqs2Endpoint m1getEndpoint() {
        return super.getEndpoint();
    }

    public Exchange createExchange(Message message) {
        return createExchange(m1getEndpoint().getExchangePattern(), message);
    }

    private Exchange createExchange(ExchangePattern exchangePattern, Message message) {
        Exchange createExchange = createExchange(true);
        createExchange.setPattern(exchangePattern);
        org.apache.camel.Message in = createExchange.getIn();
        in.setBody(message.body());
        in.setHeaders(new HashMap(message.attributesAsStrings()));
        in.setHeader(Sqs2Constants.MESSAGE_ID, message.messageId());
        in.setHeader(Sqs2Constants.MD5_OF_BODY, message.md5OfBody());
        in.setHeader(Sqs2Constants.RECEIPT_HANDLE, message.receiptHandle());
        in.setHeader(Sqs2Constants.ATTRIBUTES, message.attributes());
        in.setHeader(Sqs2Constants.MESSAGE_ATTRIBUTES, message.messageAttributes());
        HeaderFilterStrategy headerFilterStrategy = m1getEndpoint().getHeaderFilterStrategy();
        for (Map.Entry entry : message.messageAttributes().entrySet()) {
            String str = (String) entry.getKey();
            Object fromMessageAttributeValue = Sqs2MessageHelper.fromMessageAttributeValue((MessageAttributeValue) entry.getValue());
            if (!headerFilterStrategy.applyFilterToExternalHeaders(str, fromMessageAttributeValue, createExchange)) {
                in.setHeader(str, fromMessageAttributeValue);
            }
        }
        return createExchange;
    }

    public String toString() {
        if (this.sqsConsumerToString == null) {
            this.sqsConsumerToString = "SqsConsumer[" + URISupport.sanitizeUri(m1getEndpoint().getEndpointUri()) + "]";
        }
        return this.sqsConsumerToString;
    }

    protected void afterConfigureScheduler(ScheduledPollConsumerScheduler scheduledPollConsumerScheduler, boolean z) {
        if (z && (scheduledPollConsumerScheduler instanceof DefaultScheduledPollConsumerScheduler)) {
            DefaultScheduledPollConsumerScheduler defaultScheduledPollConsumerScheduler = (DefaultScheduledPollConsumerScheduler) scheduledPollConsumerScheduler;
            defaultScheduledPollConsumerScheduler.setConcurrentConsumers(getConfiguration().getConcurrentConsumers());
            defaultScheduledPollConsumerScheduler.setPoolSize(Math.max(defaultScheduledPollConsumerScheduler.getPoolSize(), getConfiguration().getConcurrentConsumers()));
        }
    }

    protected void doStart() throws Exception {
        if (getConfiguration().isExtendMessageVisibility() && this.scheduledExecutor == null) {
            ThreadPoolProfile threadPoolProfile = new ThreadPoolProfile("SqsTimeoutExtender");
            threadPoolProfile.setPoolSize(1);
            threadPoolProfile.setAllowCoreThreadTimeOut(false);
            threadPoolProfile.setMaxQueueSize(-1);
            this.scheduledExecutor = m1getEndpoint().getCamelContext().getExecutorServiceManager().newScheduledThreadPool(this, "SqsTimeoutExtender", threadPoolProfile);
        }
        super.doStart();
        this.healthCheckRepository = HealthCheckHelper.getHealthCheckRepository(m1getEndpoint().getCamelContext(), "components", WritableHealthCheckRepository.class);
        if (this.healthCheckRepository != null) {
            this.consumerHealthCheck = new Sqs2ConsumerHealthCheck(this, getRouteId());
            this.healthCheckRepository.addHealthCheck(this.consumerHealthCheck);
        }
    }

    protected void doShutdown() throws Exception {
        if (this.scheduledExecutor != null) {
            m1getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(this.scheduledExecutor);
            this.scheduledExecutor = null;
        }
        super.doShutdown();
    }
}
