package org.apache.activemq.usecases;

import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TopicSubscriber;
import java.util.HashSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.disk.page.PageFile;
import org.apache.activemq.util.Wait;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/usecases/DurableSubscriptionOfflineTest.class */
public class DurableSubscriptionOfflineTest extends DurableSubscriptionOfflineTestBase {
    private static final Logger LOG = LoggerFactory.getLogger(DurableSubscriptionOfflineTest.class);

    @Override // org.apache.activemq.usecases.DurableSubscriptionOfflineTestBase
    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("vm://" + getName(true));
        activeMQConnectionFactory.setWatchTopicAdvisories(false);
        return activeMQConnectionFactory;
    }

    @Test(timeout = 60000)
    public void testConsumeAllMatchedMessages() throws Exception {
        Connection createConnection = createConnection();
        Session createSession = createConnection.createSession(false, 1);
        createSession.createDurableSubscriber(this.topic, "SubsId", "filter = 'true'", true);
        createSession.close();
        createConnection.close();
        Connection createConnection2 = createConnection();
        Session createSession2 = createConnection2.createSession(false, 1);
        MessageProducer createProducer = createSession2.createProducer((Destination) null);
        int i = 0;
        for (int i2 = 0; i2 < 10; i2++) {
            i++;
            Message createMessage = createSession2.createMessage();
            createMessage.setStringProperty("filter", "true");
            createProducer.send(this.topic, createMessage);
        }
        Thread.sleep(1000L);
        createSession2.close();
        createConnection2.close();
        Connection createConnection3 = createConnection();
        Session createSession3 = createConnection3.createSession(false, 1);
        createSession3.createDurableSubscriber(this.topic, "SubsId", "filter = 'true'", true).setMessageListener(new DurableSubscriptionOfflineTestListener());
        Thread.sleep(3000L);
        createSession3.close();
        createConnection3.close();
        Assert.assertEquals(i, r0.count);
    }

    @Test(timeout = 60000)
    public void testBrowseOfflineSub() throws Exception {
        Connection createConnection = createConnection();
        Session createSession = createConnection.createSession(false, 1);
        createSession.createDurableSubscriber(this.topic, "SubsId");
        createSession.close();
        createConnection.close();
        Connection createConnection2 = createConnection();
        Session createSession2 = createConnection2.createSession(false, 1);
        MessageProducer createProducer = createSession2.createProducer((Destination) null);
        for (int i = 0; i < 10; i++) {
            Message createMessage = createSession2.createMessage();
            createMessage.setStringProperty("filter", "true");
            createProducer.send(this.topic, createMessage);
        }
        Thread.sleep(1000L);
        createSession2.close();
        createConnection2.close();
        ObjectName[] inactiveDurableTopicSubscribers = this.broker.getAdminView().getInactiveDurableTopicSubscribers();
        Assert.assertEquals(1L, inactiveDurableTopicSubscribers.length);
        DurableSubscriptionViewMBean durableSubscriptionViewMBean = (DurableSubscriptionViewMBean) this.broker.getManagementContext().newProxyInstance(inactiveDurableTopicSubscribers[0], DurableSubscriptionViewMBean.class, true);
        Assert.assertNotNull(durableSubscriptionViewMBean.browse());
        Assert.assertEquals(10L, r0.length);
        Assert.assertNotNull(durableSubscriptionViewMBean.browseAsTable());
        Assert.assertEquals(10L, r0.size());
    }

    @Test(timeout = 60000)
    public void testTwoOfflineSubscriptionCanConsume() throws Exception {
        Connection createConnection = createConnection("cliId1");
        Session createSession = createConnection.createSession(false, 1);
        createSession.createDurableSubscriber(this.topic, "SubsId", "filter = 'true'", true);
        createSession.close();
        createConnection.close();
        Connection createConnection2 = createConnection("cliId2");
        Session createSession2 = createConnection2.createSession(false, 1);
        createSession2.createDurableSubscriber(this.topic, "SubsId", "filter = 'true'", true).setMessageListener(new DurableSubscriptionOfflineTestListener());
        Connection createConnection3 = createConnection();
        Session createSession3 = createConnection3.createSession(false, 1);
        MessageProducer createProducer = createSession3.createProducer((Destination) null);
        int i = 0;
        for (int i2 = 0; i2 < 10; i2++) {
            i++;
            Message createMessage = createSession3.createMessage();
            createMessage.setStringProperty("filter", "true");
            createProducer.send(this.topic, createMessage);
        }
        Thread.sleep(1000L);
        createSession3.close();
        createConnection3.close();
        Thread.sleep(3000L);
        createSession2.close();
        createConnection2.close();
        Assert.assertEquals(i, r0.count);
        Connection createConnection4 = createConnection("cliId1");
        Session createSession4 = createConnection4.createSession(false, 1);
        createSession4.createDurableSubscriber(this.topic, "SubsId", "filter = 'true'", true).setMessageListener(new DurableSubscriptionOfflineTestListener());
        Thread.sleep(3000L);
        createSession4.close();
        createConnection4.close();
        Assert.assertEquals("offline consumer got all", i, r0.count);
    }

    @Test(timeout = 60000)
    public void testRemovedDurableSubDeletes() throws Exception {
        Connection createConnection = createConnection("cliId1");
        Session createSession = createConnection.createSession(false, 1);
        createSession.createDurableSubscriber(this.topic, "SubsId", "filter = 'true'", true);
        createSession.close();
        createConnection.close();
        Session createSession2 = createConnection().createSession(false, 1);
        MessageProducer createProducer = createSession2.createProducer((Destination) null);
        for (int i = 0; i < 10; i++) {
            Message createMessage = createSession2.createMessage();
            createMessage.setStringProperty("filter", "true");
            createProducer.send(this.topic, createMessage);
        }
        Thread.sleep(1000L);
        Connection createConnection2 = createConnection("cliId1");
        Session createSession3 = createConnection2.createSession(false, 1);
        createSession3.unsubscribe("SubsId");
        createSession3.close();
        createConnection2.close();
        this.topic = new ActiveMQTopic(this.topic.getPhysicalName() + "?consumer.retroactive=true");
        Connection createConnection3 = createConnection("offCli2");
        Session createSession4 = createConnection3.createSession(false, 1);
        createSession4.createDurableSubscriber(this.topic, "SubsId", "$a='A1' AND (($b=true AND $c=true) OR ($d='D1' OR $d='D2'))", true).setMessageListener(new DurableSubscriptionOfflineTestListener());
        createSession4.close();
        createConnection3.close();
        Assert.assertEquals(0L, r0.count);
    }

    @Test(timeout = 60000)
    public void testRemovedDurableSubDeletesFromIndex() throws Exception {
        if (this.broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) {
            PageFile pageFile = this.broker.getPersistenceAdapter().getStore().getPageFile();
            Logger logger = LOG;
            long pageCount = pageFile.getPageCount();
            long freePageCount = pageFile.getFreePageCount();
            pageFile.getFile().length();
            logger.info("PageCount " + pageCount + " f:" + logger + ", fileSize:" + freePageCount);
            long j = 0;
            for (int i = 0; i < 2; i++) {
                Logger logger2 = LOG;
                long pageCount2 = pageFile.getPageCount();
                pageFile.getFreePageCount();
                logger2.info("Iteration: " + i + " Count:" + pageCount2 + " f:" + logger2);
                Connection createConnection = createConnection("cliId1-" + i);
                Session createSession = createConnection.createSession(false, 1);
                createSession.createDurableSubscriber(this.topic, "SubsId", "filter = 'true'", true);
                createSession.close();
                createConnection.close();
                Connection createConnection2 = createConnection();
                Session createSession2 = createConnection2.createSession(false, 1);
                MessageProducer createProducer = createSession2.createProducer((Destination) null);
                for (int i2 = 0; i2 < 2750; i2++) {
                    Message createMessage = createSession2.createMessage();
                    createMessage.setStringProperty("filter", "true");
                    createProducer.send(this.topic, createMessage);
                }
                createConnection2.close();
                Connection createConnection3 = createConnection("cliId1-" + i);
                Session createSession3 = createConnection3.createSession(false, 1);
                createSession3.unsubscribe("SubsId");
                createSession3.close();
                createConnection3.close();
                Logger logger3 = LOG;
                long pageCount3 = pageFile.getPageCount();
                long freePageCount2 = pageFile.getFreePageCount();
                long pageCount4 = pageFile.getPageCount() - pageFile.getFreePageCount();
                pageFile.getFile().length();
                logger3.info("PageCount " + pageCount3 + " f:" + logger3 + " diff: " + freePageCount2 + " fileSize:" + logger3);
                if (j != 0) {
                    Assert.assertEquals("Only use X pages per iteration: " + i, j, pageFile.getPageCount() - pageFile.getFreePageCount());
                }
                j = pageFile.getPageCount() - pageFile.getFreePageCount();
            }
        }
    }

    @Test(timeout = 60000)
    public void testInterleavedOfflineSubscriptionCanConsumeAfterUnsub() throws Exception {
        Connection createConnection = createConnection("offCli1");
        Session createSession = createConnection.createSession(false, 1);
        createSession.createDurableSubscriber(this.topic, "SubsId", "filter = 'true'", true);
        createSession.close();
        createConnection.close();
        Connection createConnection2 = createConnection("offCli2");
        Session createSession2 = createConnection2.createSession(false, 1);
        createSession2.createDurableSubscriber(this.topic, "SubsId", (String) null, true);
        createSession2.close();
        createConnection2.close();
        Session createSession3 = createConnection().createSession(false, 1);
        MessageProducer createProducer = createSession3.createProducer((Destination) null);
        int i = 0;
        for (int i2 = 0; i2 < 10; i2++) {
            boolean z = ((int) (Math.random() * 2.0d)) >= 1;
            i++;
            Message createMessage = createSession3.createMessage();
            createMessage.setStringProperty("filter", z ? "true" : "false");
            createProducer.send(this.topic, createMessage);
        }
        Thread.sleep(1000L);
        Connection createConnection3 = createConnection("offCli1");
        Session createSession4 = createConnection3.createSession(false, 1);
        createSession4.unsubscribe("SubsId");
        createSession4.close();
        createConnection3.close();
        Connection createConnection4 = createConnection("offCli2");
        Session createSession5 = createConnection4.createSession(false, 1);
        createSession5.createDurableSubscriber(this.topic, "SubsId", (String) null, true).setMessageListener(new DurableSubscriptionOfflineTestListener("SubsId"));
        Thread.sleep(3000L);
        createSession5.close();
        createConnection4.close();
        Assert.assertEquals("offline consumer got all", i, r0.count);
    }

    @Test(timeout = 60000)
    public void testNoDuplicateOnConcurrentSendTranCommitAndActivate() throws Exception {
        for (int i = 0; i <= 10; i++) {
            Connection createConnection = createConnection("cli" + i);
            Session createSession = createConnection.createSession(false, 1);
            createSession.createDurableSubscriber(this.topic, "SubsId", (String) null, true);
            createSession.close();
            createConnection.close();
        }
        String str = new String(new byte[1000]);
        Connection createConnection2 = createConnection();
        final Session createSession2 = createConnection2.createSession(true, 0);
        MessageProducer createProducer = createSession2.createProducer(this.topic);
        for (int i2 = 0; i2 < 1000; i2++) {
            createProducer.send(createSession2.createTextMessage(str));
        }
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        newCachedThreadPool.execute(new Runnable() { // from class: org.apache.activemq.usecases.DurableSubscriptionOfflineTest.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    createSession2.commit();
                } catch (JMSException e) {
                    e.printStackTrace();
                    DurableSubscriptionOfflineTest.this.exceptions.add(e);
                }
            }
        });
        for (int i3 = 0; i3 < 10; i3++) {
            newCachedThreadPool.execute(new Runnable(i3) { // from class: org.apache.activemq.usecases.DurableSubscriptionOfflineTest.1CheckForDupsClient
                HashSet<Long> ids = new HashSet<>();
                final int id;

                {
                    this.id = i3;
                }

                @Override // java.lang.Runnable
                public void run() {
                    try {
                        Connection createConnection3 = DurableSubscriptionOfflineTest.this.createConnection("cli" + this.id);
                        Session createSession3 = createConnection3.createSession(false, 1);
                        for (int i4 = 0; i4 < 2; i4++) {
                            TopicSubscriber createDurableSubscriber = createSession3.createDurableSubscriber(DurableSubscriptionOfflineTest.this.topic, "SubsId", (String) null, true);
                            for (int i5 = 0; i5 < 500; i5++) {
                                Message receive = createDurableSubscriber.receive(4000L);
                                Assert.assertNotNull(receive);
                                long producerSequenceId = new MessageId(receive.getJMSMessageID()).getProducerSequenceId();
                                Assert.assertTrue("ID=" + this.id + " not a duplicate: " + producerSequenceId, this.ids.add(Long.valueOf(producerSequenceId)));
                            }
                            createDurableSubscriber.close();
                        }
                        Message receive2 = createSession3.createDurableSubscriber(DurableSubscriptionOfflineTest.this.topic, "SubsId", (String) null, true).receive(4000L);
                        if (receive2 != null) {
                            long producerSequenceId2 = new MessageId(receive2.getJMSMessageID()).getProducerSequenceId();
                            Assert.assertTrue("ID=" + this.id + " not a duplicate: " + producerSequenceId2, this.ids.add(Long.valueOf(producerSequenceId2)));
                        }
                        Assert.assertNull(receive2);
                        createSession3.close();
                        createConnection3.close();
                    } catch (Throwable th) {
                        th.printStackTrace();
                        DurableSubscriptionOfflineTest.this.exceptions.add(th);
                    }
                }
            });
        }
        newCachedThreadPool.shutdown();
        newCachedThreadPool.awaitTermination(5L, TimeUnit.MINUTES);
        createConnection2.close();
        Assert.assertTrue("no exceptions: " + this.exceptions, this.exceptions.isEmpty());
    }

    @Test(timeout = 120000)
    public void testOrderOnActivateDeactivate() throws Exception {
        for (int i = 0; i < 10; i++) {
            LOG.info("Iteration: " + i);
            doTestOrderOnActivateDeactivate();
            this.broker.stop();
            this.broker.waitUntilStopped();
            createBroker(true);
        }
    }

    public void doTestOrderOnActivateDeactivate() throws Exception {
        Connection connection = null;
        for (int i = 0; i <= 4; i++) {
            connection = createConnection("cli" + i);
            Session createSession = connection.createSession(false, 1);
            createSession.createDurableSubscriber(this.topic, "SubsId", (String) null, true);
            createSession.close();
            connection.close();
        }
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + ((TransportConnector) this.broker.getTransportConnectors().get(1)).getConnectUri().getPort() + "?wireFormat.maxInactivityDuration=0)?jms.watchTopicAdvisories=false&jms.alwaysSyncSend=true&jms.dispatchAsync=true&jms.sendAcksAsync=true&initialReconnectDelay=100&maxReconnectDelay=30000&useExponentialBackOff=true");
        Runnable runnable = new Runnable() { // from class: org.apache.activemq.usecases.DurableSubscriptionOfflineTest.2
            final String payLoad = new String(new byte[DurableSubProcessConcurrentCommitActivateNoDuplicateTest.CARGO_SIZE]);

            @Override // java.lang.Runnable
            public void run() {
                try {
                    Connection createConnection = DurableSubscriptionOfflineTest.this.createConnection();
                    Session createSession2 = createConnection.createSession(true, 0);
                    MessageProducer createProducer = createSession2.createProducer(DurableSubscriptionOfflineTest.this.topic);
                    for (int i2 = 0; i2 < 1000; i2++) {
                        createProducer.send(createSession2.createTextMessage(this.payLoad));
                    }
                    DurableSubscriptionOfflineTest.LOG.info("About to commit: 1000");
                    createSession2.commit();
                    DurableSubscriptionOfflineTest.LOG.info("committed: 1000");
                    createConnection.close();
                } catch (Exception e) {
                    e.printStackTrace();
                    DurableSubscriptionOfflineTest.this.exceptions.add(e);
                }
            }
        };
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        for (int i2 = 0; i2 < 4; i2++) {
            Runnable runnable2 = new Runnable(i2, activeMQConnectionFactory) { // from class: org.apache.activemq.usecases.DurableSubscriptionOfflineTest.1CheckOrderClient
                final int id;
                int runCount = 0;
                final /* synthetic */ ActiveMQConnectionFactory val$clientFactory;

                {
                    this.val$clientFactory = activeMQConnectionFactory;
                    this.id = i2;
                }

                @Override // java.lang.Runnable
                public void run() {
                    Message receiveNoWait;
                    try {
                        synchronized (this) {
                            Connection createConnection = this.val$clientFactory.createConnection();
                            createConnection.setClientID("cli" + this.id);
                            createConnection.start();
                            Session createSession2 = createConnection.createSession(false, 2);
                            TopicSubscriber createDurableSubscriber = createSession2.createDurableSubscriber(DurableSubscriptionOfflineTest.this.topic, "SubsId", (String) null, true);
                            int i3 = 0;
                            this.runCount++;
                            int i4 = 0;
                            while (i4 < 500 && (receiveNoWait = createDurableSubscriber.receiveNoWait()) != null) {
                                i3++;
                                Assert.assertEquals(this.id + " expected order: runCount: " + this.runCount + " id: " + receiveNoWait.getJMSMessageID(), i3, new MessageId(receiveNoWait.getJMSMessageID()).getProducerSequenceId());
                                i4++;
                            }
                            DurableSubscriptionOfflineTest.LOG.info(createConnection.getClientID() + " peeked " + i4);
                            createSession2.close();
                            createConnection.close();
                        }
                    } catch (Throwable th) {
                        th.printStackTrace();
                        DurableSubscriptionOfflineTest.this.exceptions.add(th);
                    }
                }
            };
            for (int i3 = 0; i3 < 100; i3++) {
                newCachedThreadPool.execute(runnable2);
            }
        }
        newCachedThreadPool.execute(runnable);
        newCachedThreadPool.shutdown();
        newCachedThreadPool.awaitTermination(5L, TimeUnit.MINUTES);
        connection.close();
        Assert.assertTrue("no exceptions: " + this.exceptions, this.exceptions.isEmpty());
    }

    @Test(timeout = 60000)
    public void testUnmatchedSubUnsubscribeDeletesAll() throws Exception {
        Connection createConnection = createConnection("offCli1");
        Session createSession = createConnection.createSession(false, 1);
        createSession.createDurableSubscriber(this.topic, "SubsId", "filter = 'true'", true);
        createSession.close();
        createConnection.close();
        Connection createConnection2 = createConnection();
        Session createSession2 = createConnection2.createSession(false, 1);
        MessageProducer createProducer = createSession2.createProducer((Destination) null);
        int i = 0;
        for (int i2 = 0; i2 < 10; i2++) {
            boolean z = i2 % 2 == 0;
            if (z) {
                i++;
            }
            Message createMessage = createSession2.createMessage();
            createMessage.setStringProperty("filter", z ? "true" : "false");
            createProducer.send(this.topic, createMessage);
        }
        LOG.info("sent: " + i);
        Thread.sleep(1000L);
        createSession2.close();
        createConnection2.close();
        Connection createConnection3 = createConnection("offCli1");
        Session createSession3 = createConnection3.createSession(false, 1);
        createSession3.unsubscribe("SubsId");
        createSession3.close();
        createConnection3.close();
        Connection createConnection4 = createConnection("offCli1");
        Session createSession4 = createConnection4.createSession(false, 1);
        createSession4.createDurableSubscriber(this.topic, "SubsId", "filter = 'true'", true).setMessageListener(new DurableSubscriptionOfflineTestListener());
        Thread.sleep(3000L);
        createSession4.close();
        createConnection4.close();
        Assert.assertEquals(0L, r0.count);
    }

    @Test(timeout = 60000)
    public void testAllConsumed() throws Exception {
        Connection createConnection = createConnection("cli1");
        Session createSession = createConnection.createSession(false, 1);
        createSession.createDurableSubscriber(this.topic, "SubsId", "filter = 'true'", true);
        createSession.close();
        createConnection.close();
        Connection createConnection2 = createConnection("cli2");
        Session createSession2 = createConnection2.createSession(false, 1);
        createSession2.createDurableSubscriber(this.topic, "SubsId", "filter = 'true'", true);
        createSession2.close();
        createConnection2.close();
        Connection createConnection3 = createConnection();
        Session createSession3 = createConnection3.createSession(false, 1);
        MessageProducer createProducer = createSession3.createProducer((Destination) null);
        int i = 0;
        for (int i2 = 0; i2 < 10; i2++) {
            Message createMessage = createSession3.createMessage();
            createMessage.setStringProperty("filter", "true");
            createProducer.send(this.topic, createMessage);
            i++;
        }
        LOG.info("sent: " + i);
        Thread.sleep(1000L);
        createSession3.close();
        createConnection3.close();
        Connection createConnection4 = createConnection("cli1");
        Session createSession4 = createConnection4.createSession(false, 1);
        createSession4.createDurableSubscriber(this.topic, "SubsId", "filter = 'true'", true).setMessageListener(new DurableSubscriptionOfflineTestListener());
        Thread.sleep(3000L);
        createSession4.close();
        createConnection4.close();
        Assert.assertEquals(i, r0.count);
        LOG.info("cli2 pull 2");
        Connection createConnection5 = createConnection("cli2");
        Session createSession5 = createConnection5.createSession(false, 1);
        TopicSubscriber createDurableSubscriber = createSession5.createDurableSubscriber(this.topic, "SubsId", "filter = 'true'", true);
        Assert.assertNotNull("got message", createDurableSubscriber.receive(2000L));
        Assert.assertNotNull("got message", createDurableSubscriber.receive(2000L));
        createSession5.close();
        createConnection5.close();
        Connection createConnection6 = createConnection();
        Session createSession6 = createConnection6.createSession(false, 1);
        MessageProducer createProducer2 = createSession6.createProducer((Destination) null);
        int i3 = 0;
        int i4 = 0;
        while (i4 < 2) {
            Message createMessage2 = createSession6.createMessage();
            createMessage2.setStringProperty("filter", i4 == 1 ? "true" : "false");
            createProducer2.send(this.topic, createMessage2);
            i3++;
            i4++;
        }
        LOG.info("sent: " + i3);
        Thread.sleep(1000L);
        createSession6.close();
        createConnection6.close();
        LOG.info("cli1 again, should get 1 new ones");
        Connection createConnection7 = createConnection("cli1");
        Session createSession7 = createConnection7.createSession(false, 1);
        createSession7.createDurableSubscriber(this.topic, "SubsId", "filter = 'true'", true).setMessageListener(new DurableSubscriptionOfflineTestListener());
        Thread.sleep(3000L);
        createSession7.close();
        createConnection7.close();
        Assert.assertEquals(1L, r0.count);
    }

    @Test(timeout = 60000)
    public void testNoMissOnMatchingSubAfterRestart() throws Exception {
        Connection createConnection = createConnection("cli1");
        Session createSession = createConnection.createSession(false, 1);
        createSession.createDurableSubscriber(this.topic, "SubsId", "filter = 'true'", true);
        createSession.close();
        createConnection.close();
        Connection createConnection2 = createConnection();
        Session createSession2 = createConnection2.createSession(false, 1);
        MessageProducer createProducer = createSession2.createProducer((Destination) null);
        Message createMessage = createSession2.createMessage();
        createMessage.setStringProperty("filter", "true");
        createMessage.setIntProperty("ID", 0);
        createProducer.send(this.topic, createMessage);
        int i = 0 + 1;
        for (int i2 = i; i2 < 10; i2++) {
            Message createMessage2 = createSession2.createMessage();
            createMessage2.setStringProperty("filter", "false");
            createMessage2.setIntProperty("ID", i2);
            createProducer.send(this.topic, createMessage2);
            i++;
        }
        createConnection2.close();
        LOG.info("sent: " + i);
        Connection createConnection3 = createConnection("cli2");
        Session createSession3 = createConnection3.createSession(false, 1);
        createSession3.createDurableSubscriber(this.topic, "SubsId", "filter = 'true'", true);
        createSession3.close();
        createConnection3.close();
        destroyBroker();
        createBroker(false);
        Connection createConnection4 = createConnection();
        Session createSession4 = createConnection4.createSession(false, 1);
        MessageProducer createProducer2 = createSession4.createProducer((Destination) null);
        for (int i3 = i; i3 < 30; i3++) {
            Message createMessage3 = createSession4.createMessage();
            createMessage3.setStringProperty("filter", "true");
            createMessage3.setIntProperty("ID", i3);
            createProducer2.send(this.topic, createMessage3);
            i++;
        }
        createConnection4.close();
        LOG.info("sent: " + i);
        Connection createConnection5 = createConnection("cli2");
        Session createSession5 = createConnection5.createSession(false, 1);
        Assert.assertEquals("is message 10", 10L, createSession5.createDurableSubscriber(this.topic, "SubsId", "filter = 'true'", true).receive(3000L).getIntProperty("ID"));
        createSession5.close();
        createConnection5.close();
        Connection createConnection6 = createConnection("cli1");
        Session createSession6 = createConnection6.createSession(false, 1);
        TopicSubscriber createDurableSubscriber = createSession6.createDurableSubscriber(this.topic, "SubsId", "filter = 'true'", true);
        Assert.assertEquals("is message 0", 0L, createDurableSubscriber.receive(3000L).getIntProperty("ID"));
        Assert.assertEquals("is message 10", 10L, createDurableSubscriber.receive(3000L).getIntProperty("ID"));
        createSession6.close();
        createConnection6.close();
    }

    @Test(timeout = 640000)
    public void testInactiveSubscribeAfterBrokerRestart() throws Exception {
        Connection createConnection = createConnection("subs1");
        Connection createConnection2 = createConnection("subs2");
        Session createSession = createConnection.createSession(false, 1);
        Session createSession2 = createConnection2.createSession(false, 1);
        ActiveMQTopic activeMQTopic = new ActiveMQTopic("TEST.FOO");
        TopicSubscriber createDurableSubscriber = createSession.createDurableSubscriber(activeMQTopic, "consumerName");
        TopicSubscriber createDurableSubscriber2 = createSession2.createDurableSubscriber(activeMQTopic, "tearDownconsumerName");
        final DurableSubscriptionOfflineTestListener durableSubscriptionOfflineTestListener = new DurableSubscriptionOfflineTestListener("listener");
        createDurableSubscriber.setMessageListener(durableSubscriptionOfflineTestListener);
        createDurableSubscriber2.setMessageListener(durableSubscriptionOfflineTestListener);
        MessageProducer createProducer = createSession.createProducer(activeMQTopic);
        createProducer.setDeliveryMode(2);
        for (int i = 0; i < 20; i++) {
            if (i == 10) {
                createDurableSubscriber2.close();
                createConnection2.close();
            }
            createProducer.send(createSession.createTextMessage("test-" + i));
        }
        destroyBroker();
        createBroker(false);
        Connection createConnection3 = createConnection("subs2");
        createConnection3.createSession(false, 1).createDurableSubscriber(activeMQTopic, "tearDownconsumerName").setMessageListener(durableSubscriptionOfflineTestListener);
        LOG.info("waiting for messages to flow");
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.DurableSubscriptionOfflineTest.3
            public boolean isSatisified() throws Exception {
                return durableSubscriptionOfflineTestListener.count >= 40;
            }
        });
        Assert.assertTrue("At least message 40 must be received, count=" + durableSubscriptionOfflineTestListener.count, 40 <= durableSubscriptionOfflineTestListener.count);
        createSession.close();
        createConnection3.close();
    }
}
