package org.apache.activemq.network;

import java.io.File;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.network.DynamicNetworkTestSupport;
import org.apache.activemq.usecases.DurableSubProcessWithRestartTest;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/activemq/network/CompositeConsumerNetworkBridgeTest.class */
public class CompositeConsumerNetworkBridgeTest extends DynamicNetworkTestSupport {
    private BrokerService broker1;
    private BrokerService broker2;
    private Session session1;
    private Session session2;
    private final DynamicNetworkTestSupport.FLOW flow;
    protected static final Logger LOG = LoggerFactory.getLogger(CompositeConsumerNetworkBridgeTest.class);
    private static final String testTopic1 = "test.composite.topic.1";
    private static final String testTopic2 = "test.composite.topic.2";
    private static final List<ActiveMQTopic> topics = List.of(new ActiveMQTopic(testTopic1), new ActiveMQTopic(testTopic2));
    private static final String testQueue1 = "test.composite.queue.1";
    private static final String testQueue2 = "test.composite.queue.2";
    private static final List<ActiveMQQueue> queues = List.of(new ActiveMQQueue(testQueue1), new ActiveMQQueue(testQueue2));

    @Parameterized.Parameters
    public static Collection<Object[]> data() {
        return Arrays.asList(new Object[]{DynamicNetworkTestSupport.FLOW.FORWARD}, new Object[]{DynamicNetworkTestSupport.FLOW.REVERSE});
    }

    public CompositeConsumerNetworkBridgeTest(DynamicNetworkTestSupport.FLOW flow) {
        this.flow = flow;
    }

    @After
    public void tearDown() throws Exception {
        doTearDown();
    }

    @Test
    public void testCompositeDurableSubscriber() throws Exception {
        setUp();
        ActiveMQTopic activeMQTopic = new ActiveMQTopic("test.composite.topic.1,test.composite.topic.2");
        TopicSubscriber createDurableSubscriber = this.session1.createDurableSubscriber(activeMQTopic, this.subName);
        assertConsumersCount(this.broker1, activeMQTopic, 1);
        assertConsumersCount(this.broker2, activeMQTopic, 0);
        assertNCDurableSubsCount(this.broker2, activeMQTopic, 0);
        for (ActiveMQTopic activeMQTopic2 : topics) {
            assertConsumersCount(this.broker2, activeMQTopic2, 1);
            assertNCDurableSubsCount(this.broker2, activeMQTopic2, 1);
        }
        assertCompositeMapCounts(1, 1);
        createDurableSubscriber.close();
        Thread.sleep(1000L);
        removeSubscription(this.broker1, this.subName);
        for (ActiveMQTopic activeMQTopic3 : topics) {
            assertConsumersCount(this.broker2, activeMQTopic3, 0);
            assertNCDurableSubsCount(this.broker2, activeMQTopic3, 0);
        }
        assertCompositeMapCounts(0, 0);
    }

    @Test
    public void testCompositeAndNormalDurableSub() throws Exception {
        setUp();
        ActiveMQTopic activeMQTopic = new ActiveMQTopic("test.composite.topic.1,test.composite.topic.2");
        TopicSubscriber createDurableSubscriber = this.session1.createDurableSubscriber(activeMQTopic, this.subName + "1");
        TopicSubscriber createDurableSubscriber2 = this.session1.createDurableSubscriber(topics.get(0), this.subName + "2");
        Iterator<ActiveMQTopic> it = topics.iterator();
        while (it.hasNext()) {
            assertNCDurableSubsCount(this.broker2, it.next(), 1);
        }
        assertNCDurableSubsCount(this.broker2, activeMQTopic, 0);
        assertCompositeMapCounts(1, 1);
        this.session2.createProducer(topics.get(0)).send(this.session2.createTextMessage("test"));
        Assert.assertNotNull(createDurableSubscriber.receive(1000L));
        Assert.assertNotNull(createDurableSubscriber2.receive(1000L));
        createDurableSubscriber.close();
        createDurableSubscriber2.close();
        Thread.sleep(1000L);
        removeSubscription(this.broker1, this.subName + "1");
        removeSubscription(this.broker1, this.subName + "2");
        assertCompositeMapCounts(0, 0);
    }

    @Test
    public void testTopicCompositeSubs() throws Exception {
        setUp();
        ActiveMQTopic activeMQTopic = new ActiveMQTopic("test.composite.topic.1,test.composite.topic.2");
        MessageConsumer createConsumer = this.session1.createConsumer(activeMQTopic);
        MessageConsumer createConsumer2 = this.session1.createConsumer(activeMQTopic);
        for (ActiveMQTopic activeMQTopic2 : topics) {
            assertConsumersCount(this.broker1, activeMQTopic2, 2);
            assertConsumersCount(this.broker2, activeMQTopic2, 1);
        }
        assertCompositeMapCounts(2, 0);
        this.session2.createProducer(topics.get(0)).send(this.session2.createTextMessage("test"));
        Assert.assertNotNull(createConsumer.receive(1000L));
        Assert.assertNotNull(createConsumer2.receive(1000L));
        createConsumer.close();
        createConsumer2.close();
        assertCompositeMapCounts(0, 0);
    }

