package org.apache.activemq.broker.region;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.InvalidSelectorException;
import javax.management.ObjectName;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.class */
public class SubscriptionAddRemoveQueueTest {
    private BrokerService brokerService;
    private Queue queue;
    private ConsumerInfo info = new ConsumerInfo();
    private List<SimpleImmediateDispatchSubscription> subs = new ArrayList();
    private ConnectionContext context = new ConnectionContext();
    private ProducerBrokerExchange producerBrokerExchange = new ProducerBrokerExchange();
    private ProducerInfo producerInfo = new ProducerInfo();
    private ProducerState producerState = new ProducerState(this.producerInfo);
    private ActiveMQDestination destination = new ActiveMQQueue("TEST");
    private int numSubscriptions = 1000;
    private boolean working = true;
    private int senders = 20;

    /* loaded from: input_file:org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest$SimpleImmediateDispatchSubscription.class */
    private class SimpleImmediateDispatchSubscription implements Subscription, LockOwner {
        private SubscriptionStatistics subscriptionStatistics;
        List<MessageReference> dispatched;

        private SimpleImmediateDispatchSubscription() {
            this.subscriptionStatistics = new SubscriptionStatistics();
            this.dispatched = new CopyOnWriteArrayList();
        }

        public void acknowledge(ConnectionContext connectionContext, MessageAck messageAck) throws Exception {
        }

        public void add(MessageReference messageReference) throws Exception {
            MessageReference messageReference2 = (QueueMessageReference) messageReference;
            messageReference2.lock(this);
            this.dispatched.add(messageReference2);
        }

        public ConnectionContext getContext() {
            return null;
        }

        public int getCursorMemoryHighWaterMark() {
            return 0;
        }

        public void setCursorMemoryHighWaterMark(int i) {
        }

        public boolean isSlowConsumer() {
            return false;
        }

        public void unmatched(MessageReference messageReference) throws IOException {
        }

        public long getTimeOfLastMessageAck() {
            return 0L;
        }

        public long getConsumedCount() {
            return 0L;
        }

        public void incrementConsumedCount() {
        }

        public void resetConsumedCount() {
        }

        public void add(ConnectionContext connectionContext, Destination destination) throws Exception {
        }

        public void destroy() {
        }

        public void gc() {
        }

        public ConsumerInfo getConsumerInfo() {
            return SubscriptionAddRemoveQueueTest.this.info;
        }

        public long getDequeueCounter() {
            return 0L;
        }

        public long getDispatchedCounter() {
            return 0L;
        }

        public int getDispatchedQueueSize() {
            return 0;
        }

        public long getEnqueueCounter() {
            return 0L;
        }

        public int getInFlightSize() {
            return 0;
        }

        public int getInFlightUsage() {
            return 0;
        }

        public ObjectName getObjectName() {
            return null;
        }

        public int getPendingQueueSize() {
            return 0;
        }

        public long getPendingMessageSize() {
            return 0L;
        }

        public int getPrefetchSize() {
            return 0;
        }

        public String getSelector() {
            return null;
        }

        public boolean isBrowser() {
            return false;
        }

        public boolean isFull() {
            return false;
        }

        public boolean isHighWaterMark() {
            return false;
        }

        public boolean isLowWaterMark() {
            return false;
        }

        public boolean isRecoveryRequired() {
            return false;
        }

        public boolean matches(MessageReference messageReference, MessageEvaluationContext messageEvaluationContext) throws IOException {
            return true;
        }

        public boolean matches(ActiveMQDestination activeMQDestination) {
            return false;
        }

        public void processMessageDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
        }

        public Response pullMessage(ConnectionContext connectionContext, MessagePull messagePull) throws Exception {
            return null;
        }

        public boolean isWildcard() {
            return false;
        }

        public List<MessageReference> remove(ConnectionContext connectionContext, Destination destination) throws Exception {
            return new ArrayList(this.dispatched);
        }

        public void setObjectName(ObjectName objectName) {
        }

        public void setSelector(String str) throws InvalidSelectorException, UnsupportedOperationException {
        }

        public void updateConsumerPrefetch(int i) {
        }

        public boolean addRecoveredMessage(ConnectionContext connectionContext, MessageReference messageReference) throws Exception {
            return false;
        }

        public ActiveMQDestination getActiveMQDestination() {
            return null;
        }

        public int getLockPriority() {
            return 0;
        }

        public boolean isLockExclusive() {
            return false;
        }

        public int countBeforeFull() {
            return 10;
        }

        public SubscriptionStatistics getSubscriptionStatistics() {
            return this.subscriptionStatistics;
        }

