package org.apache.activemq.network;

import jakarta.jms.JMSException;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.TextMessage;
import jakarta.jms.TopicSubscriber;
import java.lang.reflect.Field;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.advisory.AdvisoryBroker;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.broker.region.virtual.CompositeQueue;
import org.apache.activemq.broker.region.virtual.CompositeTopic;
import org.apache.activemq.broker.region.virtual.VirtualDestination;
import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.plugin.java.JavaRuntimeConfigurationBroker;
import org.apache.activemq.plugin.java.JavaRuntimeConfigurationPlugin;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/activemq/network/VirtualConsumerDemandTest.class */
public class VirtualConsumerDemandTest extends DynamicNetworkTestSupport {
    protected static final int MESSAGE_COUNT = 10;
    private static final Logger LOG = LoggerFactory.getLogger(VirtualConsumerDemandTest.class);
    protected JavaRuntimeConfigurationBroker runtimeBroker;
    protected String consumerName = "durableSubs";
    protected String testQueueName = "include.test.foo";
    private final boolean isDuplex;
    private final boolean isUseVirtualDestSubsOnCreation;
    protected NetworkConnector connector;
    protected AdvisoryBroker remoteAdvisoryBroker;

    @Parameterized.Parameters
    public static Collection<Object[]> data() {
        return Arrays.asList(new Object[]{false, true}, new Object[]{true, false}, new Object[]{true, true}, new Object[]{false, false});
    }

    public VirtualConsumerDemandTest(boolean z, boolean z2) {
        this.isDuplex = z;
        this.isUseVirtualDestSubsOnCreation = z2;
    }

    @Test(timeout = 60000)
    public void testVirtualTopics() throws Exception {
        Assume.assumeTrue(this.isUseVirtualDestSubsOnCreation);
        doSetUp(true, null);
        MessageConsumer virtualDestinationAdvisoryConsumer = getVirtualDestinationAdvisoryConsumer("VirtualTopic.>");
        MessageProducer createProducer = this.localSession.createProducer(new ActiveMQTopic("VirtualTopic.include.test.bar"));
        MessageProducer createProducer2 = this.localSession.createProducer(new ActiveMQTopic("VirtualTopic.include.test.bar2"));
        MessageProducer createProducer3 = this.localSession.createProducer(new ActiveMQTopic("VirtualTopic.include.test.bar3"));
        Thread.sleep(2000L);
        TextMessage createTextMessage = this.localSession.createTextMessage("test");
        DestinationStatistics destinationStatistics = this.localBroker.getDestination(new ActiveMQTopic("VirtualTopic.include.test.bar")).getDestinationStatistics();
        DestinationStatistics destinationStatistics2 = this.localBroker.getDestination(new ActiveMQTopic("VirtualTopic.include.test.bar2")).getDestinationStatistics();
        DestinationStatistics destinationStatistics3 = this.localBroker.getDestination(new ActiveMQTopic("VirtualTopic.include.test.bar3")).getDestinationStatistics();
        DestinationStatistics destinationStatistics4 = this.remoteBroker.getDestination(new ActiveMQQueue("Consumer.cons1.VirtualTopic.include.test.bar")).getDestinationStatistics();
        DestinationStatistics destinationStatistics5 = this.remoteBroker.getDestination(new ActiveMQQueue("Consumer.cons1.VirtualTopic.include.test.bar2")).getDestinationStatistics();
        waitForConsumerCount(destinationStatistics, 1);
        waitForConsumerCount(destinationStatistics2, 1);
        createProducer.send(createTextMessage);
        createProducer2.send(this.localSession.createTextMessage("test2"));
        createProducer3.send(this.localSession.createTextMessage("test3"));
        waitForDispatchFromLocalBroker(destinationStatistics, 1);
        waitForDispatchFromLocalBroker(destinationStatistics2, 1);
        assertLocalBrokerStatistics(destinationStatistics, 1);
        assertLocalBrokerStatistics(destinationStatistics2, 1);
        Assert.assertEquals("remote dest messages", 1L, destinationStatistics4.getMessages().getCount());
        Assert.assertEquals("remote dest messages", 1L, destinationStatistics5.getMessages().getCount());
        assertRemoteAdvisoryCount(virtualDestinationAdvisoryConsumer, 2);
        assertAdvisoryBrokerCounts(1, 2, 2);
        Thread.sleep(1000L);
        Assert.assertEquals("local broker dest stat dispatched", 0L, destinationStatistics3.getDispatched().getCount());
    }

    @Test(timeout = 60000)
    public void testVirtualTopicWithConsumer() throws Exception {
        doSetUp(true, null);
        MessageConsumer virtualDestinationAdvisoryConsumer = getVirtualDestinationAdvisoryConsumer("VirtualTopic.>");
        MessageProducer createProducer = this.localSession.createProducer(new ActiveMQTopic("VirtualTopic.include.test.bar"));
        Thread.sleep(2000L);
        TextMessage createTextMessage = this.localSession.createTextMessage("test");
        DestinationStatistics destinationStatistics = this.localBroker.getDestination(new ActiveMQTopic("VirtualTopic.include.test.bar")).getDestinationStatistics();
        MessageConsumer createConsumer = this.remoteSession.createConsumer(new ActiveMQQueue("Consumer.cons1.VirtualTopic.include.test.bar"));
        waitForConsumerCount(destinationStatistics, 1);
        createProducer.send(createTextMessage);
        Assert.assertNotNull(createConsumer.receive(5000L));
        waitForDispatchFromLocalBroker(destinationStatistics, 1);
        assertLocalBrokerStatistics(destinationStatistics, 1);
        assertRemoteAdvisoryCount(virtualDestinationAdvisoryConsumer, 2, 1);
        if (this.isUseVirtualDestSubsOnCreation) {
            assertAdvisoryBrokerCounts(1, 2, 1);
        } else {
            assertAdvisoryBrokerCounts(1, 1, 0);
        }
    }

    @Test(timeout = 60000)
    public void testVirtualTopicWithConsumerGoOffline() throws Exception {
        Assume.assumeTrue(this.isUseVirtualDestSubsOnCreation);
        doSetUp(true, null);
        MessageConsumer virtualDestinationAdvisoryConsumer = getVirtualDestinationAdvisoryConsumer("VirtualTopic.>");
        MessageProducer createProducer = this.localSession.createProducer(new ActiveMQTopic("VirtualTopic.include.test.bar"));
        Thread.sleep(2000L);
        TextMessage createTextMessage = this.localSession.createTextMessage("test");
        DestinationStatistics destinationStatistics = this.localBroker.getDestination(new ActiveMQTopic("VirtualTopic.include.test.bar")).getDestinationStatistics();
        MessageConsumer createConsumer = this.remoteSession.createConsumer(new ActiveMQQueue("Consumer.cons1.VirtualTopic.include.test.bar"));
        waitForConsumerCount(destinationStatistics, 1);
        createProducer.send(createTextMessage);
        Assert.assertNotNull(createConsumer.receive(5000L));
        waitForDispatchFromLocalBroker(destinationStatistics, 1);
        assertLocalBrokerStatistics(destinationStatistics, 1);
        createConsumer.close();
        Thread.sleep(2000L);
        createProducer.send(createTextMessage);
        waitForDispatchFromLocalBroker(destinationStatistics, 2);
        assertLocalBrokerStatistics(destinationStatistics, 2);
        Assert.assertNotNull(this.remoteSession.createConsumer(new ActiveMQQueue("Consumer.cons1.VirtualTopic.include.test.bar")).receive(5000L));
        assertRemoteAdvisoryCount(virtualDestinationAdvisoryConsumer, 4);
        assertAdvisoryBrokerCounts(1, 2, 1);
    }