    @Test
    public void testCompositeQueueSubs() throws Exception {
        setUp();
        ActiveMQQueue activeMQQueue = new ActiveMQQueue("test.composite.queue.1,test.composite.queue.2");
        MessageConsumer createConsumer = this.session1.createConsumer(activeMQQueue);
        MessageConsumer createConsumer2 = this.session1.createConsumer(activeMQQueue);
        Iterator<ActiveMQQueue> it = queues.iterator();
        while (it.hasNext()) {
            ActiveMQDestination activeMQDestination = (ActiveMQDestination) it.next();
            assertConsumersCount(this.broker1, activeMQDestination, 2);
            assertConsumersCount(this.broker2, activeMQDestination, 1);
        }
        assertCompositeMapCounts(2, 0);
        this.session2.createProducer(queues.get(0)).send(this.session2.createTextMessage("test"));
        Assert.assertTrue((createConsumer.receive(1000L) == null && createConsumer2.receive(1000L) == null) ? false : true);
        createConsumer.close();
        createConsumer2.close();
        assertCompositeMapCounts(0, 0);
    }

    @Test
    public void testCompositeAndNormalQueueSubs() throws Exception {
        setUp();
        MessageConsumer createConsumer = this.session1.createConsumer(new ActiveMQQueue("test.composite.queue.1,test.composite.queue.2"));
        MessageConsumer createConsumer2 = this.session1.createConsumer(new ActiveMQQueue(testQueue2));
        assertConsumersCount(this.broker1, (ActiveMQDestination) queues.get(0), 1);
        assertConsumersCount(this.broker1, (ActiveMQDestination) queues.get(1), 2);
        Iterator<ActiveMQQueue> it = queues.iterator();
        while (it.hasNext()) {
            assertConsumersCount(this.broker2, (ActiveMQDestination) it.next(), 1);
        }
        assertCompositeMapCounts(1, 0);
        this.session2.createProducer(queues.get(0)).send(this.session2.createTextMessage("test"));
        Assert.assertNotNull(createConsumer.receive(1000L));
        createConsumer.close();
        createConsumer2.close();
        assertCompositeMapCounts(0, 0);
    }

    @Test
    public void testCompositeTwoDurableSubscribers() throws Exception {
        setUp();
        ActiveMQTopic activeMQTopic = new ActiveMQTopic("test.composite.topic.1,test.composite.topic.2");
        TopicSubscriber createDurableSubscriber = this.session1.createDurableSubscriber(activeMQTopic, this.subName + "1");
        TopicSubscriber createDurableSubscriber2 = this.session1.createDurableSubscriber(activeMQTopic, this.subName + "2");
        assertConsumersCount(this.broker1, activeMQTopic, 2);
        assertConsumersCount(this.broker2, activeMQTopic, 0);
        assertNCDurableSubsCount(this.broker2, activeMQTopic, 0);
        for (ActiveMQTopic activeMQTopic2 : topics) {
            assertConsumersCount(this.broker2, activeMQTopic2, 1);
            assertNCDurableSubsCount(this.broker2, activeMQTopic2, 1);
        }
        assertCompositeMapCounts(2, 2);
        createDurableSubscriber.close();
        Thread.sleep(1000L);
        removeSubscription(this.broker1, this.subName + "1");
        for (ActiveMQTopic activeMQTopic3 : topics) {
            assertConsumersCount(this.broker2, activeMQTopic3, 1);
            assertNCDurableSubsCount(this.broker2, activeMQTopic3, 1);
        }
        createDurableSubscriber2.close();
        Thread.sleep(1000L);
        removeSubscription(this.broker1, this.subName + "2");
        for (ActiveMQTopic activeMQTopic4 : topics) {
            assertConsumersCount(this.broker2, activeMQTopic4, 0);
            assertNCDurableSubsCount(this.broker2, activeMQTopic4, 0);
        }
        assertCompositeMapCounts(0, 0);
    }

    private void setUp() throws Exception {
        doSetUp(this.tempFolder.newFolder(), this.tempFolder.newFolder());
    }

    protected void doSetUp(File file, File file2) throws Exception {
        doSetUpRemoteBroker(file2);
        doSetUpLocalBroker(file);
        Thread.sleep(1000L);
    }

    protected void doSetUpLocalBroker(File file) throws Exception {
        this.localBroker = createLocalBroker(file);
        this.localBroker.setDeleteAllMessagesOnStartup(true);
        this.localBroker.start();
        this.localBroker.waitUntilStarted();
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(this.localBroker.getVmConnectorURI());
        activeMQConnectionFactory.setAlwaysSyncSend(true);
        activeMQConnectionFactory.setDispatchAsync(false);
        this.localConnection = activeMQConnectionFactory.createConnection();
        this.localConnection.setClientID("clientId");
        this.localConnection.start();
        Wait.waitFor(() -> {
            return ((NetworkConnector) this.localBroker.getNetworkConnectors().get(0)).activeBridges().size() == 1;
        }, DurableSubProcessWithRestartTest.BROKER_RESTART, 500L);
        this.localSession = this.localConnection.createSession(false, 1);
        if (this.flow.equals(DynamicNetworkTestSupport.FLOW.FORWARD)) {
            this.broker1 = this.localBroker;
            this.session1 = this.localSession;
        } else {
            this.broker2 = this.localBroker;
            this.session2 = this.localSession;
        }
    }

