package org.apache.activemq.usecases;

import jakarta.jms.Connection;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageListener;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.net.URI;
import java.util.List;
import junit.framework.Test;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.SocketProxy;
import org.apache.activemq.util.Wait;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:org/apache/activemq/usecases/DurableSubscriberWithNetworkDisconnectTest.class */
public class DurableSubscriberWithNetworkDisconnectTest extends JmsMultipleBrokersTestSupport {
    private static final Log LOG = LogFactory.getLog(DurableSubscriberWithNetworkDisconnectTest.class);
    private static final int NETWORK_DOWN_TIME = 10000;
    private static final String HUB = "HubBroker";
    private static final String SPOKE = "SpokeBroker";
    private SocketProxy socketProxy;
    private long networkDownTimeStart;
    protected static final int MESSAGE_COUNT = 200;
    public boolean simulateStalledNetwork;
    public boolean exponentialBackOff;
    private long inactiveDuration = 1000;
    private long receivedMsgs = 0;
    private boolean useSocketProxy = true;
    public boolean useDuplexNetworkBridge = true;
    public boolean dynamicOnly = true;
    public long networkTTL = 3;
    public boolean failover = false;
    public boolean inactivity = true;

    public void initCombosForTestSendOnAReceiveOnBWithTransportDisconnect() {
        addCombinationValues("failover", new Object[]{Boolean.FALSE, Boolean.TRUE});
    }

    public void testSendOnAReceiveOnBWithTransportDisconnect() throws Exception {
        bridgeBrokers(SPOKE, HUB);
        startAllBrokers();
        URI vmConnectorURI = this.brokers.get(HUB).broker.getVmConnectorURI();
        URI vmConnectorURI2 = this.brokers.get(SPOKE).broker.getVmConnectorURI();
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(vmConnectorURI);
        ActiveMQConnectionFactory activeMQConnectionFactory2 = new ActiveMQConnectionFactory(vmConnectorURI2);
        Connection createConnection = activeMQConnectionFactory.createConnection();
        Connection createConnection2 = activeMQConnectionFactory2.createConnection();
        createConnection.setClientID("clientHUB");
        createConnection2.setClientID("clientSPOKE");
        createConnection.start();
        createConnection2.start();
        Session createSession = createConnection.createSession(false, 1);
        Session createSession2 = createConnection2.createSession(false, 1);
        ActiveMQTopic activeMQTopic = new ActiveMQTopic("TEST.FOO");
        createSession2.createDurableSubscriber(activeMQTopic, "consumerName").setMessageListener(new MessageListener() { // from class: org.apache.activemq.usecases.DurableSubscriberWithNetworkDisconnectTest.1
            public void onMessage(Message message) {
                try {
                    DurableSubscriberWithNetworkDisconnectTest.this.receivedMsgs++;
                    Log log = DurableSubscriberWithNetworkDisconnectTest.LOG;
                    long j = DurableSubscriberWithNetworkDisconnectTest.this.receivedMsgs;
                    ((TextMessage) message).getText();
                    log.info("Received messages (" + j + "): " + log);
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        sleep(1000);
        MessageProducer createProducer = createSession.createProducer(activeMQTopic);
        createProducer.setDeliveryMode(2);
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            sleep(50);
            if (i == 50 || i == 150) {
                if (this.simulateStalledNetwork) {
                    this.socketProxy.pause();
                } else {
                    this.socketProxy.close();
                }
                this.networkDownTimeStart = System.currentTimeMillis();
            } else if (this.networkDownTimeStart > 0) {
                sleep(10000);
                this.networkDownTimeStart = 0L;
                if (this.simulateStalledNetwork) {
                    this.socketProxy.goOn();
                } else {
                    this.socketProxy.reopen();
                }
            } else {
                sleep(DurableSubProcessConcurrentCommitActivateNoDuplicateTest.SERVER_SLEEP);
            }
            createProducer.send(createSession.createTextMessage("test-" + i));
        }
        LOG.info("waiting for messages to flow");
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.DurableSubscriberWithNetworkDisconnectTest.2
            public boolean isSatisified() throws Exception {
                return DurableSubscriberWithNetworkDisconnectTest.this.receivedMsgs >= 200;
            }
        });
        assertTrue("At least message 200 must be received, count=" + this.receivedMsgs, 200 <= this.receivedMsgs);
        this.brokers.get(HUB).broker.deleteAllMessages();
        this.brokers.get(SPOKE).broker.deleteAllMessages();
        createConnection.close();
        createConnection2.close();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.JmsMultipleBrokersTestSupport
    public void startAllBrokers() throws Exception {
        this.brokers.get(HUB).broker.start();
        this.brokers.get(SPOKE).broker.start();
        sleep(DurableSubProcessConcurrentCommitActivateNoDuplicateTest.CARGO_SIZE);
    }

    @Override // org.apache.activemq.JmsMultipleBrokersTestSupport
    public void setUp() throws Exception {
        this.networkDownTimeStart = 0L;
        this.inactiveDuration = 1000L;
        this.useSocketProxy = true;
        this.receivedMsgs = 0L;
        super.setAutoFail(true);
        super.setUp();
        createBroker(new URI("broker:(tcp://localhost:61617)/HubBroker?persistent=true&useJmx=false&deleteAllMessagesOnStartup=true"));
        createBroker(new URI("broker:(tcp://localhost:61616)/SpokeBroker?persistent=true&useJmx=false&deleteAllMessagesOnStartup=true"));
    }

    @Override // org.apache.activemq.JmsMultipleBrokersTestSupport
    public void tearDown() throws Exception {
        super.tearDown();
        if (this.socketProxy != null) {
            this.socketProxy.close();
        }
    }

    public static Test suite() {
        return suite(DurableSubscriberWithNetworkDisconnectTest.class);
    }

    private void sleep(int i) {
        try {
            Thread.sleep(i);
        } catch (InterruptedException e) {
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.JmsMultipleBrokersTestSupport
    public NetworkConnector bridgeBrokers(BrokerService brokerService, BrokerService brokerService2, boolean z, int i, boolean z2, boolean z3) throws Exception {
        String str;
        List transportConnectors = brokerService2.getTransportConnectors();
        if (transportConnectors.isEmpty()) {
            throw new Exception("Remote broker has no registered connectors.");
        }
        URI connectUri = ((TransportConnector) transportConnectors.get(0)).getConnectUri();
        if (this.useSocketProxy) {
            this.socketProxy = new SocketProxy(connectUri);
            connectUri = this.socketProxy.getUrl();
        }
        String str2 = this.failover ? "static:(failover:(" + connectUri : "static:(" + connectUri;
        if (this.inactivity) {
            String str3 = str2;
            long j = this.inactiveDuration;
            long j2 = this.inactiveDuration;
            str = str3 + "?wireFormat.maxInactivityDuration=" + j + "&wireFormat.maxInactivityDurationInitalDelay=" + str3 + ")";
        } else {
            str = str2 + ")";
        }
        if (this.failover) {
            str = str + "?maxReconnectAttempts=0)";
        }
        DiscoveryNetworkConnector discoveryNetworkConnector = new DiscoveryNetworkConnector(new URI(str + "?useExponentialBackOff=" + this.exponentialBackOff));
        discoveryNetworkConnector.setDynamicOnly(this.dynamicOnly);
        discoveryNetworkConnector.setNetworkTTL(i);
        brokerService.addNetworkConnector(discoveryNetworkConnector);
        maxSetupTime = 2000;
        if (this.useDuplexNetworkBridge) {
            discoveryNetworkConnector.setDuplex(true);
        }
        return discoveryNetworkConnector;
    }
}
