package org.apache.activemq.artemis.core.client.impl;

import io.netty.handler.codec.http.multipart.DiskFileUpload;
import java.io.File;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
import org.apache.activemq.artemis.spi.core.remoting.ConsumerContext;
import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.FutureLatch;
import org.apache.activemq.artemis.utils.PasswordMaskingUtil;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.apache.activemq.artemis.utils.TokenBucketLimiter;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.apache.activemq.artemis.utils.collections.PriorityLinkedList;
import org.apache.activemq.artemis.utils.collections.PriorityLinkedListImpl;
import org.jboss.logging.Logger;

/* loaded from: input_file:WEB-INF/lib/artemis-core-client-2.6.1.jar:org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.class */
public final class ClientConsumerImpl implements ClientConsumerInternal {
    private static final long CLOSE_TIMEOUT_MILLISECONDS = 10000;
    private static final int NUM_PRIORITIES = 10;
    private final ClientSessionInternal session;
    private final SessionContext sessionContext;
    private final ConsumerContext consumerContext;
    private final SimpleString filterString;
    private final SimpleString queueName;
    private final boolean browseOnly;
    private final Executor sessionExecutor;
    private final Executor flowControlExecutor;
    private final int clientWindowSize;
    private final int ackBatchSize;
    private LargeMessageControllerImpl currentLargeMessageController;
    private ClientMessageInternal largeMessageReceived;
    private final TokenBucketLimiter rateLimiter;
    private volatile Thread receiverThread;
    private volatile Thread onMessageThread;
    private volatile MessageHandler handler;
    private volatile boolean closing;
    private volatile boolean closed;
    private int creditsToSend;
    private volatile boolean failedOver;
    private volatile Exception lastException;
    private int ackBytes;
    private volatile ClientMessageInternal lastAckedMessage;
    private final ClientSession.QueueQuery queueInfo;
    private volatile boolean ackIndividually;
    private final ClassLoader contextClassLoader;
    private static final Logger logger = Logger.getLogger((Class<?>) ClientConsumerImpl.class);
    public static final SimpleString FORCED_DELIVERY_MESSAGE = new SimpleString("_hornetq.forced.delivery.seq");
    private final ReusableLatch pendingFlowControl = new ReusableLatch(0);
    private final PriorityLinkedList<ClientMessageInternal> buffer = new PriorityLinkedListImpl(10);
    private final Runner runner = new Runner();
    private boolean stopped = false;
    private AtomicLong forceDeliveryCount = new AtomicLong(0);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/artemis-core-client-2.6.1.jar:org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl$Runner.class */
    public class Runner implements Runnable {
        private Runner() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                ClientConsumerImpl.this.callOnMessage();
            } catch (Exception e) {
                ActiveMQClientLogger.LOGGER.onMessageError(e);
                ClientConsumerImpl.this.lastException = e;
            }
        }
    }

    public ClientConsumerImpl(ClientSessionInternal clientSessionInternal, ConsumerContext consumerContext, SimpleString simpleString, SimpleString simpleString2, boolean z, int i, int i2, TokenBucketLimiter tokenBucketLimiter, Executor executor, Executor executor2, SessionContext sessionContext, ClientSession.QueueQuery queueQuery, ClassLoader classLoader) {
        this.consumerContext = consumerContext;
        this.queueName = simpleString;
        this.filterString = simpleString2;
        this.browseOnly = z;
        this.sessionContext = sessionContext;
        this.session = clientSessionInternal;
        this.rateLimiter = tokenBucketLimiter;
        this.sessionExecutor = executor;
        this.clientWindowSize = i;
        this.ackBatchSize = i2;
        this.queueInfo = queueQuery;
        this.contextClassLoader = classLoader;
        this.flowControlExecutor = executor2;
        if (logger.isTraceEnabled()) {
            logger.trace(this + ":: being created at", new Exception("trace"));
        }
    }

    @Override // org.apache.activemq.artemis.api.core.client.ClientConsumer
    public ConsumerContext getConsumerContext() {
        return this.consumerContext;
    }

    private ClientMessage receive(long j, boolean z) throws ActiveMQException {
        if (logger.isTraceEnabled()) {
            logger.trace(this + "::receive(" + j + ", " + z + PasswordMaskingUtil.END_ENC);
        }
        checkClosed();
        if (this.largeMessageReceived != null) {
            if (logger.isTraceEnabled()) {
                logger.trace(this + "::receive(" + j + ", " + z + ") -> discard LargeMessage body for " + this.largeMessageReceived);
            }
            this.largeMessageReceived.discardBody();
            this.largeMessageReceived = null;
        }
        if (this.rateLimiter != null) {
            this.rateLimiter.limit();
        }
        if (this.handler != null) {
            if (logger.isTraceEnabled()) {
                logger.trace(this + "::receive(" + j + ", " + z + ") -> throwing messageHandlerSet");
            }
            throw ActiveMQClientMessageBundle.BUNDLE.messageHandlerSet();
        }
        if (this.clientWindowSize == 0) {
            if (logger.isTraceEnabled()) {
                logger.trace(this + "::receive(" + j + ", " + z + ") -> start slowConsumer");
            }
            startSlowConsumer();
        }
        this.receiverThread = Thread.currentThread();
        boolean z2 = false;
        boolean z3 = false;
        long j2 = -1;
        long j3 = j == 0 ? Long.MAX_VALUE : j;
        while (true) {
            ClientMessageInternal clientMessageInternal = null;
            try {
                synchronized (this) {
                    while (true) {
                        if (!this.stopped) {
                            ClientMessageInternal poll = this.buffer.poll();
                            clientMessageInternal = poll;
                            if (poll != null) {
                                break;
                            }
                        }
                        if (this.closed || j3 <= 0) {
                            break;
                        }
                        if (j2 == -1) {
                            j2 = System.currentTimeMillis();
                        }
                        if (clientMessageInternal == null && z) {
                            if (!this.stopped) {
                                if (!z2) {
                                    z3 = true;
                                    break;
                                }
                            } else {
                                break;
                            }
                        }
                        try {
                            wait(j3);
                            if (clientMessageInternal != null || this.closed) {
                                break;
                            }
                            long currentTimeMillis = System.currentTimeMillis();
                            j3 -= currentTimeMillis - j2;
                            j2 = currentTimeMillis;
                        } catch (InterruptedException e) {
                            throw new ActiveMQInterruptedException(e);
                        }
                    }
                }
                if (this.failedOver) {
                    if (clientMessageInternal == null) {
                        if (logger.isTraceEnabled()) {
                            logger.trace(this + "::receive(" + j + ", " + z + ") -> m == null and failover");
                        }
                        this.failedOver = false;
                        z2 = false;
                        j3 = j == 0 ? Long.MAX_VALUE : j;
                    } else {
                        if (logger.isTraceEnabled()) {
                            logger.trace(this + "::receive(" + j + ", " + z + ") -> failedOver, but m != null, being " + clientMessageInternal);
                        }
                        this.failedOver = false;
                    }
                }
                if (z3) {
                    if (logger.isTraceEnabled()) {
                        logger.trace(this + "::Forcing delivery");
                    }
                    this.sessionContext.forceDelivery(this, this.forceDeliveryCount.getAndIncrement());
                    z3 = false;
                    z2 = true;
                } else {
                    if (clientMessageInternal == null) {
                        if (logger.isTraceEnabled()) {
                            logger.trace(this + "::Returning null");
                        }
                        resetIfSlowConsumer();
                        this.receiverThread = null;
                        return null;
                    }
                    this.session.workDone();
                    if (clientMessageInternal.containsProperty(FORCED_DELIVERY_MESSAGE)) {
                        long longValue = clientMessageInternal.getLongProperty(FORCED_DELIVERY_MESSAGE).longValue();
                        if (z && z2 && longValue == this.forceDeliveryCount.get() - 1) {
                            resetIfSlowConsumer();
                            if (logger.isTraceEnabled()) {
                                logger.trace(this + "::There was nothing on the queue, leaving it now:: returning null");
                            }
                            return null;
                        }
                        if (logger.isTraceEnabled()) {
                            logger.trace(this + "::Ignored force delivery answer as it belonged to another call");
                        }
                    } else {
                        boolean isExpired = clientMessageInternal.isExpired();
                        flowControlBeforeConsumption(clientMessageInternal);
                        if (!isExpired) {
                            if (clientMessageInternal.isLargeMessage()) {
                                this.largeMessageReceived = clientMessageInternal;
                            }
                            if (logger.isTraceEnabled()) {
                                logger.trace(this + "::Returning " + clientMessageInternal);
                            }
                            ClientMessageInternal clientMessageInternal2 = clientMessageInternal;
                            this.receiverThread = null;
                            return clientMessageInternal2;
                        }
                        clientMessageInternal.discardBody();
                        this.session.expire(this, clientMessageInternal);
                        if (this.clientWindowSize == 0) {
                            startSlowConsumer();
                        }
                        if (j3 <= 0) {
                            this.receiverThread = null;
                            return null;
                        }
                    }
                }
            } finally {
                this.receiverThread = null;
            }
        }
    }

    @Override // org.apache.activemq.artemis.api.core.client.ClientConsumer
    public ClientMessage receive(long j) throws ActiveMQException {
        if (logger.isTraceEnabled()) {
            logger.trace(this + ":: receive(" + j + PasswordMaskingUtil.END_ENC);
        }
        ClientMessage receive = receive(j, false);
        if (receive == null && !this.closed) {
            if (logger.isTraceEnabled()) {
                logger.trace(this + ":: receive(" + j + ") -> null, trying again with receive(0)");
            }
            receive = receive(0L, true);
        }
        if (logger.isTraceEnabled()) {
            logger.trace(this + ":: returning " + receive);
        }
        return receive;
    }

    @Override // org.apache.activemq.artemis.api.core.client.ClientConsumer
    public ClientMessage receive() throws ActiveMQException {
        return receive(0L, false);
    }

    @Override // org.apache.activemq.artemis.api.core.client.ClientConsumer
    public ClientMessage receiveImmediate() throws ActiveMQException {
        return receive(0L, true);
    }

    @Override // org.apache.activemq.artemis.api.core.client.ClientConsumer
    public MessageHandler getMessageHandler() throws ActiveMQException {
        checkClosed();
        return this.handler;
    }

    @Override // org.apache.activemq.artemis.api.core.client.ClientConsumer
    public synchronized ClientConsumerImpl setMessageHandler(MessageHandler messageHandler) throws ActiveMQException {
        checkClosed();
        if (this.receiverThread != null) {
            throw ActiveMQClientMessageBundle.BUNDLE.inReceive();
        }
        boolean z = this.handler == null;
        if (this.handler != messageHandler && this.clientWindowSize == 0) {
            startSlowConsumer();
        }
        this.handler = messageHandler;
        if (this.handler != null && z) {
            requeueExecutors();
        } else if (this.handler == null && !z) {
            waitForOnMessageToComplete(true);
        }
        return this;
    }

    @Override // org.apache.activemq.artemis.api.core.client.ClientConsumer, java.lang.AutoCloseable
    public void close() throws ActiveMQException {
        doCleanUp(true);
    }

    @Override // org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal
    public Thread prepareForClose(final FutureLatch futureLatch) throws ActiveMQException {
        this.closing = true;
        resetLargeMessageController();
        this.sessionExecutor.execute(new Runnable() { // from class: org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl.1
            @Override // java.lang.Runnable
            public void run() {
                futureLatch.run();
            }
        });
        return this.onMessageThread;
    }

    @Override // org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal
    public void cleanUp() {
        try {
            doCleanUp(false);
        } catch (ActiveMQException e) {
            ActiveMQClientLogger.LOGGER.failedCleaningUp(toString());
        }
    }

    @Override // org.apache.activemq.artemis.api.core.client.ClientConsumer
    public boolean isClosed() {
        return this.closed;
    }

    @Override // org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal
    public void stop(boolean z) throws ActiveMQException {
        if (this.browseOnly) {
            return;
        }
        synchronized (this) {
            if (this.stopped) {
                return;
            }
            this.stopped = true;
            waitForOnMessageToComplete(z);
        }
    }

    @Override // org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal
    public void clearAtFailover() {
        if (logger.isTraceEnabled()) {
            logger.trace(this + "::ClearAtFailover");
        }
        clearBuffer();
        this.stopped = true;
        resetLargeMessageController();
        this.lastAckedMessage = null;
        this.creditsToSend = 0;
        this.failedOver = true;
        this.ackIndividually = false;
    }

    @Override // org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal
    public synchronized void start() {
        this.stopped = false;
        requeueExecutors();
    }

    @Override // org.apache.activemq.artemis.api.core.client.ClientConsumer
    public Exception getLastException() {
        return this.lastException;
    }

    @Override // org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal
    public ClientSession.QueueQuery getQueueInfo() {
        return this.queueInfo;
    }

    @Override // org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal
    public long getForceDeliveryCount() {
        return this.forceDeliveryCount.get();
    }

    @Override // org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal
    public SimpleString getFilterString() {
        return this.filterString;
    }

    @Override // org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal
    public SimpleString getQueueName() {
        return this.queueName;
    }

    @Override // org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal
    public boolean isBrowseOnly() {
        return this.browseOnly;
    }

    @Override // org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal
    public synchronized void handleMessage(ClientMessageInternal clientMessageInternal) throws Exception {
        if (this.closing) {
            return;
        }
        if (clientMessageInternal.getBooleanProperty(Message.HDR_LARGE_COMPRESSED).booleanValue()) {
            handleCompressedMessage(clientMessageInternal);
        } else {
            handleRegularMessage(clientMessageInternal);
        }
    }

    private void handleRegularMessage(ClientMessageInternal clientMessageInternal) {
        if (clientMessageInternal.getAddress() == null) {
            clientMessageInternal.setAddress(this.queueInfo.getAddress());
        }
        clientMessageInternal.onReceipt(this);
        if (!this.ackIndividually && clientMessageInternal.getPriority() != 4 && !clientMessageInternal.containsProperty(FORCED_DELIVERY_MESSAGE)) {
            this.ackIndividually = true;
        }
        this.buffer.addTail(clientMessageInternal, clientMessageInternal.getPriority());
        if (this.handler == null) {
            notify();
        } else {
            if (this.stopped) {
                return;
            }
            queueExecutor();
        }
    }

    private void handleCompressedMessage(ClientMessageInternal clientMessageInternal) throws Exception {
        ClientLargeMessageImpl clientLargeMessageImpl = new ClientLargeMessageImpl();
        clientLargeMessageImpl.retrieveExistingData(clientMessageInternal);
        File file = null;
        if (this.session.isCacheLargeMessageClient()) {
            file = File.createTempFile("tmp-large-message-" + clientLargeMessageImpl.getMessageID() + "-", DiskFileUpload.postfix);
            file.deleteOnExit();
        }
        this.currentLargeMessageController = new LargeMessageControllerImpl(this, clientLargeMessageImpl.getLargeMessageSize(), this.session.getSessionFactory().getServerLocator().getCallTimeout(), file);
        this.currentLargeMessageController.setLocal(true);
        ActiveMQBuffer bodyBuffer = clientMessageInternal.toCore().getBodyBuffer();
        byte[] activeArray = ByteUtil.getActiveArray(bodyBuffer.readBytes(bodyBuffer.writerIndex() - bodyBuffer.readerIndex()).toByteBuffer());
        clientLargeMessageImpl.setLargeMessageController(new CompressedLargeMessageControllerImpl(this.currentLargeMessageController));
        this.currentLargeMessageController.addPacket(activeArray, activeArray.length, false);
        handleRegularMessage(clientLargeMessageImpl);
    }

    @Override // org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal
    public synchronized void handleLargeMessage(ClientLargeMessageInternal clientLargeMessageInternal, long j) throws Exception {
        if (this.closing) {
            return;
        }
        File file = null;
        if (this.session.isCacheLargeMessageClient()) {
            file = File.createTempFile("tmp-large-message-" + clientLargeMessageInternal.getMessageID() + "-", DiskFileUpload.postfix);
            file.deleteOnExit();
        }
        this.currentLargeMessageController = new LargeMessageControllerImpl(this, j, this.session.getSessionFactory().getServerLocator().getCallTimeout(), file);
        if (clientLargeMessageInternal.isCompressed()) {
            clientLargeMessageInternal.setLargeMessageController(new CompressedLargeMessageControllerImpl(this.currentLargeMessageController));
        } else {
            clientLargeMessageInternal.setLargeMessageController(this.currentLargeMessageController);
        }
        handleRegularMessage(clientLargeMessageInternal);
    }

    @Override // org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal
    public synchronized void handleLargeMessageContinuation(byte[] bArr, int i, boolean z) throws Exception {
        if (this.closing) {
            return;
        }
        if (this.currentLargeMessageController != null) {
            this.currentLargeMessageController.addPacket(bArr, i, z);
            return;
        }
        if (logger.isTraceEnabled()) {
            logger.trace(this + "::Sending back credits for largeController = null " + i);
        }
        flowControl(i, false);
    }

    @Override // org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal
    public void clear(boolean z) throws ActiveMQException {
        synchronized (this) {
            LinkedListIterator<ClientMessageInternal> it = this.buffer.iterator();
            while (it.hasNext()) {
                try {
                    ClientMessageInternal next = it.next();
                    if (next.isLargeMessage()) {
                        ((ClientLargeMessageInternal) next).getLargeMessageController().cancel();
                    }
                    flowControlBeforeConsumption(next);
                } catch (Exception e) {
                    ActiveMQClientLogger.LOGGER.errorClearingMessages(e);
                }
            }
            clearBuffer();
            try {
                resetLargeMessageController();
            } catch (Throwable th) {
                ActiveMQClientLogger.LOGGER.errorClearingMessages(th);
            }
        }
        waitForOnMessageToComplete(z);
    }

    private void resetLargeMessageController() {
        LargeMessageControllerImpl largeMessageControllerImpl = this.currentLargeMessageController;
        if (largeMessageControllerImpl != null) {
            largeMessageControllerImpl.cancel();
            this.currentLargeMessageController = null;
        }
    }

    @Override // org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal
    public int getClientWindowSize() {
        return this.clientWindowSize;
    }

    @Override // org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal
    public int getBufferSize() {
        return this.buffer.size();
    }

    @Override // org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal
    public void acknowledge(ClientMessage clientMessage) throws ActiveMQException {
        ClientMessageInternal clientMessageInternal = (ClientMessageInternal) clientMessage;
        if (this.ackIndividually) {
            individualAcknowledge(clientMessage);
            return;
        }
        this.ackBytes += clientMessage.getEncodeSize();
        if (logger.isTraceEnabled()) {
            logger.trace(this + "::acknowledge ackBytes=" + this.ackBytes + " and ackBatchSize=" + this.ackBatchSize + ", encodeSize=" + clientMessage.getEncodeSize());
        }
        if (this.ackBytes >= this.ackBatchSize) {
            if (logger.isTraceEnabled()) {
                logger.trace(this + ":: acknowledge acking " + clientMessageInternal);
            }
            doAck(clientMessageInternal);
        } else {
            if (logger.isTraceEnabled()) {
                logger.trace(this + ":: acknowledge setting lastAckedMessage = " + clientMessageInternal);
            }
            this.lastAckedMessage = clientMessageInternal;
        }
    }

    @Override // org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal
    public void individualAcknowledge(ClientMessage clientMessage) throws ActiveMQException {
        if (this.lastAckedMessage != null) {
            flushAcks();
        }
        this.session.individualAcknowledge(this, clientMessage);
    }

    @Override // org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal
    public void flushAcks() throws ActiveMQException {
        if (this.lastAckedMessage != null) {
            if (logger.isTraceEnabled()) {
                logger.trace(this + "::FlushACK acking lastMessage::" + this.lastAckedMessage);
            }
            doAck(this.lastAckedMessage);
        }
    }

    @Override // org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal
    public void flowControl(int i, boolean z) throws ActiveMQException {
        if (this.clientWindowSize >= 0) {
            this.creditsToSend += i;
            if (this.creditsToSend >= this.clientWindowSize) {
                if (this.clientWindowSize == 0 && z) {
                    if (logger.isTraceEnabled()) {
                        logger.trace(this + "::FlowControl::Sending " + this.creditsToSend + " -1, for slow consumer");
                    }
                    int i2 = this.creditsToSend - 1;
                    this.creditsToSend = 0;
                    if (i2 > 0) {
                        sendCredits(i2);
                        return;
                    }
                    return;
                }
                if (logger.isDebugEnabled()) {
                    logger.debug("Sending " + i + " from flow-control");
                }
                int i3 = this.creditsToSend;
                this.creditsToSend = 0;
                if (i3 > 0) {
                    sendCredits(i3);
                }
            }
        }
    }

    private void startSlowConsumer() {
        if (logger.isTraceEnabled()) {
            logger.trace(this + "::Sending 1 credit to start delivering of one message to slow consumer");
        }
        sendCredits(1);
        try {
            this.pendingFlowControl.await(10L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private void resetIfSlowConsumer() {
        if (this.clientWindowSize == 0) {
            sendCredits(0);
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            this.flowControlExecutor.execute(new Runnable() { // from class: org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl.2
                @Override // java.lang.Runnable
                public void run() {
                    countDownLatch.countDown();
                }
            });
            try {
                countDownLatch.await(10L, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                throw new ActiveMQInterruptedException(e);
            }
        }
    }

    private void requeueExecutors() {
        for (int i = 0; i < this.buffer.size(); i++) {
            queueExecutor();
        }
    }

    private void queueExecutor() {
        if (logger.isTraceEnabled()) {
            logger.trace(this + "::Adding Runner on Executor for delivery");
        }
        this.sessionExecutor.execute(this.runner);
    }

    private void sendCredits(final int i) {
        this.pendingFlowControl.countUp();
        this.flowControlExecutor.execute(new Runnable() { // from class: org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl.3
            @Override // java.lang.Runnable
            public void run() {
                try {
                    ClientConsumerImpl.this.sessionContext.sendConsumerCredits(ClientConsumerImpl.this, i);
                } finally {
                    ClientConsumerImpl.this.pendingFlowControl.countDown();
                }
            }
        });
    }

    private void waitForOnMessageToComplete(boolean z) {
        if (this.handler == null || !z || Thread.currentThread() == this.onMessageThread) {
            return;
        }
        FutureLatch futureLatch = new FutureLatch();
        this.sessionExecutor.execute(futureLatch);
        if (futureLatch.await(10000L)) {
            return;
        }
        ActiveMQClientLogger.LOGGER.timeOutWaitingForProcessing();
    }

    private void checkClosed() throws ActiveMQException {
        if (this.closed) {
            throw ActiveMQClientMessageBundle.BUNDLE.consumerClosed();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void callOnMessage() throws Exception {
        ClientMessageInternal poll;
        if (this.closing || this.stopped) {
            return;
        }
        this.session.workDone();
        MessageHandler messageHandler = this.handler;
        if (messageHandler != null) {
            if (this.rateLimiter != null) {
                this.rateLimiter.limit();
            }
            this.failedOver = false;
            synchronized (this) {
                poll = this.buffer.poll();
            }
            if (poll == null || poll.containsProperty(FORCED_DELIVERY_MESSAGE)) {
                return;
            }
            boolean isExpired = poll.isExpired();
            flowControlBeforeConsumption(poll);
            if (isExpired) {
                this.session.expire(this, poll);
            } else {
                if (logger.isTraceEnabled()) {
                    logger.trace(this + "::Calling handler.onMessage");
                }
                final ClassLoader classLoader = (ClassLoader) AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() { // from class: org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl.4
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.security.PrivilegedAction
                    public ClassLoader run() {
                        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
                        Thread.currentThread().setContextClassLoader(ClientConsumerImpl.this.contextClassLoader);
                        return contextClassLoader;
                    }
                });
                this.onMessageThread = Thread.currentThread();
                try {
                    messageHandler.onMessage(poll);
                    try {
                        AccessController.doPrivileged(new PrivilegedAction<Object>() { // from class: org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl.5
                            @Override // java.security.PrivilegedAction
                            public Object run() {
                                Thread.currentThread().setContextClassLoader(classLoader);
                                return null;
                            }
                        });
                    } catch (Exception e) {
                    }
                    if (logger.isTraceEnabled()) {
                        logger.trace(this + "::Handler.onMessage done");
                    }
                    if (poll.isLargeMessage()) {
                        poll.discardBody();
                    }
                } finally {
                    try {
                        AccessController.doPrivileged(new PrivilegedAction<Object>() { // from class: org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl.5
                            @Override // java.security.PrivilegedAction
                            public Object run() {
                                Thread.currentThread().setContextClassLoader(classLoader);
                                return null;
                            }
                        });
                    } catch (Exception e2) {
                        ActiveMQClientLogger.LOGGER.failedPerformPostActionsOnMessage(e2);
                    }
                    this.onMessageThread = null;
                }
            }
            if (this.clientWindowSize == 0) {
                startSlowConsumer();
            }
        }
    }

    private void flowControlBeforeConsumption(ClientMessageInternal clientMessageInternal) throws ActiveMQException {
        if (clientMessageInternal.getFlowControlSize() != 0) {
            flowControl(clientMessageInternal.getFlowControlSize(), !clientMessageInternal.isLargeMessage());
        }
    }

    private void doCleanUp(boolean z) throws ActiveMQException {
        if (this.closed) {
            return;
        }
        this.closing = true;
        waitForOnMessageToComplete(true);
        resetLargeMessageController();
        this.closed = true;
        synchronized (this) {
            if (this.receiverThread != null) {
                notify();
            }
            this.handler = null;
            this.receiverThread = null;
        }
        flushAcks();
        clearBuffer();
        if (z) {
            this.sessionContext.closeConsumer(this);
        }
        this.session.removeConsumer(this);
    }

    private void clearBuffer() {
        this.buffer.clear();
    }

    private void doAck(ClientMessageInternal clientMessageInternal) throws ActiveMQException {
        this.ackBytes = 0;
        this.lastAckedMessage = null;
        if (logger.isTraceEnabled()) {
            logger.trace(this + "::Acking message " + clientMessageInternal);
        }
        this.session.acknowledge(this, clientMessageInternal);
    }

    public String toString() {
        return super.toString() + "{consumerContext=" + this.consumerContext + ", queueName=" + ((Object) this.queueName) + '}';
    }
}