    protected void doSetUpRemoteBroker(File file) throws Exception {
        this.remoteBroker = createRemoteBroker(file);
        this.remoteBroker.setDeleteAllMessagesOnStartup(true);
        this.remoteBroker.start();
        this.remoteBroker.waitUntilStarted();
        this.remoteConnection = new ActiveMQConnectionFactory(this.remoteBroker.getVmConnectorURI()).createConnection();
        this.remoteConnection.setClientID("clientId");
        this.remoteConnection.start();
        this.remoteSession = this.remoteConnection.createSession(false, 1);
        if (this.flow.equals(DynamicNetworkTestSupport.FLOW.FORWARD)) {
            this.broker2 = this.remoteBroker;
            this.session2 = this.remoteSession;
        } else {
            this.broker1 = this.remoteBroker;
            this.session1 = this.remoteSession;
        }
    }

    protected BrokerService createLocalBroker(File file) throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setMonitorConnectionSplits(true);
        brokerService.setDataDirectoryFile(file);
        brokerService.setBrokerName("localBroker");
        brokerService.addNetworkConnector(configureLocalNetworkConnector());
        brokerService.addConnector(JmsMultipleBrokersTestSupport.AUTO_ASSIGN_TRANSPORT);
        brokerService.setDestinations(new ActiveMQDestination[]{new ActiveMQTopic(testTopic1), new ActiveMQTopic(testTopic2), new ActiveMQQueue(testQueue1), new ActiveMQQueue(testQueue2)});
        return brokerService;
    }

    protected NetworkConnector configureLocalNetworkConnector() throws Exception {
        DiscoveryNetworkConnector discoveryNetworkConnector = new DiscoveryNetworkConnector(new URI("static:(" + ((TransportConnector) this.remoteBroker.getTransportConnectors().get(0)).getConnectUri() + ")"));
        discoveryNetworkConnector.setName("networkConnector");
        discoveryNetworkConnector.setDynamicOnly(false);
        discoveryNetworkConnector.setDecreaseNetworkConsumerPriority(false);
        discoveryNetworkConnector.setConduitSubscriptions(true);
        discoveryNetworkConnector.setDuplex(true);
        discoveryNetworkConnector.setStaticBridge(false);
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(List.of(new ActiveMQTopic("test.composite.topic.>"), new ActiveMQQueue("test.composite.queue.>")));
        discoveryNetworkConnector.setDynamicallyIncludedDestinations(arrayList);
        return discoveryNetworkConnector;
    }

    protected BrokerService createRemoteBroker(File file) throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setBrokerName("remoteBroker");
        brokerService.setUseJmx(false);
        brokerService.setDataDirectoryFile(file);
        brokerService.addConnector(JmsMultipleBrokersTestSupport.AUTO_ASSIGN_TRANSPORT);
        brokerService.setDestinations(new ActiveMQDestination[]{new ActiveMQTopic(testTopic1), new ActiveMQTopic(testTopic2), new ActiveMQQueue(testQueue1), new ActiveMQQueue(testQueue2)});
        return brokerService;
    }

    protected void assertCompositeMapCounts(int i, int i2) throws Exception {
        DurableConduitBridge findBridge = findBridge();
        Assert.assertTrue(Wait.waitFor(() -> {
            return i == findBridge.compositeConsumerIds.size();
        }, 5000L, 500L));
        Assert.assertTrue(Wait.waitFor(() -> {
            return i2 == findBridge.compositeSubscriptions.size();
        }, 5000L, 500L));
    }

    protected DurableConduitBridge findBridge() throws Exception {
        return this.flow.equals(DynamicNetworkTestSupport.FLOW.FORWARD) ? findBridge(this.remoteBroker) : findBridge(this.localBroker);
    }

    protected DurableConduitBridge findBridge(BrokerService brokerService) throws Exception {
        NetworkBridge findDuplexBridge;
        if (brokerService.getNetworkConnectors().size() > 0) {
            Assert.assertTrue(Wait.waitFor(() -> {
                return ((NetworkConnector) brokerService.getNetworkConnectors().get(0)).activeBridges().size() == 1;
            }, 5000L, 500L));
            findDuplexBridge = (NetworkBridge) ((NetworkConnector) brokerService.getNetworkConnectors().get(0)).activeBridges().iterator().next();
        } else {
            findDuplexBridge = findDuplexBridge(brokerService.getTransportConnectorByScheme("tcp"));
        }
        return (DurableConduitBridge) findDuplexBridge;
    }
}
