package org.apache.activemq.network;

import jakarta.jms.Connection;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.io.File;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import javax.net.ServerSocketFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.command.Response;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFilter;
import org.apache.activemq.transport.nio.NIOTransport;
import org.apache.activemq.transport.nio.NIOTransportFactory;
import org.apache.activemq.transport.tcp.TcpTransportFactory;
import org.apache.activemq.transport.tcp.TcpTransportServer;
import org.apache.activemq.wireformat.WireFormat;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/network/CheckDuplicateMessagesOnDuplexTest.class */
public class CheckDuplicateMessagesOnDuplexTest {
    private static final Logger log = LoggerFactory.getLogger(CheckDuplicateMessagesOnDuplexTest.class);
    private BrokerService localBroker;
    private BrokerService remoteBroker;
    private ActiveMQConnectionFactory localFactory;
    private ActiveMQConnectionFactory remoteFactory;
    private Session localSession;
    private MessageConsumer consumer;
    private Session remoteSession;
    private MessageProducer producer;
    private Connection remoteConnection;
    private Connection localConnection;
    private DebugTransportFilter debugTransportFilter;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/activemq/network/CheckDuplicateMessagesOnDuplexTest$DebugTransportFactory.class */
    public class DebugTransportFactory extends NIOTransportFactory {
        private DebugTransportFactory() {
        }

        protected TcpTransportServer createTcpTransportServer(URI uri, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
            return new DebugTransportServer(this, uri, serverSocketFactory);
        }
    }

    /* loaded from: input_file:org/apache/activemq/network/CheckDuplicateMessagesOnDuplexTest$DebugTransportFilter.class */
    private class DebugTransportFilter extends TransportFilter {
        boolean closeOnResponse;

        public DebugTransportFilter(Transport transport) {
            super(transport);
            this.closeOnResponse = false;
        }

        public void oneway(Object obj) throws IOException {
            if (!this.closeOnResponse || !(obj instanceof Response)) {
                super.oneway(obj);
                return;
            }
            this.closeOnResponse = false;
            CheckDuplicateMessagesOnDuplexTest.log.warn("\n\nclosing connection before response is sent\n\n");
            try {
                this.next.stop();
            } catch (Exception e) {
                CheckDuplicateMessagesOnDuplexTest.log.error("couldn't stop niotransport", e);
            }
        }
    }

    /* loaded from: input_file:org/apache/activemq/network/CheckDuplicateMessagesOnDuplexTest$DebugTransportServer.class */
    private class DebugTransportServer extends TcpTransportServer {
        public DebugTransportServer(TcpTransportFactory tcpTransportFactory, URI uri, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
            super(tcpTransportFactory, uri, serverSocketFactory);
        }

        protected Transport createTransport(Socket socket, WireFormat wireFormat) throws IOException {
            NIOTransport nIOTransport = new NIOTransport(wireFormat, socket);
            CheckDuplicateMessagesOnDuplexTest.this.debugTransportFilter = new DebugTransportFilter(nIOTransport);
            return CheckDuplicateMessagesOnDuplexTest.this.debugTransportFilter;
        }
    }

    @BeforeClass
    public static void setUpClass() {
    }

    @AfterClass
    public static void tearDownClass() {
    }

    @Before
    public void setUp() {
    }

    @After
    public void tearDown() {
    }

    @Test
    public void testConnectionLossBehaviorBeforeAckIsSent() throws Exception {
        createBrokers();
        this.localBroker.deleteAllMessages();
        this.remoteBroker.deleteAllMessages();
        startBrokers();
        openConnections();
        Thread.sleep(1000L);
        log.info("\n\n==============================================\nsend hello1\n");
        this.debugTransportFilter.closeOnResponse = true;
        this.producer.send(this.remoteSession.createTextMessage("hello1"));
        TextMessage receive = this.consumer.receive(30000L);
        Assert.assertNotNull("expected hello1", receive);
        Assert.assertEquals("hello1", receive.getText());
        Thread.sleep(1000L);
        log.info("\n\n------------------------------------------\nsend hello2\n");
        this.producer.send(this.remoteSession.createTextMessage("hello2"));
        TextMessage receive2 = this.consumer.receive(30000L);
        Assert.assertNotNull("expected hello2", receive2);
        Assert.assertEquals("hello2", receive2.getText());
        closeLocalConnection();
        Thread.sleep(1000L);
        log.info("\n\n------------------------------------------\nsend hello3\n");
        openLocalConnection();
        Thread.sleep(1000L);
        this.producer.send(this.remoteSession.createTextMessage("hello3"));
        TextMessage receive3 = this.consumer.receive(30000L);
        Assert.assertNotNull("expected hello3", receive3);
        Assert.assertEquals("hello3", receive3.getText());
        Thread.sleep(1000L);
        log.info("\n\n==============================================\n\n");
        closeConnections();
        stopBrokers();
        Thread.sleep(1000L);
        log.info("\n\n##############################################\n\n");
        createLocalBroker();
        startLocalBroker();
        openLocalConnection();
        Message receive4 = this.consumer.receive(1000L);
        closeLocalConnection();
        stopLocalBroker();
        Assert.assertNull(receive4);
    }

