package org.apache.activemq.transport.failover;

import jakarta.jms.Connection;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.util.SocketProxy;
import org.apache.activemq.util.URISupport;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/transport/failover/FailoverReadInactivityBlockWriteTimeoutClientTest.class */
public class FailoverReadInactivityBlockWriteTimeoutClientTest extends JmsTestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(FailoverReadInactivityBlockWriteTimeoutClientTest.class);

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.JmsTestSupport
    public BrokerService createBroker() throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setDeleteAllMessagesOnStartup(true);
        KahaDBPersistenceAdapter kahaDBPersistenceAdapter = new KahaDBPersistenceAdapter();
        kahaDBPersistenceAdapter.setConcurrentStoreAndDispatchQueues(false);
        brokerService.setPersistenceAdapter(kahaDBPersistenceAdapter);
        brokerService.addConnector("tcp://localhost:0?wireFormat.maxInactivityDuration=0");
        return brokerService;
    }

    public void testBlockedFailoverSendWillReactToReadInactivityTimeout() throws Exception {
        final ActiveMQQueue activeMQQueue = new ActiveMQQueue("testClientWriteTimeout");
        this.messageTextPrefix = initMessagePrefix(81920);
        URI removeQuery = URISupport.removeQuery(((TransportConnector) this.broker.getTransportConnectors().get(0)).getConnectUri());
        LOG.info("consuming using uri: " + removeQuery);
        Connection createConnection = new ActiveMQConnectionFactory(removeQuery).createConnection();
        createConnection.start();
        MessageConsumer createConsumer = createConnection.createSession(false, 1).createConsumer(activeMQQueue);
        SocketProxy socketProxy = new SocketProxy();
        socketProxy.setTarget(removeQuery);
        socketProxy.open();
        final ActiveMQConnection createConnection2 = new ActiveMQConnectionFactory("failover:(" + socketProxy.getUrl() + "?wireFormat.maxInactivityDuration=5000&ignoreRemoteWireFormat=true)?jms.useAsyncSend=true&trackMessages=true&maxCacheSize=6638400").createConnection();
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        createConnection2.addTransportListener(new TransportListener() { // from class: org.apache.activemq.transport.failover.FailoverReadInactivityBlockWriteTimeoutClientTest.1
            public void onCommand(Object obj) {
            }

            public void onException(IOException iOException) {
                FailoverReadInactivityBlockWriteTimeoutClientTest.LOG.info("Got: " + iOException);
            }

            public void transportInterupted() {
                atomicInteger.incrementAndGet();
            }

            public void transportResumed() {
            }
        });
        createConnection2.start();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        Executors.newCachedThreadPool().execute(new Runnable() { // from class: org.apache.activemq.transport.failover.FailoverReadInactivityBlockWriteTimeoutClientTest.2
            @Override // java.lang.Runnable
            public void run() {
                try {
                    Session createSession = createConnection2.createSession(false, 1);
                    MessageProducer createProducer = createSession.createProducer(activeMQQueue);
                    for (int i = 0; i < 200; i++) {
                        createProducer.send(createSession.createTextMessage(FailoverReadInactivityBlockWriteTimeoutClientTest.this.messageTextPrefix + i));
                        countDownLatch.countDown();
                    }
                    createProducer.close();
                    createSession.close();
                    FailoverReadInactivityBlockWriteTimeoutClientTest.LOG.info("Done with send of: 200");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        countDownLatch.await(5L, TimeUnit.SECONDS);
        socketProxy.pause();
        assertTrue("Got interrupted", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.transport.failover.FailoverReadInactivityBlockWriteTimeoutClientTest.3
            public boolean isSatisified() throws Exception {
                return atomicInteger.get() > 0;
            }
        }));
        socketProxy.goOn();
        for (int i = 0; i < 200; i++) {
            assertNotNull("Got message " + i + " after reconnect", createConsumer.receive(5000L));
        }
        assertTrue("no pending messages when done", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.transport.failover.FailoverReadInactivityBlockWriteTimeoutClientTest.4
            public boolean isSatisified() throws Exception {
                FailoverReadInactivityBlockWriteTimeoutClientTest.LOG.info("current total message count: " + FailoverReadInactivityBlockWriteTimeoutClientTest.this.broker.getAdminView().getTotalMessageCount());
                return FailoverReadInactivityBlockWriteTimeoutClientTest.this.broker.getAdminView().getTotalMessageCount() == 0;
            }
        }));
    }

    private String initMessagePrefix(int i) {
        return new String(new byte[i]);
    }
}