    @Test(timeout = 60000)
    public void testDynamicFlow() throws Exception {
        testDynamicFlow(false);
    }

    @Test(timeout = 60000)
    public void testDynamicFlowForceDurable() throws Exception {
        testDynamicFlow(true);
    }

    protected void testDynamicFlow(boolean z) throws Exception {
        Assume.assumeTrue(this.isUseVirtualDestSubsOnCreation);
        doSetUp(true, null, true, z);
        MessageConsumer virtualDestinationAdvisoryConsumer = getVirtualDestinationAdvisoryConsumer(this.testTopicName);
        this.runtimeBroker.setVirtualDestinations(new VirtualDestination[]{createCompositeTopic(this.testTopicName, new ActiveMQQueue("include.test.bar.bridge"))}, true);
        MessageProducer createProducer = this.localSession.createProducer(this.included);
        Thread.sleep(2000L);
        TextMessage createTextMessage = this.localSession.createTextMessage("test");
        DestinationStatistics destinationStatistics = this.localBroker.getDestination(this.included).getDestinationStatistics();
        DestinationStatistics destinationStatistics2 = this.remoteBroker.getDestination(new ActiveMQQueue("include.test.bar.bridge")).getDestinationStatistics();
        waitForConsumerCount(destinationStatistics, 1);
        assertNCDurableSubsCount(this.localBroker, this.included, z ? 1 : 0);
        createProducer.send(createTextMessage);
        waitForDispatchFromLocalBroker(destinationStatistics, 1);
        assertLocalBrokerStatistics(destinationStatistics, 1);
        Assert.assertEquals("remote dest messages", 1L, destinationStatistics2.getMessages().getCount());
        assertRemoteAdvisoryCount(virtualDestinationAdvisoryConsumer, 1);
        assertAdvisoryBrokerCounts(1, 1, 1);
    }

    @Test(timeout = 30000)
    public void testAdvisoryReplayMultipleAdvisoryConsumers() throws Exception {
        Assume.assumeTrue(this.isUseVirtualDestSubsOnCreation);
        ActiveMQQueue activeMQQueue = new ActiveMQQueue("include.test.bar.bridge");
        doSetUp(true, new VirtualDestination[]{createCompositeTopic(this.testTopicName, activeMQQueue)}, true, true);
        this.runtimeBroker.addNewDestination(activeMQQueue);
        MessageConsumer virtualDestinationAdvisoryConsumer = getVirtualDestinationAdvisoryConsumer(this.testTopicName);
        assertRemoteAdvisoryCount(virtualDestinationAdvisoryConsumer, 1);
        MessageConsumer virtualDestinationAdvisoryConsumer2 = getVirtualDestinationAdvisoryConsumer(this.testTopicName);
        assertRemoteAdvisoryCount(virtualDestinationAdvisoryConsumer, 0);
        assertRemoteAdvisoryCount(virtualDestinationAdvisoryConsumer2, 1);
    }

    @Test(timeout = 60000)
    public void testSecondNonIncludedCompositeTopicForwardSameQueue() throws Exception {
        Assume.assumeTrue(this.isUseVirtualDestSubsOnCreation);
        doSetUp(true, null);
        MessageConsumer virtualDestinationAdvisoryConsumer = getVirtualDestinationAdvisoryConsumer(this.testTopicName);
        VirtualDestination createCompositeTopic = createCompositeTopic("include.test.bar2", new ActiveMQQueue("include.test.bar.bridge"));
        this.runtimeBroker.setVirtualDestinations(new VirtualDestination[]{createCompositeTopic}, true);
        Thread.sleep(2000L);
        this.runtimeBroker.setVirtualDestinations(new VirtualDestination[]{createCompositeTopic, createCompositeTopic(this.testTopicName, new ActiveMQQueue("include.test.bar.bridge"))}, true);
        Thread.sleep(2000L);
        MessageProducer createProducer = this.localSession.createProducer(this.included);
        TextMessage createTextMessage = this.localSession.createTextMessage("test");
        DestinationStatistics destinationStatistics = this.localBroker.getDestination(this.included).getDestinationStatistics();
        DestinationStatistics destinationStatistics2 = this.remoteBroker.getDestination(new ActiveMQQueue("include.test.bar.bridge")).getDestinationStatistics();
        waitForConsumerCount(destinationStatistics, 1);
        createProducer.send(createTextMessage);
        waitForDispatchFromLocalBroker(destinationStatistics, 1);
        assertLocalBrokerStatistics(destinationStatistics, 1);
        Assert.assertEquals("remote dest messages", 1L, destinationStatistics2.getMessages().getCount());
        assertRemoteAdvisoryCount(virtualDestinationAdvisoryConsumer, 1);
        assertAdvisoryBrokerCounts(2, 2, 2);
    }

    @Test(timeout = 60000)
    public void testSecondNonIncludedCompositeTopic() throws Exception {
        Assume.assumeTrue(this.isUseVirtualDestSubsOnCreation);
        doSetUp(true, null);
        MessageConsumer virtualDestinationAdvisoryConsumer = getVirtualDestinationAdvisoryConsumer(this.testTopicName);
        VirtualDestination createCompositeTopic = createCompositeTopic("include.test.bar2", new ActiveMQQueue("include.test.bar.bridge2"));
        this.runtimeBroker.setVirtualDestinations(new VirtualDestination[]{createCompositeTopic}, true);
        Thread.sleep(2000L);
        this.runtimeBroker.setVirtualDestinations(new VirtualDestination[]{createCompositeTopic, createCompositeTopic(this.testTopicName, new ActiveMQQueue("include.test.bar.bridge"))}, true);
        Thread.sleep(2000L);
        MessageProducer createProducer = this.localSession.createProducer(this.included);
        TextMessage createTextMessage = this.localSession.createTextMessage("test");
        DestinationStatistics destinationStatistics = this.localBroker.getDestination(this.included).getDestinationStatistics();
        DestinationStatistics destinationStatistics2 = this.remoteBroker.getDestination(new ActiveMQQueue("include.test.bar.bridge")).getDestinationStatistics();
        waitForConsumerCount(destinationStatistics, 1);
        createProducer.send(createTextMessage);
        waitForDispatchFromLocalBroker(destinationStatistics, 1);
        assertLocalBrokerStatistics(destinationStatistics, 1);
        Assert.assertEquals("remote dest messages", 1L, destinationStatistics2.getMessages().getCount());
        assertRemoteAdvisoryCount(virtualDestinationAdvisoryConsumer, 1);
        assertAdvisoryBrokerCounts(2, 1, 1);
    }

