package org.apache.activemq;

import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.transport.nio.NIOSSLWindowSizeTest;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.util.DefaultTestAppender;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.LogEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/ProducerFlowControlTest.class */
public class ProducerFlowControlTest extends JmsTestSupport {
    static final Logger LOG = LoggerFactory.getLogger(ProducerFlowControlTest.class);
    protected TransportConnector connector;
    protected ActiveMQConnection connection;
    ActiveMQQueue queueA = new ActiveMQQueue("QUEUE.A");
    ActiveMQQueue queueB = new ActiveMQQueue("QUEUE.B");
    protected final AtomicBoolean gotResourceException = new AtomicBoolean(false);

    public void test2ndPubisherWithProducerWindowSendConnectionThatIsBlocked() throws Exception {
        ActiveMQConnectionFactory createConnectionFactory = createConnectionFactory();
        createConnectionFactory.setProducerWindowSize(NIOSSLWindowSizeTest.MESSAGE_SIZE);
        this.connection = createConnectionFactory.createConnection();
        this.connections.add(this.connection);
        this.connection.start();
        MessageConsumer createConsumer = this.connection.createSession(false, 2).createConsumer(this.queueB);
        fillQueue(this.queueA);
        assertTrue(asyncSendTo(this.queueB, "Message 1").await(2L, TimeUnit.SECONDS));
        TextMessage receive = createConsumer.receive();
        assertEquals("Message 1", receive.getText());
        receive.acknowledge();
        assertTrue(asyncSendTo(this.queueB, "Message 2").await(2L, TimeUnit.SECONDS));
        TextMessage receive2 = createConsumer.receive();
        assertEquals("Message 2", receive2.getText());
        receive2.acknowledge();
    }

