package org.apache.rocketmq.client.java.impl.consumer;

import apache.rocketmq.v2.AckMessageResponse;
import apache.rocketmq.v2.ChangeInvisibleDurationResponse;
import apache.rocketmq.v2.Code;
import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueResponse;
import apache.rocketmq.v2.ReceiveMessageRequest;
import apache.rocketmq.v2.Status;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.java.exception.BadRequestException;
import org.apache.rocketmq.client.java.exception.TooManyRequestsException;
import org.apache.rocketmq.client.java.hook.MessageHookPoints;
import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
import org.apache.rocketmq.client.java.message.MessageViewImpl;
import org.apache.rocketmq.client.java.retry.RetryPolicy;
import org.apache.rocketmq.client.java.route.Endpoints;
import org.apache.rocketmq.client.java.route.MessageQueueImpl;
import org.apache.rocketmq.client.java.rpc.RpcInvocation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.class */
public class ProcessQueueImpl implements ProcessQueue {
    private static final Logger LOGGER = LoggerFactory.getLogger(ProcessQueueImpl.class);
    private static final Duration FORWARD_FIFO_MESSAGE_TO_DLQ_FAILURE_BACKOFF_DELAY = Duration.ofSeconds(1);
    private static final Duration ACK_MESSAGE_FAILURE_BACKOFF_DELAY = Duration.ofSeconds(1);
    private static final Duration CHANGE_INVISIBLE_DURATION_FAILURE_BACKOFF_DELAY = Duration.ofSeconds(1);
    private static final Duration RECEIVING_FLOW_CONTROL_BACKOFF_DELAY = Duration.ofMillis(20);
    private static final Duration RECEIVING_FAILURE_BACKOFF_DELAY = Duration.ofSeconds(1);
    private static final Duration RECEIVING_BACKOFF_DELAY_WHEN_CACHE_IS_FULL = Duration.ofSeconds(1);
    private final PushConsumerImpl consumer;
    private final MessageQueueImpl mq;
    private final FilterExpression filterExpression;
    private volatile long activityNanoTime = System.nanoTime();
    private volatile boolean dropped = false;

    @GuardedBy("pendingMessagesLock")
    private final List<MessageViewImpl> pendingMessages = new ArrayList();
    private final ReadWriteLock pendingMessagesLock = new ReentrantReadWriteLock();

    @GuardedBy("inflightMessagesLock")
    private final List<MessageViewImpl> inflightMessages = new ArrayList();
    private final ReadWriteLock inflightMessagesLock = new ReentrantReadWriteLock();
    private final AtomicLong cachedMessagesBytes = new AtomicLong();
    private final AtomicLong receptionTimes = new AtomicLong(0);
    private final AtomicLong receivedMessagesQuantity = new AtomicLong(0);

    public ProcessQueueImpl(PushConsumerImpl pushConsumerImpl, MessageQueueImpl messageQueueImpl, FilterExpression filterExpression) {
        this.consumer = pushConsumerImpl;
        this.mq = messageQueueImpl;
        this.filterExpression = filterExpression;
    }

    @Override // org.apache.rocketmq.client.java.impl.consumer.ProcessQueue
    public MessageQueueImpl getMessageQueue() {
        return this.mq;
    }

    @Override // org.apache.rocketmq.client.java.impl.consumer.ProcessQueue
    public void drop() {
        this.dropped = true;
    }

    @Override // org.apache.rocketmq.client.java.impl.consumer.ProcessQueue
    public boolean expired() {
        Duration ofNanos = Duration.ofNanos(2 * (this.consumer.getPushConsumerSettings().getLongPollingTimeout().toNanos() + this.consumer.getClientConfiguration().getRequestTimeout().toNanos()));
        Duration ofNanos2 = Duration.ofNanos(System.nanoTime() - this.activityNanoTime);
        if (ofNanos2.compareTo(ofNanos) < 0) {
            return false;
        }
        LOGGER.warn("Process queue is idle, idleDuration={}, maxIdleDuration={}, mq={}, clientId={}", new Object[]{ofNanos2, ofNanos, this.mq, this.consumer.clientId()});
        return true;
    }