    @Test(timeout = 60000)
    public void testNoUseVirtualDestinationSubscriptionsOnCreation() throws Exception {
        Assume.assumeTrue(!this.isUseVirtualDestSubsOnCreation);
        doSetUp(true, null);
        MessageConsumer virtualDestinationAdvisoryConsumer = getVirtualDestinationAdvisoryConsumer(this.testTopicName);
        this.runtimeBroker.setVirtualDestinations(new VirtualDestination[]{createCompositeTopic(this.testTopicName, new ActiveMQQueue("include.test.bar.bridge"))}, true);
        MessageProducer createProducer = this.localSession.createProducer(this.included);
        Thread.sleep(2000L);
        TextMessage createTextMessage = this.localSession.createTextMessage("test");
        DestinationStatistics destinationStatistics = this.localBroker.getDestination(this.included).getDestinationStatistics();
        DestinationStatistics destinationStatistics2 = this.remoteBroker.getDestination(new ActiveMQQueue("include.test.bar.bridge")).getDestinationStatistics();
        createProducer.send(createTextMessage);
        Thread.sleep(2000L);
        waitForDispatchFromLocalBroker(destinationStatistics, 0);
        assertLocalBrokerStatistics(destinationStatistics, 0);
        Assert.assertEquals("remote dest messages", 0L, destinationStatistics2.getMessages().getCount());
        assertRemoteAdvisoryCount(virtualDestinationAdvisoryConsumer, 0);
        assertAdvisoryBrokerCounts(1, 0, 0);
    }

    @Test(timeout = 60000)
    public void testTwoTargetsRemove1() throws Exception {
        Assume.assumeTrue(this.isUseVirtualDestSubsOnCreation);
        doSetUp(true, null);
        MessageConsumer virtualDestinationAdvisoryConsumer = getVirtualDestinationAdvisoryConsumer(this.testTopicName);
        this.runtimeBroker.setVirtualDestinations(new VirtualDestination[]{createCompositeTopic(this.testTopicName, new ActiveMQQueue("include.test.bar.bridge"), new ActiveMQQueue("include.test.bar.bridge2"))}, true);
        MessageProducer createProducer = this.localSession.createProducer(this.included);
        Thread.sleep(2000L);
        TextMessage createTextMessage = this.localSession.createTextMessage("test");
        DestinationStatistics destinationStatistics = this.localBroker.getDestination(this.included).getDestinationStatistics();
        DestinationStatistics destinationStatistics2 = this.remoteBroker.getDestination(new ActiveMQQueue("include.test.bar.bridge")).getDestinationStatistics();
        DestinationStatistics destinationStatistics3 = this.remoteBroker.getDestination(new ActiveMQQueue("include.test.bar.bridge2")).getDestinationStatistics();
        Thread.sleep(2000L);
        assertRemoteAdvisoryCount(virtualDestinationAdvisoryConsumer, 2);
        assertAdvisoryBrokerCounts(1, 2, 2);
        waitForConsumerCount(destinationStatistics, 1);
        createProducer.send(createTextMessage);
        waitForDispatchFromLocalBroker(destinationStatistics, 1);
        assertLocalBrokerStatistics(destinationStatistics, 1);
        Assert.assertEquals("remote dest messages", 1L, destinationStatistics2.getMessages().getCount());
        Assert.assertEquals("remote2 dest messages", 1L, destinationStatistics3.getMessages().getCount());
        this.runtimeBroker.setVirtualDestinations(new VirtualDestination[]{createCompositeTopic(this.testTopicName, new ActiveMQQueue("include.test.bar.bridge"))}, true);
        Thread.sleep(2000L);
        createProducer.send(createTextMessage);
        waitForDispatchFromLocalBroker(destinationStatistics, 2);
        assertLocalBrokerStatistics(destinationStatistics, 2);
        Assert.assertEquals("remote dest messages", 2L, destinationStatistics2.getMessages().getCount());
        Assert.assertEquals("remote2 dest messages", 1L, destinationStatistics3.getMessages().getCount());
        assertRemoteAdvisoryCount(virtualDestinationAdvisoryConsumer, 3);
        assertAdvisoryBrokerCounts(1, 1, 1);
    }

    @Test(timeout = 60000)
    public void testTwoTargetsRemove1Destination() throws Exception {
        Assume.assumeTrue(this.isUseVirtualDestSubsOnCreation);
        doSetUp(true, null);
        MessageConsumer virtualDestinationAdvisoryConsumer = getVirtualDestinationAdvisoryConsumer(this.testTopicName);
        this.runtimeBroker.setVirtualDestinations(new VirtualDestination[]{createCompositeTopic(this.testTopicName, new ActiveMQQueue("include.test.bar.bridge"), new ActiveMQQueue("include.test.bar.bridge2"))}, true);
        MessageProducer createProducer = this.localSession.createProducer(this.included);
        TextMessage createTextMessage = this.localSession.createTextMessage("test");
        Thread.sleep(1000L);
        DestinationStatistics destinationStatistics = this.localBroker.getDestination(this.included).getDestinationStatistics();
        DestinationStatistics destinationStatistics2 = this.remoteBroker.getDestination(new ActiveMQQueue("include.test.bar.bridge")).getDestinationStatistics();
        DestinationStatistics destinationStatistics3 = this.remoteBroker.getDestination(new ActiveMQQueue("include.test.bar.bridge2")).getDestinationStatistics();
        waitForConsumerCount(destinationStatistics, 1);
        createProducer.send(createTextMessage);
        waitForDispatchFromLocalBroker(destinationStatistics, 1);
        assertLocalBrokerStatistics(destinationStatistics, 1);
        Assert.assertEquals("remote dest messages", 1L, destinationStatistics2.getMessages().getCount());
        Assert.assertEquals("remote2 dest messages", 1L, destinationStatistics3.getMessages().getCount());
        this.remoteBroker.removeDestination(new ActiveMQQueue("include.test.bar.bridge2"));
        Thread.sleep(2000L);
        assertRemoteAdvisoryCount(virtualDestinationAdvisoryConsumer, 3);
        assertAdvisoryBrokerCounts(1, 1, 1);
        createProducer.send(createTextMessage);
        waitForDispatchFromLocalBroker(destinationStatistics, 2);
        assertLocalBrokerStatistics(destinationStatistics, 2);
        Assert.assertEquals("remote dest messages", 2L, destinationStatistics2.getMessages().getCount());
        assertRemoteAdvisoryCount(virtualDestinationAdvisoryConsumer, 1);
        assertAdvisoryBrokerCounts(1, 2, 2);
    }