    public void testPubisherRecoverAfterBlock() throws Exception {
        this.connection = createConnectionFactory().createConnection();
        this.connections.add(this.connection);
        this.connection.start();
        final Session createSession = this.connection.createSession(false, 2);
        final MessageProducer createProducer = createSession.createProducer(this.queueA);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        final AtomicBoolean atomicBoolean2 = new AtomicBoolean(true);
        new Thread("Filler") { // from class: org.apache.activemq.ProducerFlowControlTest.1
            int i;

            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                while (atomicBoolean2.get()) {
                    atomicBoolean.set(false);
                    try {
                        MessageProducer messageProducer = createProducer;
                        Session session = createSession;
                        int i = this.i + 1;
                        this.i = i;
                        messageProducer.send(session.createTextMessage("Test message " + i));
                        ProducerFlowControlTest.LOG.info("sent: " + this.i);
                    } catch (JMSException e) {
                    }
                }
            }
        }.start();
        waitForBlockedOrResourceLimit(atomicBoolean);
        MessageConsumer createConsumer = createSession.createConsumer(this.queueA);
        for (int i = 0; i < 5; i++) {
            TextMessage receive = createConsumer.receive(1000L);
            LOG.info("received: " + i + ", msg: " + receive.getJMSMessageID());
            receive.acknowledge();
        }
        Thread.sleep(1000L);
        atomicBoolean2.set(false);
        assertFalse("producer has resumed", atomicBoolean.get());
    }

    public void testAsyncPubisherRecoverAfterBlock() throws Exception {
        ActiveMQConnectionFactory createConnectionFactory = createConnectionFactory();
        createConnectionFactory.setProducerWindowSize(5120);
        createConnectionFactory.setUseAsyncSend(true);
        this.connection = createConnectionFactory.createConnection();
        this.connections.add(this.connection);
        this.connection.start();
        final Session createSession = this.connection.createSession(false, 2);
        final MessageProducer createProducer = createSession.createProducer(this.queueA);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        final AtomicBoolean atomicBoolean2 = new AtomicBoolean(true);
        new Thread("Filler") { // from class: org.apache.activemq.ProducerFlowControlTest.2
            int i;

            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                while (atomicBoolean2.get()) {
                    atomicBoolean.set(false);
                    try {
                        MessageProducer messageProducer = createProducer;
                        Session session = createSession;
                        int i = this.i + 1;
                        this.i = i;
                        messageProducer.send(session.createTextMessage("Test message " + i));
                        ProducerFlowControlTest.LOG.info("sent: " + this.i);
                    } catch (JMSException e) {
                    }
                }
            }
        }.start();
        waitForBlockedOrResourceLimit(atomicBoolean);
        MessageConsumer createConsumer = createSession.createConsumer(this.queueA);
        for (int i = 0; i < 5; i++) {
            TextMessage receive = createConsumer.receive(1000L);
            assertNotNull("Got a message", receive);
            LOG.info("received: " + i + ", msg: " + receive.getJMSMessageID());
            receive.acknowledge();
        }
        Thread.sleep(1000L);
        atomicBoolean2.set(false);
        assertFalse("producer has resumed", atomicBoolean.get());
    }

    public void test2ndPubisherWithSyncSendConnectionThatIsBlocked() throws Exception {
        ActiveMQConnectionFactory createConnectionFactory = createConnectionFactory();
        createConnectionFactory.setAlwaysSyncSend(true);
        this.connection = createConnectionFactory.createConnection();
        this.connections.add(this.connection);
        this.connection.start();
        MessageConsumer createConsumer = this.connection.createSession(false, 2).createConsumer(this.queueB);
        fillQueue(this.queueA);
        assertTrue(asyncSendTo(this.queueB, "Message 1").await(2L, TimeUnit.SECONDS));
        TextMessage receive = createConsumer.receive();
        assertEquals("Message 1", receive.getText());
        receive.acknowledge();
        assertTrue(asyncSendTo(this.queueB, "Message 2").await(2L, TimeUnit.SECONDS));
        TextMessage receive2 = createConsumer.receive();
        assertEquals("Message 2", receive2.getText());
        receive2.acknowledge();
    }

    public void testSimpleSendReceive() throws Exception {
        ActiveMQConnectionFactory createConnectionFactory = createConnectionFactory();
        createConnectionFactory.setAlwaysSyncSend(true);
        this.connection = createConnectionFactory.createConnection();
        this.connections.add(this.connection);
        this.connection.start();
        MessageConsumer createConsumer = this.connection.createSession(false, 2).createConsumer(this.queueA);
        assertTrue(asyncSendTo(this.queueA, "Message 1").await(2L, TimeUnit.SECONDS));
        TextMessage receive = createConsumer.receive();
        assertEquals("Message 1", receive.getText());
        receive.acknowledge();
        assertTrue(asyncSendTo(this.queueA, "Message 2").await(2L, TimeUnit.SECONDS));
        TextMessage receive2 = createConsumer.receive();
        assertEquals("Message 2", receive2.getText());
        receive2.acknowledge();
    }

    public void test2ndPubisherWithStandardConnectionThatIsBlocked() throws Exception {
        this.connection = createConnectionFactory().createConnection();
        this.connections.add(this.connection);
        this.connection.start();
        fillQueue(this.queueA);
        assertFalse(asyncSendTo(this.queueB, "Message 1").await(2L, TimeUnit.SECONDS));
    }

    public void testDisableWarning() throws Exception {
        final AtomicInteger atomicInteger = new AtomicInteger();
        final AtomicInteger atomicInteger2 = new AtomicInteger();
        DefaultTestAppender defaultTestAppender = new DefaultTestAppender() { // from class: org.apache.activemq.ProducerFlowControlTest.3
            public void append(LogEvent logEvent) {
                if (logEvent.getLevel().equals(Level.WARN) && logEvent.getMessage().getFormattedMessage().contains("Usage Manager Memory Limit")) {
                    ProducerFlowControlTest.LOG.info("received warn log message: " + logEvent.getMessage());
                    atomicInteger.incrementAndGet();
                }
                if (logEvent.getLevel().equals(Level.DEBUG) && logEvent.getMessage().getFormattedMessage().contains("Usage Manager Memory Limit")) {
                    ProducerFlowControlTest.LOG.info("received debug log message: " + logEvent.getMessage());
                    atomicInteger2.incrementAndGet();
                }
            }
        };
        defaultTestAppender.start();
        org.apache.logging.log4j.core.Logger logger = LogManager.getLogger(Queue.class);
        logger.addAppender(defaultTestAppender);
        logger.setLevel(Level.DEBUG);
        try {
            ConnectionFactory createConnectionFactory = createConnectionFactory();
            this.connection = createConnectionFactory.createConnection();
            this.connections.add(this.connection);
            this.connection.start();
            fillQueue(this.queueB);
            assertEquals(1, atomicInteger.get());
            this.broker.getDestinationPolicy().getDefaultEntry().setBlockedProducerWarningInterval(0L);
            atomicInteger.set(0);
            this.connection = createConnectionFactory.createConnection();
            this.connections.add(this.connection);
            this.connection.start();
            fillQueue(new ActiveMQQueue("SomeOtherQueueToPickUpNewPolicy"));
            assertEquals(0, atomicInteger.get());
            assertTrue(atomicInteger2.get() > 1);
            logger.removeAppender(defaultTestAppender);
        } catch (Throwable th) {
            logger.removeAppender(defaultTestAppender);
            throw th;
        }
    }

    /* JADX WARN: Type inference failed for: r0v2, types: [org.apache.activemq.ProducerFlowControlTest$4] */
    private void fillQueue(final ActiveMQQueue activeMQQueue) throws JMSException, InterruptedException {
        final AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        final AtomicBoolean atomicBoolean2 = new AtomicBoolean(true);
        new Thread("Fill thread.") { // from class: org.apache.activemq.ProducerFlowControlTest.4
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                Session session = null;
                try {
                    session = ProducerFlowControlTest.this.connection.createSession(false, 1);
                    MessageProducer createProducer = session.createProducer(activeMQQueue);
                    createProducer.setDeliveryMode(1);
                    while (atomicBoolean2.get()) {
                        atomicBoolean.set(false);
                        createProducer.send(session.createTextMessage("Hello World"));
                    }
                    ProducerFlowControlTest.this.safeClose(session);
                } catch (JMSException e) {
                    ProducerFlowControlTest.this.safeClose(session);
                } catch (Throwable th) {
                    ProducerFlowControlTest.this.safeClose(session);
                    throw th;
                }
            }
        }.start();
        waitForBlockedOrResourceLimit(atomicBoolean);
        atomicBoolean2.set(false);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void waitForBlockedOrResourceLimit(AtomicBoolean atomicBoolean) throws InterruptedException {
        while (true) {
            Thread.sleep(1000L);
            if (atomicBoolean.get() || this.gotResourceException.get()) {
                return;
            } else {
                atomicBoolean.set(true);
            }
        }
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [org.apache.activemq.ProducerFlowControlTest$5] */
    private CountDownLatch asyncSendTo(final ActiveMQQueue activeMQQueue, final String str) throws JMSException {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        new Thread("Send thread.") { // from class: org.apache.activemq.ProducerFlowControlTest.5
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                Session session = null;
                try {
                    session = ProducerFlowControlTest.this.connection.createSession(false, 1);
                    MessageProducer createProducer = session.createProducer(activeMQQueue);
                    createProducer.setDeliveryMode(1);
                    createProducer.send(session.createTextMessage(str));
                    countDownLatch.countDown();
                    ProducerFlowControlTest.this.safeClose(session);
                } catch (JMSException e) {
                    ProducerFlowControlTest.this.safeClose(session);
                } catch (Throwable th) {
                    ProducerFlowControlTest.this.safeClose(session);
                    throw th;
                }
            }
        }.start();
        return countDownLatch;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.JmsTestSupport
    public BrokerService createBroker() throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setPersistent(false);
        brokerService.setUseJmx(false);
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setMemoryLimit(1L);
        policyEntry.setPendingSubscriberPolicy(new VMPendingSubscriberMessageStoragePolicy());
        policyEntry.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy());
        policyEntry.setProducerFlowControl(true);
        policyMap.setDefaultEntry(policyEntry);
        brokerService.setDestinationPolicy(policyMap);
        this.connector = brokerService.addConnector(JmsMultipleBrokersTestSupport.AUTO_ASSIGN_TRANSPORT);
        return brokerService;
    }

    @Override // org.apache.activemq.JmsTestSupport
    public void setUp() throws Exception {
        setAutoFail(true);
        super.setUp();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.JmsTestSupport
    public void tearDown() throws Exception {
        Iterator<Connection> it = this.connections.iterator();
        while (it.hasNext()) {
            ActiveMQConnection activeMQConnection = (Connection) it.next();
            ((TcpTransport) activeMQConnection.getTransport().narrow(TcpTransport.class)).getTransportListener().onException(new IOException("Disposed."));
            activeMQConnection.getTransport().stop();
        }
        super.tearDown();
    }

    @Override // org.apache.activemq.JmsTestSupport
    protected ConnectionFactory createConnectionFactory() throws Exception {
        return new ActiveMQConnectionFactory(this.connector.getConnectUri());
    }
}
