package org.apache.activemq.usecases;

import java.util.Iterator;
import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.TransportConnection;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ConnectionControl;
import org.apache.activemq.perf.NetworkedSyncTest;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;

/* loaded from: input_file:org/apache/activemq/usecases/ClientRebalanceTest.class */
public class ClientRebalanceTest extends JmsMultipleBrokersTestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(ClientRebalanceTest.class);
    private static final String QUEUE_NAME = "Test.ClientRebalanceTest";

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.JmsMultipleBrokersTestSupport
    public void setUp() throws Exception {
        setAutoFail(true);
        super.setUp();
    }

    public void testRebalance() throws Exception {
        createBroker((Resource) new ClassPathResource("org/apache/activemq/usecases/rebalance-broker1.xml"));
        createBroker((Resource) new ClassPathResource("org/apache/activemq/usecases/rebalance-broker2.xml"));
        startAllBrokers();
        this.brokers.get("b1").broker.waitUntilStarted();
        LOG.info("Starting connection");
        Connection createConnection = new ActiveMQConnectionFactory("failover:(tcp://localhost:61616,tcp://localhost:61617)?randomize=false").createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        Queue createQueue = createSession.createQueue(QUEUE_NAME);
        MessageProducer createProducer = createSession.createProducer(createQueue);
        MessageConsumer createConsumer = createSession.createConsumer(createQueue);
        TextMessage createTextMessage = createSession.createTextMessage("Test message");
        createProducer.send(createTextMessage);
        assertNotNull(createConsumer.receive(2000L));
        createBroker((Resource) new ClassPathResource("org/apache/activemq/usecases/rebalance-broker3.xml"));
        this.brokers.get("b3").broker.waitUntilStarted();
        Thread.sleep(3000L);
        LOG.info("Stopping broker 1");
        this.brokers.get("b1").broker.stop();
        this.brokers.get("b1").broker.waitUntilStopped();
        Thread.sleep(3000L);
        createProducer.send(createTextMessage);
        assertNotNull(createConsumer.receive(2000L));
        LOG.info("Stopping broker 2");
        this.brokers.get("b2").broker.stop();
        this.brokers.get("b2").broker.waitUntilStopped();
        createProducer.send(createTextMessage);
        assertNotNull(createConsumer.receive(2000L));
    }

    public void testReconnect() throws Exception {
        createBroker((Resource) new ClassPathResource("org/apache/activemq/usecases/rebalance-broker1.xml"));
        createBroker((Resource) new ClassPathResource("org/apache/activemq/usecases/rebalance-broker2.xml"));
        startAllBrokers();
        this.brokers.get("b1").broker.waitUntilStarted();
        LOG.info("Starting connection");
        Connection createConnection = new ActiveMQConnectionFactory("failover:(tcp://localhost:61616)?randomize=false").createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        Queue createQueue = createSession.createQueue("Test.ClientReconnectTest");
        MessageProducer createProducer = createSession.createProducer(createQueue);
        MessageConsumer createConsumer = createSession.createConsumer(createQueue);
        TextMessage createTextMessage = createSession.createTextMessage("Test message");
        createProducer.send(createTextMessage);
        assertNotNull(createConsumer.receive(2000L));
        TransportConnector transportConnectorByName = this.brokers.get("b1").broker.getTransportConnectorByName("openwire");
        assertNotNull(transportConnectorByName);
        TransportConnection findClientFailoverTransportConnection = findClientFailoverTransportConnection(transportConnectorByName);
        assertNotNull(findClientFailoverTransportConnection);
        String connectionId = findClientFailoverTransportConnection.getConnectionId();
        String remoteAddress = findClientFailoverTransportConnection.getRemoteAddress();
        ConnectionControl connectionControl = new ConnectionControl();
        connectionControl.setReconnectTo(NetworkedSyncTest.broker1URL);
        findClientFailoverTransportConnection.dispatchSync(connectionControl);
        Thread.sleep(2000L);
        TransportConnection findClientFailoverTransportConnection2 = findClientFailoverTransportConnection(transportConnectorByName);
        assertNotNull(findClientFailoverTransportConnection2);
        assertEquals(connectionId, findClientFailoverTransportConnection2.getConnectionId());
        Assert.assertNotEquals(remoteAddress, findClientFailoverTransportConnection2.getRemoteAddress());
        createProducer.send(createTextMessage);
        assertNotNull(createConsumer.receive(2000L));
        createConnection.close();
    }

    protected TransportConnection findClientFailoverTransportConnection(TransportConnector transportConnector) {
        TransportConnection transportConnection = null;
        Iterator it = transportConnector.getConnections().iterator();
        while (it.hasNext()) {
            TransportConnection transportConnection2 = (TransportConnection) it.next();
            if (!transportConnection2.isNetworkConnection() && transportConnection2.isFaultTolerantConnection()) {
                transportConnection = transportConnection2;
            }
        }
        return transportConnection;
    }
}