    @Test(timeout = 60000)
    public void testTwoCompositeTopicsRemove1() throws Exception {
        Assume.assumeTrue(this.isUseVirtualDestSubsOnCreation);
        doSetUp(true, null);
        VirtualDestination createCompositeTopic = createCompositeTopic(this.testTopicName, new ActiveMQQueue("include.test.bar.bridge"));
        this.runtimeBroker.setVirtualDestinations(new VirtualDestination[]{createCompositeTopic, createCompositeTopic(this.testTopicName + "2", new ActiveMQQueue("include.test.bar.bridge2"))}, true);
        MessageProducer createProducer = this.localSession.createProducer(this.included);
        TextMessage createTextMessage = this.localSession.createTextMessage("test");
        Thread.sleep(1000L);
        DestinationStatistics destinationStatistics = this.localBroker.getDestination(this.included).getDestinationStatistics();
        DestinationStatistics destinationStatistics2 = this.remoteBroker.getDestination(new ActiveMQQueue("include.test.bar.bridge")).getDestinationStatistics();
        waitForConsumerCount(destinationStatistics, 1);
        createProducer.send(createTextMessage);
        waitForDispatchFromLocalBroker(destinationStatistics, 1);
        assertLocalBrokerStatistics(destinationStatistics, 1);
        Assert.assertEquals("remote dest messages", 1L, destinationStatistics2.getMessages().getCount());
        assertAdvisoryBrokerCounts(2, 1, 1);
        this.runtimeBroker.setVirtualDestinations(new VirtualDestination[]{createCompositeTopic}, true);
        Thread.sleep(2000L);
        assertAdvisoryBrokerCounts(1, 1, 1);
        createProducer.send(createTextMessage);
        waitForDispatchFromLocalBroker(destinationStatistics, 2);
        assertLocalBrokerStatistics(destinationStatistics, 2);
        Assert.assertEquals("remote dest messages", 2L, destinationStatistics2.getMessages().getCount());
    }

    @Test(timeout = 60000)
    public void testTwoTargetsRemoveBoth() throws Exception {
        Assume.assumeTrue(this.isUseVirtualDestSubsOnCreation);
        doSetUp(true, null);
        MessageConsumer virtualDestinationAdvisoryConsumer = getVirtualDestinationAdvisoryConsumer(this.testTopicName);
        this.runtimeBroker.setVirtualDestinations(new VirtualDestination[]{createCompositeTopic(this.testTopicName, new ActiveMQQueue("include.test.bar.bridge"), new ActiveMQQueue("include.test.bar.bridge2"))}, true);
        MessageProducer createProducer = this.localSession.createProducer(this.included);
        TextMessage createTextMessage = this.localSession.createTextMessage("test");
        Thread.sleep(1000L);
        DestinationStatistics destinationStatistics = this.localBroker.getDestination(this.included).getDestinationStatistics();
        DestinationStatistics destinationStatistics2 = this.remoteBroker.getDestination(new ActiveMQQueue("include.test.bar.bridge")).getDestinationStatistics();
        DestinationStatistics destinationStatistics3 = this.remoteBroker.getDestination(new ActiveMQQueue("include.test.bar.bridge2")).getDestinationStatistics();
        waitForConsumerCount(destinationStatistics, 1);
        createProducer.send(createTextMessage);
        waitForDispatchFromLocalBroker(destinationStatistics, 1);
        assertLocalBrokerStatistics(destinationStatistics, 1);
        Assert.assertEquals("remote dest messages", 1L, destinationStatistics2.getMessages().getCount());
        Assert.assertEquals("remote2 dest messages", 1L, destinationStatistics3.getMessages().getCount());
        this.runtimeBroker.setVirtualDestinations(new VirtualDestination[0], true);
        Thread.sleep(2000L);
        createProducer.send(createTextMessage);
        Thread.sleep(2000L);
        assertLocalBrokerStatistics(destinationStatistics, 1);
        Assert.assertEquals("remote dest messages", 1L, destinationStatistics2.getMessages().getCount());
        Assert.assertEquals("remote2 dest messages", 1L, destinationStatistics3.getMessages().getCount());
        assertRemoteAdvisoryCount(virtualDestinationAdvisoryConsumer, 4);
        assertAdvisoryBrokerCounts(0, 0, 0);
    }

    @Test(timeout = 60000)
    public void testDestinationAddedFirst() throws Exception {
        Assume.assumeTrue(this.isUseVirtualDestSubsOnCreation);
        doSetUp(true, null);
        MessageConsumer virtualDestinationAdvisoryConsumer = getVirtualDestinationAdvisoryConsumer(this.testTopicName);
        this.remoteBroker.getBroker().addDestination(this.remoteBroker.getAdminConnectionContext(), new ActiveMQQueue("include.test.bar.bridge"), false);
        Thread.sleep(2000L);
        VirtualDestination createCompositeTopic = createCompositeTopic(this.testTopicName, new ActiveMQQueue("include.test.bar.bridge"));
        DestinationStatistics destinationStatistics = this.remoteBroker.getDestination(new ActiveMQQueue("include.test.bar.bridge")).getDestinationStatistics();
        this.runtimeBroker.setVirtualDestinations(new VirtualDestination[]{createCompositeTopic}, true);
        MessageProducer createProducer = this.localSession.createProducer(this.included);
        TextMessage createTextMessage = this.localSession.createTextMessage("test");
        Thread.sleep(1000L);
        DestinationStatistics destinationStatistics2 = this.localBroker.getDestination(this.included).getDestinationStatistics();
        waitForConsumerCount(destinationStatistics2, 1);
        createProducer.send(createTextMessage);
        waitForDispatchFromLocalBroker(destinationStatistics2, 1);
        assertLocalBrokerStatistics(destinationStatistics2, 1);
        Assert.assertEquals("remote dest messages", 1L, destinationStatistics.getMessages().getCount());
        assertRemoteAdvisoryCount(virtualDestinationAdvisoryConsumer, 1);
        assertAdvisoryBrokerCounts(1, 1, 1);
    }