    void cacheMessages(List<MessageViewImpl> list) {
        ArrayList arrayList = new ArrayList();
        this.pendingMessagesLock.writeLock().lock();
        try {
            MessageViewImpl messageViewImpl = null;
            for (MessageViewImpl messageViewImpl2 : list) {
                if (messageViewImpl2.isCorrupted()) {
                    arrayList.add(messageViewImpl2);
                } else {
                    if (null != messageViewImpl) {
                        messageViewImpl.setNext(messageViewImpl2);
                    }
                    messageViewImpl = messageViewImpl2;
                    this.pendingMessages.add(messageViewImpl2);
                    this.cachedMessagesBytes.addAndGet(messageViewImpl2.getBody().remaining());
                }
            }
        } finally {
            this.pendingMessagesLock.writeLock().unlock();
            arrayList.forEach(messageViewImpl3 -> {
                MessageId messageId = messageViewImpl3.getMessageId();
                if (this.consumer.getPushConsumerSettings().isFifo()) {
                    LOGGER.error("Message is corrupted, forward it to dead letter queue in fifo mode, mq={}, messageId={}, clientId={}", new Object[]{this.mq, messageId, this.consumer.clientId()});
                    forwardToDeadLetterQueue(messageViewImpl3);
                } else {
                    LOGGER.error("Message is corrupted, nack it in standard mode, mq={}, messageId={}, clientId={}", new Object[]{this.mq, messageId, this.consumer.clientId()});
                    nackMessage(messageViewImpl3);
                }
            });
        }
    }

    private int getReceptionBatchSize() {
        return Math.min(Math.max(this.consumer.cacheMessageCountThresholdPerQueue() - cachedMessagesCount(), 1), this.consumer.getPushConsumerSettings().getReceiveBatchSize());
    }

    @Override // org.apache.rocketmq.client.java.impl.consumer.ProcessQueue
    public void fetchMessageImmediately() {
        receiveMessageImmediately();
    }

    public void onReceiveMessageException(Throwable th) {
        receiveMessageLater(th instanceof TooManyRequestsException ? RECEIVING_FLOW_CONTROL_BACKOFF_DELAY : RECEIVING_FAILURE_BACKOFF_DELAY);
    }

    private void receiveMessageLater(Duration duration) {
        String clientId = this.consumer.clientId();
        ScheduledExecutorService scheduler = this.consumer.getScheduler();
        try {
            LOGGER.info("Try to receive message later, mq={}, delay={}, clientId={}", new Object[]{this.mq, duration, clientId});
            scheduler.schedule(this::receiveMessage, duration.toNanos(), TimeUnit.NANOSECONDS);
        } catch (Throwable th) {
            if (scheduler.isShutdown()) {
                return;
            }
            LOGGER.error("[Bug] Failed to schedule message receiving request, mq={}, clientId={}", new Object[]{this.mq, clientId, th});
            onReceiveMessageException(th);
        }
    }

    public void receiveMessage() {
        if (this.dropped) {
            LOGGER.info("Process queue has been dropped, no longer receive message, mq={}, clientId={}", this.mq, this.consumer.clientId());
        } else if (!isCacheFull()) {
            receiveMessageImmediately();
        } else {
            LOGGER.warn("Process queue cache is full, would receive message later, mq={}, clientId={}", this.mq, this.consumer.clientId());
            receiveMessageLater(RECEIVING_BACKOFF_DELAY_WHEN_CACHE_IS_FULL);
        }
    }

