package org.apache.activemq.network;

import java.io.File;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TopicSubscriber;
import junit.framework.Test;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.DestinationFilter;
import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.usecases.DurableSubProcessWithRestartTest;
import org.apache.activemq.util.SubscriptionKey;
import org.apache.activemq.util.Wait;

/* loaded from: input_file:org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.class */
public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSupport {
    private boolean duplex = true;
    private boolean deletePersistentMessagesOnStartup = true;

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.JmsMultipleBrokersTestSupport
    public NetworkConnector bridgeBrokers(String str, String str2) throws Exception {
        NetworkConnector bridgeBrokers = super.bridgeBrokers(str, str2);
        ArrayList arrayList = new ArrayList();
        arrayList.add(new ActiveMQTopic("TEST.FOO?forceDurable=true"));
        bridgeBrokers.setDynamicallyIncludedDestinations(arrayList);
        bridgeBrokers.setDuplex(this.duplex);
        bridgeBrokers.setDecreaseNetworkConsumerPriority(false);
        bridgeBrokers.setConduitSubscriptions(true);
        bridgeBrokers.setSyncDurableSubs(true);
        bridgeBrokers.setNetworkTTL(-1);
        bridgeBrokers.setClientIdToken("|");
        return bridgeBrokers;
    }

    public void testDurablePropagationBrokerRestart() throws Exception {
        this.duplex = true;
        bridgeBrokers("Broker_A_A", "Broker_B_B");
        bridgeBrokers("Broker_B_B", "Broker_C_C");
        bridgeBrokers("Broker_C_C", "Broker_D_D");
        bridgeBrokers("Broker_D_D", "Broker_E_E");
        startAllBrokers();
        ActiveMQTopic activeMQTopic = (ActiveMQTopic) createDestination("TEST.FOO", true);
        Connection createConnection = this.brokers.get("Broker_A_A").factory.createConnection();
        createConnection.setClientID("clientId1");
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        TopicSubscriber createDurableSubscriber = createSession.createDurableSubscriber(activeMQTopic, "subA");
        TopicSubscriber createDurableSubscriber2 = createSession.createDurableSubscriber(activeMQTopic, "subA2");
        assertNCDurableSubsCount(this.brokers.get("Broker_B_B").broker, activeMQTopic, 1);
        assertNCDurableSubsCount(this.brokers.get("Broker_C_C").broker, activeMQTopic, 1);
        assertNCDurableSubsCount(this.brokers.get("Broker_D_D").broker, activeMQTopic, 1);
        assertNCDurableSubsCount(this.brokers.get("Broker_E_E").broker, activeMQTopic, 1);
        assertNCDurableSubsCount(this.brokers.get("Broker_A_A").broker, activeMQTopic, 0);
        Connection createConnection2 = this.brokers.get("Broker_E_E").factory.createConnection();
        createConnection2.setClientID("clientId2");
        createConnection2.start();
        Session createSession2 = createConnection2.createSession(false, 1);
        TopicSubscriber createDurableSubscriber3 = createSession2.createDurableSubscriber(activeMQTopic, "subE");
        TopicSubscriber createDurableSubscriber4 = createSession2.createDurableSubscriber(activeMQTopic, "subE2");
        assertNCDurableSubsCount(this.brokers.get("Broker_B_B").broker, activeMQTopic, 2);
        assertNCDurableSubsCount(this.brokers.get("Broker_C_C").broker, activeMQTopic, 2);
        assertNCDurableSubsCount(this.brokers.get("Broker_D_D").broker, activeMQTopic, 2);
        assertNCDurableSubsCount(this.brokers.get("Broker_E_E").broker, activeMQTopic, 1);
        assertNCDurableSubsCount(this.brokers.get("Broker_A_A").broker, activeMQTopic, 1);
        createDurableSubscriber.close();
        createDurableSubscriber2.close();
        createDurableSubscriber3.close();
        createDurableSubscriber4.close();
        destroyAllBrokers();
        this.deletePersistentMessagesOnStartup = false;
        String str = new String("?persistent=true&useJmx=false");
        createBroker(new URI("broker:(tcp://localhost:61616)/Broker_A_A" + str));
        createBroker(new URI("broker:(tcp://localhost:61617)/Broker_B_B" + str));
        createBroker(new URI("broker:(tcp://localhost:61618)/Broker_C_C" + str));
        createBroker(new URI("broker:(tcp://localhost:61619)/Broker_D_D" + str));
        createBroker(new URI("broker:(tcp://localhost:61620)/Broker_E_E" + str));
        bridgeBrokers("Broker_A_A", "Broker_B_B");
        bridgeBrokers("Broker_B_B", "Broker_C_C");
        bridgeBrokers("Broker_C_C", "Broker_D_D");
        bridgeBrokers("Broker_D_D", "Broker_E_E");
        startAllBrokers();
        Connection createConnection3 = this.brokers.get("Broker_A_A").factory.createConnection();
        createConnection3.setClientID("clientId1");
        createConnection3.start();
        Session createSession3 = createConnection3.createSession(false, 1);
        Connection createConnection4 = this.brokers.get("Broker_E_E").factory.createConnection();
        createConnection4.setClientID("clientId2");
        createConnection4.start();
        Session createSession4 = createConnection4.createSession(false, 1);
        createSession4.createDurableSubscriber(activeMQTopic, "subE").close();
        createSession3.unsubscribe("subA");
        createSession3.unsubscribe("subA2");
        createSession4.unsubscribe("subE");
        createSession4.unsubscribe("subE2");
        assertNCDurableSubsCount(this.brokers.get("Broker_A_A").broker, activeMQTopic, 0);
        assertNCDurableSubsCount(this.brokers.get("Broker_B_B").broker, activeMQTopic, 0);
        assertNCDurableSubsCount(this.brokers.get("Broker_C_C").broker, activeMQTopic, 0);
        assertNCDurableSubsCount(this.brokers.get("Broker_D_D").broker, activeMQTopic, 0);
        assertNCDurableSubsCount(this.brokers.get("Broker_E_E").broker, activeMQTopic, 0);
    }

    public void testDurablePropagationDuplex() throws Exception {
        this.duplex = true;
        testDurablePropagation();
    }

    public void testDurablePropagationOneWay() throws Exception {
        this.duplex = false;
        testDurablePropagation();
    }

    protected void testDurablePropagation() throws Exception {
        bridgeBrokers("Broker_A_A", "Broker_B_B");
        bridgeBrokers("Broker_B_B", "Broker_C_C");
        if (!this.duplex) {
            bridgeBrokers("Broker_B_B", "Broker_A_A");
            bridgeBrokers("Broker_C_C", "Broker_B_B");
        }
        startAllBrokers();
        ActiveMQTopic activeMQTopic = (ActiveMQTopic) createDestination("TEST.FOO", true);
        Session createSession = createSession("Broker_A_A");
        TopicSubscriber createDurableSubscriber = createSession.createDurableSubscriber(activeMQTopic, "subA");
        TopicSubscriber createDurableSubscriber2 = createSession.createDurableSubscriber(activeMQTopic, "subB");
        assertNCDurableSubsCount(this.brokers.get("Broker_B_B").broker, activeMQTopic, 1);
        assertNCDurableSubsCount(this.brokers.get("Broker_C_C").broker, activeMQTopic, 1);
        assertNCDurableSubsCount(this.brokers.get("Broker_A_A").broker, activeMQTopic, 0);
        sendMessages("Broker_C_C", activeMQTopic, 1);
        assertNotNull(createDurableSubscriber.receive(1000L));
        assertNotNull(createDurableSubscriber2.receive(1000L));
        Session createSession2 = createSession("Broker_C_C");
        TopicSubscriber createDurableSubscriber3 = createSession2.createDurableSubscriber(activeMQTopic, "subC");
        assertNCDurableSubsCount(this.brokers.get("Broker_B_B").broker, activeMQTopic, 2);
        assertNCDurableSubsCount(this.brokers.get("Broker_C_C").broker, activeMQTopic, 1);
        assertNCDurableSubsCount(this.brokers.get("Broker_A_A").broker, activeMQTopic, 1);
        createDurableSubscriber.close();
        createDurableSubscriber2.close();
        createDurableSubscriber3.close();
        createSession.unsubscribe("subA");
        createSession.unsubscribe("subB");
        createSession2.unsubscribe("subC");
        assertNCDurableSubsCount(this.brokers.get("Broker_A_A").broker, activeMQTopic, 0);
        assertNCDurableSubsCount(this.brokers.get("Broker_B_B").broker, activeMQTopic, 0);
        assertNCDurableSubsCount(this.brokers.get("Broker_C_C").broker, activeMQTopic, 0);
    }

    public void testDurablePropagationConsumerAllBrokersDuplex() throws Exception {
        this.duplex = true;
        testDurablePropagationConsumerAllBrokers();
    }

    public void testDurablePropagationConsumerAllBrokersOneWay() throws Exception {
        this.duplex = false;
        testDurablePropagationConsumerAllBrokers();
    }

    protected void testDurablePropagationConsumerAllBrokers() throws Exception {
        bridgeBrokers("Broker_A_A", "Broker_B_B");
        bridgeBrokers("Broker_B_B", "Broker_C_C");
        if (!this.duplex) {
            bridgeBrokers("Broker_B_B", "Broker_A_A");
            bridgeBrokers("Broker_C_C", "Broker_B_B");
        }
        startAllBrokers();
        ActiveMQTopic activeMQTopic = (ActiveMQTopic) createDestination("TEST.FOO", true);
        Session createSession = createSession("Broker_A_A");
        TopicSubscriber createDurableSubscriber = createSession.createDurableSubscriber(activeMQTopic, "subA");
        assertNCDurableSubsCount(this.brokers.get("Broker_B_B").broker, activeMQTopic, 1);
        assertNCDurableSubsCount(this.brokers.get("Broker_C_C").broker, activeMQTopic, 1);
        assertNCDurableSubsCount(this.brokers.get("Broker_A_A").broker, activeMQTopic, 0);
        Session createSession2 = createSession("Broker_B_B");
        TopicSubscriber createDurableSubscriber2 = createSession2.createDurableSubscriber(activeMQTopic, "subB");
        assertNCDurableSubsCount(this.brokers.get("Broker_B_B").broker, activeMQTopic, 1);
        assertNCDurableSubsCount(this.brokers.get("Broker_C_C").broker, activeMQTopic, 1);
        assertNCDurableSubsCount(this.brokers.get("Broker_A_A").broker, activeMQTopic, 1);
        Session createSession3 = createSession("Broker_C_C");
        TopicSubscriber createDurableSubscriber3 = createSession3.createDurableSubscriber(activeMQTopic, "subC");
        assertNCDurableSubsCount(this.brokers.get("Broker_B_B").broker, activeMQTopic, 2);
        assertNCDurableSubsCount(this.brokers.get("Broker_C_C").broker, activeMQTopic, 1);
        assertNCDurableSubsCount(this.brokers.get("Broker_A_A").broker, activeMQTopic, 1);
        createDurableSubscriber.close();
        createDurableSubscriber2.close();
        createDurableSubscriber3.close();
        createSession.unsubscribe("subA");
        createSession2.unsubscribe("subB");
        createSession3.unsubscribe("subC");
        assertNCDurableSubsCount(this.brokers.get("Broker_A_A").broker, activeMQTopic, 0);
        assertNCDurableSubsCount(this.brokers.get("Broker_B_B").broker, activeMQTopic, 0);
        assertNCDurableSubsCount(this.brokers.get("Broker_C_C").broker, activeMQTopic, 0);
    }

    public void testDurablePropagation5BrokerDuplex() throws Exception {
        this.duplex = true;
        testDurablePropagation5Broker();
    }

    public void testDurablePropagation5BrokerOneWay() throws Exception {
        this.duplex = false;
        testDurablePropagation5Broker();
    }

    protected void testDurablePropagation5Broker() throws Exception {
        bridgeBrokers("Broker_A_A", "Broker_B_B");
        bridgeBrokers("Broker_B_B", "Broker_C_C");
        bridgeBrokers("Broker_C_C", "Broker_D_D");
        bridgeBrokers("Broker_D_D", "Broker_E_E");
        if (!this.duplex) {
            bridgeBrokers("Broker_B_B", "Broker_A_A");
            bridgeBrokers("Broker_C_C", "Broker_B_B");
            bridgeBrokers("Broker_D_D", "Broker_C_C");
            bridgeBrokers("Broker_E_E", "Broker_D_D");
        }
        startAllBrokers();
        ActiveMQTopic activeMQTopic = (ActiveMQTopic) createDestination("TEST.FOO", true);
        Session createSession = createSession("Broker_A_A");
        TopicSubscriber createDurableSubscriber = createSession.createDurableSubscriber(activeMQTopic, "subA");
        Thread.sleep(1000L);
        assertNCDurableSubsCount(this.brokers.get("Broker_B_B").broker, activeMQTopic, 1);
        assertNCDurableSubsCount(this.brokers.get("Broker_C_C").broker, activeMQTopic, 1);
        assertNCDurableSubsCount(this.brokers.get("Broker_D_D").broker, activeMQTopic, 1);
        assertNCDurableSubsCount(this.brokers.get("Broker_E_E").broker, activeMQTopic, 1);
        assertNCDurableSubsCount(this.brokers.get("Broker_A_A").broker, activeMQTopic, 0);
        sendMessages("Broker_E_E", activeMQTopic, 1);
        assertNotNull(createDurableSubscriber.receive(1000L));
        Session createSession2 = createSession("Broker_E_E");
        TopicSubscriber createDurableSubscriber2 = createSession2.createDurableSubscriber(activeMQTopic, "subE");
        Thread.sleep(1000L);
        assertNCDurableSubsCount(this.brokers.get("Broker_B_B").broker, activeMQTopic, 2);
        assertNCDurableSubsCount(this.brokers.get("Broker_C_C").broker, activeMQTopic, 2);
        assertNCDurableSubsCount(this.brokers.get("Broker_D_D").broker, activeMQTopic, 2);
        assertNCDurableSubsCount(this.brokers.get("Broker_E_E").broker, activeMQTopic, 1);
        assertNCDurableSubsCount(this.brokers.get("Broker_A_A").broker, activeMQTopic, 1);
        createDurableSubscriber.close();
        createDurableSubscriber2.close();
        createSession.unsubscribe("subA");
        createSession2.unsubscribe("subE");
        assertNCDurableSubsCount(this.brokers.get("Broker_A_A").broker, activeMQTopic, 0);
        assertNCDurableSubsCount(this.brokers.get("Broker_B_B").broker, activeMQTopic, 0);
        assertNCDurableSubsCount(this.brokers.get("Broker_C_C").broker, activeMQTopic, 0);
        assertNCDurableSubsCount(this.brokers.get("Broker_D_D").broker, activeMQTopic, 0);
        assertNCDurableSubsCount(this.brokers.get("Broker_E_E").broker, activeMQTopic, 0);
    }

    public void testDurablePropagationSpokeDuplex() throws Exception {
        this.duplex = true;
        testDurablePropagationSpoke();
    }

    public void testDurablePropagationSpokeOneWay() throws Exception {
        this.duplex = false;
        testDurablePropagationSpoke();
    }

    protected void testDurablePropagationSpoke() throws Exception {
        bridgeBrokers("Broker_A_A", "Broker_B_B");
        bridgeBrokers("Broker_B_B", "Broker_C_C");
        bridgeBrokers("Broker_B_B", "Broker_D_D");
        if (!this.duplex) {
            bridgeBrokers("Broker_B_B", "Broker_A_A");
            bridgeBrokers("Broker_C_C", "Broker_B_B");
            bridgeBrokers("Broker_D_D", "Broker_B_B");
        }
        startAllBrokers();
        ActiveMQTopic activeMQTopic = (ActiveMQTopic) createDestination("TEST.FOO", true);
        Session createSession = createSession("Broker_A_A");
        Session createSession2 = createSession("Broker_B_B");
        Session createSession3 = createSession("Broker_C_C");
        Session createSession4 = createSession("Broker_D_D");
        TopicSubscriber createDurableSubscriber = createSession.createDurableSubscriber(activeMQTopic, "subA");
        TopicSubscriber createDurableSubscriber2 = createSession.createDurableSubscriber(activeMQTopic, "subAB");
        Thread.sleep(1000L);
        assertNCDurableSubsCount(this.brokers.get("Broker_B_B").broker, activeMQTopic, 1);
        assertNCDurableSubsCount(this.brokers.get("Broker_C_C").broker, activeMQTopic, 1);
        assertNCDurableSubsCount(this.brokers.get("Broker_D_D").broker, activeMQTopic, 1);
        assertNCDurableSubsCount(this.brokers.get("Broker_A_A").broker, activeMQTopic, 0);
        TopicSubscriber createDurableSubscriber3 = createSession4.createDurableSubscriber(activeMQTopic, "subD");
        Thread.sleep(1000L);
        assertNCDurableSubsCount(this.brokers.get("Broker_B_B").broker, activeMQTopic, 2);
        assertNCDurableSubsCount(this.brokers.get("Broker_C_C").broker, activeMQTopic, 1);
        assertNCDurableSubsCount(this.brokers.get("Broker_D_D").broker, activeMQTopic, 1);
        assertNCDurableSubsCount(this.brokers.get("Broker_A_A").broker, activeMQTopic, 1);
        sendMessages("Broker_A_A", activeMQTopic, 1);
        assertNotNull(createDurableSubscriber3.receive(1000L));
        sendMessages("Broker_C_C", activeMQTopic, 1);
        assertNotNull(createDurableSubscriber3.receive(1000L));
        TopicSubscriber createDurableSubscriber4 = createSession2.createDurableSubscriber(activeMQTopic, "subB");
        TopicSubscriber createDurableSubscriber5 = createSession3.createDurableSubscriber(activeMQTopic, "subC");
        Thread.sleep(1000L);
        assertNCDurableSubsCount(this.brokers.get("Broker_B_B").broker, activeMQTopic, 3);
        assertNCDurableSubsCount(this.brokers.get("Broker_C_C").broker, activeMQTopic, 1);
        assertNCDurableSubsCount(this.brokers.get("Broker_D_D").broker, activeMQTopic, 1);
        assertNCDurableSubsCount(this.brokers.get("Broker_A_A").broker, activeMQTopic, 1);
        createDurableSubscriber.close();
        createDurableSubscriber2.close();
        createDurableSubscriber4.close();
        createDurableSubscriber5.close();
        createDurableSubscriber3.close();
        createSession.unsubscribe("subA");
        createSession.unsubscribe("subAB");
        createSession2.unsubscribe("subB");
        createSession3.unsubscribe("subC");
        createSession4.unsubscribe("subD");
        assertNCDurableSubsCount(this.brokers.get("Broker_A_A").broker, activeMQTopic, 0);
        assertNCDurableSubsCount(this.brokers.get("Broker_B_B").broker, activeMQTopic, 0);
        assertNCDurableSubsCount(this.brokers.get("Broker_C_C").broker, activeMQTopic, 0);
        assertNCDurableSubsCount(this.brokers.get("Broker_D_D").broker, activeMQTopic, 0);
    }

    public void testForceDurablePropagationDuplex() throws Exception {
        this.duplex = true;
        testForceDurablePropagation();
    }

    public void testForceDurablePropagationOneWay() throws Exception {
        this.duplex = false;
        testForceDurablePropagation();
    }

    protected void testForceDurablePropagation() throws Exception {
        bridgeBrokers("Broker_A_A", "Broker_B_B");
        bridgeBrokers("Broker_B_B", "Broker_C_C");
        if (!this.duplex) {
            bridgeBrokers("Broker_B_B", "Broker_A_A");
            bridgeBrokers("Broker_C_C", "Broker_B_B");
        }
        startAllBrokers();
        ActiveMQTopic activeMQTopic = (ActiveMQTopic) createDestination("TEST.FOO", true);
        MessageConsumer createConsumer = createSession("Broker_A_A").createConsumer(activeMQTopic);
        Thread.sleep(1000L);
        assertNCDurableSubsCount(this.brokers.get("Broker_B_B").broker, activeMQTopic, 1);
        assertNCDurableSubsCount(this.brokers.get("Broker_C_C").broker, activeMQTopic, 1);
        assertNCDurableSubsCount(this.brokers.get("Broker_A_A").broker, activeMQTopic, 0);
        sendMessages("Broker_C_C", activeMQTopic, 1);
        assertNotNull(createConsumer.receive(1000L));
        MessageConsumer createConsumer2 = createSession("Broker_C_C").createConsumer(activeMQTopic);
        Thread.sleep(1000L);
        assertNCDurableSubsCount(this.brokers.get("Broker_B_B").broker, activeMQTopic, 2);
        assertNCDurableSubsCount(this.brokers.get("Broker_C_C").broker, activeMQTopic, 1);
        assertNCDurableSubsCount(this.brokers.get("Broker_A_A").broker, activeMQTopic, 1);
        createConsumer.close();
        createConsumer2.close();
        assertNCDurableSubsCount(this.brokers.get("Broker_A_A").broker, activeMQTopic, 0);
        assertNCDurableSubsCount(this.brokers.get("Broker_B_B").broker, activeMQTopic, 0);
        assertNCDurableSubsCount(this.brokers.get("Broker_C_C").broker, activeMQTopic, 0);
    }

    public void testDurablePropagationSyncDuplex() throws Exception {
        this.duplex = true;
        testDurablePropagationSync();
    }

    public void testDurablePropagationSyncOneWay() throws Exception {
        this.duplex = false;
        testDurablePropagationSync();
    }

    protected void testDurablePropagationSync() throws Exception {
        NetworkConnector bridgeBrokers = bridgeBrokers("Broker_A_A", "Broker_B_B");
        NetworkConnector bridgeBrokers2 = bridgeBrokers("Broker_B_B", "Broker_C_C");
        NetworkConnector networkConnector = null;
        NetworkConnector networkConnector2 = null;
        if (!this.duplex) {
            networkConnector = bridgeBrokers("Broker_B_B", "Broker_A_A");
            networkConnector2 = bridgeBrokers("Broker_C_C", "Broker_B_B");
        }
        startAllBrokers();
        bridgeBrokers.stop();
        bridgeBrokers2.stop();
        if (!this.duplex) {
            networkConnector.stop();
            networkConnector2.stop();
        }
        ActiveMQTopic activeMQTopic = (ActiveMQTopic) createDestination("TEST.FOO", true);
        Session createSession = createSession("Broker_A_A");
        Session createSession2 = createSession("Broker_C_C");
        TopicSubscriber createDurableSubscriber = createSession.createDurableSubscriber(activeMQTopic, "subA");
        TopicSubscriber createDurableSubscriber2 = createSession.createDurableSubscriber(activeMQTopic, "subB");
        TopicSubscriber createDurableSubscriber3 = createSession2.createDurableSubscriber(activeMQTopic, "subC");
        Thread.sleep(1000L);
        assertNCDurableSubsCount(this.brokers.get("Broker_A_A").broker, activeMQTopic, 0);
        assertNCDurableSubsCount(this.brokers.get("Broker_B_B").broker, activeMQTopic, 0);
        assertNCDurableSubsCount(this.brokers.get("Broker_C_C").broker, activeMQTopic, 0);
        bridgeBrokers.start();
        bridgeBrokers2.start();
        if (!this.duplex) {
            networkConnector.start();
            networkConnector2.start();
        }
        assertNCDurableSubsCount(this.brokers.get("Broker_B_B").broker, activeMQTopic, 2);
        assertNCDurableSubsCount(this.brokers.get("Broker_C_C").broker, activeMQTopic, 1);
        assertNCDurableSubsCount(this.brokers.get("Broker_A_A").broker, activeMQTopic, 1);
        createDurableSubscriber.close();
        createDurableSubscriber2.close();
        createDurableSubscriber3.close();
    }

    public void testDurablePropagationMultipleBridgesDifferentDestinations() throws Exception {
        this.duplex = true;
        bridgeBrokers("Broker_A_A", "Broker_B_B");
        bridgeBrokers("Broker_B_B", "Broker_C_C");
        NetworkConnector bridgeBrokers = bridgeBrokers("Broker_A_A", "Broker_B_B");
        NetworkConnector bridgeBrokers2 = bridgeBrokers("Broker_B_B", "Broker_C_C");
        bridgeBrokers.setName("nc_3_3");
        bridgeBrokers2.setName("nc_4_4");
        ArrayList arrayList = new ArrayList();
        arrayList.add(new ActiveMQTopic("TEST.FOO2?forceDurable=true"));
        bridgeBrokers.setDynamicallyIncludedDestinations(arrayList);
        bridgeBrokers2.setDynamicallyIncludedDestinations(arrayList);
        startAllBrokers();
        ActiveMQTopic activeMQTopic = (ActiveMQTopic) createDestination("TEST.FOO", true);
        ActiveMQTopic activeMQTopic2 = (ActiveMQTopic) createDestination("TEST.FOO2", true);
        Session createSession = createSession("Broker_A_A");
        Session createSession2 = createSession("Broker_C_C");
        TopicSubscriber createDurableSubscriber = createSession.createDurableSubscriber(activeMQTopic, "subA");
        TopicSubscriber createDurableSubscriber2 = createSession.createDurableSubscriber(activeMQTopic2, "subAa");
        TopicSubscriber createDurableSubscriber3 = createSession2.createDurableSubscriber(activeMQTopic, "subC");
        TopicSubscriber createDurableSubscriber4 = createSession2.createDurableSubscriber(activeMQTopic2, "subCc");
        Thread.sleep(1000L);
        assertNCDurableSubsCount(this.brokers.get("Broker_B_B").broker, activeMQTopic, 2);
        assertNCDurableSubsCount(this.brokers.get("Broker_C_C").broker, activeMQTopic, 1);
        assertNCDurableSubsCount(this.brokers.get("Broker_A_A").broker, activeMQTopic, 1);
        assertNCDurableSubsCount(this.brokers.get("Broker_B_B").broker, activeMQTopic2, 2);
        assertNCDurableSubsCount(this.brokers.get("Broker_C_C").broker, activeMQTopic2, 1);
        assertNCDurableSubsCount(this.brokers.get("Broker_A_A").broker, activeMQTopic2, 1);
        createDurableSubscriber.close();
        createDurableSubscriber3.close();
        createSession.unsubscribe("subA");
        createSession2.unsubscribe("subC");
        assertNCDurableSubsCount(this.brokers.get("Broker_B_B").broker, activeMQTopic, 0);
        assertNCDurableSubsCount(this.brokers.get("Broker_C_C").broker, activeMQTopic, 0);
        assertNCDurableSubsCount(this.brokers.get("Broker_A_A").broker, activeMQTopic, 0);
        assertNCDurableSubsCount(this.brokers.get("Broker_B_B").broker, activeMQTopic2, 2);
        assertNCDurableSubsCount(this.brokers.get("Broker_C_C").broker, activeMQTopic2, 1);
        assertNCDurableSubsCount(this.brokers.get("Broker_A_A").broker, activeMQTopic2, 1);
        createDurableSubscriber2.close();
        createDurableSubscriber4.close();
        createSession.unsubscribe("subAa");
        createSession2.unsubscribe("subCc");
        assertNCDurableSubsCount(this.brokers.get("Broker_B_B").broker, activeMQTopic2, 0);
        assertNCDurableSubsCount(this.brokers.get("Broker_C_C").broker, activeMQTopic2, 0);
        assertNCDurableSubsCount(this.brokers.get("Broker_A_A").broker, activeMQTopic2, 0);
    }

    protected void assertNCDurableSubsCount(final BrokerService brokerService, final ActiveMQTopic activeMQTopic, final int i) throws Exception {
        assertTrue(Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.network.DurableFiveBrokerNetworkBridgeTest.1
            public boolean isSatisified() throws Exception {
                return i == DurableFiveBrokerNetworkBridgeTest.this.getNCDurableSubs(brokerService, activeMQTopic).size();
            }
        }, DurableSubProcessWithRestartTest.BROKER_RESTART, 500L));
    }

    protected List<DurableTopicSubscription> getNCDurableSubs(BrokerService brokerService, ActiveMQTopic activeMQTopic) throws Exception {
        DurableTopicSubscription durableTopicSubscription;
        ArrayList arrayList = new ArrayList();
        DestinationFilter destination = brokerService.getDestination(activeMQTopic);
        Topic topic = destination instanceof DestinationFilter ? (Topic) destination.getAdaptor(Topic.class) : (Topic) destination;
        for (SubscriptionKey subscriptionKey : topic.getDurableTopicSubs().keySet()) {
            if (subscriptionKey.getSubscriptionName().startsWith("NC-DS_") && (durableTopicSubscription = (DurableTopicSubscription) topic.getDurableTopicSubs().get(subscriptionKey)) != null) {
                arrayList.add(durableTopicSubscription);
            }
        }
        return arrayList;
    }

    @Override // org.apache.activemq.JmsMultipleBrokersTestSupport
    public void setUp() throws Exception {
        super.setAutoFail(true);
        super.setUp();
        this.deletePersistentMessagesOnStartup = true;
        String str = new String("?persistent=true&useJmx=false");
        createBroker(new URI("broker:(tcp://localhost:61616)/Broker_A_A" + str));
        createBroker(new URI("broker:(tcp://localhost:61617)/Broker_B_B" + str));
        createBroker(new URI("broker:(tcp://localhost:61618)/Broker_C_C" + str));
        createBroker(new URI("broker:(tcp://localhost:61619)/Broker_D_D" + str));
        createBroker(new URI("broker:(tcp://localhost:61620)/Broker_E_E" + str));
    }

    @Override // org.apache.activemq.JmsMultipleBrokersTestSupport
    protected void configureBroker(BrokerService brokerService) {
        brokerService.setBrokerId(brokerService.getBrokerName());
        brokerService.setDeleteAllMessagesOnStartup(this.deletePersistentMessagesOnStartup);
        brokerService.setDataDirectory("target" + File.separator + "test-data" + File.separator + "DurableFiveBrokerNetworkBridgeTest");
    }

    protected Session createSession(String str) throws Exception {
        Connection createConnection = createConnection(str);
        createConnection.start();
        return createConnection.createSession(false, 1);
    }

    public static Test suite() {
        return suite(DurableFiveBrokerNetworkBridgeTest.class);
    }
}
