package org.apache.activemq.artemis.core.server.impl;

import java.math.BigDecimal;
import java.util.Deque;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.message.BodyEncoder;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.HandleStatus;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.FutureLatch;
import org.apache.activemq.artemis.utils.LinkedListIterator;
import org.apache.activemq.artemis.utils.TypedProperties;
import org.jboss.logging.Logger;

/* loaded from: input_file:WEB-INF/lib/artemis-server-1.5.4.jar:org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.class */
public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
    private static final Logger logger = Logger.getLogger((Class<?>) ServerConsumerImpl.class);
    private final long id;
    protected final Queue messageQueue;
    private final Filter filter;
    private final int minLargeMessageSize;
    private final ServerSession session;
    protected final Object lock;
    private final boolean supportLargeMessage;
    private Object protocolData;
    private Object protocolContext;
    private final ActiveMQServer server;
    private SlowConsumerDetectionListener slowConsumerListener;
    private final ReadWriteLock lockDelivery;
    private volatile AtomicInteger availableCredits;
    private boolean started;
    private volatile LargeMessageDeliverer largeMessageDeliverer;
    private final boolean browseOnly;
    protected BrowserDeliverer browserDeliverer;
    private final boolean strictUpdateDeliveryCount;
    private final StorageManager storageManager;
    protected final Deque<MessageReference> deliveringRefs;
    private final SessionCallback callback;
    private final boolean preAcknowledge;
    private final ManagementService managementService;
    private final Binding binding;
    private boolean transferring;
    private final long creationTime;
    private AtomicLong consumerRateCheckTime;
    private AtomicLong messageConsumedSnapshot;
    private long acks;
    private final Runnable resumeLargeMessageRunnable;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:WEB-INF/lib/artemis-server-1.5.4.jar:org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl$BrowserDeliverer.class */
    public class BrowserDeliverer implements Runnable {
        protected MessageReference current = null;
        public final LinkedListIterator<MessageReference> iterator;

        public BrowserDeliverer(LinkedListIterator<MessageReference> linkedListIterator) {
            this.iterator = linkedListIterator;
        }

        public synchronized void close() {
            this.iterator.close();
        }

        @Override // java.lang.Runnable
        public synchronized void run() {
            if (this.current != null) {
                try {
                    HandleStatus handle = ServerConsumerImpl.this.handle(this.current);
                    if (handle == HandleStatus.BUSY) {
                        return;
                    }
                    if (handle == HandleStatus.HANDLED) {
                        ServerConsumerImpl.this.proceedDeliver(this.current);
                    }
                    this.current = null;
                } catch (Exception e) {
                    ActiveMQServerLogger.LOGGER.errorBrowserHandlingMessage(e, this.current);
                    return;
                }
            }
            MessageReference messageReference = null;
            while (true) {
                try {
                    messageReference = null;
                    synchronized (ServerConsumerImpl.this.messageQueue) {
                        if (!this.iterator.hasNext()) {
                            ServerConsumerImpl.this.callback.browserFinished(ServerConsumerImpl.this);
                            return;
                        }
                        messageReference = this.iterator.next();
                        HandleStatus handle2 = ServerConsumerImpl.this.handle(messageReference);
                        if (handle2 == HandleStatus.HANDLED) {
                            ServerConsumerImpl.this.proceedDeliver(messageReference);
                        } else if (handle2 == HandleStatus.BUSY) {
                            this.current = messageReference;
                            return;
                        }
                    }
                } catch (Exception e2) {
                    ActiveMQServerLogger.LOGGER.errorBrowserHandlingMessage(e2, messageReference);
                    return;
                }
            }
        }

        public boolean isBrowsed() {
            ServerConsumerImpl.this.messageQueue.deliverAsync();
            return !this.iterator.hasNext();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/artemis-server-1.5.4.jar:org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl$LargeMessageDeliverer.class */
    public final class LargeMessageDeliverer {
        private long sizePendingLargeMessage;
        private LargeServerMessage largeMessage;
        private final MessageReference ref;
        private boolean sentInitialPacket;
        private long positionPendingLargeMessage;
        private BodyEncoder context;

        private LargeMessageDeliverer(LargeServerMessage largeServerMessage, MessageReference messageReference) throws Exception {
            this.sentInitialPacket = false;
            this.largeMessage = largeServerMessage;
            this.largeMessage.incrementDelayDeletionCount();
            this.ref = messageReference;
        }

        public boolean deliver() throws Exception {
            ServerConsumerImpl.this.lockDelivery.readLock().lock();
            try {
                if (!ServerConsumerImpl.this.started) {
                    return false;
                }
                LargeServerMessage largeServerMessage = this.largeMessage;
                if (largeServerMessage == null) {
                    ServerConsumerImpl.this.lockDelivery.readLock().unlock();
                    return true;
                }
                if (ServerConsumerImpl.this.availableCredits != null && ServerConsumerImpl.this.availableCredits.get() <= 0) {
                    if (ServerConsumerImpl.logger.isTraceEnabled()) {
                        ServerConsumerImpl.logger.trace(this + "::FlowControl::delivery largeMessage interrupting as there are no more credits, available=" + ServerConsumerImpl.this.availableCredits);
                    }
                    ServerConsumerImpl.this.lockDelivery.readLock().unlock();
                    return false;
                }
                if (!this.sentInitialPacket) {
                    this.context = largeServerMessage.getBodyEncoder();
                    this.sizePendingLargeMessage = this.context.getLargeBodySize();
                    this.context.open();
                    this.sentInitialPacket = true;
                    int sendLargeMessage = ServerConsumerImpl.this.callback.sendLargeMessage(this.ref, largeServerMessage, ServerConsumerImpl.this, this.context.getLargeBodySize(), this.ref.getDeliveryCount());
                    if (ServerConsumerImpl.this.availableCredits != null) {
                        ServerConsumerImpl.this.availableCredits.addAndGet(-sendLargeMessage);
                        if (ServerConsumerImpl.logger.isTraceEnabled()) {
                            ServerConsumerImpl.logger.trace(this + "::FlowControl:: deliver initialpackage with " + sendLargeMessage + " delivered, available now = " + ServerConsumerImpl.this.availableCredits);
                        }
                    }
                    ServerConsumerImpl.this.resumeLargeMessage();
                    ServerConsumerImpl.this.lockDelivery.readLock().unlock();
                    return false;
                }
                if (ServerConsumerImpl.this.availableCredits != null && ServerConsumerImpl.this.availableCredits.get() <= 0) {
                    if (ServerConsumerImpl.logger.isTraceEnabled()) {
                        ServerConsumerImpl.logger.trace(this + "::FlowControl::deliverLargeMessage Leaving loop of send LargeMessage because of credits, available=" + ServerConsumerImpl.this.availableCredits);
                    }
                    ServerConsumerImpl.this.lockDelivery.readLock().unlock();
                    return false;
                }
                int min = (int) Math.min(this.sizePendingLargeMessage - this.positionPendingLargeMessage, ServerConsumerImpl.this.minLargeMessageSize);
                ActiveMQBuffer fixedBuffer = ActiveMQBuffers.fixedBuffer(min);
                this.context.encode(fixedBuffer, min);
                byte[] array = fixedBuffer.toByteBuffer().hasArray() ? fixedBuffer.toByteBuffer().array() : new byte[0];
                int sendLargeMessageContinuation = ServerConsumerImpl.this.callback.sendLargeMessageContinuation(ServerConsumerImpl.this, array, this.positionPendingLargeMessage + ((long) min) < this.sizePendingLargeMessage, false);
                int length = array.length;
                if (ServerConsumerImpl.this.availableCredits != null) {
                    ServerConsumerImpl.this.availableCredits.addAndGet(-sendLargeMessageContinuation);
                    if (ServerConsumerImpl.logger.isTraceEnabled()) {
                        ServerConsumerImpl.logger.trace(this + "::FlowControl::largeMessage deliver continuation, packetSize=" + sendLargeMessageContinuation + " available now=" + ServerConsumerImpl.this.availableCredits);
                    }
                }
                this.positionPendingLargeMessage += length;
                if (this.positionPendingLargeMessage < this.sizePendingLargeMessage) {
                    ServerConsumerImpl.this.resumeLargeMessage();
                    ServerConsumerImpl.this.lockDelivery.readLock().unlock();
                    return false;
                }
                if (ServerConsumerImpl.logger.isTraceEnabled()) {
                    ServerConsumerImpl.logger.trace("Finished deliverLargeMessage");
                }
                finish();
                ServerConsumerImpl.this.lockDelivery.readLock().unlock();
                return true;
            } finally {
                ServerConsumerImpl.this.lockDelivery.readLock().unlock();
            }
        }

        public void finish() throws Exception {
            synchronized (ServerConsumerImpl.this.lock) {
                if (this.largeMessage == null) {
                    return;
                }
                if (this.context != null) {
                    this.context.close();
                }
                this.largeMessage.releaseResources();
                this.largeMessage.decrementDelayDeletionCount();
                if (ServerConsumerImpl.this.preAcknowledge && !ServerConsumerImpl.this.browseOnly) {
                    this.largeMessage.decrementDelayDeletionCount();
                }
                ServerConsumerImpl.this.largeMessageDeliverer = null;
                this.largeMessage = null;
            }
        }
    }

    @Override // org.apache.activemq.artemis.core.server.Consumer
    public String debug() {
        return toString() + "::Delivering " + this.deliveringRefs.size();
    }

    public ServerConsumerImpl(long j, ServerSession serverSession, QueueBinding queueBinding, Filter filter, boolean z, boolean z2, StorageManager storageManager, SessionCallback sessionCallback, boolean z3, boolean z4, ManagementService managementService, ActiveMQServer activeMQServer) throws Exception {
        this(j, serverSession, queueBinding, filter, z, z2, storageManager, sessionCallback, z3, z4, managementService, true, null, activeMQServer);
    }

    public ServerConsumerImpl(long j, ServerSession serverSession, QueueBinding queueBinding, Filter filter, boolean z, boolean z2, StorageManager storageManager, SessionCallback sessionCallback, boolean z3, boolean z4, ManagementService managementService, boolean z5, Integer num, ActiveMQServer activeMQServer) throws Exception {
        this.lock = new Object();
        this.lockDelivery = new ReentrantReadWriteLock();
        this.availableCredits = new AtomicInteger(0);
        this.largeMessageDeliverer = null;
        this.deliveringRefs = new ConcurrentLinkedDeque();
        this.transferring = false;
        this.consumerRateCheckTime = new AtomicLong(System.currentTimeMillis());
        this.messageConsumedSnapshot = new AtomicLong(0L);
        this.resumeLargeMessageRunnable = new Runnable() { // from class: org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl.3
            @Override // java.lang.Runnable
            public void run() {
                synchronized (ServerConsumerImpl.this.lock) {
                    try {
                        if (ServerConsumerImpl.this.largeMessageDeliverer == null || ServerConsumerImpl.this.largeMessageDeliverer.deliver()) {
                            ServerConsumerImpl.this.forceDelivery();
                        }
                    } catch (Exception e) {
                        ActiveMQServerLogger.LOGGER.errorRunningLargeMessageDeliverer(e);
                    }
                }
            }
        };
        this.id = j;
        this.filter = filter;
        this.session = serverSession;
        this.binding = queueBinding;
        this.messageQueue = queueBinding.getQueue();
        this.started = z2 || z;
        this.browseOnly = z2;
        this.storageManager = storageManager;
        this.callback = sessionCallback;
        this.preAcknowledge = z3;
        this.managementService = managementService;
        this.minLargeMessageSize = serverSession.getMinLargeMessageSize();
        this.strictUpdateDeliveryCount = z4;
        this.creationTime = System.currentTimeMillis();
        if (z2) {
            this.browserDeliverer = new BrowserDeliverer(this.messageQueue.browserIterator());
        } else {
            this.messageQueue.addConsumer(this);
        }
        this.supportLargeMessage = z5;
        if (num != null) {
            if (num.intValue() == -1) {
                this.availableCredits = null;
            } else {
                this.availableCredits.set(num.intValue());
            }
        }
        this.server = activeMQServer;
    }

    @Override // org.apache.activemq.artemis.spi.core.remoting.ReadyListener
    public void readyForWriting() {
        promptDelivery();
    }

    @Override // org.apache.activemq.artemis.core.server.ServerConsumer
    public Object getProtocolData() {
        return this.protocolData;
    }

    @Override // org.apache.activemq.artemis.core.server.ServerConsumer
    public void setProtocolData(Object obj) {
        this.protocolData = obj;
    }

    @Override // org.apache.activemq.artemis.core.server.ServerConsumer
    public void setlowConsumerDetection(SlowConsumerDetectionListener slowConsumerDetectionListener) {
        this.slowConsumerListener = slowConsumerDetectionListener;
    }

    @Override // org.apache.activemq.artemis.core.server.ServerConsumer
    public SlowConsumerDetectionListener getSlowConsumerDetecion() {
        return this.slowConsumerListener;
    }

    @Override // org.apache.activemq.artemis.core.server.ServerConsumer
    public void fireSlowConsumer() {
        if (this.slowConsumerListener != null) {
            this.slowConsumerListener.onSlowConsumer(this);
        }
    }

    @Override // org.apache.activemq.artemis.core.server.ServerConsumer
    public Object getProtocolContext() {
        return this.protocolContext;
    }

    @Override // org.apache.activemq.artemis.core.server.ServerConsumer
    public void setProtocolContext(Object obj) {
        this.protocolContext = obj;
    }

    @Override // org.apache.activemq.artemis.core.server.ServerConsumer
    public long getID() {
        return this.id;
    }

    @Override // org.apache.activemq.artemis.core.server.ServerConsumer
    public boolean isBrowseOnly() {
        return this.browseOnly;
    }

    @Override // org.apache.activemq.artemis.core.server.ServerConsumer
    public long getCreationTime() {
        return this.creationTime;
    }

    @Override // org.apache.activemq.artemis.core.server.ServerConsumer
    public String getConnectionID() {
        return this.session.getConnectionID().toString();
    }

    @Override // org.apache.activemq.artemis.core.server.ServerConsumer
    public String getSessionID() {
        return this.session.getName();
    }

    @Override // org.apache.activemq.artemis.core.server.Consumer
    public List<MessageReference> getDeliveringMessages() {
        LinkedList linkedList = new LinkedList();
        synchronized (this.lock) {
            List<MessageReference> inTXMessagesForConsumer = this.session.getInTXMessagesForConsumer(this.id);
            if (inTXMessagesForConsumer != null) {
                linkedList.addAll(inTXMessagesForConsumer);
            }
            linkedList.addAll(this.deliveringRefs);
        }
        return linkedList;
    }

    @Override // org.apache.activemq.artemis.core.server.Consumer
    public HandleStatus handle(MessageReference messageReference) throws Exception {
        if ((this.callback != null && !this.callback.hasCredits(this)) || (this.availableCredits != null && this.availableCredits.get() <= 0)) {
            if (logger.isDebugEnabled()) {
                logger.debug(this + " is busy for the lack of credits. Current credits = " + this.availableCredits + " Can't receive reference " + messageReference);
            }
            return HandleStatus.BUSY;
        }
        synchronized (this.lock) {
            if (!this.callback.isWritable(this) || !this.started || this.transferring) {
                return HandleStatus.BUSY;
            }
            if (this.largeMessageDeliverer != null) {
                if (logger.isDebugEnabled()) {
                    logger.debug(this + " is busy delivering large message " + this.largeMessageDeliverer + ", can't deliver reference " + messageReference);
                }
                return HandleStatus.BUSY;
            }
            ServerMessage message = messageReference.getMessage();
            if (this.filter != null && !this.filter.match(message)) {
                if (logger.isTraceEnabled()) {
                    logger.trace("Reference " + messageReference + " is a noMatch on consumer " + this);
                }
                return HandleStatus.NO_MATCH;
            }
            if (logger.isTraceEnabled()) {
                logger.trace("ServerConsumerImpl::" + this + " Handling reference " + messageReference);
            }
            if (!this.browseOnly) {
                if (!this.preAcknowledge) {
                    this.deliveringRefs.add(messageReference);
                }
                messageReference.handled();
                messageReference.setConsumerId(Long.valueOf(this.id));
                messageReference.incrementDeliveryCount();
                if (this.strictUpdateDeliveryCount && !messageReference.isPaged() && messageReference.getMessage().isDurable() && messageReference.getQueue().isDurable() && !messageReference.getQueue().isInternalQueue() && !messageReference.isPaged()) {
                    this.storageManager.updateDeliveryCount(messageReference);
                }
                if (this.preAcknowledge) {
                    if (message.isLargeMessage()) {
                        ((LargeServerMessage) message).incrementDelayDeletionCount();
                    }
                    messageReference.getQueue().acknowledge(messageReference);
                    this.acks++;
                }
                if (message.isLargeMessage() && this.supportLargeMessage) {
                    this.largeMessageDeliverer = new LargeMessageDeliverer((LargeServerMessage) message, messageReference);
                }
            }
            this.lockDelivery.readLock().lock();
            return HandleStatus.HANDLED;
        }
    }

    @Override // org.apache.activemq.artemis.core.server.Consumer
    public void proceedDeliver(MessageReference messageReference) throws Exception {
        try {
            ServerMessage message = messageReference.getMessage();
            if (message.isLargeMessage() && this.supportLargeMessage) {
                if (this.largeMessageDeliverer == null) {
                    this.largeMessageDeliverer = new LargeMessageDeliverer((LargeServerMessage) message, messageReference);
                }
                this.largeMessageDeliverer.deliver();
            } else {
                deliverStandardMessage(messageReference, message);
            }
        } finally {
            this.lockDelivery.readLock().unlock();
            this.callback.afterDelivery();
        }
    }

    @Override // org.apache.activemq.artemis.core.server.Consumer
    public Filter getFilter() {
        return this.filter;
    }

    @Override // org.apache.activemq.artemis.core.server.ServerConsumer
    public void close(boolean z) throws Exception {
        if (logger.isTraceEnabled()) {
            logger.trace("ServerConsumerImpl::" + this + " being closed with failed=" + z, new Exception("trace"));
        }
        setStarted(false);
        LargeMessageDeliverer largeMessageDeliverer = this.largeMessageDeliverer;
        if (largeMessageDeliverer != null) {
            largeMessageDeliverer.finish();
        }
        removeItself();
        Iterator<MessageReference> it = cancelRefs(z, false, (Transaction) null).iterator();
        TransactionImpl transactionImpl = new TransactionImpl(this.storageManager);
        while (it.hasNext()) {
            MessageReference next = it.next();
            if (logger.isTraceEnabled()) {
                logger.trace("ServerConsumerImpl::" + this + " cancelling reference " + next);
            }
            next.getQueue().cancel(transactionImpl, next, true);
        }
        transactionImpl.rollback();
        if (this.browseOnly) {
            return;
        }
        TypedProperties typedProperties = new TypedProperties();
        typedProperties.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, this.binding.getAddress());
        typedProperties.putSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME, this.binding.getClusterName());
        typedProperties.putSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME, this.binding.getRoutingName());
        typedProperties.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING, this.filter == null ? null : this.filter.getFilterString());
        typedProperties.putIntProperty(ManagementHelper.HDR_DISTANCE, this.binding.getDistance());
        typedProperties.putIntProperty(ManagementHelper.HDR_CONSUMER_COUNT, this.messageQueue.getConsumerCount());
        typedProperties.putSimpleStringProperty(ManagementHelper.HDR_USER, SimpleString.toSimpleString(this.session.getUsername()));
        typedProperties.putSimpleStringProperty(ManagementHelper.HDR_REMOTE_ADDRESS, SimpleString.toSimpleString(((ServerSessionImpl) this.session).getRemotingConnection().getRemoteAddress()));
        typedProperties.putSimpleStringProperty(ManagementHelper.HDR_SESSION_NAME, SimpleString.toSimpleString(this.session.getName()));
        this.managementService.sendNotification(new Notification(null, CoreNotificationType.CONSUMER_CLOSED, typedProperties));
    }

    @Override // org.apache.activemq.artemis.core.server.ServerConsumer
    public void removeItself() throws Exception {
        if (this.browseOnly) {
            this.browserDeliverer.close();
        } else {
            this.messageQueue.removeConsumer(this);
        }
        this.session.removeConsumer(this.id);
    }

    @Override // org.apache.activemq.artemis.core.server.ServerConsumer
    public void forceDelivery(final long j) {
        forceDelivery(j, new Runnable() { // from class: org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl.1
            @Override // java.lang.Runnable
            public void run() {
                ServerMessageImpl serverMessageImpl = new ServerMessageImpl(ServerConsumerImpl.this.storageManager.generateID(), 50);
                serverMessageImpl.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, j);
                serverMessageImpl.setAddress(ServerConsumerImpl.this.messageQueue.getName());
                ServerConsumerImpl.this.callback.sendMessage(null, serverMessageImpl, ServerConsumerImpl.this, 0);
            }
        });
    }

    public synchronized void forceDelivery(final long j, final Runnable runnable) {
        promptDelivery();
        this.messageQueue.getExecutor().execute(new Runnable() { // from class: org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl.2
            @Override // java.lang.Runnable
            public void run() {
                try {
                    synchronized (ServerConsumerImpl.this.lock) {
                        if (ServerConsumerImpl.this.transferring) {
                            ServerConsumerImpl.this.messageQueue.getExecutor().execute(new Runnable() { // from class: org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl.2.1
                                @Override // java.lang.Runnable
                                public void run() {
                                    ServerConsumerImpl.this.forceDelivery(j, runnable);
                                }
                            });
                        } else {
                            runnable.run();
                        }
                    }
                } catch (Exception e) {
                    ActiveMQServerLogger.LOGGER.errorSendingForcedDelivery(e);
                }
            }
        });
    }

    @Override // org.apache.activemq.artemis.core.server.ServerConsumer
    public LinkedList<MessageReference> cancelRefs(boolean z, boolean z2, Transaction transaction) throws Exception {
        boolean z3 = z2;
        try {
            try {
                if (this.largeMessageDeliverer != null) {
                    this.largeMessageDeliverer.finish();
                }
                this.largeMessageDeliverer = null;
            } catch (Throwable th) {
                ActiveMQServerLogger.LOGGER.errorResttingLargeMessage(th, this.largeMessageDeliverer);
                this.largeMessageDeliverer = null;
            }
            LinkedList<MessageReference> linkedList = new LinkedList<>();
            synchronized (this.lock) {
                if (!this.deliveringRefs.isEmpty()) {
                    for (MessageReference messageReference : this.deliveringRefs) {
                        if (z3) {
                            messageReference.acknowledge(transaction);
                            z3 = false;
                        } else {
                            linkedList.add(messageReference);
                            updateDeliveryCountForCanceledRef(messageReference, z);
                        }
                        if (logger.isTraceEnabled()) {
                            logger.trace("ServerConsumerImpl::" + this + " Preparing Cancelling list for messageID = " + messageReference.getMessage().getMessageID() + ", ref = " + messageReference);
                        }
                    }
                    this.deliveringRefs.clear();
                }
            }
            return linkedList;
        } catch (Throwable th2) {
            this.largeMessageDeliverer = null;
            throw th2;
        }
    }

    protected void updateDeliveryCountForCanceledRef(MessageReference messageReference, boolean z) {
        if (this.callback.updateDeliveryCountAfterCancel(this, messageReference, z) || z) {
            return;
        }
        messageReference.decrementDeliveryCount();
    }

    @Override // org.apache.activemq.artemis.core.server.ServerConsumer
    public void setStarted(boolean z) {
        synchronized (this.lock) {
            boolean lockDelivery = lockDelivery();
            try {
                this.started = this.browseOnly || z;
                if (lockDelivery) {
                    this.lockDelivery.writeLock().unlock();
                }
            } catch (Throwable th) {
                if (lockDelivery) {
                    this.lockDelivery.writeLock().unlock();
                }
                throw th;
            }
        }
        if (z) {
            promptDelivery();
        }
    }

    private boolean lockDelivery() {
        try {
            if (this.lockDelivery.writeLock().tryLock(30L, TimeUnit.SECONDS)) {
                return true;
            }
            ActiveMQServerLogger.LOGGER.timeoutLockingConsumer();
            if (this.server == null) {
                return false;
            }
            this.server.threadDump();
            return false;
        } catch (Exception e) {
            ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
            return false;
        }
    }

    @Override // org.apache.activemq.artemis.core.server.ServerConsumer
    public void setTransferring(boolean z) {
        synchronized (this.lock) {
            boolean lockDelivery = lockDelivery();
            try {
                this.transferring = z;
                if (lockDelivery) {
                    this.lockDelivery.writeLock().unlock();
                }
            } catch (Throwable th) {
                if (lockDelivery) {
                    this.lockDelivery.writeLock().unlock();
                }
                throw th;
            }
        }
        if (z) {
            FutureLatch futureLatch = new FutureLatch();
            this.messageQueue.getExecutor().execute(futureLatch);
            if (!futureLatch.await(10000L)) {
                ActiveMQServerLogger.LOGGER.errorTransferringConsumer();
            }
        }
        if (z) {
            return;
        }
        promptDelivery();
    }

    @Override // org.apache.activemq.artemis.core.server.ServerConsumer
    public void receiveCredits(int i) {
        if (i == -1) {
            if (logger.isDebugEnabled()) {
                logger.debug(this + ":: FlowControl::Received disable flow control message");
            }
            this.availableCredits = null;
            promptDelivery();
            return;
        }
        if (i == 0) {
            logger.debug(this + ":: FlowControl::Received reset flow control message");
            this.availableCredits.set(0);
            return;
        }
        int andAdd = this.availableCredits.getAndAdd(i);
        if (logger.isDebugEnabled()) {
            logger.debug(this + "::FlowControl::Received " + i + " credits, previous value = " + andAdd + " currentValue = " + this.availableCredits.get());
        }
        if (andAdd > 0 || andAdd + i <= 0) {
            return;
        }
        if (logger.isTraceEnabled()) {
            logger.trace(this + "::calling promptDelivery from receiving credits");
        }
        promptDelivery();
    }

    @Override // org.apache.activemq.artemis.core.server.ServerConsumer
    public Queue getQueue() {
        return this.messageQueue;
    }

    @Override // org.apache.activemq.artemis.core.server.ServerConsumer
    public List<MessageReference> getDeliveringReferencesBasedOnProtocol(boolean z, Object obj, Object obj2) {
        LinkedList linkedList = new LinkedList();
        boolean z2 = false;
        synchronized (this.lock) {
            Iterator<MessageReference> it = this.deliveringRefs.iterator();
            while (it.hasNext()) {
                MessageReference next = it.next();
                if (!z2) {
                    z2 = next.getProtocolData() != null && next.getProtocolData().equals(obj);
                }
                if (z2) {
                    if (z) {
                        it.remove();
                    }
                    linkedList.add(next);
                    if (next.getProtocolData() != null && next.getProtocolData().equals(obj2)) {
                        break;
                    }
                }
            }
        }
        return linkedList;
    }

    @Override // org.apache.activemq.artemis.core.server.ServerConsumer
    public void acknowledge(Transaction transaction, long j) throws Exception {
        MessageReference poll;
        if (this.browseOnly) {
            return;
        }
        boolean z = false;
        if (transaction == null) {
            z = true;
            transaction = new TransactionImpl(this.storageManager);
        }
        do {
            try {
                synchronized (this.lock) {
                    poll = this.deliveringRefs.poll();
                }
                if (logger.isTraceEnabled()) {
                    logger.trace("ACKing ref " + poll + " on tx= " + transaction + ", consumer=" + this);
                }
                if (poll == null) {
                    ActiveMQIllegalStateException consumerNoReference = ActiveMQMessageBundle.BUNDLE.consumerNoReference(Long.valueOf(this.id), Long.valueOf(j), this.messageQueue.getName());
                    transaction.markAsRollbackOnly(consumerNoReference);
                    throw consumerNoReference;
                }
                poll.acknowledge(transaction);
                this.acks++;
            } catch (ActiveMQException e) {
                if (z) {
                    transaction.rollback();
                } else {
                    transaction.markAsRollbackOnly(e);
                }
                throw e;
            } catch (Throwable th) {
                ActiveMQServerLogger.LOGGER.errorAckingMessage((Exception) th);
                ActiveMQIllegalStateException activeMQIllegalStateException = new ActiveMQIllegalStateException(th.getMessage());
                if (z) {
                    transaction.rollback();
                } else {
                    transaction.markAsRollbackOnly(activeMQIllegalStateException);
                }
                throw activeMQIllegalStateException;
            }
        } while (poll.getMessage().getMessageID() != j);
        if (z) {
            transaction.commit();
        }
    }

    @Override // org.apache.activemq.artemis.core.server.ServerConsumer
    public void individualAcknowledge(Transaction transaction, long j) throws Exception {
        if (this.browseOnly) {
            return;
        }
        boolean z = false;
        if (logger.isTraceEnabled()) {
            logger.trace("individualACK messageID=" + j);
        }
        if (transaction == null) {
            if (logger.isTraceEnabled()) {
                logger.trace("individualACK starting new TX");
            }
            z = true;
            transaction = new TransactionImpl(this.storageManager);
        }
        try {
            MessageReference removeReferenceByID = removeReferenceByID(j);
            if (logger.isTraceEnabled()) {
                logger.trace("ACKing ref " + removeReferenceByID + " on tx= " + transaction + ", consumer=" + this);
            }
            if (removeReferenceByID == null) {
                ActiveMQIllegalStateException activeMQIllegalStateException = new ActiveMQIllegalStateException("Cannot find ref to ack " + j);
                transaction.markAsRollbackOnly(activeMQIllegalStateException);
                throw activeMQIllegalStateException;
            }
            removeReferenceByID.acknowledge(transaction);
            if (z) {
                transaction.commit();
            }
        } catch (ActiveMQException e) {
            if (z) {
                transaction.rollback();
            } else {
                transaction.markAsRollbackOnly(e);
            }
            throw e;
        } catch (Throwable th) {
            ActiveMQServerLogger.LOGGER.errorAckingMessage((Exception) th);
            ActiveMQIllegalStateException activeMQIllegalStateException2 = new ActiveMQIllegalStateException(th.getMessage());
            if (z) {
                transaction.rollback();
            } else {
                transaction.markAsRollbackOnly(activeMQIllegalStateException2);
            }
            throw activeMQIllegalStateException2;
        }
    }

    @Override // org.apache.activemq.artemis.core.server.ServerConsumer
    public void individualCancel(long j, boolean z) throws Exception {
        if (this.browseOnly) {
            return;
        }
        MessageReference removeReferenceByID = removeReferenceByID(j);
        if (removeReferenceByID == null) {
            throw new IllegalStateException("Cannot find ref to ack " + j);
        }
        if (!z) {
            removeReferenceByID.decrementDeliveryCount();
        }
        removeReferenceByID.getQueue().cancel(removeReferenceByID, System.currentTimeMillis());
    }

    @Override // org.apache.activemq.artemis.core.server.ServerConsumer
    public void backToDelivering(MessageReference messageReference) {
        this.deliveringRefs.addFirst(messageReference);
    }

    @Override // org.apache.activemq.artemis.core.server.ServerConsumer
    public MessageReference removeReferenceByID(long j) throws Exception {
        if (this.browseOnly) {
            return null;
        }
        synchronized (this.lock) {
            if (this.deliveringRefs.isEmpty()) {
                return null;
            }
            if (this.deliveringRefs.peek().getMessage().getMessageID() == j) {
                return this.deliveringRefs.poll();
            }
            Iterator<MessageReference> it = this.deliveringRefs.iterator();
            MessageReference messageReference = null;
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                MessageReference next = it.next();
                if (next.getMessage().getMessageID() == j) {
                    it.remove();
                    messageReference = next;
                    break;
                }
            }
            return messageReference;
        }
    }

    public AtomicInteger getAvailableCredits() {
        return this.availableCredits;
    }

    public String toString() {
        return "ServerConsumerImpl [id=" + this.id + ", filter=" + this.filter + ", binding=" + this.binding + "]";
    }

    @Override // org.apache.activemq.artemis.core.server.Consumer
    public String toManagementString() {
        return "ServerConsumer [id=" + getConnectionID() + ":" + getSessionID() + ":" + this.id + ", filter=" + this.filter + ", binding=" + this.binding.toManagementString() + "]";
    }

    @Override // org.apache.activemq.artemis.core.server.Consumer
    public void disconnect() {
        this.callback.disconnect(this, getQueue().getName().toString());
    }

    public float getRate() {
        if (((float) (System.currentTimeMillis() - this.consumerRateCheckTime.getAndSet(System.currentTimeMillis()))) / 1000.0f != 0.0f) {
            return BigDecimal.valueOf(((float) (this.acks - this.messageConsumedSnapshot.getAndSet(this.acks))) / r0).setScale(2, 0).floatValue();
        }
        this.messageConsumedSnapshot.getAndSet(this.acks);
        return 0.0f;
    }

    @Override // org.apache.activemq.artemis.core.server.ServerConsumer
    public void promptDelivery() {
        if (this.largeMessageDeliverer != null) {
            resumeLargeMessage();
        } else {
            forceDelivery();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void forceDelivery() {
        if (this.browseOnly) {
            this.messageQueue.getExecutor().execute(this.browserDeliverer);
        } else {
            this.messageQueue.deliverAsync();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void resumeLargeMessage() {
        this.messageQueue.getExecutor().execute(this.resumeLargeMessageRunnable);
    }

    private void deliverStandardMessage(MessageReference messageReference, ServerMessage serverMessage) {
        int sendMessage = this.callback.sendMessage(messageReference, serverMessage, this, messageReference.getDeliveryCount());
        if (this.availableCredits != null) {
            this.availableCredits.addAndGet(-sendMessage);
            if (logger.isTraceEnabled()) {
                logger.trace(this + "::FlowControl::delivery standard taking " + sendMessage + " from credits, available now is " + this.availableCredits);
            }
        }
    }
}
