package org.apache.activemq.usecases;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URL;
import java.net.URLConnection;
import java.net.URLStreamHandler;
import java.net.URLStreamHandlerFactory;
import java.util.Iterator;
import java.util.Map;
import java.util.Vector;
import javax.jms.MessageProducer;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTempQueue;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.Wait;
import org.apache.activemq.xbean.XBeanBrokerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/usecases/RequestReplyNoAdvisoryNetworkTest.class */
public class RequestReplyNoAdvisoryNetworkTest extends JmsMultipleBrokersTestSupport {
    private static final transient Logger LOG = LoggerFactory.getLogger(RequestReplyNoAdvisoryNetworkTest.class);
    BrokerService a;
    BrokerService b;
    static final String connectionIdMarker = "ID:marker.";
    Vector<BrokerService> brokers = new Vector<>();
    ActiveMQQueue sendQ = new ActiveMQQueue("sendQ");
    ActiveMQTempQueue replyQWildcard = new ActiveMQTempQueue("ID:marker.>");
    private final long receiveTimeout = 30000;

    /* renamed from: org.apache.activemq.usecases.RequestReplyNoAdvisoryNetworkTest$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/activemq/usecases/RequestReplyNoAdvisoryNetworkTest$1.class */
    class AnonymousClass1 implements URLStreamHandlerFactory {
        final /* synthetic */ String val$xmlConfigString;

        AnonymousClass1(String str) {
            this.val$xmlConfigString = str;
        }

        @Override // java.net.URLStreamHandlerFactory
        public URLStreamHandler createURLStreamHandler(String str) {
            if ("inline".equalsIgnoreCase(str)) {
                return new URLStreamHandler() { // from class: org.apache.activemq.usecases.RequestReplyNoAdvisoryNetworkTest.1.1
                    @Override // java.net.URLStreamHandler
                    protected URLConnection openConnection(URL url) throws IOException {
                        return new URLConnection(url) { // from class: org.apache.activemq.usecases.RequestReplyNoAdvisoryNetworkTest.1.1.1
                            @Override // java.net.URLConnection
                            public void connect() throws IOException {
                            }

                            @Override // java.net.URLConnection
                            public InputStream getInputStream() throws IOException {
                                return new ByteArrayInputStream(AnonymousClass1.this.val$xmlConfigString.replace("%HOST%", this.url.getFile()).getBytes("UTF-8"));
                            }
                        };
                    }
                };
            }
            return null;
        }
    }

    public void testNonAdvisoryNetworkRequestReplyXmlConfig() throws Exception {
        URL.setURLStreamHandlerFactory(new AnonymousClass1(new String("<beans xmlns=\"http://www.springframework.org/schema/beans\" xmlns:amq=\"http://activemq.apache.org/schema/core\" xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xsi:schemaLocation=\"http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd\">  <broker xmlns=\"http://activemq.apache.org/schema/core\" id=\"broker\"    allowTempAutoCreationOnSend=\"true\" schedulePeriodForDestinationPurge=\"1000\"    brokerName=\"%HOST%\" persistent=\"false\" advisorySupport=\"false\" useJmx=\"false\" >   <destinationPolicy>    <policyMap>     <policyEntries>      <policyEntry optimizedDispatch=\"true\"  gcInactiveDestinations=\"true\" gcWithNetworkConsumers=\"true\" inactiveTimoutBeforeGC=\"1000\">       <destination>        <tempQueue physicalName=\"" + this.replyQWildcard.getPhysicalName() + "\"/>       </destination>      </policyEntry>     </policyEntries>    </policyMap>   </destinationPolicy>   <networkConnectors>    <networkConnector uri=\"multicast://default\">     <staticallyIncludedDestinations>      <queue physicalName=\"" + this.sendQ.getPhysicalName() + "\"/>      <tempQueue physicalName=\"" + this.replyQWildcard.getPhysicalName() + "\"/>     </staticallyIncludedDestinations>    </networkConnector>   </networkConnectors>   <transportConnectors>     <transportConnector uri=\"tcp://0.0.0.0:0\" discoveryUri=\"multicast://default\" />   </transportConnectors>  </broker></beans>")));
        this.a = new XBeanBrokerFactory().createBroker(new URI("xbean:inline:A"));
        this.b = new XBeanBrokerFactory().createBroker(new URI("xbean:inline:B"));
        this.brokers.add(this.a);
        this.brokers.add(this.b);
        doTestNonAdvisoryNetworkRequestReply();
    }

    public void testNonAdvisoryNetworkRequestReply() throws Exception {
        createBridgeAndStartBrokers();
        doTestNonAdvisoryNetworkRequestReply();
    }

    public void testNonAdvisoryNetworkRequestReplyWithPIM() throws Exception {
        this.a = configureBroker("A");
        this.b = configureBroker("B");
        BrokerService configureBroker = configureBroker("M");
        configureBroker.setAllowTempAutoCreationOnSend(true);
        configureForPiggyInTheMiddle(bridge(this.a, configureBroker));
        configureForPiggyInTheMiddle(bridge(this.b, configureBroker));
        startBrokers();
        waitForBridgeFormation(configureBroker, 2, 0);
        doTestNonAdvisoryNetworkRequestReply();
    }