    private void receiveMessageImmediately() {
        if (!this.consumer.isRunning()) {
            LOGGER.info("Stop to receive message because consumer is not running, mq={}, clientId={}", this.mq, this.consumer.clientId());
            return;
        }
        try {
            final Endpoints endpoints = this.mq.getBroker().getEndpoints();
            ReceiveMessageRequest wrapReceiveMessageRequest = this.consumer.wrapReceiveMessageRequest(getReceptionBatchSize(), this.mq, this.filterExpression);
            this.activityNanoTime = System.nanoTime();
            this.consumer.doBefore(MessageHookPoints.RECEIVE, Collections.emptyList());
            final Stopwatch createStarted = Stopwatch.createStarted();
            Futures.addCallback(this.consumer.receiveMessage(wrapReceiveMessageRequest, this.mq, this.consumer.getPushConsumerSettings().getLongPollingTimeout()), new FutureCallback<ReceiveMessageResult>() { // from class: org.apache.rocketmq.client.java.impl.consumer.ProcessQueueImpl.1
                public void onSuccess(ReceiveMessageResult receiveMessageResult) {
                    ProcessQueueImpl.this.consumer.doAfter(MessageHookPoints.RECEIVE, (List) receiveMessageResult.getMessageViewImpls().stream().map((v0) -> {
                        return v0.getMessageCommon();
                    }).collect(Collectors.toList()), createStarted.elapsed(), MessageHookPointsStatus.OK);
                    try {
                        ProcessQueueImpl.this.onReceiveMessageResult(receiveMessageResult);
                    } catch (Throwable th) {
                        ProcessQueueImpl.LOGGER.error("[Bug] Exception raised while handling receive result, mq={}, endpoints={}, clientId={}", new Object[]{ProcessQueueImpl.this.mq, endpoints, ProcessQueueImpl.this.consumer.clientId(), th});
                        ProcessQueueImpl.this.onReceiveMessageException(th);
                    }
                }

                public void onFailure(Throwable th) {
                    ProcessQueueImpl.this.consumer.doAfter(MessageHookPoints.RECEIVE, Collections.emptyList(), createStarted.elapsed(), MessageHookPointsStatus.ERROR);
                    ProcessQueueImpl.LOGGER.error("Exception raised during message reception, mq={}, endpoints={}, clientId={}", new Object[]{ProcessQueueImpl.this.mq, endpoints, ProcessQueueImpl.this.consumer.clientId(), th});
                    ProcessQueueImpl.this.onReceiveMessageException(th);
                }
            }, MoreExecutors.directExecutor());
            this.receptionTimes.getAndIncrement();
            this.consumer.getReceptionTimes().getAndIncrement();
        } catch (Throwable th) {
            LOGGER.error("Exception raised during message reception, mq={}, clientId={}", new Object[]{this.mq, this.consumer.clientId(), th});
            onReceiveMessageException(th);
        }
    }

    public boolean isCacheFull() {
        int cacheMessageCountThresholdPerQueue = this.consumer.cacheMessageCountThresholdPerQueue();
        long cachedMessagesCount = cachedMessagesCount();
        if (cacheMessageCountThresholdPerQueue <= cachedMessagesCount) {
            LOGGER.warn("Process queue total cached messages quantity exceeds the threshold, threshold={}, actual={}, mq={}, clientId={}", new Object[]{Integer.valueOf(cacheMessageCountThresholdPerQueue), Long.valueOf(cachedMessagesCount), this.mq, this.consumer.clientId()});
            return true;
        }
        int cacheMessageBytesThresholdPerQueue = this.consumer.cacheMessageBytesThresholdPerQueue();
        long cachedMessageBytes = cachedMessageBytes();
        if (cacheMessageBytesThresholdPerQueue > cachedMessageBytes) {
            return false;
        }
        LOGGER.warn("Process queue total cached messages memory exceeds the threshold, threshold={} bytes, actual={} bytes, mq={}, clientId={}", new Object[]{Integer.valueOf(cacheMessageBytesThresholdPerQueue), Long.valueOf(cachedMessageBytes), this.mq, this.consumer.clientId()});
        return true;
    }

    public int cachedMessagesCount() {
        this.pendingMessagesLock.readLock().lock();
        this.inflightMessagesLock.readLock().lock();
        try {
            return this.pendingMessages.size() + this.inflightMessages.size();
        } finally {
            this.inflightMessagesLock.readLock().unlock();
            this.pendingMessagesLock.readLock().unlock();
        }
    }

    public int inflightMessagesCount() {
        this.inflightMessagesLock.readLock().lock();
        try {
            return this.inflightMessages.size();
        } finally {
            this.inflightMessagesLock.readLock().unlock();
        }
    }

