package com.amazon.sqs.javamessaging;

import com.amazon.sqs.javamessaging.acknowledge.Acknowledger;
import com.amazon.sqs.javamessaging.acknowledge.NegativeAcknowledger;
import com.amazon.sqs.javamessaging.message.SQSBytesMessage;
import com.amazon.sqs.javamessaging.message.SQSMessage;
import com.amazon.sqs.javamessaging.message.SQSObjectMessage;
import com.amazon.sqs.javamessaging.message.SQSTextMessage;
import com.amazon.sqs.javamessaging.util.ExponentialBackoffStrategy;
import com.amazonaws.services.sqs.model.MessageAttributeValue;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:lib/amazon-sqs-java-messaging-lib-1.0.0.jar:com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetch.class */
public class SQSMessageConsumerPrefetch implements Runnable, PrefetchManager {
    private static final Log LOG;
    protected static final int WAIT_TIME_SECONDS = 20;
    protected static final String ALL = "All";
    private final AmazonSQSMessagingClientWrapper amazonSQSClient;
    private final String queueUrl;
    private final int numberOfMessagesToPrefetch;
    private final SQSQueueDestination sqsDestination;
    protected final ArrayDeque<MessageManager> messageQueue;
    private final Acknowledger acknowledger;
    private final NegativeAcknowledger negativeAcknowledger;
    private volatile MessageListener messageListener;
    private SQSMessageConsumer messageConsumer;
    private final SQSSessionCallbackScheduler sqsSessionRunnable;
    protected int messagesPrefetched = 0;
    protected volatile boolean closed = false;
    protected volatile boolean running = false;
    protected int retriesAttempted = 0;
    private final Object stateLock = new Object();
    protected ExponentialBackoffStrategy backoffStrategy = new ExponentialBackoffStrategy(25, 25, 2000);
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:lib/amazon-sqs-java-messaging-lib-1.0.0.jar:com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetch$MessageManager.class */
    public static class MessageManager {
        private final PrefetchManager prefetchManager;
        private final Message message;

        public MessageManager(PrefetchManager prefetchManager, Message message) {
            this.prefetchManager = prefetchManager;
            this.message = message;
        }

        public PrefetchManager getPrefetchManager() {
            return this.prefetchManager;
        }

