package org.apache.activemq.usecases;

import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.broker.region.virtual.VirtualDestination;
import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
import org.apache.activemq.broker.region.virtual.VirtualTopic;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/usecases/SelectorAwareVTThatDropsMessagesWhenNoConsumer.class */
public class SelectorAwareVTThatDropsMessagesWhenNoConsumer {
    protected static final Logger LOG = LoggerFactory.getLogger(SelectorAwareVTThatDropsMessagesWhenNoConsumer.class);
    private static final String QUEUE_NAME = "TestQ";
    private static final String CONSUMER_QUEUE = "Consumer.Orders.VirtualOrders.TestQ";
    private static final String PRODUCER_DESTINATION_NAME = "VirtualOrders.TestQ";
    final AtomicInteger receivedCount = new AtomicInteger(0);
    private BrokerService broker;

    /* loaded from: input_file:org/apache/activemq/usecases/SelectorAwareVTThatDropsMessagesWhenNoConsumer$CountingListener.class */
    class CountingListener implements MessageListener {
        AtomicInteger counter;

        public CountingListener(AtomicInteger atomicInteger) {
            this.counter = atomicInteger;
        }

        public void onMessage(Message message) {
            this.counter.incrementAndGet();
        }
    }

    @Before
    public void setUp() {
        setupBroker("broker://()/localhost?");
    }

    @After
    public void tearDown() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
            this.broker.waitUntilStopped();
        }
    }

    @Test(timeout = 60000)
    public void verifyNoDispatchDuringDisconnect() throws Exception {
        Connection createConnection = new ActiveMQConnectionFactory("vm://localhost").createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        Queue createQueue = createSession.createQueue(CONSUMER_QUEUE);
        CountingListener countingListener = new CountingListener(this.receivedCount);
        MessageConsumer createConsumer = createSession.createConsumer(createQueue);
        createConsumer.setMessageListener(countingListener);
        MessageProducer createProducer = createSession.createProducer(createSession.createTopic(PRODUCER_DESTINATION_NAME));
        TextMessage createTextMessage = createSession.createTextMessage("bla");
        createProducer.send(createTextMessage);
        createProducer.send(createTextMessage);
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.SelectorAwareVTThatDropsMessagesWhenNoConsumer.1
            public boolean isSatisified() throws Exception {
                return SelectorAwareVTThatDropsMessagesWhenNoConsumer.this.receivedCount.get() == 2;
            }
        });
        createConsumer.close();
        createProducer.send(createTextMessage);
        createProducer.send(createTextMessage);
        Assert.assertEquals(2L, this.receivedCount.get());
        LOG.debug("Restarting consumerA");
        createSession.createConsumer(createQueue).setMessageListener(countingListener);
        createProducer.send(createTextMessage);
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.SelectorAwareVTThatDropsMessagesWhenNoConsumer.2
            public boolean isSatisified() throws Exception {
                return SelectorAwareVTThatDropsMessagesWhenNoConsumer.this.receivedCount.get() == 3;
            }
        });
        Assert.assertEquals(3L, this.receivedCount.get());
        createConnection.close();
    }

    private void setupBroker(String str) {
        try {
            this.broker = BrokerFactory.createBroker(str);
            DestinationInterceptor virtualDestinationInterceptor = new VirtualDestinationInterceptor();
            VirtualDestination virtualTopic = new VirtualTopic();
            virtualTopic.setName("VirtualOrders.>");
            virtualTopic.setSelectorAware(true);
            virtualDestinationInterceptor.setVirtualDestinations(new VirtualDestination[]{virtualTopic});
            this.broker.setDestinationInterceptors(new DestinationInterceptor[]{virtualDestinationInterceptor});
            this.broker.setUseJmx(false);
            this.broker.start();
            this.broker.waitUntilStarted();
        } catch (Exception e) {
            LOG.error("Failed creating broker", e);
        }
    }
}