    public long cachedMessageBytes() {
        return this.cachedMessagesBytes.get();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onReceiveMessageResult(ReceiveMessageResult receiveMessageResult) {
        List<MessageViewImpl> messageViewImpls = receiveMessageResult.getMessageViewImpls();
        if (!messageViewImpls.isEmpty()) {
            cacheMessages(messageViewImpls);
            this.receivedMessagesQuantity.getAndAdd(messageViewImpls.size());
            this.consumer.getReceivedMessagesQuantity().getAndAdd(messageViewImpls.size());
            this.consumer.getConsumeService().signal();
        }
        receiveMessage();
    }

    @Override // org.apache.rocketmq.client.java.impl.consumer.ProcessQueue
    public Optional<MessageViewImpl> tryTakeMessage() {
        this.pendingMessagesLock.writeLock().lock();
        this.inflightMessagesLock.writeLock().lock();
        try {
            Optional<MessageViewImpl> findFirst = this.pendingMessages.stream().findFirst();
            if (!findFirst.isPresent()) {
                return findFirst;
            }
            MessageViewImpl messageViewImpl = findFirst.get();
            this.inflightMessages.add(messageViewImpl);
            this.pendingMessages.remove(messageViewImpl);
            this.inflightMessagesLock.writeLock().unlock();
            this.pendingMessagesLock.writeLock().unlock();
            return findFirst;
        } finally {
            this.inflightMessagesLock.writeLock().unlock();
            this.pendingMessagesLock.writeLock().unlock();
        }
    }

    private void eraseMessage(MessageViewImpl messageViewImpl) {
        this.inflightMessagesLock.writeLock().lock();
        try {
            if (this.inflightMessages.remove(messageViewImpl)) {
                this.cachedMessagesBytes.addAndGet(-messageViewImpl.getBody().remaining());
            }
        } finally {
            this.inflightMessagesLock.writeLock().unlock();
        }
    }

    private void statsConsumptionResult(ConsumeResult consumeResult) {
        if (ConsumeResult.SUCCESS.equals(consumeResult)) {
            this.consumer.consumptionOkQuantity.incrementAndGet();
        } else {
            this.consumer.consumptionErrorQuantity.incrementAndGet();
        }
    }

    @Override // org.apache.rocketmq.client.java.impl.consumer.ProcessQueue
    public void eraseMessage(MessageViewImpl messageViewImpl, ConsumeResult consumeResult) {
        statsConsumptionResult(consumeResult);
        eraseMessage(messageViewImpl);
        if (ConsumeResult.SUCCESS.equals(consumeResult)) {
            ackMessage(messageViewImpl);
        } else {
            nackMessage(messageViewImpl);
        }
    }

    private void nackMessage(MessageViewImpl messageViewImpl) {
        changeInvisibleDuration(messageViewImpl, this.consumer.getRetryPolicy().getNextAttemptDelay(messageViewImpl.getDeliveryAttempt()), 1);
    }

    private void changeInvisibleDuration(final MessageViewImpl messageViewImpl, final Duration duration, final int i) {
        final String clientId = this.consumer.clientId();
        final String consumerGroup = this.consumer.getConsumerGroup();
        final MessageId messageId = messageViewImpl.getMessageId();
        final Endpoints endpoints = messageViewImpl.getEndpoints();
        Futures.addCallback(this.consumer.changeInvisibleDuration(messageViewImpl, duration), new FutureCallback<RpcInvocation<ChangeInvisibleDurationResponse>>() { // from class: org.apache.rocketmq.client.java.impl.consumer.ProcessQueueImpl.2
            public void onSuccess(RpcInvocation<ChangeInvisibleDurationResponse> rpcInvocation) {
                ChangeInvisibleDurationResponse response = rpcInvocation.getResponse();
                String requestId = rpcInvocation.getContext().getRequestId();
                Status status = response.getStatus();
                Code code = status.getCode();
                if (Code.INVALID_RECEIPT_HANDLE.equals(code)) {
                    ProcessQueueImpl.LOGGER.error("Failed to change invisible duration due to the invalid receipt handle, forgive to retry, clientId={}, consumerGroup={}, messageId={}, attempt={}, mq={}, endpoints={}, requestId={}, status message=[{}]", new Object[]{clientId, consumerGroup, messageId, Integer.valueOf(i), ProcessQueueImpl.this.mq, endpoints, requestId, status.getMessage()});
                    return;
                }
                if (!Code.OK.equals(code)) {
                    ProcessQueueImpl.LOGGER.error("Failed to change invisible duration, would retry later, clientId={}, consumerGroup={}, messageId={}, attempt={}, mq={}, endpoints={}, requestId={}, status message=[{}]", new Object[]{clientId, consumerGroup, messageId, Integer.valueOf(i), ProcessQueueImpl.this.mq, endpoints, requestId, status.getMessage()});
                    ProcessQueueImpl.this.changeInvisibleDurationLater(messageViewImpl, duration, 1 + i);
                } else if (1 < i) {
                    ProcessQueueImpl.LOGGER.info("Finally, change invisible duration successfully, clientId={}, consumerGroup={} messageId={}, attempt={}, mq={}, endpoints={}, requestId={}", new Object[]{clientId, consumerGroup, messageId, Integer.valueOf(i), ProcessQueueImpl.this.mq, endpoints, requestId});
                } else {
                    ProcessQueueImpl.LOGGER.debug("Change invisible duration successfully, clientId={}, consumerGroup={}, messageId={}, mq={}, endpoints={}, requestId={}", new Object[]{clientId, consumerGroup, messageId, ProcessQueueImpl.this.mq, endpoints, requestId});
                }
            }

            public void onFailure(Throwable th) {
                ProcessQueueImpl.LOGGER.error("Exception raised while changing invisible duration, would retry later, clientId={}, consumerGroup={}, messageId={}, mq={}, endpoints={}", new Object[]{clientId, consumerGroup, messageId, ProcessQueueImpl.this.mq, endpoints, th});
                ProcessQueueImpl.this.changeInvisibleDurationLater(messageViewImpl, duration, 1 + i);
            }
        }, MoreExecutors.directExecutor());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void changeInvisibleDurationLater(MessageViewImpl messageViewImpl, Duration duration, int i) {
        MessageId messageId = messageViewImpl.getMessageId();
        String clientId = this.consumer.clientId();
        if (this.dropped) {
            LOGGER.info("Process queue was dropped, give up to change invisible duration, mq={}, messageId={}, clientId={}", new Object[]{this.mq, messageId, clientId});
            return;
        }
        ScheduledExecutorService scheduler = this.consumer.getScheduler();
        try {
            scheduler.schedule(() -> {
                changeInvisibleDuration(messageViewImpl, duration, i);
            }, CHANGE_INVISIBLE_DURATION_FAILURE_BACKOFF_DELAY.toNanos(), TimeUnit.NANOSECONDS);
        } catch (Throwable th) {
            if (scheduler.isShutdown()) {
                return;
            }
            LOGGER.error("[Bug] Failed to schedule message change invisible duration request, mq={}, messageId={}, clientId={}", new Object[]{this.mq, messageId, clientId});
            changeInvisibleDurationLater(messageViewImpl, duration, 1 + i);
        }
    }

    @Override // org.apache.rocketmq.client.java.impl.consumer.ProcessQueue
    public Iterator<MessageViewImpl> tryTakeFifoMessages() {
        this.pendingMessagesLock.writeLock().lock();
        this.inflightMessagesLock.writeLock().lock();
        try {
            Optional<MessageViewImpl> findFirst = this.pendingMessages.stream().findFirst();
            if (!findFirst.isPresent()) {
                Iterator<MessageViewImpl> emptyIterator = Collections.emptyIterator();
                this.inflightMessagesLock.writeLock().unlock();
                this.pendingMessagesLock.writeLock().unlock();
                return emptyIterator;
            }
            MessageViewImpl messageViewImpl = findFirst.get();
            messageViewImpl.iterator().forEachRemaining(messageViewImpl2 -> {
                this.pendingMessages.remove(messageViewImpl2);
                this.inflightMessages.add(messageViewImpl2);
            });
            Iterator<MessageViewImpl> it = messageViewImpl.iterator();
            this.inflightMessagesLock.writeLock().unlock();
            this.pendingMessagesLock.writeLock().unlock();
            return it;
        } catch (Throwable th) {
            this.inflightMessagesLock.writeLock().unlock();
            this.pendingMessagesLock.writeLock().unlock();
            throw th;
        }
    }

    @Override // org.apache.rocketmq.client.java.impl.consumer.ProcessQueue
    public ListenableFuture<Void> eraseFifoMessage(MessageViewImpl messageViewImpl, ConsumeResult consumeResult) {
        statsConsumptionResult(consumeResult);
        RetryPolicy retryPolicy = this.consumer.getRetryPolicy();
        int maxAttempts = retryPolicy.getMaxAttempts();
        int deliveryAttempt = messageViewImpl.getDeliveryAttempt();
        MessageId messageId = messageViewImpl.getMessageId();
        ConsumeService consumeService = this.consumer.getConsumeService();
        String clientId = this.consumer.clientId();
        if (ConsumeResult.FAILURE.equals(consumeResult) && deliveryAttempt < maxAttempts) {
            Duration nextAttemptDelay = retryPolicy.getNextAttemptDelay(deliveryAttempt);
            LOGGER.debug("Prepare to redeliver the fifo message because of the consumption failure, maxAttempt={}, attempt={}, mq={}, messageId={}, nextAttemptDelay={}, clientId={}", new Object[]{Integer.valueOf(maxAttempts), Integer.valueOf(messageViewImpl.incrementAndGetDeliveryAttempt()), this.mq, messageId, nextAttemptDelay, clientId});
            return Futures.transformAsync(consumeService.consume(messageViewImpl, nextAttemptDelay), consumeResult2 -> {
                return eraseFifoMessage(messageViewImpl, consumeResult2);
            }, MoreExecutors.directExecutor());
        }
        boolean equals = ConsumeResult.SUCCESS.equals(consumeResult);
        if (!equals) {
            LOGGER.info("Failed to consume fifo message finally, run out of attempt times, maxAttempts={}, attempt={}, mq={}, messageId={}, clientId={}", new Object[]{Integer.valueOf(maxAttempts), Integer.valueOf(deliveryAttempt), this.mq, messageId, clientId});
        }
        ListenableFuture<Void> ackMessage = equals ? ackMessage(messageViewImpl) : forwardToDeadLetterQueue(messageViewImpl);
        ackMessage.addListener(() -> {
            eraseMessage(messageViewImpl);
        }, this.consumer.getConsumptionExecutor());
        return ackMessage;
    }

    private ListenableFuture<Void> forwardToDeadLetterQueue(MessageViewImpl messageViewImpl) {
        SettableFuture<Void> create = SettableFuture.create();
        forwardToDeadLetterQueue(messageViewImpl, 1, create);
        return create;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void forwardToDeadLetterQueue(final MessageViewImpl messageViewImpl, final int i, final SettableFuture<Void> settableFuture) {
        ListenableFuture<RpcInvocation<ForwardMessageToDeadLetterQueueResponse>> forwardMessageToDeadLetterQueue = this.consumer.forwardMessageToDeadLetterQueue(messageViewImpl);
        final String clientId = this.consumer.clientId();
        final String consumerGroup = this.consumer.getConsumerGroup();
        final MessageId messageId = messageViewImpl.getMessageId();
        final Endpoints endpoints = messageViewImpl.getEndpoints();
        Futures.addCallback(forwardMessageToDeadLetterQueue, new FutureCallback<RpcInvocation<ForwardMessageToDeadLetterQueueResponse>>() { // from class: org.apache.rocketmq.client.java.impl.consumer.ProcessQueueImpl.3
            public void onSuccess(RpcInvocation<ForwardMessageToDeadLetterQueueResponse> rpcInvocation) {
                ForwardMessageToDeadLetterQueueResponse response = rpcInvocation.getResponse();
                String requestId = rpcInvocation.getContext().getRequestId();
                Status status = response.getStatus();
                Code code = status.getCode();
                if (!Code.OK.equals(code)) {
                    ProcessQueueImpl.LOGGER.error("Failed to forward message to dead letter queue, would attempt to re-forward later, clientId={}, consumerGroup={}, messageId={}, attempt={}, mq={}, endpoints={}, requestId={}, code={}, status message={}", new Object[]{clientId, consumerGroup, messageId, Integer.valueOf(i), ProcessQueueImpl.this.mq, endpoints, requestId, code, status.getMessage()});
                    ProcessQueueImpl.this.forwardToDeadLetterQueue(messageViewImpl, 1 + i, settableFuture);
                    return;
                }
                settableFuture.setFuture(Futures.immediateVoidFuture());
                if (1 < i) {
                    ProcessQueueImpl.LOGGER.info("Re-forward message to dead letter queue successfully, clientId={}, consumerGroup={}, attempt={}, messageId={}, mq={}, endpoints={}, requestId={}", new Object[]{clientId, consumerGroup, Integer.valueOf(i), messageId, ProcessQueueImpl.this.mq, endpoints, requestId});
                } else {
                    ProcessQueueImpl.LOGGER.info("Forward message to dead letter queue successfully, clientId={}, consumerGroup={}, messageId={}, mq={}, endpoints={}, requestId={}", new Object[]{clientId, consumerGroup, messageId, ProcessQueueImpl.this.mq, endpoints, requestId});
                }
            }

            public void onFailure(Throwable th) {
                ProcessQueueImpl.LOGGER.error("Exception raised while forward message to DLQ, would attempt to re-forward later, clientId={}, consumerGroup={}, attempt={}, messageId={}, mq={}", new Object[]{clientId, consumerGroup, Integer.valueOf(i), messageId, ProcessQueueImpl.this.mq, th});
                ProcessQueueImpl.this.forwardToDeadLetterQueueLater(messageViewImpl, 1 + i, settableFuture);
            }
        }, MoreExecutors.directExecutor());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void forwardToDeadLetterQueueLater(MessageViewImpl messageViewImpl, int i, SettableFuture<Void> settableFuture) {
        MessageId messageId = messageViewImpl.getMessageId();
        String clientId = this.consumer.clientId();
        if (this.dropped) {
            LOGGER.info("Process queue was dropped, give up to forward message to dead letter queue, mq={}, messageId={}, clientId={}", new Object[]{this.mq, messageId, clientId});
            return;
        }
        ScheduledExecutorService scheduler = this.consumer.getScheduler();
        try {
            scheduler.schedule(() -> {
                forwardToDeadLetterQueue(messageViewImpl, i, settableFuture);
            }, FORWARD_FIFO_MESSAGE_TO_DLQ_FAILURE_BACKOFF_DELAY.toNanos(), TimeUnit.NANOSECONDS);
        } catch (Throwable th) {
            if (scheduler.isShutdown()) {
                return;
            }
            LOGGER.error("[Bug] Failed to schedule DLQ message request, mq={}, messageId={}, clientId={}", new Object[]{this.mq, messageViewImpl.getMessageId(), clientId});
            forwardToDeadLetterQueueLater(messageViewImpl, 1 + i, settableFuture);
        }
    }

    private ListenableFuture<Void> ackMessage(MessageViewImpl messageViewImpl) {
        SettableFuture<Void> create = SettableFuture.create();
        ackMessage(messageViewImpl, 1, create);
        return create;
    }

    private void ackMessage(final MessageViewImpl messageViewImpl, final int i, final SettableFuture<Void> settableFuture) {
        final String clientId = this.consumer.clientId();
        final String consumerGroup = this.consumer.getConsumerGroup();
        final MessageId messageId = messageViewImpl.getMessageId();
        final Endpoints endpoints = messageViewImpl.getEndpoints();
        Futures.addCallback(this.consumer.ackMessage(messageViewImpl), new FutureCallback<RpcInvocation<AckMessageResponse>>() { // from class: org.apache.rocketmq.client.java.impl.consumer.ProcessQueueImpl.4
            public void onSuccess(RpcInvocation<AckMessageResponse> rpcInvocation) {
                AckMessageResponse response = rpcInvocation.getResponse();
                String requestId = rpcInvocation.getContext().getRequestId();
                Status status = response.getStatus();
                Code code = status.getCode();
                if (Code.INVALID_RECEIPT_HANDLE.equals(code)) {
                    ProcessQueueImpl.LOGGER.error("Failed to ack message due to the invalid receipt handle, forgive to retry, clientId={}, consumerGroup={}, messageId={}, attempt={}, mq={}, endpoints={}, requestId={}, status message=[{}]", new Object[]{clientId, consumerGroup, messageId, Integer.valueOf(i), ProcessQueueImpl.this.mq, endpoints, requestId, status.getMessage()});
                    settableFuture.setException(new BadRequestException(code.getNumber(), requestId, status.getMessage()));
                } else {
                    if (!Code.OK.equals(code)) {
                        ProcessQueueImpl.LOGGER.error("Failed to ack message, would attempt to re-ack later, clientId={}, consumerGroup={}, attempt={}, messageId={}, mq={}, code={}, requestId={}, endpoints={}, status message=[{}]", new Object[]{clientId, consumerGroup, Integer.valueOf(i), messageId, ProcessQueueImpl.this.mq, code, requestId, endpoints, status.getMessage()});
                        ProcessQueueImpl.this.ackMessageLater(messageViewImpl, 1 + i, settableFuture);
                        return;
                    }
                    settableFuture.setFuture(Futures.immediateVoidFuture());
                    if (1 < i) {
                        ProcessQueueImpl.LOGGER.info("Finally, ack message successfully, clientId={}, consumerGroup={}, attempt={}, messageId={}, mq={}, endpoints={}, requestId={}", new Object[]{clientId, consumerGroup, Integer.valueOf(i), messageId, ProcessQueueImpl.this.mq, endpoints, requestId});
                    } else {
                        ProcessQueueImpl.LOGGER.debug("Ack message successfully, clientId={}, consumerGroup={}, messageId={}, mq={}, endpoints={}, requestId={}", new Object[]{clientId, consumerGroup, messageId, ProcessQueueImpl.this.mq, endpoints, requestId});
                    }
                }
            }

            public void onFailure(Throwable th) {
                ProcessQueueImpl.LOGGER.error("Exception raised while acknowledging message, clientId={}, consumerGroup={}, would attempt to re-ack later, attempt={}, messageId={}, mq={}, endpoints={}", new Object[]{clientId, consumerGroup, Integer.valueOf(i), messageId, ProcessQueueImpl.this.mq, endpoints, th});
                ProcessQueueImpl.this.ackMessageLater(messageViewImpl, 1 + i, settableFuture);
            }
        }, MoreExecutors.directExecutor());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void ackMessageLater(MessageViewImpl messageViewImpl, int i, SettableFuture<Void> settableFuture) {
        MessageId messageId = messageViewImpl.getMessageId();
        String clientId = this.consumer.clientId();
        if (this.dropped) {
            LOGGER.info("Process queue was dropped, give up to ack message, mq={}, messageId={}, clientId={}", new Object[]{this.mq, messageId, clientId});
            return;
        }
        ScheduledExecutorService scheduler = this.consumer.getScheduler();
        try {
            scheduler.schedule(() -> {
                ackMessage(messageViewImpl, i, settableFuture);
            }, ACK_MESSAGE_FAILURE_BACKOFF_DELAY.toNanos(), TimeUnit.NANOSECONDS);
        } catch (Throwable th) {
            if (scheduler.isShutdown()) {
                return;
            }
            LOGGER.error("[Bug] Failed to schedule message ack request, mq={}, messageId={}, clientId={}", new Object[]{this.mq, messageId, clientId});
            ackMessageLater(messageViewImpl, 1 + i, settableFuture);
        }
    }

    @Override // org.apache.rocketmq.client.java.impl.consumer.ProcessQueue
    public long getPendingMessageCount() {
        this.pendingMessagesLock.readLock().lock();
        try {
            return this.pendingMessages.size();
        } finally {
            this.pendingMessagesLock.readLock().unlock();
        }
    }

    @Override // org.apache.rocketmq.client.java.impl.consumer.ProcessQueue
    public long getInflightMessageCount() {
        this.inflightMessagesLock.readLock().lock();
        try {
            return this.inflightMessages.size();
        } finally {
            this.inflightMessagesLock.readLock().unlock();
        }
    }

    @Override // org.apache.rocketmq.client.java.impl.consumer.ProcessQueue
    public long getCachedMessageBytes() {
        return this.cachedMessagesBytes.get();
    }

    @Override // org.apache.rocketmq.client.java.impl.consumer.ProcessQueue
    public void doStats() {
        LOGGER.info("Process queue stats: clientId={}, mq={}, receptionTimes={}, receivedMessageQuantity={}, pendingMessageCount={}, inflightMessageCount={}, cachedMessageBytes={}", new Object[]{this.consumer.clientId(), this.mq, Long.valueOf(this.receptionTimes.getAndSet(0L)), Long.valueOf(this.receivedMessagesQuantity.getAndSet(0L)), Long.valueOf(getPendingMessageCount()), Long.valueOf(getInflightMessageCount()), Long.valueOf(getCachedMessageBytes())});
    }
}
