package org.apache.activemq.usecases;

import java.net.URI;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.util.MessageIdList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.class */
public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(ThreeBrokerQueueNetworkTest.class);
    protected static final int MESSAGE_COUNT = 100;
    private static final long MAX_WAIT_MILLIS = 10000;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest$Condition.class */
    public interface Condition {
        boolean isSatisified() throws Exception;
    }

    public void testABandBCbrokerNetwork() throws Exception {
        bridgeBrokers("BrokerA", "BrokerB");
        bridgeBrokers("BrokerB", "BrokerC");
        startAllBrokers();
        waitForBridgeFormation();
        ActiveMQDestination createDestination = createDestination("TEST.FOO", false);
        MessageConsumer createConsumer = createConsumer("BrokerC", createDestination);
        sendMessages("BrokerA", createDestination, 100);
        Thread.sleep(1000L);
        assertEquals(0, getConsumerMessages("BrokerC", createConsumer).getMessageCount());
    }

    public void testBAandBCbrokerNetwork() throws Exception {
        bridgeBrokers("BrokerB", "BrokerA");
        bridgeBrokers("BrokerB", "BrokerC");
        startAllBrokers();
        waitForBridgeFormation();
        ActiveMQDestination createDestination = createDestination("TEST.FOO", false);
        MessageConsumer createConsumer = createConsumer("BrokerA", createDestination);
        MessageConsumer createConsumer2 = createConsumer("BrokerC", createDestination);
        Thread.sleep(2000L);
        sendMessages("BrokerB", createDestination, 100);
        Thread.sleep(1000L);
        assertEquals(100, getConsumerMessages("BrokerA", createConsumer).getMessageCount() + getConsumerMessages("BrokerC", createConsumer2).getMessageCount());
    }

    public void testBAandBCbrokerNetworkWithSelectorsSendFirst() throws Exception {
        bridgeBrokers("BrokerB", "BrokerA", true, 1, false);
        bridgeBrokers("BrokerB", "BrokerC", true, 1, false);
        startAllBrokers();
        waitForBridgeFormation();
        ActiveMQDestination createDestination = createDestination("TEST.FOO", false);
        HashMap<String, Object> hashMap = new HashMap<>();
        hashMap.put("broker", "BROKER_A");
        sendMessages("BrokerB", createDestination, 100, hashMap);
        hashMap.clear();
        hashMap.put("broker", "BROKER_C");
        sendMessages("BrokerB", createDestination, 100, hashMap);
        MessageConsumer createConsumer = createConsumer("BrokerA", (Destination) createDestination, "broker = 'BROKER_A'");
        MessageConsumer createConsumer2 = createConsumer("BrokerC", (Destination) createDestination, "broker = 'BROKER_C'");
        Thread.sleep(2000L);
        MessageIdList consumerMessages = getConsumerMessages("BrokerA", createConsumer);
        MessageIdList consumerMessages2 = getConsumerMessages("BrokerC", createConsumer2);
        assertEquals(100, consumerMessages.getMessageCount());
        assertEquals(100, consumerMessages2.getMessageCount());
    }

    public void testBAandBCbrokerNetworkWithSelectorsSubscribeFirst() throws Exception {
        bridgeBrokers("BrokerB", "BrokerA", true, 1, false);
        bridgeBrokers("BrokerB", "BrokerC", true, 1, false);
        startAllBrokers();
        waitForBridgeFormation();
        ActiveMQDestination createDestination = createDestination("TEST.FOO", false);
        MessageConsumer createConsumer = createConsumer("BrokerA", (Destination) createDestination, "broker = 'BROKER_A'");
        MessageConsumer createConsumer2 = createConsumer("BrokerC", (Destination) createDestination, "broker = 'BROKER_C'");
        Thread.sleep(2000L);
        HashMap<String, Object> hashMap = new HashMap<>();
        hashMap.put("broker", "BROKER_A");
        sendMessages("BrokerB", createDestination, 100, hashMap);
        hashMap.clear();
        hashMap.put("broker", "BROKER_C");
        sendMessages("BrokerB", createDestination, 100, hashMap);
        Thread.sleep(1000L);
        MessageIdList consumerMessages = getConsumerMessages("BrokerA", createConsumer);
        MessageIdList consumerMessages2 = getConsumerMessages("BrokerC", createConsumer2);
        assertEquals(100, consumerMessages.getMessageCount());
        assertEquals(100, consumerMessages2.getMessageCount());
    }

    public void testABandCBbrokerNetwork() throws Exception {
        bridgeBrokers("BrokerA", "BrokerB");
        bridgeBrokers("BrokerC", "BrokerB");
        startAllBrokers();
        waitForBridgeFormation();
        ActiveMQDestination createDestination = createDestination("TEST.FOO", false);
        MessageConsumer createConsumer = createConsumer("BrokerB", createDestination);
        sendMessages("BrokerA", createDestination, 100);
        sendMessages("BrokerC", createDestination, 100);
        MessageIdList consumerMessages = getConsumerMessages("BrokerB", createConsumer);
        consumerMessages.waitForMessagesToArrive(200);
        assertEquals(200, consumerMessages.getMessageCount());
    }

    public void testAllConnectedBrokerNetwork() throws Exception {
        bridgeBrokers("BrokerA", "BrokerB");
        bridgeBrokers("BrokerB", "BrokerA");
        bridgeBrokers("BrokerB", "BrokerC");
        bridgeBrokers("BrokerC", "BrokerB");
        bridgeBrokers("BrokerA", "BrokerC");
        bridgeBrokers("BrokerC", "BrokerA");
        startAllBrokers();
        waitForBridgeFormation();
        ActiveMQDestination createDestination = createDestination("TEST.FOO", false);
        MessageConsumer createConsumer = createConsumer("BrokerA", createDestination);
        MessageConsumer createConsumer2 = createConsumer("BrokerB", createDestination);
        MessageConsumer createConsumer3 = createConsumer("BrokerC", createDestination);
        sendMessages("BrokerA", createDestination, 100);
        sendMessages("BrokerB", createDestination, 100);
        sendMessages("BrokerC", createDestination, 100);
        Thread.sleep(1000L);
        assertEquals(300, getConsumerMessages("BrokerA", createConsumer).getMessageCount() + getConsumerMessages("BrokerB", createConsumer2).getMessageCount() + getConsumerMessages("BrokerC", createConsumer3).getMessageCount());
    }

    public void testAllConnectedUsingMulticast() throws Exception {
        bridgeAllBrokers();
        startAllBrokers();
        waitForBridgeFormation();
        ActiveMQDestination createDestination = createDestination("TEST.FOO", false);
        MessageConsumer createConsumer = createConsumer("BrokerA", createDestination);
        MessageConsumer createConsumer2 = createConsumer("BrokerB", createDestination);
        MessageConsumer createConsumer3 = createConsumer("BrokerC", createDestination);
        sendMessages("BrokerA", createDestination, 100);
        sendMessages("BrokerB", createDestination, 100);
        sendMessages("BrokerC", createDestination, 100);
        Thread.sleep(1000L);
        final MessageIdList consumerMessages = getConsumerMessages("BrokerA", createConsumer);
        MessageIdList consumerMessages2 = getConsumerMessages("BrokerB", createConsumer2);
        MessageIdList consumerMessages3 = getConsumerMessages("BrokerC", createConsumer3);
        waitFor(new Condition() { // from class: org.apache.activemq.usecases.ThreeBrokerQueueNetworkTest.1
            @Override // org.apache.activemq.usecases.ThreeBrokerQueueNetworkTest.Condition
            public boolean isSatisified() {
                return consumerMessages.getMessageCount() == 100;
            }
        });
        assertEquals(300, consumerMessages.getMessageCount() + consumerMessages2.getMessageCount() + consumerMessages3.getMessageCount());
    }

    private void waitFor(Condition condition) throws Exception {
        long currentTimeMillis = System.currentTimeMillis() + 10000;
        while (!condition.isSatisified() && System.currentTimeMillis() < currentTimeMillis) {
            Thread.sleep(1000L);
        }
        if (System.currentTimeMillis() >= currentTimeMillis) {
            LOG.error("expired while waiting for condition " + condition);
        }
    }

    public void testAllConnectedUsingMulticastProducerConsumerOnA() throws Exception {
        bridgeAllBrokers("default", 3, false);
        startAllBrokers();
        waitForBridgeFormation();
        ActiveMQDestination createDestination = createDestination("TEST.FOO", false);
        CountDownLatch countDownLatch = new CountDownLatch(2000);
        MessageConsumer createConsumer = createConsumer("BrokerA", (Destination) createDestination, countDownLatch);
        Thread.sleep(1000L);
        sendMessages("BrokerA", createDestination, 2000);
        assertTrue(countDownLatch.await(30L, TimeUnit.SECONDS));
        assertEquals(2000, getConsumerMessages("BrokerA", createConsumer).getMessageCount());
    }

    public void testAllConnectedWithSpare() throws Exception {
        bridgeAllBrokers("default", 3, false);
        startAllBrokers();
        waitForBridgeFormation();
        ActiveMQDestination createDestination = createDestination("TEST.FOO", false);
        CountDownLatch countDownLatch = new CountDownLatch(2000);
        MessageConsumer createConsumer = createConsumer("BrokerA", (Destination) createDestination, countDownLatch);
        Thread.sleep(2000L);
        sendMessages("BrokerB", createDestination, 2000);
        assertTrue("messaged received within time limit", countDownLatch.await(30L, TimeUnit.SECONDS));
        assertEquals(2000, getConsumerMessages("BrokerA", createConsumer).getMessageCount());
    }

    public void XtestMigrateConsumerStuckMessages() throws Exception {
        bridgeAllBrokers("default", 3, false);
        startAllBrokers();
        waitForBridgeFormation();
        ActiveMQDestination createDestination = createDestination("TEST.FOO", false);
        LOG.info("Consumer on A");
        MessageConsumer createConsumer = createConsumer("BrokerA", createDestination);
        Thread.sleep(2000L);
        LOG.info("Consumer on B");
        CountDownLatch countDownLatch = new CountDownLatch(2000 / 2);
        MessageConsumer createConsumer2 = createConsumer("BrokerB", (Destination) createDestination, countDownLatch);
        Thread.sleep(2000L);
        LOG.info("Close consumer on A");
        createConsumer.close();
        Thread.sleep(2000L);
        LOG.info("Send to B");
        sendMessages("BrokerB", createDestination, 2000);
        assertTrue(countDownLatch.await(30L, TimeUnit.SECONDS));
        MessageIdList consumerMessages = getConsumerMessages("BrokerB", createConsumer2);
        Thread.sleep(500L);
        assertEquals(2000 / 2, consumerMessages.getMessageCount());
        CountDownLatch countDownLatch2 = new CountDownLatch(2000 / 2);
        MessageConsumer createConsumer3 = createConsumer("BrokerA", (Destination) createDestination, countDownLatch2);
        assertTrue(countDownLatch2.await(30L, TimeUnit.SECONDS));
        assertEquals(2000 / 2, getConsumerMessages("BrokerA", createConsumer3).getMessageCount());
    }

    public void testMigrateConsumer() throws Exception {
        bridgeAllBrokers("default", 3, true, true);
        startAllBrokers();
        waitForBridgeFormation();
        ActiveMQDestination createDestination = createDestination("TEST.FOO", false);
        LOG.info("Consumer on A");
        MessageConsumer createConsumer = createConsumer("BrokerA", createDestination);
        Thread.sleep(2000L);
        LOG.info("Consumer on B");
        CountDownLatch countDownLatch = new CountDownLatch(2000);
        MessageIdList consumerMessages = getConsumerMessages("BrokerB", createConsumer("BrokerB", (Destination) createDestination, countDownLatch));
        consumerMessages.setProcessingDelay(10L);
        Thread.sleep(2000L);
        LOG.info("Close consumer on A");
        createConsumer.close();
        Thread.sleep(2000L);
        LOG.info("Send to B");
        sendMessages("BrokerB", createDestination, 2000);
        assertTrue("messages are received within limit", countDownLatch.await(60L, TimeUnit.SECONDS));
        assertEquals(2000, consumerMessages.getMessageCount());
    }

    public void testNoDuplicateQueueSubs() throws Exception {
        bridgeAllBrokers("default", 3, true);
        startAllBrokers();
        waitForBridgeFormation();
        ActiveMQDestination createDestination = createDestination("TEST.FOO", false);
        MessageConsumer createConsumer = createConsumer("BrokerA", createDestination);
        Thread.sleep(2000L);
        Collection<JmsMultipleBrokersTestSupport.BrokerItem> values = this.brokers.values();
        Iterator<JmsMultipleBrokersTestSupport.BrokerItem> it = values.iterator();
        while (it.hasNext()) {
            verifyConsumerCount(it.next().broker, 1, createDestination);
        }
        createConsumer.close();
        Thread.sleep(2000L);
        Iterator<JmsMultipleBrokersTestSupport.BrokerItem> it2 = values.iterator();
        while (it2.hasNext()) {
            verifyConsumerCount(it2.next().broker, 0, createDestination);
        }
    }

    public void testNoDuplicateQueueSubsHasLowestPriority() throws Exception {
        bridgeAllBrokers("default", 3, true, true);
        ActiveMQDestination createDestination = createDestination("TEST.FOO", false);
        this.brokers.get("BrokerA").broker.setPlugins(new BrokerPlugin[]{new BrokerPlugin() { // from class: org.apache.activemq.usecases.ThreeBrokerQueueNetworkTest.2
            public Broker installPlugin(Broker broker) throws Exception {
                return new BrokerFilter(broker) { // from class: org.apache.activemq.usecases.ThreeBrokerQueueNetworkTest.2.1
                    final AtomicInteger count = new AtomicInteger();

                    public void preProcessDispatch(MessageDispatch messageDispatch) {
                        if (messageDispatch.getDestination().getPhysicalName().contains("ActiveMQ.Advisory.Consumer") && this.count.getAndIncrement() == 0) {
                            ThreeBrokerQueueNetworkTest.LOG.info("Sleeping on first advisory: " + messageDispatch);
                            try {
                                Thread.sleep(2000L);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                        super.postProcessDispatch(messageDispatch);
                    }
                };
            }
        }});
        startAllBrokers();
        waitForBridgeFormation();
        createConsumer("BrokerA", createDestination);
        Thread.sleep(5000L);
        Iterator<JmsMultipleBrokersTestSupport.BrokerItem> it = this.brokers.values().iterator();
        while (it.hasNext()) {
            BrokerService brokerService = it.next().broker;
            verifyConsumerCount(brokerService, 1, createDestination);
            if (!"BrokerA".equals(brokerService.getBrokerName())) {
                verifyConsumePriority(brokerService, (byte) -5, createDestination);
            }
        }
    }

    public void testDuplicateQueueSubs() throws Exception {
        createBroker("BrokerD");
        bridgeAllBrokers("default", 3, false);
        startAllBrokers();
        waitForBridgeFormation();
        ActiveMQDestination createDestination = createDestination("TEST.FOO", false);
        MessageConsumer createConsumer = createConsumer("BrokerA", createDestination);
        Thread.sleep(2000L);
        verifyConsumerCount(this.brokers.get("BrokerA").broker, 1, createDestination);
        Collection<JmsMultipleBrokersTestSupport.BrokerItem> values = this.brokers.values();
        Iterator<JmsMultipleBrokersTestSupport.BrokerItem> it = values.iterator();
        while (it.hasNext()) {
            BrokerService brokerService = it.next().broker;
            if (!"BrokerA".equals(brokerService.getBrokerName())) {
                verifyConsumerCount(brokerService, 3, createDestination);
                verifyConsumePriority(brokerService, (byte) 0, createDestination);
            }
        }
        createConsumer.close();
        Thread.sleep(2000L);
        Iterator<JmsMultipleBrokersTestSupport.BrokerItem> it2 = values.iterator();
        while (it2.hasNext()) {
            verifyConsumerCount(it2.next().broker, 0, createDestination);
        }
    }

    private void verifyConsumerCount(BrokerService brokerService, int i, final Destination destination) throws Exception {
        final RegionBroker regionBroker = brokerService.getRegionBroker();
        waitFor(new Condition() { // from class: org.apache.activemq.usecases.ThreeBrokerQueueNetworkTest.3
            @Override // org.apache.activemq.usecases.ThreeBrokerQueueNetworkTest.Condition
            public boolean isSatisified() throws Exception {
                return !regionBroker.getDestinations(ActiveMQDestination.transform(destination)).isEmpty();
            }
        });
        Queue queue = (Queue) regionBroker.getDestinations(ActiveMQDestination.transform(destination)).iterator().next();
        assertEquals("consumer count on " + brokerService.getBrokerName() + " matches for q: " + queue, i, queue.getConsumers().size());
    }

    private void verifyConsumePriority(BrokerService brokerService, byte b, Destination destination) throws Exception {
        Queue queue = (Queue) brokerService.getRegionBroker().getDestinations(ActiveMQDestination.transform(destination)).iterator().next();
        Iterator it = queue.getConsumers().iterator();
        while (it.hasNext()) {
            assertEquals("consumer on " + brokerService.getBrokerName() + " matches priority: " + queue, b, ((Subscription) it.next()).getConsumerInfo().getPriority());
        }
    }

    @Override // org.apache.activemq.JmsMultipleBrokersTestSupport
    public void setUp() throws Exception {
        super.setAutoFail(true);
        super.setUp();
        createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA?persistent=false&useJmx=false"));
        createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB?persistent=false&useJmx=false"));
        createBroker(new URI("broker:(tcp://localhost:61618)/BrokerC?persistent=false&useJmx=false"));
    }
}
