package org.apache.activemq.usecases;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/usecases/DispatchMultipleConsumersTest.class */
public class DispatchMultipleConsumersTest extends TestCase {
    private static final Logger logger = LoggerFactory.getLogger(DispatchMultipleConsumersTest.class);
    BrokerService broker;
    Destination dest;
    AtomicInteger sentCount;
    AtomicInteger consumedCount;
    CountDownLatch producerLatch;
    CountDownLatch consumerLatch;
    String brokerURL;
    String destinationName = "TEST.Q";
    String msgStr = "Test text message";
    int messagesPerThread = 20;
    int producerThreads = 50;
    int consumerCount = 2;
    String userName = "";
    String password = "";

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/activemq/usecases/DispatchMultipleConsumersTest$ConsumerThread.class */
    public class ConsumerThread extends Thread {
        Session session;
        MessageConsumer consumer;

        public ConsumerThread(Connection connection, String str) {
            setName(str);
            DispatchMultipleConsumersTest.logger.trace("Created new consumer thread:" + str);
            try {
                this.session = connection.createSession(false, 1);
                this.consumer = this.session.createConsumer(DispatchMultipleConsumersTest.this.dest);
                start();
            } catch (JMSException e) {
                DispatchMultipleConsumersTest.logger.error("Failed to start consumer thread:" + str, e);
            }
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            int i = 0;
            int i2 = 0;
            while (true) {
                try {
                    Message receive = this.consumer.receive(1000L);
                    if (receive != null) {
                        i2 = 0;
                        Thread.sleep(100L);
                        if (DispatchMultipleConsumersTest.logger.isTraceEnabled()) {
                            DispatchMultipleConsumersTest.logger.trace("Message received:" + receive.getJMSMessageID());
                        }
                        i++;
                    } else if (DispatchMultipleConsumersTest.this.producerLatch.getCount() <= 0) {
                        i2++;
                        if (i2 > 10) {
                            break;
                        }
                    }
                } catch (InterruptedException e) {
                    DispatchMultipleConsumersTest.logger.error("Interrupted!", e);
                } catch (JMSException e2) {
                    DispatchMultipleConsumersTest.logger.error("Failed to consume:", e2);
                }
            }
            try {
                this.consumer.close();
            } catch (JMSException e3) {
                DispatchMultipleConsumersTest.logger.error("Failed to close consumer " + getName(), e3);
            }
            DispatchMultipleConsumersTest.this.consumedCount.addAndGet(i);
            DispatchMultipleConsumersTest.this.consumerLatch.countDown();
            DispatchMultipleConsumersTest.logger.trace("Consumed " + i + " messages using thread " + getName());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/activemq/usecases/DispatchMultipleConsumersTest$ProducerThread.class */
    public class ProducerThread extends Thread {
        int count;
        Connection conn;
        Session session;
        MessageProducer producer;

        public ProducerThread(ActiveMQConnectionFactory activeMQConnectionFactory, int i, String str) {
            this.count = i;
            setName(str);
            DispatchMultipleConsumersTest.logger.trace("Created new producer thread:" + str);
            try {
                this.conn = activeMQConnectionFactory.createConnection();
                this.conn.start();
                this.session = this.conn.createSession(false, 1);
                this.producer = this.session.createProducer(DispatchMultipleConsumersTest.this.dest);
                start();
            } catch (JMSException e) {
                DispatchMultipleConsumersTest.logger.error("Failed to start producer thread:" + str, e);
            }
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            int i = 0;
            while (i < this.count) {
                try {
                    this.producer.send(this.session.createTextMessage(DispatchMultipleConsumersTest.this.msgStr));
                    Thread.sleep(500L);
                    i++;
                } catch (JMSException e) {
                    DispatchMultipleConsumersTest.logger.error(e.getMessage(), e);
                } catch (InterruptedException e2) {
                    DispatchMultipleConsumersTest.logger.error("Interrupted!", e2);
                }
            }
            this.conn.close();
            DispatchMultipleConsumersTest.this.sentCount.addAndGet(i);
            DispatchMultipleConsumersTest.this.producerLatch.countDown();
            if (DispatchMultipleConsumersTest.logger.isTraceEnabled()) {
                DispatchMultipleConsumersTest.logger.trace("Sent " + i + " messages from thread " + getName());
            }
        }
    }

    protected void setUp() throws Exception {
        super.setUp();
        this.broker = new BrokerService();
        this.broker.setPersistent(true);
        this.broker.setUseJmx(true);
        this.broker.deleteAllMessages();
        this.broker.addConnector(JmsMultipleBrokersTestSupport.AUTO_ASSIGN_TRANSPORT);
        this.broker.start();
        this.broker.waitUntilStarted();
        this.dest = new ActiveMQQueue(this.destinationName);
        resetCounters();
        this.brokerURL = ((TransportConnector) this.broker.getTransportConnectors().get(0)).getPublishableConnectString();
    }

    protected void tearDown() throws Exception {
        this.broker.stop();
        this.broker.waitUntilStopped();
        super.tearDown();
    }

    private void resetCounters() {
        this.sentCount = new AtomicInteger(0);
        this.consumedCount = new AtomicInteger(0);
        this.producerLatch = new CountDownLatch(this.producerThreads);
        this.consumerLatch = new CountDownLatch(this.consumerCount);
    }

    public void testDispatch1() {
        for (int i = 1; i <= 5; i++) {
            resetCounters();
            dispatch();
            assertEquals("Incorrect messages in Iteration " + i, this.sentCount.get(), this.consumedCount.get());
        }
    }

    private void dispatch() {
        startConsumers();
        startProducers();
        try {
            this.producerLatch.await();
            this.consumerLatch.await();
        } catch (InterruptedException e) {
            fail("test interrupted!");
        }
    }

    private void startConsumers() {
        try {
            Connection createConnection = new ActiveMQConnectionFactory(this.userName, this.password, this.brokerURL).createConnection();
            createConnection.start();
            for (int i = 0; i < this.consumerCount; i++) {
                new ConsumerThread(createConnection, "ConsumerThread" + i);
            }
        } catch (JMSException e) {
            logger.error("Failed to start consumers", e);
        }
    }

    private void startProducers() {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(this.userName, this.password, this.brokerURL);
        for (int i = 0; i < this.producerThreads; i++) {
            new ProducerThread(activeMQConnectionFactory, this.messagesPerThread, "ProducerThread" + i);
        }
    }
}