    private void configureForPiggyInTheMiddle(NetworkConnector networkConnector) {
        networkConnector.setDuplex(true);
        networkConnector.setNetworkTTL(2);
    }

    public void doTestNonAdvisoryNetworkRequestReply() throws Exception {
        waitForBridgeFormation(this.a, 1, 0);
        waitForBridgeFormation(this.b, 1, 0);
        ActiveMQConnection createConnection = createConnection(createConnectionFactory(this.a));
        ActiveMQSession createSession = createConnection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(this.sendQ);
        ActiveMQTempQueue createTemporaryQueue = createSession.createTemporaryQueue();
        TextMessage createTextMessage = createSession.createTextMessage("1");
        createTextMessage.setJMSReplyTo(createTemporaryQueue);
        createProducer.send(createTextMessage);
        LOG.info("request sent");
        ActiveMQConnection createConnection2 = createConnection(createConnectionFactory(this.b));
        ActiveMQSession createSession2 = createConnection2.createSession(false, 1);
        TextMessage receive = createSession2.createConsumer(this.sendQ).receive(30000L);
        assertNotNull("got request from sender ok", receive);
        LOG.info("got request, sending reply");
        createSession2.createProducer(receive.getJMSReplyTo()).send(createSession2.createTextMessage("got " + receive.getText()));
        createConnection2.close();
        TextMessage receive2 = createSession.createConsumer(createTemporaryQueue).receive(30000L);
        assertNotNull("expected reply message", receive2);
        assertEquals("text is as expected", "got 1", receive2.getText());
        createConnection.close();
        LOG.info("checking for dangling temp destinations");
        Iterator<BrokerService> it = this.brokers.iterator();
        while (it.hasNext()) {
            final RegionBroker regionBroker = it.next().getRegionBroker();
            assertTrue("all temps are gone on " + regionBroker.getBrokerName(), Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.RequestReplyNoAdvisoryNetworkTest.2
                public boolean isSatisified() throws Exception {
                    Map destinationMap = regionBroker.getTempTopicRegion().getDestinationMap();
                    RequestReplyNoAdvisoryNetworkTest.LOG.info("temp topics on " + regionBroker.getBrokerName() + ", " + destinationMap);
                    Map destinationMap2 = regionBroker.getTempQueueRegion().getDestinationMap();
                    RequestReplyNoAdvisoryNetworkTest.LOG.info("temp queues on " + regionBroker.getBrokerName() + ", " + destinationMap2);
                    return destinationMap2.isEmpty() && destinationMap.isEmpty();
                }
            }));
        }
    }

    private ActiveMQConnection createConnection(ActiveMQConnectionFactory activeMQConnectionFactory) throws Exception {
        ActiveMQConnection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.start();
        return createConnection;
    }

    private ActiveMQConnectionFactory createConnectionFactory(BrokerService brokerService) throws Exception {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(((TransportConnector) brokerService.getTransportConnectors().get(0)).getPublishableConnectString());
        activeMQConnectionFactory.setWatchTopicAdvisories(false);
        activeMQConnectionFactory.setConnectionIDPrefix("ID:marker." + brokerService.getBrokerName());
        return activeMQConnectionFactory;
    }

    public void createBridgeAndStartBrokers() throws Exception {
        this.a = configureBroker("A");
        this.b = configureBroker("B");
        bridge(this.a, this.b);
        bridge(this.b, this.a);
        startBrokers();
    }

    private void startBrokers() throws Exception {
        Iterator<BrokerService> it = this.brokers.iterator();
        while (it.hasNext()) {
            it.next().start();
        }
    }

    @Override // org.apache.activemq.JmsMultipleBrokersTestSupport
    public void tearDown() throws Exception {
        Iterator<BrokerService> it = this.brokers.iterator();
        while (it.hasNext()) {
            it.next().stop();
        }
        this.brokers.clear();
    }

    private NetworkConnector bridge(BrokerService brokerService, BrokerService brokerService2) throws Exception {
        NetworkConnector addNetworkConnector = brokerService.addNetworkConnector("static://" + ((TransportConnector) brokerService2.getTransportConnectors().get(0)).getPublishableConnectString());
        addNetworkConnector.addStaticallyIncludedDestination(this.sendQ);
        addNetworkConnector.addStaticallyIncludedDestination(this.replyQWildcard);
        return addNetworkConnector;
    }

    private BrokerService configureBroker(String str) throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setBrokerName(str);
        brokerService.setAdvisorySupport(false);
        brokerService.setPersistent(false);
        brokerService.setUseJmx(false);
        brokerService.setSchedulePeriodForDestinationPurge(1000);
        brokerService.setAllowTempAutoCreationOnSend(true);
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setOptimizedDispatch(true);
        policyEntry.setGcInactiveDestinations(true);
        policyEntry.setGcWithNetworkConsumers(true);
        policyEntry.setInactiveTimoutBeforeGC(1000L);
        policyMap.put(this.replyQWildcard, policyEntry);
        brokerService.setDestinationPolicy(policyMap);
        brokerService.addConnector(JmsMultipleBrokersTestSupport.AUTO_ASSIGN_TRANSPORT);
        this.brokers.add(brokerService);
        return brokerService;
    }
}