    private void createBrokers() throws Exception {
        createLocalBroker();
        createRemoteBroker();
    }

    private void createLocalBroker() throws Exception {
        this.localBroker = new BrokerService();
        this.localBroker.setBrokerName("LOCAL");
        this.localBroker.setUseJmx(true);
        this.localBroker.setSchedulePeriodForDestinationPurge(5000);
        ManagementContext managementContext = new ManagementContext();
        managementContext.setCreateConnector(false);
        this.localBroker.setManagementContext(managementContext);
        this.localBroker.setPersistenceAdapter(persistanceAdapterFactory("target/local"));
        ArrayList arrayList = new ArrayList();
        TransportConnector transportConnector = new TransportConnector(new DebugTransportFactory().doBind(URI.create("nio://127.0.0.1:23539")));
        transportConnector.setName("tc");
        transportConnector.setAuditNetworkProducers(true);
        arrayList.add(transportConnector);
        this.localBroker.setTransportConnectors(arrayList);
    }

    private void createRemoteBroker() throws Exception {
        this.remoteBroker = new BrokerService();
        this.remoteBroker.setBrokerName("REMOTE");
        this.remoteBroker.setUseJmx(true);
        this.remoteBroker.setSchedulePeriodForDestinationPurge(5000);
        ManagementContext managementContext = new ManagementContext();
        managementContext.setCreateConnector(false);
        this.remoteBroker.setManagementContext(managementContext);
        this.remoteBroker.setPersistenceAdapter(persistanceAdapterFactory("target/remote"));
        ArrayList arrayList = new ArrayList();
        DiscoveryNetworkConnector discoveryNetworkConnector = new DiscoveryNetworkConnector();
        discoveryNetworkConnector.setName("to local");
        discoveryNetworkConnector.setUri(URI.create("static://(tcp://127.0.0.1:23539?wireFormat.maxInactivityDuration=0)"));
        discoveryNetworkConnector.setDuplex(true);
        discoveryNetworkConnector.setAlwaysSyncSend(true);
        discoveryNetworkConnector.setDecreaseNetworkConsumerPriority(false);
        discoveryNetworkConnector.setPrefetchSize(1);
        discoveryNetworkConnector.setCheckDuplicateMessagesOnDuplex(true);
        arrayList.add(discoveryNetworkConnector);
        this.remoteBroker.setNetworkConnectors(arrayList);
    }

    private void startBrokers() throws Exception {
        startLocalBroker();
        startRemoteBroker();
    }

    private void startLocalBroker() throws Exception {
        this.localBroker.start();
        this.localBroker.waitUntilStarted();
    }

    private void startRemoteBroker() throws Exception {
        this.remoteBroker.start();
        this.remoteBroker.waitUntilStarted();
    }

    private void openConnections() throws JMSException {
        openLocalConnection();
        openRemoteConnection();
    }

    private void openLocalConnection() throws JMSException {
        this.localFactory = new ActiveMQConnectionFactory(this.localBroker.getVmConnectorURI());
        this.localConnection = this.localFactory.createConnection();
        this.localConnection.start();
        this.localSession = this.localConnection.createSession(false, 1);
        this.consumer = this.localSession.createConsumer(this.localSession.createQueue("testqueue"));
    }

    private void openRemoteConnection() throws JMSException {
        this.remoteFactory = new ActiveMQConnectionFactory(this.remoteBroker.getVmConnectorURI());
        this.remoteConnection = this.remoteFactory.createConnection();
        this.remoteConnection.start();
        this.remoteSession = this.remoteConnection.createSession(false, 1);
        this.producer = this.remoteSession.createProducer(this.remoteSession.createQueue("testqueue"));
    }

    private void closeConnections() throws JMSException {
        closeLocalConnection();
        closeRemoteConnection();
    }

    private void closeLocalConnection() throws JMSException {
        this.localConnection.close();
    }

    private void closeRemoteConnection() throws JMSException {
        this.remoteConnection.close();
    }

    private void stopBrokers() throws Exception {
        stopRemoteBroker();
        stopLocalBroker();
    }

    private void stopLocalBroker() throws Exception {
        this.localBroker.stop();
        this.localBroker.waitUntilStopped();
    }

    private void stopRemoteBroker() throws Exception {
        this.remoteBroker.stop();
        this.remoteBroker.waitUntilStopped();
    }

    private PersistenceAdapter persistanceAdapterFactory(String str) {
        return persistanceAdapterFactory_KahaDB(str);
    }

    private PersistenceAdapter persistanceAdapterFactory_KahaDB(String str) {
        KahaDBPersistenceAdapter kahaDBPersistenceAdapter = new KahaDBPersistenceAdapter();
        kahaDBPersistenceAdapter.setDirectory(new File(str));
        kahaDBPersistenceAdapter.setIgnoreMissingJournalfiles(true);
        kahaDBPersistenceAdapter.setCheckForCorruptJournalFiles(true);
        kahaDBPersistenceAdapter.setChecksumJournalFiles(true);
        return kahaDBPersistenceAdapter;
    }
}
