package org.apache.activemq.usecases;

import java.net.MalformedURLException;
import java.net.URI;
import java.util.Iterator;
import java.util.Set;
import javax.jms.Connection;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.TopicSubscriber;
import javax.management.ObjectName;
import javax.management.QueryExp;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.bugs.AMQ4607Test;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.Wait;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.Assume;

/* loaded from: input_file:org/apache/activemq/usecases/DurableSubscriberWithNetworkRestartTest.class */
public class DurableSubscriberWithNetworkRestartTest extends JmsMultipleBrokersTestSupport {
    private static final Log LOG = LogFactory.getLog(DurableSubscriberWithNetworkRestartTest.class);
    private static final String HUB = "HubBroker";
    private static final String SPOKE = "SpokeBroker";
    protected static final int MESSAGE_COUNT = 10;
    public boolean dynamicOnly = false;

    public void testSendOnAReceiveOnBWithTransportDisconnectDynamicOnly() throws Exception {
        this.dynamicOnly = true;
        try {
            testSendOnAReceiveOnBWithTransportDisconnect();
        } finally {
            this.dynamicOnly = false;
        }
    }

    public void testSendOnAReceiveOnBWithTransportDisconnect() throws Exception {
        bridge(SPOKE, HUB);
        startAllBrokers();
        verifyDuplexBridgeMbean();
        URI publishableConnectURI = ((TransportConnector) this.brokers.get(HUB).broker.getTransportConnectors().get(0)).getPublishableConnectURI();
        URI publishableConnectURI2 = ((TransportConnector) this.brokers.get(SPOKE).broker.getTransportConnectors().get(0)).getPublishableConnectURI();
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(publishableConnectURI);
        ActiveMQConnectionFactory activeMQConnectionFactory2 = new ActiveMQConnectionFactory(publishableConnectURI2);
        Connection createConnection = activeMQConnectionFactory.createConnection();
        Connection createConnection2 = activeMQConnectionFactory2.createConnection();
        createConnection.setClientID("clientHUB");
        createConnection2.setClientID("clientSPOKE");
        createConnection.start();
        createConnection2.start();
        Session createSession = createConnection.createSession(false, 1);
        Session createSession2 = createConnection2.createSession(false, 1);
        ActiveMQTopic activeMQTopic = new ActiveMQTopic("TEST.FOO");
        TopicSubscriber createDurableSubscriber = createSession.createDurableSubscriber(activeMQTopic, "consumerName");
        sleep(1000);
        createDurableSubscriber.close();
        MessageProducer createProducer = createSession2.createProducer(activeMQTopic);
        createProducer.setDeliveryMode(2);
        String str = new String(new byte[10240]);
        for (int i = 0; i < 10; i++) {
            TextMessage createTextMessage = createSession2.createTextMessage("test-" + i);
            createTextMessage.setStringProperty("payload", str);
            createProducer.send(createTextMessage);
        }
        createProducer.close();
        for (int i2 = 0; i2 < 2; i2++) {
            this.brokers.get(SPOKE).broker.stop();
            sleep(1000);
            createBroker(new URI("broker:(tcp://localhost:61616)/SpokeBroker?persistent=true&useJmx=true&deleteAllMessagesOnStartup=false"));
            bridge(SPOKE, HUB);
            this.brokers.get(SPOKE).broker.start();
            LOG.info("restarted spoke..:" + i2);
            assertTrue("got mbeans on restart", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.DurableSubscriberWithNetworkRestartTest.1
                public boolean isSatisified() throws Exception {
                    return DurableSubscriberWithNetworkRestartTest.this.countMbeans(((JmsMultipleBrokersTestSupport.BrokerItem) DurableSubscriberWithNetworkRestartTest.this.brokers.get(DurableSubscriberWithNetworkRestartTest.HUB)).broker, "networkBridge", AMQ4607Test.TIMEOUT) == (DurableSubscriberWithNetworkRestartTest.this.dynamicOnly ? 1 : 2);
                }
            }));
        }
    }

    private void verifyDuplexBridgeMbean() throws Exception {
        assertEquals(1, countMbeans(this.brokers.get(HUB).broker, "networkBridge", 5000));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public int countMbeans(BrokerService brokerService, String str, int i) throws Exception {
        Set queryNames;
        long currentTimeMillis = System.currentTimeMillis() + i;
        if (!str.contains("=")) {
            str = str + "=*";
        }
        ObjectName objectName = new ObjectName("org.apache.activemq:type=Broker,brokerName=" + brokerService.getBrokerName() + "," + str + ",*");
        int i2 = 0;
        do {
            if (i > 0) {
                Thread.sleep(100L);
            }
            queryNames = brokerService.getManagementContext().queryNames(objectName, (QueryExp) null);
            if (queryNames != null) {
                i2 = queryNames.size();
                LOG.info("Found: " + i2 + ", matching type: " + str);
                Iterator it = queryNames.iterator();
                while (it.hasNext()) {
                    LOG.info("" + ((ObjectName) it.next()));
                }
            }
            if (queryNames != null && !queryNames.isEmpty()) {
                break;
            }
        } while (currentTimeMillis > System.currentTimeMillis());
        if (i > 0) {
            Assume.assumeNotNull(new Object[]{queryNames});
        }
        return i2;
    }

    private void logAllMbeans(BrokerService brokerService) throws MalformedURLException {
        try {
            Set queryNames = brokerService.getManagementContext().queryNames((ObjectName) null, (QueryExp) null);
            LOG.info("Total MBean count=" + queryNames.size());
            Iterator it = queryNames.iterator();
            while (it.hasNext()) {
                LOG.info(it.next());
            }
        } catch (Exception e) {
            LOG.warn("getMBeanServer ex: " + e);
        }
    }

    public NetworkConnector bridge(String str, String str2) throws Exception {
        NetworkConnector bridgeBrokers = bridgeBrokers(str, str2, this.dynamicOnly, -1, true);
        bridgeBrokers.setSuppressDuplicateQueueSubscriptions(true);
        bridgeBrokers.setDecreaseNetworkConsumerPriority(true);
        bridgeBrokers.setConsumerTTL(1);
        bridgeBrokers.setDuplex(true);
        return bridgeBrokers;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.JmsMultipleBrokersTestSupport
    public void startAllBrokers() throws Exception {
        this.brokers.get(HUB).broker.start();
        this.brokers.get(SPOKE).broker.start();
        sleep(DurableSubProcessConcurrentCommitActivateNoDuplicateTest.CARGO_SIZE);
    }

    @Override // org.apache.activemq.JmsMultipleBrokersTestSupport
    public void setUp() throws Exception {
        super.setAutoFail(false);
        super.setUp();
        createBrokers(true);
    }

    private void createBrokers(boolean z) throws Exception {
        String str = "?persistent=true&useJmx=true&deleteAllMessagesOnStartup=" + z;
        createBroker(new URI("broker:(tcp://localhost:61617)/HubBroker" + str));
        createBroker(new URI("broker:(tcp://localhost:61616)/SpokeBroker" + str));
    }

    @Override // org.apache.activemq.JmsMultipleBrokersTestSupport
    protected void configureBroker(BrokerService brokerService) {
        brokerService.setKeepDurableSubsActive(false);
        brokerService.getManagementContext().setCreateConnector(false);
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry policyEntry = new PolicyEntry();
        if (brokerService.getBrokerName().equals(HUB)) {
            policyEntry.setStoreUsageHighWaterMark(2);
            brokerService.getSystemUsage().getStoreUsage().setLimit(1048576L);
        }
        policyMap.setDefaultEntry(policyEntry);
        brokerService.setDestinationPolicy(policyMap);
        brokerService.getSystemUsage().getMemoryUsage().setLimit(104857600L);
    }

    @Override // org.apache.activemq.JmsMultipleBrokersTestSupport
    public void tearDown() throws Exception {
        super.tearDown();
    }

    private void sleep(int i) {
        try {
            Thread.sleep(i);
        } catch (InterruptedException e) {
        }
    }
}
