package org.apache.activemq.usecases;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.management.ObjectName;
import junit.framework.Test;
import junit.framework.TestCase;
import junit.textui.TestRunner;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.jmx.DestinationViewMBean;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.jms2.ActiveMQJMS2TestBase;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.class */
public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(ExpiredMessagesWithNoConsumerTest.class);
    private final ActiveMQDestination destination = new ActiveMQQueue("test");
    private boolean optimizedDispatch = true;
    private PendingQueueMessageStoragePolicy pendingQueuePolicy;
    private BrokerService broker;
    private String connectionUri;
    private Connection connection;
    private Session session;
    private MessageProducer producer;

    public static Test suite() {
        return suite(ExpiredMessagesWithNoConsumerTest.class);
    }

    public static void main(String[] strArr) {
        TestRunner.run(suite());
    }

    protected void createBrokerWithMemoryLimit() throws Exception {
        createBrokerWithMemoryLimit(800);
    }

    protected void createBrokerWithMemoryLimit(int i) throws Exception {
        doCreateBroker(true, i);
    }

    protected void createBroker() throws Exception {
        doCreateBroker(false, 800);
    }

    private void doCreateBroker(boolean z, int i) throws Exception {
        this.broker = new BrokerService();
        this.broker.setBrokerName(ActiveMQJMS2TestBase.DEFAULT_JMX_BROKER_NAME);
        this.broker.setUseJmx(true);
        this.broker.setDeleteAllMessagesOnStartup(true);
        this.broker.addConnector(JmsMultipleBrokersTestSupport.AUTO_ASSIGN_TRANSPORT);
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setOptimizedDispatch(this.optimizedDispatch);
        policyEntry.setExpireMessagesPeriod(i);
        policyEntry.setMaxExpirePageSize(800);
        policyEntry.setPendingQueuePolicy(this.pendingQueuePolicy);
        if (z) {
            policyEntry.setDeadLetterStrategy((DeadLetterStrategy) null);
            policyEntry.setMemoryLimit(200000L);
        }
        policyMap.setDefaultEntry(policyEntry);
        this.broker.setDestinationPolicy(policyMap);
        this.broker.start();
        this.broker.waitUntilStarted();
        this.connectionUri = ((TransportConnector) this.broker.getTransportConnectors().get(0)).getPublishableConnectString();
    }

    public void testExpiredNonPersistentMessagesWithNoConsumer() throws Exception {
        createBrokerWithMemoryLimit(2000);
        this.connection = new ActiveMQConnectionFactory(this.connectionUri).createConnection();
        this.session = this.connection.createSession(false, 1);
        this.producer = this.session.createProducer(this.destination);
        this.producer.setTimeToLive(1000L);
        this.producer.setDeliveryMode(1);
        this.connection.start();
        final Thread thread = new Thread("Producing Thread") { // from class: org.apache.activemq.usecases.ExpiredMessagesWithNoConsumerTest.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    int i = 0;
                    long currentTimeMillis = System.currentTimeMillis();
                    while (true) {
                        int i2 = i;
                        i++;
                        if (i2 >= 2000) {
                            return;
                        }
                        ExpiredMessagesWithNoConsumerTest.this.producer.send(ExpiredMessagesWithNoConsumerTest.this.session.createTextMessage("test"));
                        if (i % 100 == 0) {
                            ExpiredMessagesWithNoConsumerTest.LOG.info("sent: " + i + " @ " + ((System.currentTimeMillis() - currentTimeMillis) / 100) + "m/ms");
                            currentTimeMillis = System.currentTimeMillis();
                        }
                        if (135 == i) {
                            TimeUnit.SECONDS.sleep(5L);
                        }
                    }
                } catch (Throwable th) {
                    th.printStackTrace();
                }
            }
        };
        thread.start();
        assertTrue("producer failed to complete within allocated time", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.ExpiredMessagesWithNoConsumerTest.2
            public boolean isSatisified() throws Exception {
                thread.join(TimeUnit.SECONDS.toMillis(3000L));
                return !thread.isAlive();
            }
        }));
        TimeUnit.SECONDS.sleep(5L);
        final DestinationViewMBean createView = createView(this.destination);
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.ExpiredMessagesWithNoConsumerTest.3
            public boolean isSatisified() throws Exception {
                try {
                    Logger logger = ExpiredMessagesWithNoConsumerTest.LOG;
                    long enqueueCount = createView.getEnqueueCount();
                    long dequeueCount = createView.getDequeueCount();
                    long inFlightCount = createView.getInFlightCount();
                    createView.getExpiredCount();
                    createView.getQueueSize();
                    logger.info("enqueue=" + enqueueCount + ", dequeue=" + logger + ", inflight=" + dequeueCount + ", expired= " + logger + ", size= " + inFlightCount);
                    if (createView.getDequeueCount() != 0 && createView.getDequeueCount() == createView.getExpiredCount() && createView.getDequeueCount() == createView.getEnqueueCount()) {
                        if (createView.getQueueSize() == 0) {
                            return true;
                        }
                    }
                    return false;
                } catch (Exception e) {
                    ExpiredMessagesWithNoConsumerTest.LOG.info(e.toString());
                    return false;
                }
            }
        }, 300000L);
        Logger logger = LOG;
        long enqueueCount = createView.getEnqueueCount();
        long dequeueCount = createView.getDequeueCount();
        long inFlightCount = createView.getInFlightCount();
        createView.getExpiredCount();
        createView.getQueueSize();
        logger.info("enqueue=" + enqueueCount + ", dequeue=" + logger + ", inflight=" + dequeueCount + ", expired= " + logger + ", size= " + inFlightCount);
        assertEquals("memory usage doesn't go to duck egg", 0, createView.getMemoryPercentUsage());
        assertEquals("0 queue", 0L, createView.getQueueSize());
    }

    public void initCombosForTestExpiredMessagesWithNoConsumer() {
        addCombinationValues("optimizedDispatch", new Object[]{Boolean.TRUE, Boolean.FALSE});
        addCombinationValues("pendingQueuePolicy", new Object[]{null, new VMPendingQueueMessageStoragePolicy(), new FilePendingQueueMessageStoragePolicy()});
    }

    public void testExpiredMessagesWithNoConsumer() throws Exception {
        createBrokerWithMemoryLimit();
        this.connection = new ActiveMQConnectionFactory(this.connectionUri).createConnection();
        this.session = this.connection.createSession(false, 1);
        this.producer = this.session.createProducer(this.destination);
        this.producer.setTimeToLive(1000L);
        this.connection.start();
        final Thread thread = new Thread("Producing Thread") { // from class: org.apache.activemq.usecases.ExpiredMessagesWithNoConsumerTest.4
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    int i = 0;
                    long currentTimeMillis = System.currentTimeMillis();
                    while (true) {
                        int i2 = i;
                        i++;
                        if (i2 >= 2000) {
                            return;
                        }
                        ExpiredMessagesWithNoConsumerTest.this.producer.send(ExpiredMessagesWithNoConsumerTest.this.session.createTextMessage("test"));
                        if (i % 100 == 0) {
                            ExpiredMessagesWithNoConsumerTest.LOG.info("sent: " + i + " @ " + ((System.currentTimeMillis() - currentTimeMillis) / 100) + "m/ms");
                            currentTimeMillis = System.currentTimeMillis();
                        }
                    }
                } catch (Throwable th) {
                    th.printStackTrace();
                }
            }
        };
        thread.start();
        assertTrue("producer failed to complete within allocated time", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.ExpiredMessagesWithNoConsumerTest.5
            public boolean isSatisified() throws Exception {
                thread.join(TimeUnit.SECONDS.toMillis(3000L));
                return !thread.isAlive();
            }
        }));
        final DestinationViewMBean createView = createView(this.destination);
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.ExpiredMessagesWithNoConsumerTest.6
            public boolean isSatisified() throws Exception {
                Logger logger = ExpiredMessagesWithNoConsumerTest.LOG;
                long enqueueCount = createView.getEnqueueCount();
                long dequeueCount = createView.getDequeueCount();
                long inFlightCount = createView.getInFlightCount();
                createView.getExpiredCount();
                createView.getQueueSize();
                logger.info("enqueue=" + enqueueCount + ", dequeue=" + logger + ", inflight=" + dequeueCount + ", expired= " + logger + ", size= " + inFlightCount);
                return 2000 == createView.getExpiredCount();
            }
        }, 300000L);
        Logger logger = LOG;
        long enqueueCount = createView.getEnqueueCount();
        long dequeueCount = createView.getDequeueCount();
        long inFlightCount = createView.getInFlightCount();
        createView.getExpiredCount();
        createView.getQueueSize();
        logger.info("enqueue=" + enqueueCount + ", dequeue=" + logger + ", inflight=" + dequeueCount + ", expired= " + logger + ", size= " + inFlightCount);
        assertEquals("Not all sent messages have expired", 2000L, createView.getExpiredCount());
        assertEquals("memory usage doesn't go to duck egg", 0, createView.getMemoryPercentUsage());
    }

    public void testExpiredMessagesWithVerySlowConsumer() throws Exception {
        createBroker();
        this.connection = new ActiveMQConnectionFactory(this.connectionUri + "?jms.prefetchPolicy.queuePrefetch=5").createConnection();
        this.session = this.connection.createSession(false, 2);
        this.producer = this.session.createProducer(this.destination);
        this.producer.setTimeToLive(4000L);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final CountDownLatch countDownLatch2 = new CountDownLatch(1);
        MessageConsumer createConsumer = this.session.createConsumer(this.destination);
        createConsumer.setMessageListener(new MessageListener() { // from class: org.apache.activemq.usecases.ExpiredMessagesWithNoConsumerTest.7
            public void onMessage(Message message) {
                try {
                    ExpiredMessagesWithNoConsumerTest.LOG.info("Got my message: " + message);
                    countDownLatch.countDown();
                    countDownLatch2.await(6L, TimeUnit.MINUTES);
                    ExpiredMessagesWithNoConsumerTest.LOG.info("acking message: " + message);
                    message.acknowledge();
                } catch (Exception e) {
                    e.printStackTrace();
                    TestCase.fail(e.toString());
                }
            }
        });
        this.connection.start();
        final Thread thread = new Thread("Producing Thread") { // from class: org.apache.activemq.usecases.ExpiredMessagesWithNoConsumerTest.8
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    int i = 0;
                    long currentTimeMillis = System.currentTimeMillis();
                    while (true) {
                        int i2 = i;
                        i++;
                        if (i2 >= 10) {
                            return;
                        }
                        ExpiredMessagesWithNoConsumerTest.this.producer.send(ExpiredMessagesWithNoConsumerTest.this.session.createTextMessage("test"));
                        if (i % 100 == 0) {
                            ExpiredMessagesWithNoConsumerTest.LOG.info("sent: " + i + " @ " + ((System.currentTimeMillis() - currentTimeMillis) / 100) + "m/ms");
                            currentTimeMillis = System.currentTimeMillis();
                        }
                    }
                } catch (Throwable th) {
                    th.printStackTrace();
                }
            }
        };
        thread.start();
        assertTrue("got one message", countDownLatch.await(20L, TimeUnit.SECONDS));
        assertTrue("producer failed to complete within allocated time", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.ExpiredMessagesWithNoConsumerTest.9
            public boolean isSatisified() throws Exception {
                thread.join(1000L);
                return !thread.isAlive();
            }
        }, 300000L));
        final DestinationViewMBean createView = createView(this.destination);
        assertTrue("all dispatched up to default prefetch ", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.ExpiredMessagesWithNoConsumerTest.10
            public boolean isSatisified() throws Exception {
                return 5 == createView.getDispatchCount();
            }
        }));
        assertTrue("all non inflight have expired ", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.ExpiredMessagesWithNoConsumerTest.11
            public boolean isSatisified() throws Exception {
                Logger logger = ExpiredMessagesWithNoConsumerTest.LOG;
                long enqueueCount = createView.getEnqueueCount();
                long dequeueCount = createView.getDequeueCount();
                long inFlightCount = createView.getInFlightCount();
                createView.getExpiredCount();
                createView.getQueueSize();
                logger.info("enqueue=" + enqueueCount + ", dequeue=" + logger + ", inflight=" + dequeueCount + ", expired= " + logger + ", size= " + inFlightCount);
                return createView.getExpiredCount() > 0 && createView.getEnqueueCount() - createView.getInFlightCount() == createView.getExpiredCount();
            }
        }));
        Logger logger = LOG;
        long enqueueCount = createView.getEnqueueCount();
        long dequeueCount = createView.getDequeueCount();
        long inFlightCount = createView.getInFlightCount();
        createView.getExpiredCount();
        createView.getQueueSize();
        logger.info("enqueue=" + enqueueCount + ", dequeue=" + logger + ", inflight=" + dequeueCount + ", expired= " + logger + ", size= " + inFlightCount);
        countDownLatch2.countDown();
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.ExpiredMessagesWithNoConsumerTest.12
            public boolean isSatisified() throws Exception {
                return 0 == createView.getInFlightCount();
            }
        });
        Logger logger2 = LOG;
        long enqueueCount2 = createView.getEnqueueCount();
        long dequeueCount2 = createView.getDequeueCount();
        long inFlightCount2 = createView.getInFlightCount();
        createView.getExpiredCount();
        createView.getQueueSize();
        logger2.info("enqueue=" + enqueueCount2 + ", dequeue=" + logger2 + ", inflight=" + dequeueCount2 + ", expired= " + logger2 + ", size= " + inFlightCount2);
        assertEquals("inflight reduced to duck", 0L, createView.getInFlightCount());
        assertEquals("size didn't get back to 0 ", 0L, createView.getQueueSize());
        assertEquals("dequeues didn't match sent/expired ", 10L, createView.getDequeueCount());
        createConsumer.close();
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.ExpiredMessagesWithNoConsumerTest.13
            public boolean isSatisified() throws Exception {
                return 0 == createView.getInFlightCount();
            }
        });
        assertEquals("inflight goes to zero on close", 0L, createView.getInFlightCount());
        LOG.info("done: " + getName());
    }

    public void testExpiredMessagesWithVerySlowConsumerCanContinue() throws Exception {
        createBroker();
        this.connection = new ActiveMQConnectionFactory(this.connectionUri + "?jms.prefetchPolicy.queuePrefetch=600").createConnection();
        this.session = this.connection.createSession(false, 2);
        this.producer = this.session.createProducer(this.destination);
        this.producer.setTimeToLive(4000L);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final CountDownLatch countDownLatch2 = new CountDownLatch(1);
        final AtomicLong atomicLong = new AtomicLong();
        MessageConsumer createConsumer = this.session.createConsumer(this.destination);
        createConsumer.setMessageListener(new MessageListener() { // from class: org.apache.activemq.usecases.ExpiredMessagesWithNoConsumerTest.14
            public void onMessage(Message message) {
                try {
                    if (ExpiredMessagesWithNoConsumerTest.LOG.isDebugEnabled()) {
                        ExpiredMessagesWithNoConsumerTest.LOG.debug("Got my message: " + message);
                    }
                    countDownLatch.countDown();
                    atomicLong.incrementAndGet();
                    countDownLatch2.await(5L, TimeUnit.MINUTES);
                    if (ExpiredMessagesWithNoConsumerTest.LOG.isDebugEnabled()) {
                        ExpiredMessagesWithNoConsumerTest.LOG.debug("acking message: " + message);
                    }
                    message.acknowledge();
                } catch (Exception e) {
                    e.printStackTrace();
                    TestCase.fail(e.toString());
                }
            }
        });
        this.connection.start();
        final Thread thread = new Thread("Producing Thread") { // from class: org.apache.activemq.usecases.ExpiredMessagesWithNoConsumerTest.15
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    int i = 0;
                    long currentTimeMillis = System.currentTimeMillis();
                    while (true) {
                        int i2 = i;
                        i++;
                        if (i2 >= 1500) {
                            return;
                        }
                        ExpiredMessagesWithNoConsumerTest.this.producer.send(ExpiredMessagesWithNoConsumerTest.this.session.createTextMessage("test"));
                        if (i % 100 == 0) {
                            ExpiredMessagesWithNoConsumerTest.LOG.info("sent: " + i + " @ " + ((System.currentTimeMillis() - currentTimeMillis) / 100) + "m/ms");
                            currentTimeMillis = System.currentTimeMillis();
                        }
                    }
                } catch (Throwable th) {
                    th.printStackTrace();
                }
            }
        };
        thread.start();
        assertTrue("got one message", countDownLatch.await(20L, TimeUnit.SECONDS));
        assertTrue("producer failed to complete within allocated time", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.ExpiredMessagesWithNoConsumerTest.16
            public boolean isSatisified() throws Exception {
                thread.join(1000L);
                return !thread.isAlive();
            }
        }, 300000L));
        final DestinationViewMBean createView = createView(this.destination);
        assertTrue("Not all dispatched up to default prefetch ", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.ExpiredMessagesWithNoConsumerTest.17
            public boolean isSatisified() throws Exception {
                return 600 == createView.getDispatchCount();
            }
        }));
        assertTrue("all non inflight have expired ", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.ExpiredMessagesWithNoConsumerTest.18
            public boolean isSatisified() throws Exception {
                Logger logger = ExpiredMessagesWithNoConsumerTest.LOG;
                long enqueueCount = createView.getEnqueueCount();
                long dequeueCount = createView.getDequeueCount();
                long inFlightCount = createView.getInFlightCount();
                createView.getExpiredCount();
                createView.getQueueSize();
                logger.info("enqueue=" + enqueueCount + ", dequeue=" + logger + ", inflight=" + dequeueCount + ", expired= " + logger + ", size= " + inFlightCount);
                return createView.getExpiredCount() > 0 && createView.getEnqueueCount() - createView.getInFlightCount() == createView.getExpiredCount();
            }
        }));
        Logger logger = LOG;
        long enqueueCount = createView.getEnqueueCount();
        long dequeueCount = createView.getDequeueCount();
        long inFlightCount = createView.getInFlightCount();
        createView.getExpiredCount();
        createView.getQueueSize();
        logger.info("enqueue=" + enqueueCount + ", dequeue=" + logger + ", inflight=" + dequeueCount + ", expired= " + logger + ", size= " + inFlightCount);
        countDownLatch2.countDown();
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.ExpiredMessagesWithNoConsumerTest.19
            public boolean isSatisified() throws Exception {
                return 0 == createView.getInFlightCount();
            }
        });
        Logger logger2 = LOG;
        long enqueueCount2 = createView.getEnqueueCount();
        long dequeueCount2 = createView.getDequeueCount();
        long inFlightCount2 = createView.getInFlightCount();
        createView.getExpiredCount();
        createView.getQueueSize();
        logger2.info("enqueue=" + enqueueCount2 + ", dequeue=" + logger2 + ", inflight=" + dequeueCount2 + ", expired= " + logger2 + ", size= " + inFlightCount2);
        assertEquals("inflight didn't reduce to duck", 0L, createView.getInFlightCount());
        assertEquals("size doesn't get back to 0 ", 0L, createView.getQueueSize());
        assertEquals("dequeues don't match sent/expired ", 1500L, createView.getDequeueCount());
        this.producer.setTimeToLive(0L);
        long currentTimeMillis = System.currentTimeMillis();
        for (int i = 0; i < 1500; i++) {
            this.producer.send(this.session.createTextMessage("test-" + i));
            if (i % 100 == 0) {
                LOG.info("sent: " + i + " @ " + ((System.currentTimeMillis() - currentTimeMillis) / 100) + "m/ms");
                currentTimeMillis = System.currentTimeMillis();
            }
        }
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.ExpiredMessagesWithNoConsumerTest.20
            public boolean isSatisified() throws Exception {
                return atomicLong.get() >= 1500;
            }
        });
        createConsumer.close();
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.ExpiredMessagesWithNoConsumerTest.21
            public boolean isSatisified() throws Exception {
                return 0 == createView.getInFlightCount();
            }
        });
        assertEquals("inflight did not go to zero on close", 0L, createView.getInFlightCount());
        LOG.info("done: " + getName());
    }

    public void testExpireMessagesForDurableSubscriber() throws Exception {
        createBroker();
        this.connection = new ActiveMQConnectionFactory(this.connectionUri).createConnection();
        this.connection.setClientID("myConnection");
        this.session = this.connection.createSession(false, 2);
        this.connection.start();
        ActiveMQTopic createTopic = this.session.createTopic("test");
        this.producer = this.session.createProducer(createTopic);
        this.producer.setTimeToLive(1000L);
        this.session.createDurableSubscriber(createTopic, "mySub").close();
        for (int i = 0; i < 10; i++) {
            this.producer.send(this.session.createTextMessage("test"));
        }
        DestinationViewMBean createView = createView(createTopic);
        LOG.info("messages sent");
        Logger logger = LOG;
        long expiredCount = createView.getExpiredCount();
        createView.getEnqueueCount();
        logger.info("expired=" + expiredCount + " " + logger);
        assertEquals(0L, createView.getExpiredCount());
        assertEquals(10L, createView.getEnqueueCount());
        Thread.sleep(5000L);
        Logger logger2 = LOG;
        long expiredCount2 = createView.getExpiredCount();
        createView.getEnqueueCount();
        logger2.info("expired=" + expiredCount2 + " " + logger2);
        assertEquals(10L, createView.getExpiredCount());
        assertEquals(10L, createView.getEnqueueCount());
        final AtomicLong atomicLong = new AtomicLong();
        this.session.createDurableSubscriber(createTopic, "mySub").setMessageListener(new MessageListener() { // from class: org.apache.activemq.usecases.ExpiredMessagesWithNoConsumerTest.22
            public void onMessage(Message message) {
                atomicLong.incrementAndGet();
            }
        });
        LOG.info("Waiting for messages to arrive");
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.ExpiredMessagesWithNoConsumerTest.23
            public boolean isSatisified() throws Exception {
                return atomicLong.get() >= 10;
            }
        }, 1000L);
        LOG.info("received=" + atomicLong.get());
        Logger logger3 = LOG;
        long expiredCount3 = createView.getExpiredCount();
        createView.getEnqueueCount();
        logger3.info("expired=" + expiredCount3 + " " + logger3);
        assertEquals(0L, atomicLong.get());
        assertEquals(10L, createView.getExpiredCount());
        assertEquals(10L, createView.getEnqueueCount());
    }

    protected DestinationViewMBean createView(ActiveMQDestination activeMQDestination) throws Exception {
        return (DestinationViewMBean) this.broker.getManagementContext().newProxyInstance(activeMQDestination.isQueue() ? new ObjectName(ActiveMQJMS2TestBase.DEFAULT_JMX_DOMAIN_NAME + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=test") : new ObjectName(ActiveMQJMS2TestBase.DEFAULT_JMX_DOMAIN_NAME + ":type=Broker,brokerName=localhost,destinationType=Topic,destinationName=test"), DestinationViewMBean.class, true);
    }

    protected void tearDown() throws Exception {
        this.connection.stop();
        this.broker.stop();
        this.broker.waitUntilStopped();
    }

    public boolean getOptimizedDispatch() {
        return this.optimizedDispatch;
    }

    public void setOptimizedDispatch(boolean z) {
        this.optimizedDispatch = z;
    }

    public PendingQueueMessageStoragePolicy getPendingQueuePolicy() {
        return this.pendingQueuePolicy;
    }

    public void setPendingQueuePolicy(PendingQueueMessageStoragePolicy pendingQueueMessageStoragePolicy) {
        this.pendingQueuePolicy = pendingQueueMessageStoragePolicy;
    }
}
