package org.apache.activemq;

import java.lang.Thread;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.TopicSubscriber;
import javax.management.ObjectName;
import junit.framework.Test;
import junit.framework.TestCase;
import junit.textui.TestRunner;
import org.apache.activemq.broker.jmx.DestinationViewMBean;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/JMSConsumerTest.class */
public class JMSConsumerTest extends JmsTestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(JMSConsumerTest.class);
    public ActiveMQDestination destination;
    public int deliveryMode;
    public int prefetch;
    public int ackMode;
    public byte destinationType;
    public boolean durableConsumer;

    public static Test suite() {
        return suite(JMSConsumerTest.class);
    }

    public static void main(String[] strArr) {
        TestRunner.run(suite());
    }

    public void initCombosForTestMessageListenerWithConsumerCanBeStopped() {
        addCombinationValues("deliveryMode", new Object[]{1, 2});
        addCombinationValues("destinationType", new Object[]{(byte) 1, (byte) 2, (byte) 5, (byte) 6});
    }

    public void testMessageListenerWithConsumerCanBeStopped() throws Exception {
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final CountDownLatch countDownLatch2 = new CountDownLatch(1);
        this.connection.start();
        Session createSession = this.connection.createSession(false, 1);
        this.destination = createDestination(createSession, this.destinationType);
        ActiveMQMessageConsumer createConsumer = createSession.createConsumer(this.destination);
        createConsumer.setMessageListener(new MessageListener() { // from class: org.apache.activemq.JMSConsumerTest.1
            public void onMessage(Message message) {
                atomicInteger.incrementAndGet();
                if (atomicInteger.get() == 1) {
                    countDownLatch.countDown();
                }
                if (atomicInteger.get() == 2) {
                    countDownLatch2.countDown();
                }
            }
        });
        sendMessages(createSession, (Destination) this.destination, 1);
        assertTrue(countDownLatch.await(1L, TimeUnit.SECONDS));
        assertEquals(1, atomicInteger.get());
        createConsumer.stop();
        sendMessages(createSession, (Destination) this.destination, 1);
        assertFalse(countDownLatch2.await(1L, TimeUnit.SECONDS));
        assertEquals(1, atomicInteger.get());
        createConsumer.start();
        assertTrue(countDownLatch2.await(1L, TimeUnit.SECONDS));
        assertEquals(2, atomicInteger.get());
    }

    public void testMessageListenerWithConsumerCanBeStoppedConcurently() throws Exception {
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        this.connection.start();
        Session createSession = this.connection.createSession(false, 2);
        this.destination = createDestination(createSession, (byte) 1);
        sendMessages(createSession, (Destination) this.destination, 2000);
        final ActiveMQMessageConsumer createConsumer = createSession.createConsumer(this.destination);
        final Map synchronizedMap = Collections.synchronizedMap(new HashMap());
        Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { // from class: org.apache.activemq.JMSConsumerTest.2
            @Override // java.lang.Thread.UncaughtExceptionHandler
            public void uncaughtException(Thread thread, Throwable th) {
                JMSConsumerTest.LOG.error("Uncaught exception:", th);
                synchronizedMap.put(thread, th);
            }
        });
        final ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        createConsumer.setMessageListener(new MessageListener() { // from class: org.apache.activemq.JMSConsumerTest.3
            public void onMessage(Message message) {
                newCachedThreadPool.execute(new Runnable(message, atomicInteger, createConsumer, countDownLatch, synchronizedMap) { // from class: org.apache.activemq.JMSConsumerTest.1AckAndClose
                    private Message message;
                    final /* synthetic */ AtomicInteger val$counter;
                    final /* synthetic */ ActiveMQMessageConsumer val$consumer;
                    final /* synthetic */ CountDownLatch val$closeDone;
                    final /* synthetic */ Map val$exceptions;

                    {
                        this.val$counter = r6;
                        this.val$consumer = r7;
                        this.val$closeDone = r8;
                        this.val$exceptions = r9;
                        this.message = message;
                    }

                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            int incrementAndGet = this.val$counter.incrementAndGet();
                            if (incrementAndGet == 590) {
                                this.val$consumer.close();
                                this.val$closeDone.countDown();
                            }
                            if (incrementAndGet % 200 == 0) {
                                this.message.acknowledge();
                            }
                        } catch (Exception e) {
                            JMSConsumerTest.LOG.error("Exception on close or ack:", e);
                            this.val$exceptions.put(Thread.currentThread(), e);
                        }
                    }
                });
            }
        });
        assertTrue(countDownLatch.await(20L, TimeUnit.SECONDS));
        Thread.sleep(1000L);
        assertTrue("no exceptions: " + synchronizedMap, synchronizedMap.isEmpty());
    }

    public void initCombosForTestMutiReceiveWithPrefetch1() {
        addCombinationValues("deliveryMode", new Object[]{1, 2});
        addCombinationValues("ackMode", new Object[]{1, 3, 2});
        addCombinationValues("destinationType", new Object[]{(byte) 1, (byte) 2, (byte) 5, (byte) 6});
    }

    public void testMutiReceiveWithPrefetch1() throws Exception {
        this.connection.getPrefetchPolicy().setAll(1);
        this.connection.start();
        Session createSession = this.connection.createSession(false, this.ackMode);
        this.destination = createDestination(createSession, this.destinationType);
        MessageConsumer createConsumer = createSession.createConsumer(this.destination);
        sendMessages(createSession, (Destination) this.destination, 4);
        Message message = null;
        for (int i = 0; i < 4; i++) {
            message = createConsumer.receive(1000L);
            assertNotNull(message);
        }
        assertNull(createConsumer.receiveNoWait());
        message.acknowledge();
    }

    public void initCombosForTestDurableConsumerSelectorChange() {
        addCombinationValues("deliveryMode", new Object[]{1, 2});
        addCombinationValues("destinationType", new Object[]{(byte) 2});
    }

    public void testDurableConsumerSelectorChange() throws Exception {
        this.connection.setClientID("test");
        this.connection.start();
        Session createSession = this.connection.createSession(false, 1);
        this.destination = createDestination(createSession, this.destinationType);
        MessageProducer createProducer = createSession.createProducer(this.destination);
        createProducer.setDeliveryMode(this.deliveryMode);
        TopicSubscriber createDurableSubscriber = createSession.createDurableSubscriber(this.destination, "test", "color='red'", false);
        TextMessage createTextMessage = createSession.createTextMessage("1st");
        createTextMessage.setStringProperty("color", "red");
        createProducer.send(createTextMessage);
        TextMessage receive = createDurableSubscriber.receive(1000L);
        assertNotNull(receive);
        assertEquals("1st", receive.getText());
        createDurableSubscriber.close();
        TopicSubscriber createDurableSubscriber2 = createSession.createDurableSubscriber(this.destination, "test", "color='blue'", false);
        TextMessage createTextMessage2 = createSession.createTextMessage("2nd");
        createTextMessage2.setStringProperty("color", "red");
        createProducer.send(createTextMessage2);
        TextMessage createTextMessage3 = createSession.createTextMessage("3rd");
        createTextMessage3.setStringProperty("color", "blue");
        createProducer.send(createTextMessage3);
        TextMessage receive2 = createDurableSubscriber2.receive(1000L);
        assertNotNull(receive2);
        assertEquals("3rd", receive2.getText());
        assertNull(createDurableSubscriber2.receiveNoWait());
    }

    public void initCombosForTestSendReceiveBytesMessage() {
        addCombinationValues("deliveryMode", new Object[]{1, 2});
        addCombinationValues("destinationType", new Object[]{(byte) 1, (byte) 2, (byte) 5, (byte) 6});
    }

    public void testSendReceiveBytesMessage() throws Exception {
        this.connection.start();
        Session createSession = this.connection.createSession(false, 1);
        this.destination = createDestination(createSession, this.destinationType);
        MessageConsumer createConsumer = createSession.createConsumer(this.destination);
        MessageProducer createProducer = createSession.createProducer(this.destination);
        BytesMessage createBytesMessage = createSession.createBytesMessage();
        createBytesMessage.writeBoolean(true);
        createBytesMessage.writeBoolean(false);
        createProducer.send(createBytesMessage);
        BytesMessage receive = createConsumer.receive(1000L);
        assertNotNull(receive);
        assertTrue(receive.readBoolean());
        assertFalse(receive.readBoolean());
        assertNull(createConsumer.receiveNoWait());
    }

    public void initCombosForTestSetMessageListenerAfterStart() {
        addCombinationValues("deliveryMode", new Object[]{1, 2});
        addCombinationValues("destinationType", new Object[]{(byte) 1, (byte) 2, (byte) 5, (byte) 6});
    }

    public void testSetMessageListenerAfterStart() throws Exception {
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        this.connection.start();
        Session createSession = this.connection.createSession(false, 1);
        this.destination = createDestination(createSession, this.destinationType);
        MessageConsumer createConsumer = createSession.createConsumer(this.destination);
        sendMessages(createSession, (Destination) this.destination, 4);
        createConsumer.setMessageListener(new MessageListener() { // from class: org.apache.activemq.JMSConsumerTest.4
            public void onMessage(Message message) {
                atomicInteger.incrementAndGet();
                if (atomicInteger.get() == 4) {
                    countDownLatch.countDown();
                }
            }
        });
        assertTrue(countDownLatch.await(1000L, TimeUnit.MILLISECONDS));
        Thread.sleep(200L);
        assertEquals(4, atomicInteger.get());
    }

    public void initCombosForTestPassMessageListenerIntoCreateConsumer() {
        addCombinationValues("destinationType", new Object[]{(byte) 1, (byte) 2});
    }

    public void testPassMessageListenerIntoCreateConsumer() throws Exception {
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        this.connection.start();
        ActiveMQSession createSession = this.connection.createSession(false, 1);
        this.destination = createDestination(createSession, this.destinationType);
        createSession.createConsumer(this.destination, new MessageListener() { // from class: org.apache.activemq.JMSConsumerTest.5
            public void onMessage(Message message) {
                atomicInteger.incrementAndGet();
                if (atomicInteger.get() == 4) {
                    countDownLatch.countDown();
                }
            }
        });
        sendMessages((Session) createSession, (Destination) this.destination, 4);
        assertTrue(countDownLatch.await(1000L, TimeUnit.MILLISECONDS));
        Thread.sleep(200L);
        assertEquals(4, atomicInteger.get());
    }

    public void initCombosForTestMessageListenerOnMessageCloseUnackedWithPrefetch1StayInQueue() {
        addCombinationValues("deliveryMode", new Object[]{1, 2});
        addCombinationValues("ackMode", new Object[]{2});
        addCombinationValues("destinationType", new Object[]{(byte) 1});
    }

    public void testMessageListenerOnMessageCloseUnackedWithPrefetch1StayInQueue() throws Exception {
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final CountDownLatch countDownLatch2 = new CountDownLatch(1);
        this.connection.getPrefetchPolicy().setAll(1);
        this.connection.setOptimizedMessageDispatch(false);
        this.connection.start();
        Session createSession = this.connection.createSession(false, this.ackMode);
        this.destination = createDestination(createSession, this.destinationType);
        createSession.createConsumer(this.destination).setMessageListener(new MessageListener() { // from class: org.apache.activemq.JMSConsumerTest.6
            public void onMessage(Message message) {
                try {
                    TextMessage textMessage = (TextMessage) message;
                    JMSConsumerTest.LOG.info("Got in first listener: " + textMessage.getText());
                    TestCase.assertEquals("" + atomicInteger.get(), textMessage.getText());
                    atomicInteger.incrementAndGet();
                    if (atomicInteger.get() == 2) {
                        countDownLatch.await();
                        JMSConsumerTest.this.connection.close();
                        countDownLatch2.countDown();
                    }
                    textMessage.acknowledge();
                } catch (Throwable th) {
                    th.printStackTrace();
                }
            }
        });
        sendMessages(createSession, (Destination) this.destination, 4);
        countDownLatch.countDown();
        assertTrue(countDownLatch2.await(100000L, TimeUnit.MILLISECONDS));
        this.connection = this.factory.createConnection();
        this.connections.add(this.connection);
        this.connection.getPrefetchPolicy().setAll(1);
        this.connection.start();
        final CountDownLatch countDownLatch3 = new CountDownLatch(1);
        this.connection.createSession(false, this.ackMode).createConsumer(this.destination).setMessageListener(new MessageListener() { // from class: org.apache.activemq.JMSConsumerTest.7
            public void onMessage(Message message) {
                try {
                    JMSConsumerTest.LOG.info("Got in second listener: " + ((TextMessage) message).getText());
                    atomicInteger.incrementAndGet();
                    if (atomicInteger.get() == 4) {
                        countDownLatch3.countDown();
                    }
                } catch (Throwable th) {
                    JMSConsumerTest.LOG.error("unexpected ex onMessage: ", th);
                }
            }
        });
        assertTrue(countDownLatch3.await(1000L, TimeUnit.MILLISECONDS));
        Thread.sleep(200L);
        assertEquals(5, atomicInteger.get());
    }

    public void initCombosForTestMessageListenerAutoAckOnCloseWithPrefetch1() {
        addCombinationValues("deliveryMode", new Object[]{1, 2});
        addCombinationValues("ackMode", new Object[]{1, 2});
        addCombinationValues("destinationType", new Object[]{(byte) 1});
    }

    public void testMessageListenerAutoAckOnCloseWithPrefetch1() throws Exception {
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final CountDownLatch countDownLatch2 = new CountDownLatch(1);
        this.connection.getPrefetchPolicy().setAll(1);
        this.connection.setOptimizedMessageDispatch(false);
        this.connection.start();
        Session createSession = this.connection.createSession(false, this.ackMode);
        this.destination = createDestination(createSession, this.destinationType);
        createSession.createConsumer(this.destination).setMessageListener(new MessageListener() { // from class: org.apache.activemq.JMSConsumerTest.8
            public void onMessage(Message message) {
                try {
                    TextMessage textMessage = (TextMessage) message;
                    JMSConsumerTest.LOG.info("Got in first listener: " + textMessage.getText());
                    TestCase.assertEquals("" + atomicInteger.get(), textMessage.getText());
                    atomicInteger.incrementAndGet();
                    message.acknowledge();
                    if (atomicInteger.get() == 2) {
                        countDownLatch.await();
                        JMSConsumerTest.this.connection.close();
                        countDownLatch2.countDown();
                    }
                } catch (Throwable th) {
                    th.printStackTrace();
                }
            }
        });
        sendMessages(createSession, (Destination) this.destination, 4);
        countDownLatch.countDown();
        assertTrue(countDownLatch2.await(100000L, TimeUnit.MILLISECONDS));
        this.connection = this.factory.createConnection();
        this.connections.add(this.connection);
        this.connection.getPrefetchPolicy().setAll(1);
        this.connection.start();
        final CountDownLatch countDownLatch3 = new CountDownLatch(1);
        this.connection.createSession(false, this.ackMode).createConsumer(this.destination).setMessageListener(new MessageListener() { // from class: org.apache.activemq.JMSConsumerTest.9
            public void onMessage(Message message) {
                try {
                    JMSConsumerTest.LOG.info("Got in second listener: " + ((TextMessage) message).getText());
                    atomicInteger.incrementAndGet();
                    if (atomicInteger.get() == 4) {
                        countDownLatch3.countDown();
                    }
                } catch (Throwable th) {
                    JMSConsumerTest.LOG.error("unexpected ex onMessage: ", th);
                }
            }
        });
        assertTrue(countDownLatch3.await(1000L, TimeUnit.MILLISECONDS));
        Thread.sleep(200L);
        assertEquals(4, atomicInteger.get());
    }

    public void initCombosForTestMessageListenerWithConsumerWithPrefetch1() {
        addCombinationValues("deliveryMode", new Object[]{1, 2});
        addCombinationValues("destinationType", new Object[]{(byte) 1, (byte) 2, (byte) 5, (byte) 6});
    }

    public void testMessageListenerWithConsumerWithPrefetch1() throws Exception {
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        this.connection.getPrefetchPolicy().setAll(1);
        this.connection.start();
        Session createSession = this.connection.createSession(false, 1);
        this.destination = createDestination(createSession, this.destinationType);
        createSession.createConsumer(this.destination).setMessageListener(new MessageListener() { // from class: org.apache.activemq.JMSConsumerTest.10
            public void onMessage(Message message) {
                atomicInteger.incrementAndGet();
                if (atomicInteger.get() == 4) {
                    countDownLatch.countDown();
                }
            }
        });
        sendMessages(createSession, (Destination) this.destination, 4);
        assertTrue(countDownLatch.await(1000L, TimeUnit.MILLISECONDS));
        Thread.sleep(200L);
        assertEquals(4, atomicInteger.get());
    }

    public void initCombosForTestMessageListenerWithConsumer() {
        addCombinationValues("deliveryMode", new Object[]{1, 2});
        addCombinationValues("destinationType", new Object[]{(byte) 1, (byte) 2, (byte) 5, (byte) 6});
    }

    public void testMessageListenerWithConsumer() throws Exception {
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        this.connection.start();
        Session createSession = this.connection.createSession(false, 1);
        this.destination = createDestination(createSession, this.destinationType);
        createSession.createConsumer(this.destination).setMessageListener(new MessageListener() { // from class: org.apache.activemq.JMSConsumerTest.11
            public void onMessage(Message message) {
                atomicInteger.incrementAndGet();
                if (atomicInteger.get() == 4) {
                    countDownLatch.countDown();
                }
            }
        });
        sendMessages(createSession, (Destination) this.destination, 4);
        assertTrue(countDownLatch.await(1000L, TimeUnit.MILLISECONDS));
        Thread.sleep(200L);
        assertEquals(4, atomicInteger.get());
    }

    public void initCombosForTestUnackedWithPrefetch1StayInQueue() {
        addCombinationValues("deliveryMode", new Object[]{1, 2});
        addCombinationValues("ackMode", new Object[]{1, 3, 2});
        addCombinationValues("destinationType", new Object[]{(byte) 1});
    }

    public void testUnackedWithPrefetch1StayInQueue() throws Exception {
        this.connection.getPrefetchPolicy().setAll(1);
        this.connection.start();
        Session createSession = this.connection.createSession(false, this.ackMode);
        this.destination = createDestination(createSession, this.destinationType);
        MessageConsumer createConsumer = createSession.createConsumer(this.destination);
        sendMessages(createSession, (Destination) this.destination, 4);
        Message message = null;
        for (int i = 0; i < 2; i++) {
            message = createConsumer.receive(1000L);
            assertNotNull(message);
        }
        message.acknowledge();
        this.connection.close();
        this.connection = this.factory.createConnection();
        this.connections.add(this.connection);
        this.connection.getPrefetchPolicy().setAll(1);
        this.connection.start();
        MessageConsumer createConsumer2 = this.connection.createSession(false, this.ackMode).createConsumer(this.destination);
        for (int i2 = 0; i2 < 2; i2++) {
            message = createConsumer2.receive(1000L);
            assertNotNull(message);
        }
        message.acknowledge();
        assertNull(createConsumer2.receiveNoWait());
    }

    public void initCombosForTestPrefetch1MessageNotDispatched() {
        addCombinationValues("deliveryMode", new Object[]{1, 2});
    }

    public void testPrefetch1MessageNotDispatched() throws Exception {
        this.connection.getPrefetchPolicy().setAll(1);
        this.connection.start();
        Session createSession = this.connection.createSession(true, 0);
        this.destination = new ActiveMQQueue("TEST");
        MessageConsumer createConsumer = createSession.createConsumer(this.destination);
        sendMessages(createSession, (Destination) this.destination, 2);
        createSession.commit();
        Connection connection = (ActiveMQConnection) this.factory.createConnection();
        connection.start();
        this.connections.add(connection);
        Session createSession2 = connection.createSession(true, 0);
        MessageConsumer createConsumer2 = createSession2.createConsumer(this.destination);
        assertNotNull(createConsumer.receive(1000L));
        assertNotNull(createConsumer2.receive(5000L));
        createSession.commit();
        createSession2.commit();
        assertNull(createConsumer.receiveNoWait());
    }

    public void initCombosForTestDontStart() {
        addCombinationValues("deliveryMode", new Object[]{1});
        addCombinationValues("destinationType", new Object[]{(byte) 1, (byte) 2});
    }

    public void testDontStart() throws Exception {
        Session createSession = this.connection.createSession(false, 1);
        this.destination = createDestination(createSession, this.destinationType);
        MessageConsumer createConsumer = createSession.createConsumer(this.destination);
        sendMessages(createSession, (Destination) this.destination, 1);
        assertNull(createConsumer.receive(1000L));
    }

    public void initCombosForTestStartAfterSend() {
        addCombinationValues("deliveryMode", new Object[]{1});
        addCombinationValues("destinationType", new Object[]{(byte) 1, (byte) 2});
    }

    public void testStartAfterSend() throws Exception {
        Session createSession = this.connection.createSession(false, 1);
        this.destination = createDestination(createSession, this.destinationType);
        MessageConsumer createConsumer = createSession.createConsumer(this.destination);
        sendMessages(createSession, (Destination) this.destination, 1);
        this.connection.start();
        assertNotNull(createConsumer.receive(1000L));
        assertNull(createConsumer.receiveNoWait());
    }

    public void initCombosForTestReceiveMessageWithConsumer() {
        addCombinationValues("deliveryMode", new Object[]{1, 2});
        addCombinationValues("destinationType", new Object[]{(byte) 1, (byte) 2, (byte) 5, (byte) 6});
    }

    public void testReceiveMessageWithConsumer() throws Exception {
        this.connection.start();
        Session createSession = this.connection.createSession(false, 1);
        this.destination = createDestination(createSession, this.destinationType);
        MessageConsumer createConsumer = createSession.createConsumer(this.destination);
        sendMessages(createSession, (Destination) this.destination, 1);
        TextMessage receive = createConsumer.receive(1000L);
        assertNotNull(receive);
        assertEquals("0", receive.getText());
        assertNull(createConsumer.receiveNoWait());
    }

    public void testDupsOkConsumer() throws Exception {
        this.connection.start();
        Session createSession = this.connection.createSession(false, 3);
        this.destination = createDestination(createSession, (byte) 1);
        MessageConsumer createConsumer = createSession.createConsumer(this.destination);
        sendMessages(createSession, (Destination) this.destination, 4);
        for (int i = 0; i < 4; i++) {
            assertNotNull(createConsumer.receive(1000L));
        }
        assertNull(createConsumer.receive(1000L));
        createConsumer.close();
        assertNull(createSession.createConsumer(this.destination).receive(1000L));
    }

    public void testRedispatchOfUncommittedTx() throws Exception {
        this.connection.start();
        Session createSession = this.connection.createSession(true, 0);
        this.destination = createDestination(createSession, (byte) 1);
        sendMessages((Connection) this.connection, (Destination) this.destination, 2);
        MessageConsumer createConsumer = createSession.createConsumer(this.destination);
        assertNotNull(createConsumer.receive(1000L));
        assertNotNull(createConsumer.receive(1000L));
        Session createSession2 = this.connection.createSession(true, 0);
        MessageConsumer createConsumer2 = createSession2.createConsumer(this.destination);
        createSession.close();
        Message receive = createConsumer2.receive(1000L);
        assertNotNull(receive);
        assertTrue("redelivered flag set", receive.getJMSRedelivered());
        assertEquals(2L, receive.getLongProperty("JMSXDeliveryCount"));
        Message receive2 = createConsumer2.receive(1000L);
        assertNotNull(receive2);
        assertTrue(receive2.getJMSRedelivered());
        assertEquals(2L, receive2.getLongProperty("JMSXDeliveryCount"));
        createSession2.commit();
        assertNull(createConsumer2.receive(500L));
        createSession2.close();
    }

    public void testRedispatchOfRolledbackTx() throws Exception {
        this.connection.start();
        Session createSession = this.connection.createSession(true, 0);
        this.destination = createDestination(createSession, (byte) 1);
        sendMessages((Connection) this.connection, (Destination) this.destination, 2);
        MessageConsumer createConsumer = createSession.createConsumer(this.destination);
        assertNotNull(createConsumer.receive(1000L));
        assertNotNull(createConsumer.receive(1000L));
        Session createSession2 = this.connection.createSession(true, 0);
        MessageConsumer createConsumer2 = createSession2.createConsumer(this.destination);
        createSession.rollback();
        createSession.close();
        Message receive = createConsumer2.receive(1000L);
        assertNotNull(receive);
        assertTrue(receive.getJMSRedelivered());
        assertEquals(2L, receive.getLongProperty("JMSXDeliveryCount"));
        Message receive2 = createConsumer2.receive(1000L);
        assertNotNull(receive2);
        assertTrue(receive2.getJMSRedelivered());
        assertEquals(2L, receive2.getLongProperty("JMSXDeliveryCount"));
        createSession2.commit();
        assertNull(createConsumer2.receive(500L));
        createSession2.close();
    }

    public void initCombosForTestAckOfExpired() {
        addCombinationValues("destinationType", new Object[]{(byte) 1, (byte) 2});
    }

    public void testAckOfExpired() throws Exception {
        this.connection = new ActiveMQConnectionFactory("vm://localhost?jms.prefetchPolicy.all=4&jms.sendAcksAsync=false").createActiveMQConnection();
        this.connection.start();
        Session createSession = this.connection.createSession(false, 1);
        this.destination = (ActiveMQDestination) (this.destinationType == 1 ? createSession.createQueue("test") : createSession.createTopic("test"));
        ActiveMQMessageConsumer createConsumer = createSession.createConsumer(this.destination);
        this.connection.setStatsEnabled(true);
        Session createSession2 = this.connection.createSession(false, 1);
        MessageProducer createProducer = createSession2.createProducer(this.destination);
        createProducer.setTimeToLive(1000L);
        for (int i = 0; i < 4; i++) {
            createProducer.send(createSession2.createTextMessage("" + i));
        }
        Thread.sleep(2000L);
        createProducer.setTimeToLive(0L);
        for (int i2 = 0; i2 < 4; i2++) {
            createProducer.send(createSession2.createTextMessage("no expiry" + i2));
        }
        ActiveMQMessageConsumer activeMQMessageConsumer = createConsumer;
        for (int i3 = 0; i3 < 4; i3++) {
            TextMessage receive = activeMQMessageConsumer.receive();
            assertNotNull(receive);
            assertTrue("message has \"no expiry\" text: " + receive.getText(), receive.getText().contains("no expiry"));
            activeMQMessageConsumer.acknowledge();
        }
        assertEquals("consumer has expiredMessages", 4L, activeMQMessageConsumer.getConsumerStats().getExpiredMessageCount().getCount());
        DestinationViewMBean createView = createView(this.destination);
        assertEquals("Wrong inFlightCount: " + createView.getInFlightCount(), 0L, createView.getInFlightCount());
        assertEquals("Wrong dispatch count: " + createView.getDispatchCount(), 8L, createView.getDispatchCount());
        assertEquals("Wrong dequeue count: " + createView.getDequeueCount(), 8L, createView.getDequeueCount());
    }

    protected DestinationViewMBean createView(ActiveMQDestination activeMQDestination) throws Exception {
        return (DestinationViewMBean) this.broker.getManagementContext().newProxyInstance(activeMQDestination.isQueue() ? new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=test") : new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Topic,destinationName=test"), DestinationViewMBean.class, true);
    }
}
