package org.apache.activemq.usecases;

import jakarta.jms.Connection;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageListener;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TemporaryQueue;
import jakarta.jms.TextMessage;
import java.net.URI;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import junit.framework.TestCase;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.region.AbstractRegion;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/usecases/ThreeBrokerTempDestDemandSubscriptionCleanupTest.class */
public class ThreeBrokerTempDestDemandSubscriptionCleanupTest extends JmsMultipleBrokersTestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(ThreeBrokerTempDestDemandSubscriptionCleanupTest.class);
    boolean enableTempDestinationBridging = true;
    private static final String BROKER_A = "BrokerA";
    private static final String BROKER_B = "BrokerB";
    private static final String BROKER_C = "BrokerC";
    private static final String ECHO_QUEUE_NAME = "echo";
    private static final int NUM_ITER = 100;
    private static final long CONSUME_TIMEOUT = 500;

    public void testSubscriptionsCleanedUpRace() throws Exception {
        final JmsMultipleBrokersTestSupport.BrokerItem brokerItem = this.brokers.get(BROKER_A);
        Runnable runnable = new Runnable() { // from class: org.apache.activemq.usecases.ThreeBrokerTempDestDemandSubscriptionCleanupTest.1
            @Override // java.lang.Runnable
            public void run() {
                for (int i = 0; i < 100; i++) {
                    try {
                        Connection createConnection = brokerItem.createConnection();
                        createConnection.start();
                        Session createSession = createConnection.createSession(false, 1);
                        MessageProducer createProducer = createSession.createProducer(createSession.createQueue(ThreeBrokerTempDestDemandSubscriptionCleanupTest.ECHO_QUEUE_NAME));
                        ThreeBrokerTempDestDemandSubscriptionCleanupTest.LOG.info("Starting iter: " + i);
                        TemporaryQueue createTemporaryQueue = createSession.createTemporaryQueue();
                        MessageConsumer createConsumer = createSession.createConsumer(createTemporaryQueue);
                        TextMessage createTextMessage = createSession.createTextMessage("Iteration: " + i);
                        createTextMessage.setJMSReplyTo(createTemporaryQueue);
                        createProducer.send(createTextMessage);
                        TextMessage receive = createConsumer.receive(ThreeBrokerTempDestDemandSubscriptionCleanupTest.CONSUME_TIMEOUT);
                        TestCase.assertNotNull("We should have gotten a response, but didn't for iter: " + i, receive);
                        TestCase.assertEquals("We got the wrong response from the echo service", "Iteration: " + i, receive.getText());
                        createConsumer.close();
                        createConnection.close();
                    } catch (Exception e) {
                        e.printStackTrace();
                        TestCase.fail();
                    }
                }
            }
        };
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
        newFixedThreadPool.submit(runnable);
        newFixedThreadPool.submit(runnable);
        newFixedThreadPool.shutdown();
        assertTrue("executor done on time", newFixedThreadPool.awaitTermination(30L, TimeUnit.SECONDS));
        final AbstractRegion tempQueueRegion = this.brokers.get(BROKER_C).broker.getRegionBroker().getTempQueueRegion();
        assertTrue("There were no lingering temp-queue destinations", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.ThreeBrokerTempDestDemandSubscriptionCleanupTest.2
            public boolean isSatisified() throws Exception {
                ThreeBrokerTempDestDemandSubscriptionCleanupTest.LOG.info("Lingering temps: " + tempQueueRegion.getSubscriptions().size());
                return 0 == tempQueueRegion.getSubscriptions().size();
            }
        }));
    }

    public void testSubscriptionsCleanedUpAfterConnectionClose() throws Exception {
        JmsMultipleBrokersTestSupport.BrokerItem brokerItem = this.brokers.get(BROKER_A);
        for (int i = 0; i < 100; i++) {
            try {
                Connection createConnection = brokerItem.createConnection();
                createConnection.start();
                Session createSession = createConnection.createSession(false, 1);
                MessageProducer createProducer = createSession.createProducer(createSession.createQueue(ECHO_QUEUE_NAME));
                LOG.info("Starting iter: " + i);
                TemporaryQueue createTemporaryQueue = createSession.createTemporaryQueue();
                MessageConsumer createConsumer = createSession.createConsumer(createTemporaryQueue);
                TextMessage createTextMessage = createSession.createTextMessage("Iteration: " + i);
                createTextMessage.setJMSReplyTo(createTemporaryQueue);
                createProducer.send(createTextMessage);
                TextMessage receive = createConsumer.receive(CONSUME_TIMEOUT);
                assertNotNull("We should have gotten a response, but didn't for iter: " + i, receive);
                assertEquals("We got the wrong response from the echo service", "Iteration: " + i, receive.getText());
                createConnection.close();
            } catch (Exception e) {
                e.printStackTrace();
                fail();
            }
        }
        final AbstractRegion tempQueueRegion = this.brokers.get(BROKER_C).broker.getRegionBroker().getTempQueueRegion();
        assertTrue("There were no lingering temp-queue destinations", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.ThreeBrokerTempDestDemandSubscriptionCleanupTest.3
            public boolean isSatisified() throws Exception {
                ThreeBrokerTempDestDemandSubscriptionCleanupTest.LOG.info("Lingering temps: " + tempQueueRegion.getSubscriptions().size());
                return 0 == tempQueueRegion.getSubscriptions().size();
            }
        }));
    }

    private void installEchoClientOnBrokerC() throws Exception {
        Connection createConnection = this.brokers.get(BROKER_C).createConnection();
        createConnection.start();
        final Session createSession = createConnection.createSession(false, 1);
        createSession.createConsumer(createSession.createQueue(ECHO_QUEUE_NAME)).setMessageListener(new MessageListener() { // from class: org.apache.activemq.usecases.ThreeBrokerTempDestDemandSubscriptionCleanupTest.4
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    MessageProducer createProducer = createSession.createProducer(message.getJMSReplyTo());
                    TextMessage createTextMessage = createSession.createTextMessage(textMessage.getText());
                    ThreeBrokerTempDestDemandSubscriptionCleanupTest.LOG.info("Replying to this request: " + textMessage.getText());
                    createProducer.send(createTextMessage);
                    createProducer.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                    TestCase.fail("Could not respond to an echo request");
                }
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.JmsMultipleBrokersTestSupport
    public void setUp() throws Exception {
        super.setUp();
        createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA?persistent=false&useJmx=false"));
        createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB?persistent=false&useJmx=false"));
        createBroker(new URI("broker:(tcp://localhost:61618)/BrokerC?persistent=false&useJmx=false"));
        bridgeBrokers(BROKER_A, BROKER_B, false, 3);
        bridgeBrokers(BROKER_B, BROKER_C, false, 3);
        startAllBrokers();
        installEchoClientOnBrokerC();
    }

    protected NetworkConnector bridgeBrokers(String str, String str2, boolean z, int i) throws Exception {
        NetworkConnector bridgeBrokers = super.bridgeBrokers(str, str2, z, i, true);
        bridgeBrokers.setBridgeTempDestinations(this.enableTempDestinationBridging);
        bridgeBrokers.setDuplex(true);
        return bridgeBrokers;
    }
}