    @Test(timeout = 60000)
    public void testWithConsumer() throws Exception {
        doSetUp(true, null);
        MessageConsumer virtualDestinationAdvisoryConsumer = getVirtualDestinationAdvisoryConsumer(this.testTopicName);
        this.runtimeBroker.setVirtualDestinations(new VirtualDestination[]{createCompositeTopic(this.testTopicName, new ActiveMQQueue("include.test.bar.bridge"))}, true);
        MessageProducer createProducer = this.localSession.createProducer(this.included);
        TextMessage createTextMessage = this.localSession.createTextMessage("test");
        Thread.sleep(1000L);
        final DestinationStatistics destinationStatistics = this.localBroker.getDestination(this.included).getDestinationStatistics();
        MessageConsumer createConsumer = this.remoteSession.createConsumer(new ActiveMQQueue("include.test.bar.bridge"));
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.network.VirtualConsumerDemandTest.1
            public boolean isSatisified() throws Exception {
                return 1 == destinationStatistics.getConsumers().getCount();
            }
        });
        createProducer.send(createTextMessage);
        Assert.assertNotNull(createConsumer.receive(5000L));
        waitForDispatchFromLocalBroker(destinationStatistics, 1);
        assertLocalBrokerStatistics(destinationStatistics, 1);
        assertRemoteAdvisoryCount(virtualDestinationAdvisoryConsumer, 2, 1);
        if (this.isUseVirtualDestSubsOnCreation) {
            assertAdvisoryBrokerCounts(1, 2, 1);
        } else {
            assertAdvisoryBrokerCounts(1, 1, 0);
        }
    }

    @Test(timeout = 60000)
    public void testWith2ConsumersRemove1() throws Exception {
        doSetUp(true, null);
        MessageConsumer virtualDestinationAdvisoryConsumer = getVirtualDestinationAdvisoryConsumer(this.testTopicName);
        this.runtimeBroker.setVirtualDestinations(new VirtualDestination[]{createCompositeTopic(this.testTopicName, new ActiveMQQueue("include.test.bar.bridge"))}, true);
        MessageProducer createProducer = this.localSession.createProducer(this.included);
        TextMessage createTextMessage = this.localSession.createTextMessage("test");
        Thread.sleep(1000L);
        DestinationStatistics destinationStatistics = this.localBroker.getDestination(this.included).getDestinationStatistics();
        MessageConsumer createConsumer = this.remoteSession.createConsumer(new ActiveMQQueue("include.test.bar.bridge"));
        MessageConsumer createConsumer2 = this.remoteSession.createConsumer(new ActiveMQQueue("include.test.bar.bridge"));
        waitForConsumerCount(destinationStatistics, 1);
        createProducer.send(createTextMessage);
        waitForDispatchFromLocalBroker(destinationStatistics, 1);
        Assert.assertTrue((createConsumer.receive(5000L) == null && createConsumer2.receive(5000L) == null) ? false : true);
        assertLocalBrokerStatistics(destinationStatistics, 1);
        createConsumer2.close();
        createProducer.send(createTextMessage);
        waitForDispatchFromLocalBroker(destinationStatistics, 2);
        assertLocalBrokerStatistics(destinationStatistics, 2);
        Assert.assertNotNull(createConsumer.receive(5000L));
        assertRemoteAdvisoryCount(virtualDestinationAdvisoryConsumer, 4, 3);
        if (this.isUseVirtualDestSubsOnCreation) {
            assertAdvisoryBrokerCounts(1, 2, 1);
        } else {
            assertAdvisoryBrokerCounts(1, 1, 0);
        }
    }

    @Test(timeout = 60000)
    public void testWith2ConsumersRemoveBoth() throws Exception {
        Assume.assumeTrue(!this.isUseVirtualDestSubsOnCreation);
        doSetUp(true, null);
        MessageConsumer virtualDestinationAdvisoryConsumer = getVirtualDestinationAdvisoryConsumer(this.testTopicName);
        this.runtimeBroker.setVirtualDestinations(new VirtualDestination[]{createCompositeTopic(this.testTopicName, new ActiveMQQueue("include.test.bar.bridge"))}, true);
        MessageProducer createProducer = this.localSession.createProducer(this.included);
        TextMessage createTextMessage = this.localSession.createTextMessage("test");
        Thread.sleep(1000L);
        DestinationStatistics destinationStatistics = this.localBroker.getDestination(this.included).getDestinationStatistics();
        MessageConsumer createConsumer = this.remoteSession.createConsumer(new ActiveMQQueue("include.test.bar.bridge"));
        MessageConsumer createConsumer2 = this.remoteSession.createConsumer(new ActiveMQQueue("include.test.bar.bridge"));
        waitForConsumerCount(destinationStatistics, 1);
        assertAdvisoryBrokerCounts(1, 2, 0);
        createProducer.send(createTextMessage);
        waitForDispatchFromLocalBroker(destinationStatistics, 1);
        Assert.assertTrue((createConsumer.receive(5000L) == null && createConsumer2.receive(5000L) == null) ? false : true);
        assertLocalBrokerStatistics(destinationStatistics, 1);
        createConsumer.close();
        createConsumer2.close();
        Thread.sleep(2000L);
        createProducer.send(createTextMessage);
        Thread.sleep(2000L);
        assertLocalBrokerStatistics(destinationStatistics, 1);
        assertRemoteAdvisoryCount(virtualDestinationAdvisoryConsumer, 4);
        assertAdvisoryBrokerCounts(1, 0, 0);
    }

    @Test(timeout = 60000)
    public void testExcluded() throws Exception {
        doSetUp(true, null);
        MessageConsumer virtualDestinationAdvisoryConsumer = getVirtualDestinationAdvisoryConsumer(this.testTopicName);
        this.runtimeBroker.setVirtualDestinations(new VirtualDestination[]{createCompositeTopic("exclude.test.bar", new ActiveMQQueue("exclude.test.bar.bridge"))}, true);
        MessageProducer createProducer = this.localSession.createProducer(this.excluded);
        TextMessage createTextMessage = this.localSession.createTextMessage("test");
        Thread.sleep(1000L);
        MessageConsumer createConsumer = this.remoteSession.createConsumer(new ActiveMQQueue("exclude.test.bar.bridge"));
        Thread.sleep(2000L);
        createProducer.send(createTextMessage);
        Assert.assertNull(createConsumer.receive(5000L));
        DestinationStatistics destinationStatistics = this.localBroker.getDestination(this.excluded).getDestinationStatistics();
        Assert.assertEquals("broker consumer count", 0L, destinationStatistics.getConsumers().getCount());
        assertLocalBrokerStatistics(destinationStatistics, 0);
        assertRemoteAdvisoryCount(virtualDestinationAdvisoryConsumer, 0);
        if (this.isUseVirtualDestSubsOnCreation) {
            assertAdvisoryBrokerCounts(1, 2, 1);
        } else {
            assertAdvisoryBrokerCounts(1, 1, 0);
        }
    }

    @Test(timeout = 60000)
    public void testSourceQueue() throws Exception {
        doSetUp(true, null);
        MessageConsumer queueVirtualDestinationAdvisoryConsumer = getQueueVirtualDestinationAdvisoryConsumer(this.testQueueName);
        this.runtimeBroker.setVirtualDestinations(new VirtualDestination[]{createCompositeQueue(this.testQueueName, new ActiveMQQueue("include.test.foo.bridge"))}, true);
        MessageProducer createProducer = this.localSession.createProducer(new ActiveMQQueue(this.testQueueName));
        Thread.sleep(2000L);
        TextMessage createTextMessage = this.localSession.createTextMessage("test");
        DestinationStatistics destinationStatistics = this.localBroker.getDestination(new ActiveMQQueue(this.testQueueName)).getDestinationStatistics();
        MessageConsumer createConsumer = this.remoteSession.createConsumer(new ActiveMQQueue("include.test.foo.bridge"));
        waitForConsumerCount(destinationStatistics, 1);
        createProducer.send(createTextMessage);
        Assert.assertNotNull(createConsumer.receive(5000L));
        DestinationStatistics destinationStatistics2 = this.remoteBroker.getDestination(new ActiveMQQueue(this.testQueueName)).getDestinationStatistics();
        waitForDispatchFromLocalBroker(destinationStatistics, 1);
        Assert.assertEquals("broker consumer count", 1L, destinationStatistics.getConsumers().getCount());
        assertLocalBrokerStatistics(destinationStatistics, 1);
        Assert.assertEquals("message count", 0L, destinationStatistics2.getMessages().getCount());
        assertRemoteAdvisoryCount(queueVirtualDestinationAdvisoryConsumer, 2, 1);
        if (this.isUseVirtualDestSubsOnCreation) {
            assertAdvisoryBrokerCounts(1, 2, 1);
        } else {
            assertAdvisoryBrokerCounts(1, 1, 0);
        }
    }

    @Test(timeout = 60000)
    public void testFlowRemoved() throws Exception {
        doSetUp(true, new VirtualDestination[]{createCompositeTopic(this.testTopicName, new ActiveMQQueue("include.test.bar.bridge"))});
        MessageConsumer virtualDestinationAdvisoryConsumer = getVirtualDestinationAdvisoryConsumer(this.testTopicName);
        Thread.sleep(2000L);
        this.remoteBroker.getBroker().addDestination(this.remoteBroker.getAdminConnectionContext(), new ActiveMQQueue("include.test.bar.bridge"), false);
        Thread.sleep(2000L);
        this.runtimeBroker.setVirtualDestinations(new VirtualDestination[0], true);
        MessageProducer createProducer = this.localSession.createProducer(this.included);
        Thread.sleep(2000L);
        TextMessage createTextMessage = this.localSession.createTextMessage("test");
        MessageConsumer createConsumer = this.remoteSession.createConsumer(new ActiveMQQueue("include.test.bar.bridge"));
        Thread.sleep(2000L);
        createProducer.send(createTextMessage);
        Assert.assertNull(createConsumer.receive(5000L));
        assertRemoteAdvisoryCount(virtualDestinationAdvisoryConsumer, 2, 0);
        assertAdvisoryBrokerCounts(0, 0, 0);
    }

    @Test(timeout = 60000)
    public void testReplay() throws Exception {
        Assume.assumeTrue(this.isUseVirtualDestSubsOnCreation);
        doSetUp(true, new VirtualDestination[]{createCompositeTopic(this.testTopicName, new ActiveMQQueue("include.test.bar.bridge"))}, false, false);
        MessageConsumer virtualDestinationAdvisoryConsumer = getVirtualDestinationAdvisoryConsumer(this.testTopicName);
        Thread.sleep(2000L);
        this.remoteBroker.getBroker().addDestination(this.remoteBroker.getAdminConnectionContext(), new ActiveMQQueue("include.test.bar.bridge"), false);
        Thread.sleep(2000L);
        this.localBroker.addNetworkConnector(this.connector);
        this.connector.start();
        Thread.sleep(2000L);
        assertRemoteAdvisoryCount(virtualDestinationAdvisoryConsumer, 1);
        assertAdvisoryBrokerCounts(1, 1, 1);
    }

    @Test(timeout = 60000)
    public void testReplayWithConsumer() throws Exception {
        doSetUp(true, new VirtualDestination[]{createCompositeTopic(this.testTopicName, new ActiveMQQueue("include.test.bar.bridge"))}, false, false);
        MessageConsumer virtualDestinationAdvisoryConsumer = getVirtualDestinationAdvisoryConsumer(this.testTopicName);
        Thread.sleep(2000L);
        this.remoteBroker.getBroker().addDestination(this.remoteBroker.getAdminConnectionContext(), new ActiveMQQueue("include.test.bar.bridge"), false);
        Thread.sleep(2000L);
        MessageProducer createProducer = this.localSession.createProducer(this.included);
        TextMessage createTextMessage = this.localSession.createTextMessage("test");
        MessageConsumer createConsumer = this.remoteSession.createConsumer(new ActiveMQQueue("include.test.bar.bridge"));
        Thread.sleep(2000L);
        this.localBroker.addNetworkConnector(this.connector);
        this.connector.start();
        Thread.sleep(2000L);
        createProducer.send(createTextMessage);
        Assert.assertNotNull(createConsumer.receive(5000L));
        assertRemoteAdvisoryCount(virtualDestinationAdvisoryConsumer, 2, 1);
        if (this.isUseVirtualDestSubsOnCreation) {
            assertAdvisoryBrokerCounts(1, 2, 1);
        } else {
            assertAdvisoryBrokerCounts(1, 1, 0);
        }
    }

    @Test(timeout = 60000)
    public void testRemovedIfNoConsumer() throws Exception {
        Assume.assumeTrue(this.isUseVirtualDestSubsOnCreation);
        doSetUp(true, new VirtualDestination[]{createCompositeTopic(this.testTopicName, new ActiveMQQueue("include.test.bar.bridge"))});
        MessageConsumer virtualDestinationAdvisoryConsumer = getVirtualDestinationAdvisoryConsumer(this.testTopicName);
        Thread.sleep(2000L);
        DestinationStatistics destinationStatistics = this.localBroker.getDestination(new ActiveMQQueue(this.testQueueName)).getDestinationStatistics();
        DestinationStatistics destinationStatistics2 = this.remoteBroker.getDestination(new ActiveMQQueue("include.test.bar.bridge")).getDestinationStatistics();
        Thread.sleep(2000L);
        assertAdvisoryBrokerCounts(1, 1, 1);
        this.runtimeBroker.setVirtualDestinations(new VirtualDestination[0], true);
        MessageProducer createProducer = this.localSession.createProducer(this.included);
        Thread.sleep(2000L);
        createProducer.send(this.localSession.createTextMessage("test"));
        Assert.assertEquals("broker consumer count", 0L, destinationStatistics.getConsumers().getCount());
        assertLocalBrokerStatistics(destinationStatistics, 0);
        Assert.assertEquals("remote dest messages", 0L, destinationStatistics2.getMessages().getCount());
        assertRemoteAdvisoryCount(virtualDestinationAdvisoryConsumer, 2);
        assertAdvisoryBrokerCounts(0, 0, 0);
    }

    @Test(timeout = 60000)
    public void testToTopic() throws Exception {
        doSetUp(true, null);
        MessageConsumer virtualDestinationAdvisoryConsumer = getVirtualDestinationAdvisoryConsumer(this.testTopicName);
        this.runtimeBroker.setVirtualDestinations(new VirtualDestination[]{createCompositeTopic(this.testTopicName, new ActiveMQTopic("include.test.bar.bridge"))}, true);
        MessageProducer createProducer = this.localSession.createProducer(this.included);
        Thread.sleep(2000L);
        TextMessage createTextMessage = this.localSession.createTextMessage("test");
        MessageConsumer createConsumer = this.remoteSession.createConsumer(new ActiveMQTopic("include.test.bar.bridge"));
        Thread.sleep(2000L);
        createProducer.send(createTextMessage);
        Assert.assertNotNull(createConsumer.receive(5000L));
        assertRemoteAdvisoryCount(virtualDestinationAdvisoryConsumer, 1);
        assertAdvisoryBrokerCounts(1, 1, 0);
    }

    @Test(timeout = 60000)
    public void testToTopicNoConsumer() throws Exception {
        doSetUp(true, null);
        MessageConsumer virtualDestinationAdvisoryConsumer = getVirtualDestinationAdvisoryConsumer(this.testTopicName);
        this.runtimeBroker.setVirtualDestinations(new VirtualDestination[]{createCompositeTopic(this.testTopicName, new ActiveMQTopic("include.test.bar.bridge"))}, true);
        MessageProducer createProducer = this.localSession.createProducer(this.included);
        Thread.sleep(2000L);
        createProducer.send(this.localSession.createTextMessage("test"));
        DestinationStatistics destinationStatistics = this.localBroker.getDestination(this.excluded).getDestinationStatistics();
        Assert.assertEquals("broker consumer count", 0L, destinationStatistics.getConsumers().getCount());
        assertLocalBrokerStatistics(destinationStatistics, 0);
        assertRemoteAdvisoryCount(virtualDestinationAdvisoryConsumer, 0);
        assertAdvisoryBrokerCounts(1, 0, 0);
    }

    @Test(timeout = 60000)
    public void testToTopicWithDurable() throws Exception {
        doSetUp(true, null);
        MessageConsumer virtualDestinationAdvisoryConsumer = getVirtualDestinationAdvisoryConsumer(this.testTopicName);
        this.runtimeBroker.setVirtualDestinations(new VirtualDestination[]{createCompositeTopic(this.testTopicName, new ActiveMQTopic("include.test.bar.bridge"))}, true);
        MessageProducer createProducer = this.localSession.createProducer(this.included);
        Thread.sleep(2000L);
        TextMessage createTextMessage = this.localSession.createTextMessage("test");
        final DestinationStatistics destinationStatistics = this.localBroker.getDestination(this.included).getDestinationStatistics();
        TopicSubscriber createDurableSubscriber = this.remoteSession.createDurableSubscriber(new ActiveMQTopic("include.test.bar.bridge"), "sub1");
        Thread.sleep(2000L);
        createProducer.send(createTextMessage);
        Assert.assertNotNull(createDurableSubscriber.receive(5000L));
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.network.VirtualConsumerDemandTest.2
            public boolean isSatisified() throws Exception {
                return 1 == destinationStatistics.getDequeues().getCount();
            }
        });
        Assert.assertEquals("broker dest stat dispatched", 1L, destinationStatistics.getDispatched().getCount());
        Assert.assertEquals("broker dest stat dequeues", 1L, destinationStatistics.getDequeues().getCount());
        assertRemoteAdvisoryCount(virtualDestinationAdvisoryConsumer, 1);
        assertAdvisoryBrokerCounts(1, 1, 0);
    }

    @Test(timeout = 60000)
    public void testToTopicWithDurableOffline() throws Exception {
        doSetUp(true, null);
        MessageConsumer virtualDestinationAdvisoryConsumer = getVirtualDestinationAdvisoryConsumer(this.testTopicName);
        this.runtimeBroker.setVirtualDestinations(new VirtualDestination[]{createCompositeTopic(this.testTopicName, new ActiveMQTopic("include.test.bar.bridge"))}, true);
        MessageProducer createProducer = this.localSession.createProducer(this.included);
        Thread.sleep(2000L);
        TextMessage createTextMessage = this.localSession.createTextMessage("test");
        final DestinationStatistics destinationStatistics = this.localBroker.getDestination(this.included).getDestinationStatistics();
        this.remoteSession.createDurableSubscriber(new ActiveMQTopic("include.test.bar.bridge"), "sub1").close();
        Thread.sleep(2000L);
        createProducer.send(createTextMessage);
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.network.VirtualConsumerDemandTest.3
            public boolean isSatisified() throws Exception {
                return 1 == destinationStatistics.getDequeues().getCount() && destinationStatistics.getDispatched().getCount() == 1;
            }
        });
        Assert.assertEquals("broker dest stat dispatched", 1L, destinationStatistics.getDispatched().getCount());
        Assert.assertEquals("broker dest stat dequeues", 1L, destinationStatistics.getDequeues().getCount());
        Assert.assertNotNull(this.remoteSession.createDurableSubscriber(new ActiveMQTopic("include.test.bar.bridge"), "sub1").receive(5000L));
        Thread.sleep(2000L);
        Assert.assertEquals("broker dest stat dispatched", 1L, destinationStatistics.getDispatched().getCount());
        Assert.assertEquals("broker dest stat dequeues", 1L, destinationStatistics.getDequeues().getCount());
        assertRemoteAdvisoryCount(virtualDestinationAdvisoryConsumer, 3);
        assertAdvisoryBrokerCounts(1, 1, 0);
    }

    @Before
    public void setUp() throws Exception {
    }

    @After
    public void tearDown() throws Exception {
        doTearDown();
    }

    protected void doSetUp(boolean z, VirtualDestination[] virtualDestinationArr) throws Exception {
        doSetUp(z, virtualDestinationArr, true, false);
    }

    protected void doSetUp(boolean z, VirtualDestination[] virtualDestinationArr, boolean z2, boolean z3) throws Exception {
        this.remoteBroker = createRemoteBroker(this.isUseVirtualDestSubsOnCreation, virtualDestinationArr);
        this.remoteBroker.setDeleteAllMessagesOnStartup(z);
        this.remoteBroker.start();
        this.remoteBroker.waitUntilStarted();
        this.localBroker = createLocalBroker(z2, z3);
        this.localBroker.setDeleteAllMessagesOnStartup(z);
        this.localBroker.start();
        this.localBroker.waitUntilStarted();
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(this.localBroker.getVmConnectorURI());
        activeMQConnectionFactory.setAlwaysSyncSend(true);
        activeMQConnectionFactory.setDispatchAsync(false);
        this.localConnection = activeMQConnectionFactory.createConnection();
        this.localConnection.setClientID("clientId");
        this.localConnection.start();
        this.remoteConnection = new ActiveMQConnectionFactory(this.remoteBroker.getVmConnectorURI()).createConnection();
        this.remoteConnection.setClientID("clientId");
        this.remoteConnection.start();
        this.included = new ActiveMQTopic(this.testTopicName);
        this.excluded = new ActiveMQTopic("exclude.test.bar");
        this.localSession = this.localConnection.createSession(false, 1);
        this.remoteSession = this.remoteConnection.createSession(false, 1);
    }

    protected BrokerService createLocalBroker(boolean z, boolean z2) throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setMonitorConnectionSplits(true);
        brokerService.setDataDirectoryFile(this.tempFolder.newFolder());
        brokerService.setBrokerName("localBroker");
        this.connector = new DiscoveryNetworkConnector(new URI("static:(" + ((TransportConnector) this.remoteBroker.getTransportConnectors().get(0)).getConnectUri() + ")"));
        this.connector.setName("networkConnector");
        this.connector.setDynamicOnly(false);
        this.connector.setDecreaseNetworkConsumerPriority(false);
        this.connector.setConduitSubscriptions(true);
        this.connector.setDuplex(this.isDuplex);
        this.connector.setUseVirtualDestSubs(true);
        ArrayList arrayList = new ArrayList();
        arrayList.add(new ActiveMQQueue(this.testQueueName));
        arrayList.add(new ActiveMQTopic(this.testTopicName + (z2 ? "?forceDurable=true" : "")));
        arrayList.add(new ActiveMQTopic("VirtualTopic.>"));
        this.connector.setDynamicallyIncludedDestinations(arrayList);
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(new ActiveMQQueue("exclude.test.foo"));
        arrayList2.add(new ActiveMQTopic("exclude.test.bar"));
        this.connector.setExcludedDestinations(arrayList2);
        if (z) {
            brokerService.addNetworkConnector(this.connector);
        }
        brokerService.addConnector(JmsMultipleBrokersTestSupport.AUTO_ASSIGN_TRANSPORT);
        return brokerService;
    }

    protected BrokerService createRemoteBroker(boolean z, VirtualDestination[] virtualDestinationArr) throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setBrokerName("remoteBroker");
        brokerService.setUseJmx(false);
        brokerService.setDataDirectoryFile(this.tempFolder.newFolder());
        brokerService.setPlugins(new BrokerPlugin[]{new JavaRuntimeConfigurationPlugin()});
        brokerService.setUseVirtualDestSubs(true);
        brokerService.setUseVirtualDestSubsOnCreation(z);
        if (virtualDestinationArr != null) {
            DestinationInterceptor virtualDestinationInterceptor = new VirtualDestinationInterceptor();
            virtualDestinationInterceptor.setVirtualDestinations(virtualDestinationArr);
            brokerService.setDestinationInterceptors(new DestinationInterceptor[]{virtualDestinationInterceptor});
        }
        this.runtimeBroker = brokerService.getBroker().getAdaptor(JavaRuntimeConfigurationBroker.class);
        this.remoteAdvisoryBroker = brokerService.getBroker().getAdaptor(AdvisoryBroker.class);
        brokerService.addConnector(JmsMultipleBrokersTestSupport.AUTO_ASSIGN_TRANSPORT);
        return brokerService;
    }

    protected CompositeTopic createCompositeTopic(String str, ActiveMQDestination... activeMQDestinationArr) {
        CompositeTopic compositeTopic = new CompositeTopic();
        compositeTopic.setName(str);
        compositeTopic.setForwardOnly(true);
        ArrayList arrayList = new ArrayList();
        for (ActiveMQDestination activeMQDestination : activeMQDestinationArr) {
            arrayList.add(activeMQDestination);
        }
        compositeTopic.setForwardTo(arrayList);
        return compositeTopic;
    }

    protected CompositeQueue createCompositeQueue(String str, ActiveMQDestination... activeMQDestinationArr) {
        CompositeQueue compositeQueue = new CompositeQueue();
        compositeQueue.setName(str);
        compositeQueue.setForwardOnly(true);
        ArrayList arrayList = new ArrayList();
        for (ActiveMQDestination activeMQDestination : activeMQDestinationArr) {
            arrayList.add(activeMQDestination);
        }
        compositeQueue.setForwardTo(arrayList);
        return compositeQueue;
    }

    protected MessageConsumer getVirtualDestinationAdvisoryConsumer(String str) throws JMSException {
        return this.remoteSession.createConsumer(AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(new ActiveMQTopic(str)));
    }

    protected MessageConsumer getQueueVirtualDestinationAdvisoryConsumer(String str) throws JMSException {
        return this.remoteSession.createConsumer(AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(new ActiveMQQueue(str)));
    }

    protected void assertRemoteAdvisoryCount(MessageConsumer messageConsumer, int i) throws JMSException {
        int i2 = 0;
        while (true) {
            ActiveMQMessage receive = messageConsumer.receive(1000L);
            if (receive == null) {
                Assert.assertEquals(i, i2);
                return;
            } else {
                i2++;
                LOG.info("advisory data structure: {}", receive.getDataStructure());
            }
        }
    }

    protected void assertRemoteAdvisoryCount(MessageConsumer messageConsumer, int i, int i2) throws JMSException {
        if (this.isUseVirtualDestSubsOnCreation) {
            assertRemoteAdvisoryCount(messageConsumer, i);
        } else {
            assertRemoteAdvisoryCount(messageConsumer, i2);
        }
    }

    protected void assertAdvisoryBrokerCounts(int i, int i2, int i3) throws Exception {
        Field declaredField = AdvisoryBroker.class.getDeclaredField("virtualDestinations");
        Field declaredField2 = AdvisoryBroker.class.getDeclaredField("virtualDestinationConsumers");
        Field declaredField3 = AdvisoryBroker.class.getDeclaredField("brokerConsumerDests");
        declaredField.setAccessible(true);
        declaredField2.setAccessible(true);
        declaredField3.setAccessible(true);
        Set set = (Set) declaredField.get(this.remoteAdvisoryBroker);
        ConcurrentMap concurrentMap = (ConcurrentMap) declaredField2.get(this.remoteAdvisoryBroker);
        ConcurrentMap concurrentMap2 = (ConcurrentMap) declaredField3.get(this.remoteAdvisoryBroker);
        Assert.assertEquals(i, set.size());
        Assert.assertEquals(i2, concurrentMap.size());
        Assert.assertEquals(i3, concurrentMap2.size());
    }
}
