package org.apache.activemq.network;

import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.io.PrintStream;
import java.util.Arrays;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.Wait;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/network/DrainBridgeTest.class */
public class DrainBridgeTest {
    @Test
    public void testDrain() throws Exception {
        prepareBrokerWithMessages();
        final BrokerService prepareDrainTarget = prepareDrainTarget();
        final BrokerService brokerService = new BrokerService();
        brokerService.setBrokerName("HOST");
        NetworkConnector addNetworkConnector = brokerService.addNetworkConnector("static:(" + prepareDrainTarget.getTransportConnectorByScheme("tcp").getPublishableConnectString() + ")");
        addNetworkConnector.setStaticBridge(true);
        addNetworkConnector.setStaticallyIncludedDestinations(Arrays.asList(new ActiveMQQueue(">")));
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setExpireMessagesPeriod(0L);
        ConditionalNetworkBridgeFilterFactory conditionalNetworkBridgeFilterFactory = new ConditionalNetworkBridgeFilterFactory();
        conditionalNetworkBridgeFilterFactory.setReplayWhenNoConsumers(true);
        policyEntry.setNetworkBridgeFilterFactory(conditionalNetworkBridgeFilterFactory);
        policyMap.setDefaultEntry(policyEntry);
        brokerService.setDestinationPolicy(policyMap);
        brokerService.start();
        PrintStream printStream = System.out;
        long totalMessageCount = brokerService.getAdminView().getTotalMessageCount();
        prepareDrainTarget.getAdminView().getTotalMessageCount();
        printStream.println("Local count: " + totalMessageCount + ", target count:" + printStream);
        Assert.assertEquals("local messages", 22L, brokerService.getAdminView().getTotalMessageCount());
        Assert.assertEquals("no remote messages", 0L, prepareDrainTarget.getAdminView().getTotalMessageCount());
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.network.DrainBridgeTest.1
            public boolean isSatisified() throws Exception {
                PrintStream printStream2 = System.out;
                long totalMessageCount2 = brokerService.getAdminView().getTotalMessageCount();
                prepareDrainTarget.getAdminView().getTotalMessageCount();
                printStream2.println("Local count: " + totalMessageCount2 + ", target count:" + printStream2);
                return brokerService.getAdminView().getTotalMessageCount() == 0;
            }
        });
        Assert.assertEquals("no local messages", 0L, brokerService.getAdminView().getTotalMessageCount());
        Assert.assertEquals("remote messages", 22L, prepareDrainTarget.getAdminView().getTotalMessageCount());
        Assert.assertEquals("number of queues match", brokerService.getAdminView().getQueues().length, prepareDrainTarget.getAdminView().getQueues().length);
        brokerService.stop();
        prepareDrainTarget.stop();
    }

    private BrokerService prepareDrainTarget() throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setDeleteAllMessagesOnStartup(true);
        brokerService.setBrokerName("TARGET");
        brokerService.addConnector(JmsMultipleBrokersTestSupport.AUTO_ASSIGN_TRANSPORT);
        brokerService.start();
        return brokerService;
    }

    private void prepareBrokerWithMessages() throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setDeleteAllMessagesOnStartup(true);
        brokerService.setBrokerName("HOST");
        brokerService.start();
        Connection createConnection = new ActiveMQConnectionFactory(brokerService.getVmConnectorURI()).createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        TextMessage createTextMessage = createSession.createTextMessage("This is a message.");
        MessageProducer createProducer = createSession.createProducer((Destination) null);
        ActiveMQQueue activeMQQueue = new ActiveMQQueue("Q.Foo,Bar");
        for (int i = 0; i < 10; i++) {
            createProducer.send(activeMQQueue, createTextMessage);
        }
        createSession.createConsumer(new ActiveMQQueue("Consumer.A.VirtualTopic.Y"));
        createSession.createConsumer(new ActiveMQQueue("Consumer.B.VirtualTopic.Y"));
        createProducer.send(new ActiveMQTopic("VirtualTopic.Y"), createTextMessage);
        createConnection.close();
        brokerService.stop();
    }
}
