package org.apache.activemq.broker.region;

import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicLong;
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.policy.MessageEvictionStrategy;
import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.Response;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.transaction.Synchronization;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* JADX WARN: Classes with same name are omitted:
  input_file:activemq-core-4.1.2-G20100308.jar:org/apache/activemq/broker/region/TopicSubscription.class
 */
/* loaded from: input_file:geronimo-activemq-ra-2.1.7.rar:activemq-core-4.1.2-G20100308.jar:org/apache/activemq/broker/region/TopicSubscription.class */
public class TopicSubscription extends AbstractSubscription {
    private static final Log log = LogFactory.getLog(TopicSubscription.class);
    protected final LinkedList matched;
    protected final ActiveMQDestination dlqDestination;
    protected final UsageManager usageManager;
    protected AtomicLong dispatchedCounter;
    protected AtomicLong prefetchExtension;
    private int maximumPendingMessages;
    private MessageEvictionStrategy messageEvictionStrategy;
    private int discarded;
    private final Object matchedListMutex;
    private final AtomicLong enqueueCounter;
    private final AtomicLong dequeueCounter;
    boolean singleDestination;
    Destination destination;

    public TopicSubscription(Broker broker, ConnectionContext connectionContext, ConsumerInfo consumerInfo, UsageManager usageManager) throws InvalidSelectorException {
        super(broker, connectionContext, consumerInfo);
        this.matched = new LinkedList();
        this.dlqDestination = new ActiveMQQueue("ActiveMQ.DLQ");
        this.dispatchedCounter = new AtomicLong();
        this.prefetchExtension = new AtomicLong();
        this.maximumPendingMessages = -1;
        this.messageEvictionStrategy = new OldestMessageEvictionStrategy();
        this.discarded = 0;
        this.matchedListMutex = new Object();
        this.enqueueCounter = new AtomicLong(0L);
        this.dequeueCounter = new AtomicLong(0L);
        this.singleDestination = true;
        this.usageManager = usageManager;
    }

    @Override // org.apache.activemq.broker.region.Subscription
    public void add(MessageReference messageReference) throws InterruptedException, IOException {
        this.enqueueCounter.incrementAndGet();
        messageReference.incrementReferenceCount();
        if (!isFull() && !isSlaveBroker()) {
            optimizePrefetch();
            dispatch(messageReference);
            return;
        }
        if (this.maximumPendingMessages != 0) {
            synchronized (this.matchedListMutex) {
                this.matched.addLast(messageReference);
                if (this.maximumPendingMessages > 0) {
                    int evictExpiredMessagesHighWatermark = this.messageEvictionStrategy.getEvictExpiredMessagesHighWatermark();
                    if (this.maximumPendingMessages > 0 && this.maximumPendingMessages < evictExpiredMessagesHighWatermark) {
                        evictExpiredMessagesHighWatermark = this.maximumPendingMessages;
                    }
                    if (!this.matched.isEmpty() && this.matched.size() > evictExpiredMessagesHighWatermark) {
                        removeExpiredMessages(this.matched);
                    }
                    while (true) {
                        if (this.matched.isEmpty() || this.matched.size() <= this.maximumPendingMessages) {
                            break;
                        }
                        MessageReference[] evictMessages = this.messageEvictionStrategy.evictMessages(this.matched);
                        int length = evictMessages.length;
                        for (int i = 0; i < length; i++) {
                            evictMessages[i].decrementReferenceCount();
                            this.discarded++;
                            if (log.isDebugEnabled()) {
                                log.debug("Discarding message " + evictMessages[i]);
                            }
                        }
                        if (length == 0) {
                            log.warn("No messages to evict returned from eviction strategy: " + this.messageEvictionStrategy);
                            break;
                        }
                    }
                }
            }
        }
    }

    protected void removeExpiredMessages(LinkedList linkedList) throws IOException {
        Iterator it = this.matched.iterator();
        while (it.hasNext()) {
            MessageReference messageReference = (MessageReference) it.next();
            if (messageReference.isExpired()) {
                it.remove();
                this.dispatchedCounter.incrementAndGet();
                messageReference.decrementReferenceCount();
                return;
            }
        }
    }

