package org.apache.activemq.transport;

import jakarta.jms.Connection;
import jakarta.jms.JMSException;
import jakarta.jms.MessageConsumer;
import jakarta.jms.Session;
import java.net.Socket;
import java.net.SocketException;
import java.net.URI;
import java.util.concurrent.TimeUnit;
import junit.framework.Test;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.transport.stomp.StompConnection;
import org.apache.activemq.usecases.DurableSubProcessConcurrentCommitActivateNoDuplicateTest;
import org.apache.activemq.util.SocketProxy;
import org.apache.activemq.util.URISupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/transport/SoWriteTimeoutTest.class */
public class SoWriteTimeoutTest extends JmsTestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(SoWriteTimeoutTest.class);
    final int receiveBufferSize = 16384;
    public String brokerTransportScheme = "nio";

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.JmsTestSupport
    public BrokerService createBroker() throws Exception {
        BrokerService createBroker = super.createBroker();
        createBroker.setPersistent(true);
        createBroker.setDeleteAllMessagesOnStartup(true);
        KahaDBPersistenceAdapter kahaDBPersistenceAdapter = new KahaDBPersistenceAdapter();
        kahaDBPersistenceAdapter.setConcurrentStoreAndDispatchQueues(false);
        createBroker.setPersistenceAdapter(kahaDBPersistenceAdapter);
        createBroker.addConnector(this.brokerTransportScheme + "://localhost:0?wireFormat.maxInactivityDuration=0&transport.soWriteTimeout=1000&transport.sleep=1000");
        if ("nio".equals(this.brokerTransportScheme)) {
            createBroker.addConnector("stomp+" + this.brokerTransportScheme + "://localhost:0?transport.soWriteTimeout=1000&transport.sleep=1000&socketBufferSize=16384&trace=true");
        }
        return createBroker;
    }

    public void initCombosForTestWriteTimeout() {
        addCombinationValues("brokerTransportScheme", new Object[]{"tcp", "nio"});
    }

    public void testWriteTimeout() throws Exception {
        ActiveMQQueue activeMQQueue = new ActiveMQQueue("testWriteTimeout");
        this.messageTextPrefix = initMessagePrefix(8192);
        sendMessages(activeMQQueue, DurableSubProcessConcurrentCommitActivateNoDuplicateTest.SERVER_SLEEP);
        URI removeQuery = URISupport.removeQuery(((TransportConnector) this.broker.getTransportConnectors().get(0)).getConnectUri());
        LOG.info("consuming using uri: " + removeQuery);
        SocketProxy socketProxy = new SocketProxy();
        socketProxy.setTarget(removeQuery);
        socketProxy.setReceiveBufferSize(16384);
        socketProxy.open();
        Connection createConnection = new ActiveMQConnectionFactory(socketProxy.getUrl()).createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(true, 0);
        MessageConsumer createConsumer = createSession.createConsumer(activeMQQueue);
        socketProxy.pause();
        TimeUnit.SECONDS.sleep(10L);
        socketProxy.goOn();
        assertNotNull("can receive buffered messages", createConsumer.receive(500L));
        try {
            createSession.commit();
            fail("expect commit to fail as server has aborted writeTimeout connection");
        } catch (JMSException e) {
        }
    }

    public void testWriteTimeoutStompNio() throws Exception {
        ActiveMQQueue activeMQQueue = new ActiveMQQueue("testWriteTimeout");
        this.messageTextPrefix = initMessagePrefix(8192);
        sendMessages(activeMQQueue, DurableSubProcessConcurrentCommitActivateNoDuplicateTest.SERVER_SLEEP);
        URI removeQuery = URISupport.removeQuery(((TransportConnector) this.broker.getTransportConnectors().get(1)).getConnectUri());
        LOG.info("consuming using uri: " + removeQuery);
        SocketProxy socketProxy = new SocketProxy();
        socketProxy.setTarget(new URI("tcp://localhost:" + removeQuery.getPort()));
        socketProxy.setReceiveBufferSize(16384);
        socketProxy.open();
        StompConnection stompConnection = new StompConnection();
        stompConnection.open(new Socket("localhost", socketProxy.getUrl().getPort()));
        stompConnection.getStompSocket().setTcpNoDelay(true);
        stompConnection.sendFrame("CONNECT\nlogin:system\npasscode:manager\n\n��");
        assertTrue(stompConnection.receiveFrame().startsWith("CONNECTED"));
        stompConnection.sendFrame("SUBSCRIBE\ndestination:/queue/" + activeMQQueue.getQueueName() + "\nack:client\n\n��");
        socketProxy.pause();
        TimeUnit.SECONDS.sleep(1L);
        TimeUnit.SECONDS.sleep(10L);
        socketProxy.goOn();
        assertTrue(stompConnection.receiveFrame().startsWith("MESSAGE"));
        for (int i = 0; i < 200; i++) {
            try {
                stompConnection.send("/queue/" + activeMQQueue.getPhysicalName(), "ShouldBeDeadConnectionText" + i);
            } catch (SocketException e) {
                LOG.info("got exception on send after timeout: " + e);
                return;
            }
        }
        fail("expected send to fail with timeout out connection");
    }

    private String initMessagePrefix(int i) {
        return new String(new byte[i]);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.JmsTestSupport
    public void setUp() throws Exception {
        setAutoFail(true);
        super.setUp();
    }

    public static Test suite() {
        return suite(SoWriteTimeoutTest.class);
    }
}
