package org.apache.activemq.transport.failover;

import java.io.IOException;
import java.net.ServerSocket;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.JMSSecurityException;
import javax.jms.Queue;
import javax.jms.Session;
import javax.net.ServerSocketFactory;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.LinkStealingTest;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.transport.TransportListener;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/transport/failover/FailoverStateTrackingTest.class */
public class FailoverStateTrackingTest {
    private static final Logger LOG = LoggerFactory.getLogger(LinkStealingTest.class);
    private BrokerService brokerService;
    private int serverPort;
    private ActiveMQConnectionFactory cf;
    private String connectionURI;
    private ActiveMQConnection connection;
    private final AtomicLong consumerCounter = new AtomicLong();
    private final AtomicLong producerCounter = new AtomicLong();

    @Before
    public void setUp() throws Exception {
        this.serverPort = getProxyPort();
        createAuthenticatingBroker();
        this.connectionURI = "failover:(tcp://0.0.0.0:" + this.serverPort + ")?jms.watchTopicAdvisories=false";
        this.cf = new ActiveMQConnectionFactory(this.connectionURI);
        this.brokerService.start();
    }

    @After
    public void tearDown() throws Exception {
        if (this.connection != null) {
            try {
                this.connection.close();
            } catch (Exception e) {
            }
            this.connection = null;
        }
        if (this.brokerService != null) {
            this.brokerService.stop();
            this.brokerService = null;
        }
    }

    @Test
    public void testUnauthorizedConsumerIsNotRecreated() throws Exception {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final CountDownLatch countDownLatch2 = new CountDownLatch(1);
        this.connection = this.cf.createConnection();
        this.connection.addTransportListener(new TransportListener() { // from class: org.apache.activemq.transport.failover.FailoverStateTrackingTest.1
            public void transportResumed() {
                if (countDownLatch.getCount() == 0) {
                    countDownLatch2.countDown();
                }
            }

            public void transportInterupted() {
                countDownLatch.countDown();
            }

            public void onException(IOException iOException) {
            }

            public void onCommand(Object obj) {
            }
        });
        Session createSession = this.connection.createSession(false, 1);
        Queue createQueue = createSession.createQueue("testQueue");
        try {
            createSession.createConsumer(createQueue);
            Assert.fail("Should have failed to create this consumer");
        } catch (JMSSecurityException e) {
        }
        this.brokerService.stop();
        this.brokerService.waitUntilStopped();
        Assert.assertTrue("Connection should be interrupted", countDownLatch.await(10L, TimeUnit.SECONDS));
        createTrackingBroker();
        this.brokerService.start();
        Assert.assertTrue("Connection should be reconnected", countDownLatch2.await(10L, TimeUnit.SECONDS));
        try {
            createSession.createConsumer(createQueue);
        } catch (JMSSecurityException e2) {
            Assert.fail("Should have been able to create this consumer");
        }
        Assert.assertEquals(1L, this.consumerCounter.get());
    }

    @Test
    public void testUnauthorizedProducerIsNotRecreated() throws Exception {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final CountDownLatch countDownLatch2 = new CountDownLatch(1);
        this.connection = this.cf.createConnection();
        this.connection.addTransportListener(new TransportListener() { // from class: org.apache.activemq.transport.failover.FailoverStateTrackingTest.2
            public void transportResumed() {
                FailoverStateTrackingTest.LOG.debug("Connection restored");
                if (countDownLatch.getCount() == 0) {
                    countDownLatch2.countDown();
                }
            }

            public void transportInterupted() {
                FailoverStateTrackingTest.LOG.debug("Connection interrupted");
                countDownLatch.countDown();
            }

            public void onException(IOException iOException) {
            }

            public void onCommand(Object obj) {
            }
        });
        Session createSession = this.connection.createSession(false, 1);
        Queue createQueue = createSession.createQueue("testQueue");
        try {
            createSession.createProducer(createQueue);
            Assert.fail("Should have failed to create this producer");
        } catch (JMSSecurityException e) {
        }
        this.brokerService.stop();
        this.brokerService.waitUntilStopped();
        Assert.assertTrue("Connection should be interrupted", countDownLatch.await(10L, TimeUnit.SECONDS));
        createTrackingBroker();
        this.brokerService.start();
        Assert.assertTrue("Connection should be reconnected", countDownLatch2.await(10L, TimeUnit.SECONDS));
        try {
            createSession.createProducer(createQueue);
        } catch (JMSSecurityException e2) {
            Assert.fail("Should have been able to create this producer");
        }
        Assert.assertEquals(1L, this.producerCounter.get());
    }

    private void createAuthenticatingBroker() throws Exception {
        this.brokerService = new BrokerService();
        this.brokerService.setPersistent(false);
        this.brokerService.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() { // from class: org.apache.activemq.transport.failover.FailoverStateTrackingTest.3
            public Subscription addConsumer(ConnectionContext connectionContext, ConsumerInfo consumerInfo) throws Exception {
                throw new SecurityException();
            }

            public void addProducer(ConnectionContext connectionContext, ProducerInfo producerInfo) throws Exception {
                throw new SecurityException();
            }
        }});
        this.brokerService.addConnector("tcp://0.0.0.0:" + this.serverPort);
    }

    private void createTrackingBroker() throws Exception {
        this.consumerCounter.set(0L);
        this.producerCounter.set(0L);
        this.brokerService = new BrokerService();
        this.brokerService.setPersistent(false);
        this.brokerService.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() { // from class: org.apache.activemq.transport.failover.FailoverStateTrackingTest.4
            public Subscription addConsumer(ConnectionContext connectionContext, ConsumerInfo consumerInfo) throws Exception {
                FailoverStateTrackingTest.this.consumerCounter.incrementAndGet();
                return getNext().addConsumer(connectionContext, consumerInfo);
            }

            public void addProducer(ConnectionContext connectionContext, ProducerInfo producerInfo) throws Exception {
                FailoverStateTrackingTest.this.producerCounter.incrementAndGet();
                getNext().addProducer(connectionContext, producerInfo);
            }
        }});
        this.brokerService.addConnector("tcp://0.0.0.0:" + this.serverPort);
    }

    protected int getProxyPort() {
        int i = 61616;
        try {
            ServerSocket createServerSocket = ServerSocketFactory.getDefault().createServerSocket(0);
            try {
                i = createServerSocket.getLocalPort();
                if (createServerSocket != null) {
                    createServerSocket.close();
                }
            } finally {
            }
        } catch (IOException e) {
        }
        return i;
    }
}
