package org.apache.activemq.network.jms;

import java.util.ArrayList;
import java.util.Iterator;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.perf.NetworkedSyncTest;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/network/jms/TopicBridgeStandaloneReconnectTest.class */
public class TopicBridgeStandaloneReconnectTest {
    private static final Logger LOG = LoggerFactory.getLogger(TopicBridgeStandaloneReconnectTest.class);
    private SimpleJmsTopicConnector jmsTopicConnector;
    private BrokerService localBroker;
    private BrokerService foreignBroker;
    private ActiveMQConnectionFactory localConnectionFactory;
    private ActiveMQConnectionFactory foreignConnectionFactory;
    private Destination outbound;
    private Destination inbound;
    private ArrayList<Connection> connections = new ArrayList<>();

    @Test
    public void testSendAndReceiveOverConnectedBridges() throws Exception {
        startLocalBroker();
        startForeignBroker();
        this.jmsTopicConnector.start();
        final MessageConsumer createConsumerForLocalBroker = createConsumerForLocalBroker();
        final MessageConsumer createConsumerForForeignBroker = createConsumerForForeignBroker();
        sendMessageToForeignBroker("to.foreign.broker");
        sendMessageToLocalBroker("to.local.broker");
        Assert.assertTrue("Should have received a Message.", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.network.jms.TopicBridgeStandaloneReconnectTest.1
            public boolean isSatisified() throws Exception {
                TextMessage receive = createConsumerForLocalBroker.receive(100L);
                return receive != null && receive.getText().equals("to.local.broker");
            }
        }));
        Assert.assertTrue("Should have received a Message.", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.network.jms.TopicBridgeStandaloneReconnectTest.2
            public boolean isSatisified() throws Exception {
                TextMessage receive = createConsumerForForeignBroker.receive(100L);
                return receive != null && receive.getText().equals("to.foreign.broker");
            }
        }));
    }

    @Test
    public void testSendAndReceiveOverBridgeWhenStartedBeforeBrokers() throws Exception {
        this.jmsTopicConnector.start();
        startLocalBroker();
        startForeignBroker();
        Assert.assertTrue("Should have Connected.", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.network.jms.TopicBridgeStandaloneReconnectTest.3
            public boolean isSatisified() throws Exception {
                return TopicBridgeStandaloneReconnectTest.this.jmsTopicConnector.isConnected();
            }
        }));
        final MessageConsumer createConsumerForLocalBroker = createConsumerForLocalBroker();
        final MessageConsumer createConsumerForForeignBroker = createConsumerForForeignBroker();
        sendMessageToForeignBroker("to.foreign.broker");
        sendMessageToLocalBroker("to.local.broker");
        Assert.assertTrue("Should have received a Message.", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.network.jms.TopicBridgeStandaloneReconnectTest.4
            public boolean isSatisified() throws Exception {
                TextMessage receive = createConsumerForLocalBroker.receive(100L);
                return receive != null && receive.getText().equals("to.local.broker");
            }
        }));
        Assert.assertTrue("Should have received a Message.", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.network.jms.TopicBridgeStandaloneReconnectTest.5
            public boolean isSatisified() throws Exception {
                TextMessage receive = createConsumerForForeignBroker.receive(100L);
                return receive != null && receive.getText().equals("to.foreign.broker");
            }
        }));
    }

    @Test
    public void testSendAndReceiveOverBridgeWithRestart() throws Exception {
        startLocalBroker();
        startForeignBroker();
        this.jmsTopicConnector.start();
        Assert.assertTrue("Should have Connected.", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.network.jms.TopicBridgeStandaloneReconnectTest.6
            public boolean isSatisified() throws Exception {
                return TopicBridgeStandaloneReconnectTest.this.jmsTopicConnector.isConnected();
            }
        }));
        stopLocalBroker();
        stopForeignBroker();
        Assert.assertTrue("Should have detected connection drop.", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.network.jms.TopicBridgeStandaloneReconnectTest.7
            public boolean isSatisified() throws Exception {
                return !TopicBridgeStandaloneReconnectTest.this.jmsTopicConnector.isConnected();
            }
        }));
        startLocalBroker();
        startForeignBroker();
        Assert.assertTrue("Should have Re-Connected.", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.network.jms.TopicBridgeStandaloneReconnectTest.8
            public boolean isSatisified() throws Exception {
                return TopicBridgeStandaloneReconnectTest.this.jmsTopicConnector.isConnected();
            }
        }));
        final MessageConsumer createConsumerForLocalBroker = createConsumerForLocalBroker();
        final MessageConsumer createConsumerForForeignBroker = createConsumerForForeignBroker();
        sendMessageToForeignBroker("to.foreign.broker");
        sendMessageToLocalBroker("to.local.broker");
        Assert.assertTrue("Should have received a Message.", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.network.jms.TopicBridgeStandaloneReconnectTest.9
            public boolean isSatisified() throws Exception {
                TextMessage receive = createConsumerForLocalBroker.receive(100L);
                return receive != null && receive.getText().equals("to.local.broker");
            }
        }));
        Assert.assertTrue("Should have received a Message.", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.network.jms.TopicBridgeStandaloneReconnectTest.10
            public boolean isSatisified() throws Exception {
                TextMessage receive = createConsumerForForeignBroker.receive(100L);
                return receive != null && receive.getText().equals("to.foreign.broker");
            }
        }));
    }

    @Before
    public void setUp() throws Exception {
        this.localConnectionFactory = createLocalConnectionFactory();
        this.foreignConnectionFactory = createForeignConnectionFactory();
        this.outbound = new ActiveMQTopic("RECONNECT.TEST.OUT.TOPIC");
        this.inbound = new ActiveMQTopic("RECONNECT.TEST.IN.TOPIC");
        this.jmsTopicConnector = new SimpleJmsTopicConnector();
        this.jmsTopicConnector.setOutboundTopicBridges(new OutboundTopicBridge[]{new OutboundTopicBridge("RECONNECT.TEST.OUT.TOPIC")});
        this.jmsTopicConnector.setInboundTopicBridges(new InboundTopicBridge[]{new InboundTopicBridge("RECONNECT.TEST.IN.TOPIC")});
        this.jmsTopicConnector.setOutboundTopicConnectionFactory(new ActiveMQConnectionFactory("tcp://localhost:61617"));
        this.jmsTopicConnector.setLocalTopicConnectionFactory(new ActiveMQConnectionFactory(NetworkedSyncTest.broker1URL));
    }

    @After
    public void tearDown() throws Exception {
        disposeConsumerConnections();
        try {
            this.jmsTopicConnector.stop();
            this.jmsTopicConnector = null;
        } catch (Exception e) {
        }
        try {
            stopLocalBroker();
        } catch (Throwable th) {
        }
        try {
            stopForeignBroker();
        } catch (Throwable th2) {
        }
    }

    protected void disposeConsumerConnections() {
        Iterator<Connection> it = this.connections.iterator();
        while (it.hasNext()) {
            try {
                it.next().close();
            } catch (Throwable th) {
            }
        }
    }

    protected void startLocalBroker() throws Exception {
        if (this.localBroker == null) {
            this.localBroker = createFirstBroker();
            this.localBroker.start();
            this.localBroker.waitUntilStarted();
        }
    }

    protected void stopLocalBroker() throws Exception {
        if (this.localBroker != null) {
            this.localBroker.stop();
            this.localBroker.waitUntilStopped();
            this.localBroker = null;
        }
    }

    protected void startForeignBroker() throws Exception {
        if (this.foreignBroker == null) {
            this.foreignBroker = createSecondBroker();
            this.foreignBroker.start();
            this.foreignBroker.waitUntilStarted();
        }
    }

    protected void stopForeignBroker() throws Exception {
        if (this.foreignBroker != null) {
            this.foreignBroker.stop();
            this.foreignBroker.waitUntilStopped();
            this.foreignBroker = null;
        }
    }

    protected BrokerService createFirstBroker() throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setBrokerName("broker1");
        brokerService.setPersistent(false);
        brokerService.setUseJmx(false);
        brokerService.addConnector(NetworkedSyncTest.broker1URL);
        return brokerService;
    }

    protected BrokerService createSecondBroker() throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setBrokerName("broker2");
        brokerService.setPersistent(false);
        brokerService.setUseJmx(false);
        brokerService.addConnector("tcp://localhost:61617");
        return brokerService;
    }

    protected ActiveMQConnectionFactory createLocalConnectionFactory() {
        return new ActiveMQConnectionFactory(NetworkedSyncTest.broker1URL);
    }

    protected ActiveMQConnectionFactory createForeignConnectionFactory() {
        return new ActiveMQConnectionFactory("tcp://localhost:61617");
    }

    protected void sendMessageToForeignBroker(String str) throws JMSException {
        Connection connection = null;
        try {
            connection = this.localConnectionFactory.createConnection();
            Session createSession = connection.createSession(false, 1);
            MessageProducer createProducer = createSession.createProducer(this.outbound);
            TextMessage createTextMessage = createSession.createTextMessage();
            createTextMessage.setText(str);
            createProducer.send(createTextMessage);
            try {
                connection.close();
            } catch (Throwable th) {
            }
        } catch (Throwable th2) {
            try {
                connection.close();
            } catch (Throwable th3) {
            }
            throw th2;
        }
    }

    protected void sendMessageToLocalBroker(String str) throws JMSException {
        Connection connection = null;
        try {
            connection = this.foreignConnectionFactory.createConnection();
            Session createSession = connection.createSession(false, 1);
            MessageProducer createProducer = createSession.createProducer(this.inbound);
            TextMessage createTextMessage = createSession.createTextMessage();
            createTextMessage.setText(str);
            createProducer.send(createTextMessage);
            try {
                connection.close();
            } catch (Throwable th) {
            }
        } catch (Throwable th2) {
            try {
                connection.close();
            } catch (Throwable th3) {
            }
            throw th2;
        }
    }

    protected MessageConsumer createConsumerForLocalBroker() throws JMSException {
        Connection createConnection = this.localConnectionFactory.createConnection();
        this.connections.add(createConnection);
        createConnection.start();
        return createConnection.createSession(false, 1).createConsumer(this.inbound);
    }

    protected MessageConsumer createConsumerForForeignBroker() throws JMSException {
        Connection createConnection = this.foreignConnectionFactory.createConnection();
        this.connections.add(createConnection);
        createConnection.start();
        return createConnection.createSession(false, 1).createConsumer(this.outbound);
    }
}