        public long getInFlightMessageSize() {
            return this.subscriptionStatistics.getInflightMessageSize().getTotalSize();
        }
    }

    @Before
    public void setUp() throws Exception {
        this.brokerService = new BrokerService();
        this.brokerService.start();
        DestinationStatistics destinationStatistics = new DestinationStatistics();
        destinationStatistics.setEnabled(true);
        TaskRunnerFactory taskRunnerFactory = new TaskRunnerFactory();
        this.info.setDestination(this.destination);
        this.info.setPrefetchSize(100);
        this.producerBrokerExchange.setProducerState(this.producerState);
        this.producerBrokerExchange.setConnectionContext(this.context);
        this.queue = new Queue(this.brokerService, this.destination, (MessageStore) null, destinationStatistics, taskRunnerFactory);
        this.queue.initialize();
    }

    @After
    public void tearDown() throws Exception {
        if (this.brokerService != null) {
            this.brokerService.stop();
            this.brokerService.waitUntilStopped();
        }
    }

    @Test(timeout = 120000)
    public void testNoDispatchToRemovedConsumers() throws Exception {
        final AtomicInteger atomicInteger = new AtomicInteger();
        Runnable runnable = new Runnable() { // from class: org.apache.activemq.broker.region.SubscriptionAddRemoveQueueTest.1
            @Override // java.lang.Runnable
            public void run() {
                AtomicInteger atomicInteger2 = new AtomicInteger();
                int andIncrement = atomicInteger.getAndIncrement();
                while (SubscriptionAddRemoveQueueTest.this.working) {
                    try {
                        ActiveMQMessage activeMQMessage = new ActiveMQMessage();
                        activeMQMessage.setDestination(SubscriptionAddRemoveQueueTest.this.destination);
                        activeMQMessage.setMessageId(new MessageId(andIncrement + ":0:" + atomicInteger2.getAndIncrement()));
                        SubscriptionAddRemoveQueueTest.this.queue.send(SubscriptionAddRemoveQueueTest.this.producerBrokerExchange, activeMQMessage);
                    } catch (Exception e) {
                        e.printStackTrace();
                        Assert.fail("unexpected exception in sendMessage, ex:" + e);
                    }
                }
            }
        };
        Runnable runnable2 = new Runnable() { // from class: org.apache.activemq.broker.region.SubscriptionAddRemoveQueueTest.2
            @Override // java.lang.Runnable
            public void run() {
                Iterator it = SubscriptionAddRemoveQueueTest.this.subs.iterator();
                while (it.hasNext()) {
                    try {
                        SubscriptionAddRemoveQueueTest.this.queue.removeSubscription(SubscriptionAddRemoveQueueTest.this.context, (Subscription) it.next(), 0L);
                    } catch (Exception e) {
                        e.printStackTrace();
                        Assert.fail("unexpected exception in removeSubscription, ex:" + e);
                    }
                }
            }
        };
        for (int i = 0; i < this.numSubscriptions; i++) {
            SimpleImmediateDispatchSubscription simpleImmediateDispatchSubscription = new SimpleImmediateDispatchSubscription();
            this.subs.add(simpleImmediateDispatchSubscription);
            this.queue.addSubscription(this.context, simpleImmediateDispatchSubscription);
        }
        Assert.assertEquals("there are X subscriptions", this.numSubscriptions, this.queue.getDestinationStatistics().getConsumers().getCount());
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        for (int i2 = 0; i2 < this.senders; i2++) {
            newCachedThreadPool.submit(runnable);
        }
        Assert.assertTrue("All subs should have some locks", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.broker.region.SubscriptionAddRemoveQueueTest.3
            public boolean isSatisified() throws Exception {
                boolean z = true;
                Iterator it = SubscriptionAddRemoveQueueTest.this.subs.iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    }
                    if (!SubscriptionAddRemoveQueueTest.this.hasSomeLocks(((SimpleImmediateDispatchSubscription) it.next()).dispatched)) {
                        z = false;
                        break;
                    }
                }
                return z;
            }
        }));
        newCachedThreadPool.submit(runnable2).get();
        this.working = false;
        Assert.assertEquals("there are no subscriptions", 0L, this.queue.getDestinationStatistics().getConsumers().getCount());
        Iterator<SimpleImmediateDispatchSubscription> it = this.subs.iterator();
        while (it.hasNext()) {
            Assert.assertTrue("There are no locked messages in any removed subscriptions", !hasSomeLocks(it.next().dispatched));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean hasSomeLocks(List<MessageReference> list) {
        boolean z = false;
        Iterator<MessageReference> it = list.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            if (((MessageReference) it.next()).getLockOwner() != null) {
                z = true;
                break;
            }
        }
        return z;
    }
}