    @Override // org.apache.activemq.broker.region.Subscription
    public void processMessageDispatchNotification(MessageDispatchNotification messageDispatchNotification) {
        synchronized (this.matchedListMutex) {
            Iterator it = this.matched.iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                MessageReference messageReference = (MessageReference) it.next();
                if (messageReference.getMessageId().equals(messageDispatchNotification.getMessageId())) {
                    it.remove();
                    this.dispatchedCounter.incrementAndGet();
                    messageReference.decrementReferenceCount();
                    break;
                }
            }
        }
    }

    @Override // org.apache.activemq.broker.region.Subscription
    public synchronized void acknowledge(ConnectionContext connectionContext, final MessageAck messageAck) throws Exception {
        boolean isFull = isFull();
        if (!messageAck.isStandardAck() && !messageAck.isPoisonAck()) {
            if (!messageAck.isDeliveredAck()) {
                throw new JMSException("Invalid acknowledgment: " + messageAck);
            }
            this.prefetchExtension.addAndGet(messageAck.getMessageCount());
            if (!isFull || isFull()) {
                return;
            }
            dispatchMatched();
            return;
        }
        if (connectionContext.isInTransaction()) {
            this.prefetchExtension.addAndGet(messageAck.getMessageCount());
            connectionContext.getTransaction().addSynchronization(new Synchronization() { // from class: org.apache.activemq.broker.region.TopicSubscription.1
                @Override // org.apache.activemq.transaction.Synchronization
                public void afterCommit() throws Exception {
                    synchronized (TopicSubscription.this) {
                        if (TopicSubscription.this.singleDestination && TopicSubscription.this.destination != null) {
                            TopicSubscription.this.destination.getDestinationStatistics().getDequeues().add(messageAck.getMessageCount());
                        }
                    }
                    TopicSubscription.this.dequeueCounter.addAndGet(messageAck.getMessageCount());
                    TopicSubscription.this.prefetchExtension.set(Math.max(0L, TopicSubscription.this.prefetchExtension.get() - messageAck.getMessageCount()));
                }
            });
        } else {
            if (this.singleDestination && this.destination != null) {
                this.destination.getDestinationStatistics().getDequeues().add(messageAck.getMessageCount());
            }
            this.dequeueCounter.addAndGet(messageAck.getMessageCount());
            this.prefetchExtension.set(Math.max(0L, this.prefetchExtension.get() - messageAck.getMessageCount()));
        }
        if (!isFull || isFull()) {
            return;
        }
        dispatchMatched();
    }

    @Override // org.apache.activemq.broker.region.Subscription
    public Response pullMessage(ConnectionContext connectionContext, MessagePull messagePull) throws Exception {
        return null;
    }

    @Override // org.apache.activemq.broker.region.Subscription
    public int getPendingQueueSize() {
        return matched();
    }

    @Override // org.apache.activemq.broker.region.Subscription
    public int getDispatchedQueueSize() {
        return (int) (this.dispatchedCounter.get() - this.dequeueCounter.get());
    }

    public int getMaximumPendingMessages() {
        return this.maximumPendingMessages;
    }

    @Override // org.apache.activemq.broker.region.Subscription
    public long getDispatchedCounter() {
        return this.dispatchedCounter.get();
    }

    @Override // org.apache.activemq.broker.region.Subscription
    public long getEnqueueCounter() {
        return this.enqueueCounter.get();
    }

    @Override // org.apache.activemq.broker.region.Subscription
    public long getDequeueCounter() {
        return this.dequeueCounter.get();
    }

    public int discarded() {
        int i;
        synchronized (this.matchedListMutex) {
            i = this.discarded;
        }
        return i;
    }

    public int matched() {
        int size;
        synchronized (this.matchedListMutex) {
            size = this.matched.size();
        }
        return size;
    }

    public void setMaximumPendingMessages(int i) {
        this.maximumPendingMessages = i;
    }

    public MessageEvictionStrategy getMessageEvictionStrategy() {
        return this.messageEvictionStrategy;
    }

    public void setMessageEvictionStrategy(MessageEvictionStrategy messageEvictionStrategy) {
        this.messageEvictionStrategy = messageEvictionStrategy;
    }

    private boolean isFull() {
        return ((long) getDispatchedQueueSize()) - this.prefetchExtension.get() >= ((long) this.info.getPrefetchSize());
    }

    @Override // org.apache.activemq.broker.region.Subscription
    public boolean isLowWaterMark() {
        return ((double) (((long) getDispatchedQueueSize()) - this.prefetchExtension.get())) <= ((double) this.info.getPrefetchSize()) * 0.4d;
    }

    @Override // org.apache.activemq.broker.region.Subscription
    public boolean isHighWaterMark() {
        return ((double) (((long) getDispatchedQueueSize()) - this.prefetchExtension.get())) >= ((double) this.info.getPrefetchSize()) * 0.9d;
    }

    @Override // org.apache.activemq.broker.region.Subscription
    public void updateConsumerPrefetch(int i) {
        if (this.context == null || this.context.getConnection() == null || !this.context.getConnection().isManageable()) {
            return;
        }
        ConsumerControl consumerControl = new ConsumerControl();
        consumerControl.setConsumerId(this.info.getConsumerId());
        consumerControl.setPrefetch(i);
        this.context.getConnection().dispatchAsync(consumerControl);
    }

    @Override // org.apache.activemq.broker.region.Subscription
    public void optimizePrefetch() {
    }

    private void dispatchMatched() throws IOException {
        synchronized (this.matchedListMutex) {
            Iterator it = this.matched.iterator();
            while (it.hasNext() && !isFull()) {
                MessageReference messageReference = (MessageReference) it.next();
                it.remove();
                dispatch(messageReference);
            }
        }
    }

    private void dispatch(final MessageReference messageReference) throws IOException {
        MessageDispatch messageDispatch = new MessageDispatch();
        messageDispatch.setMessage((Message) messageReference);
        messageDispatch.setConsumerId(this.info.getConsumerId());
        messageDispatch.setDestination(messageReference.getRegionDestination().getActiveMQDestination());
        this.dispatchedCounter.incrementAndGet();
        if (this.singleDestination) {
            if (this.destination == null) {
                this.destination = messageReference.getRegionDestination();
            } else if (this.destination != messageReference.getRegionDestination()) {
                this.singleDestination = false;
            }
        }
        if (this.info.isDispatchAsync()) {
            messageDispatch.setTransmitCallback(new Runnable() { // from class: org.apache.activemq.broker.region.TopicSubscription.2
                @Override // java.lang.Runnable
                public void run() {
                    messageReference.getRegionDestination().getDestinationStatistics().getDispatched().increment();
                    messageReference.decrementReferenceCount();
                }
            });
            this.context.getConnection().dispatchAsync(messageDispatch);
        } else {
            this.context.getConnection().dispatchSync(messageDispatch);
            messageReference.getRegionDestination().getDestinationStatistics().getDispatched().increment();
            messageReference.decrementReferenceCount();
        }
    }

    public String toString() {
        return "TopicSubscription: consumer=" + this.info.getConsumerId() + ", destinations=" + this.destinations.size() + ", dispatched=" + getDispatchedQueueSize() + ", delivered=" + getDequeueCounter() + ", matched=" + matched() + ", discarded=" + discarded();
    }

    @Override // org.apache.activemq.broker.region.Subscription
    public void destroy() {
        synchronized (this.matchedListMutex) {
            Iterator it = this.matched.iterator();
            while (it.hasNext()) {
                ((MessageReference) it.next()).decrementReferenceCount();
            }
            this.matched.clear();
        }
    }

    @Override // org.apache.activemq.broker.region.AbstractSubscription, org.apache.activemq.broker.region.Subscription
    public int getPrefetchSize() {
        return (int) (this.info.getPrefetchSize() + this.prefetchExtension.get());
    }
}