        public Message getMessage() {
            return this.message;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SQSMessageConsumerPrefetch(SQSSessionCallbackScheduler sQSSessionCallbackScheduler, Acknowledger acknowledger, NegativeAcknowledger negativeAcknowledger, SQSQueueDestination sQSQueueDestination, AmazonSQSMessagingClientWrapper amazonSQSMessagingClientWrapper, int i) {
        this.amazonSQSClient = amazonSQSMessagingClientWrapper;
        this.numberOfMessagesToPrefetch = i;
        this.acknowledger = acknowledger;
        this.negativeAcknowledger = negativeAcknowledger;
        this.queueUrl = sQSQueueDestination.getQueueUrl();
        this.sqsDestination = sQSQueueDestination;
        this.sqsSessionRunnable = sQSSessionCallbackScheduler;
        this.messageQueue = new ArrayDeque<>(i);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MessageListener getMessageListener() {
        return this.messageListener;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setMessageConsumer(SQSMessageConsumer sQSMessageConsumer) {
        this.messageConsumer = sQSMessageConsumer;
    }

    @Override // com.amazon.sqs.javamessaging.PrefetchManager
    public SQSMessageConsumer getMessageConsumer() {
        return this.messageConsumer;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setMessageListener(MessageListener messageListener) {
        this.messageListener = messageListener;
        if (messageListener == null || isClosed()) {
            return;
        }
        synchronized (this.stateLock) {
            if (!this.running || isClosed()) {
                return;
            }
            while (!this.messageQueue.isEmpty()) {
                this.sqsSessionRunnable.scheduleCallBack(messageListener, this.messageQueue.pollFirst());
            }
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        int min;
        while (true) {
            List<com.amazonaws.services.sqs.model.Message> list = null;
            try {
                try {
                    if (isClosed()) {
                        break;
                    }
                    synchronized (this.stateLock) {
                        waitForStart();
                        waitForPrefetch();
                        min = Math.min(this.numberOfMessagesToPrefetch - this.messagesPrefetched, 10);
                    }
                    if (!isClosed()) {
                        list = getMessages(min);
                    }
                    if (list != null && !list.isEmpty()) {
                        processReceivedMessages(list);
                    }
                    if (isClosed() || 0 != 0) {
                        nackQueueMessages();
                    }
                } catch (InterruptedException e) {
                    if (isClosed() || 1 != 0) {
                        nackQueueMessages();
                        return;
                    }
                    return;
                } catch (Throwable th) {
                    LOG.error("Unexpected exception when prefetch messages:", th);
                    throw th;
                }
            } finally {
                if (isClosed() || 0 != 0) {
                    nackQueueMessages();
                }
            }
        }
    }

    protected List<com.amazonaws.services.sqs.model.Message> getMessages(int i) throws InterruptedException {
        if (!$assertionsDisabled && i <= 0) {
            throw new AssertionError();
        }
        List<com.amazonaws.services.sqs.model.Message> list = null;
        try {
            list = this.amazonSQSClient.receiveMessage(new ReceiveMessageRequest(this.queueUrl).withMaxNumberOfMessages(Integer.valueOf(i)).withAttributeNames(ALL).withMessageAttributeNames(ALL).withWaitTimeSeconds(20)).getMessages();
            this.retriesAttempted = 0;
        } catch (JMSException e) {
            LOG.warn("Encountered exception during receive in ConsumerPrefetch thread", e);
            try {
                ExponentialBackoffStrategy exponentialBackoffStrategy = this.backoffStrategy;
                int i2 = this.retriesAttempted;
                this.retriesAttempted = i2 + 1;
                sleep(exponentialBackoffStrategy.delayBeforeNextRetry(i2));
            } catch (InterruptedException e2) {
                LOG.warn("Interrupted while retrying on receive", e2);
                throw e2;
            }
        }
        return list;
    }

    protected void processReceivedMessages(List<com.amazonaws.services.sqs.model.Message> list) {
        Message convertToJMSMessage;
        ArrayList arrayList = new ArrayList();
        for (com.amazonaws.services.sqs.model.Message message : list) {
            try {
                convertToJMSMessage = convertToJMSMessage(message);
            } catch (JMSException e) {
                arrayList.add(message.getReceiptHandle());
            }
            if (this.messageListener != null) {
                this.sqsSessionRunnable.scheduleCallBack(this.messageListener, new MessageManager(this, convertToJMSMessage));
                synchronized (this.stateLock) {
                    this.messagesPrefetched++;
                    notifyStateChange();
                }
            } else {
                synchronized (this.stateLock) {
                    this.messageQueue.addLast(new MessageManager(this, convertToJMSMessage));
                    this.messagesPrefetched++;
                    notifyStateChange();
                }
            }
        }
        try {
            this.negativeAcknowledger.action(this.queueUrl, arrayList);
        } catch (JMSException e2) {
            LOG.warn("Caught exception while nacking received messages", e2);
        }
    }

    protected void waitForPrefetch() throws InterruptedException {
        synchronized (this.stateLock) {
            while (this.messagesPrefetched >= this.numberOfMessagesToPrefetch && !isClosed()) {
                try {
                    this.stateLock.wait();
                } catch (InterruptedException e) {
                    LOG.warn("Interrupted while waiting on prefetch", e);
                    throw e;
                }
            }
        }
    }

    protected Message convertToJMSMessage(com.amazonaws.services.sqs.model.Message message) throws JMSException {
        SQSMessage sQSBytesMessage;
        MessageAttributeValue messageAttributeValue = message.getMessageAttributes().get(SQSMessage.JMS_SQS_MESSAGE_TYPE);
        if (messageAttributeValue == null) {
            sQSBytesMessage = new SQSTextMessage(this.acknowledger, this.queueUrl, message);
        } else {
            String stringValue = messageAttributeValue.getStringValue();
            if (SQSMessage.BYTE_MESSAGE_TYPE.equals(stringValue)) {
                try {
                    sQSBytesMessage = new SQSBytesMessage(this.acknowledger, this.queueUrl, message);
                } catch (JMSException e) {
                    LOG.warn("MessageReceiptHandle - " + message.getReceiptHandle() + "cannot be serialized to BytesMessage", e);
                    throw e;
                }
            } else if (SQSMessage.OBJECT_MESSAGE_TYPE.equals(stringValue)) {
                sQSBytesMessage = new SQSObjectMessage(this.acknowledger, this.queueUrl, message);
            } else {
                if (!"text".equals(stringValue)) {
                    throw new JMSException("Not a supported JMS message type");
                }
                sQSBytesMessage = new SQSTextMessage(this.acknowledger, this.queueUrl, message);
            }
        }
        sQSBytesMessage.setJMSDestination(this.sqsDestination);
        return sQSBytesMessage;
    }

    protected void nackQueueMessages() {
        synchronized (this.stateLock) {
            try {
                try {
                    this.negativeAcknowledger.bulkAction(this.messageQueue, this.queueUrl);
                    notifyStateChange();
                } catch (Throwable th) {
                    notifyStateChange();
                    throw th;
                }
            } catch (JMSException e) {
                LOG.warn("Caught exception while nacking queued messages", e);
                notifyStateChange();
            }
        }
    }

    protected void waitForStart() throws InterruptedException {
        synchronized (this.stateLock) {
            while (!this.running && !isClosed()) {
                try {
                    this.stateLock.wait();
                } catch (InterruptedException e) {
                    LOG.warn("Interrupted while waiting on consumer start", e);
                    throw e;
                }
            }
        }
    }

    @Override // com.amazon.sqs.javamessaging.PrefetchManager
    public void messageDispatched() {
        synchronized (this.stateLock) {
            this.messagesPrefetched--;
            if (this.messagesPrefetched < this.numberOfMessagesToPrefetch) {
                notifyStateChange();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Message receive() throws JMSException {
        return receive(0L);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Message receive(long j) throws JMSException {
        MessageManager pollFirst;
        if (cannotDeliver()) {
            return null;
        }
        if (j < 0) {
            j = 0;
        }
        synchronized (this.stateLock) {
            if (this.messageQueue.isEmpty()) {
                long currentTimeMillis = System.currentTimeMillis();
                long j2 = 0;
                while (this.messageQueue.isEmpty() && !isClosed()) {
                    if (j != 0) {
                        j2 = getWaitTime(j, currentTimeMillis);
                        if (currentTimeMillis <= 0) {
                            break;
                        }
                    }
                    try {
                        this.stateLock.wait(j2);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        return null;
                    }
                }
                if (this.messageQueue.isEmpty() || isClosed()) {
                    return null;
                }
                pollFirst = this.messageQueue.pollFirst();
            } else {
                pollFirst = this.messageQueue.pollFirst();
            }
            return messageHandler(pollFirst);
        }
    }

    private long getWaitTime(long j, long j2) {
        return j - (System.currentTimeMillis() - j2);
    }

    protected void notifyStateChange() {
        synchronized (this.stateLock) {
            this.stateLock.notifyAll();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Message receiveNoWait() throws JMSException {
        MessageManager pollFirst;
        if (cannotDeliver()) {
            return null;
        }
        synchronized (this.stateLock) {
            pollFirst = this.messageQueue.pollFirst();
        }
        return messageHandler(pollFirst);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start() {
        if (isClosed() || this.running) {
            return;
        }
        synchronized (this.stateLock) {
            this.running = true;
            notifyStateChange();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stop() {
        if (isClosed() || !this.running) {
            return;
        }
        synchronized (this.stateLock) {
            this.running = false;
            notifyStateChange();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void close() {
        if (isClosed()) {
            return;
        }
        synchronized (this.stateLock) {
            this.closed = true;
            notifyStateChange();
            this.messageListener = null;
        }
    }

    private Message messageHandler(MessageManager messageManager) throws JMSException {
        if (messageManager == null) {
            return null;
        }
        Message message = messageManager.getMessage();
        messageDispatched();
        this.acknowledger.notifyMessageReceived((SQSMessage) message);
        return message;
    }

    private boolean cannotDeliver() throws JMSException {
        if (isClosed() || !this.running) {
            return true;
        }
        if (this.messageListener != null) {
            throw new JMSException("Cannot receive messages synchronously after a message listener is set");
        }
        return false;
    }

    protected void sleep(long j) throws InterruptedException {
        try {
            Thread.sleep(j);
        } catch (InterruptedException e) {
            throw e;
        }
    }

    protected boolean isClosed() {
        return this.closed;
    }

    static {
        $assertionsDisabled = !SQSMessageConsumerPrefetch.class.desiredAssertionStatus();
        LOG = LogFactory.getLog(SQSMessageConsumerPrefetch.class);
    }
}
