package org.apache.activemq.usecases;

import java.util.concurrent.CountDownLatch;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/usecases/MessageGroupNewConsumerTest.class */
public class MessageGroupNewConsumerTest extends TestCase {
    private Connection connection;
    private CountDownLatch latchMessagesCreated = new CountDownLatch(1);
    private CountDownLatch latchGroupsAcquired = new CountDownLatch(1);
    private int messagesSent;
    private int messagesRecvd1;
    private int messagesRecvd2;
    private static final String connStr = "vm://localhost?broker.persistent=false&broker.useJmx=false&jms.prefetchPolicy.all=1";
    private static final Logger LOG = LoggerFactory.getLogger(MessageGroupNewConsumerTest.class);
    private static final String[] groupNames = {"GrA", "GrB", "GrC"};

    public void testNewConsumer() throws JMSException, InterruptedException {
        this.connection = new ActiveMQConnectionFactory(connStr).createConnection();
        this.connection.start();
        final String simpleName = getClass().getSimpleName();
        Thread thread = new Thread() { // from class: org.apache.activemq.usecases.MessageGroupNewConsumerTest.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    Session createSession = MessageGroupNewConsumerTest.this.connection.createSession(true, 0);
                    MessageProducer createProducer = createSession.createProducer(createSession.createQueue(simpleName));
                    for (int i = 0; i < 10; i++) {
                        for (String str : MessageGroupNewConsumerTest.groupNames) {
                            createProducer.send(MessageGroupNewConsumerTest.this.generateMessage(createSession, str, i + 1));
                            createSession.commit();
                            MessageGroupNewConsumerTest.this.messagesSent++;
                        }
                        MessageGroupNewConsumerTest.LOG.info("Sent message seq " + (i + 1));
                        if (i == 0) {
                            MessageGroupNewConsumerTest.this.latchMessagesCreated.countDown();
                        }
                        if (i == 2) {
                            MessageGroupNewConsumerTest.LOG.info("Prod: Waiting for groups");
                            MessageGroupNewConsumerTest.this.latchGroupsAcquired.await();
                        }
                        Thread.sleep(20L);
                    }
                    MessageGroupNewConsumerTest.LOG.info(MessageGroupNewConsumerTest.this.messagesSent + " messages sent");
                    createProducer.close();
                    createSession.close();
                } catch (Exception e) {
                    MessageGroupNewConsumerTest.LOG.error("Producer failed", e);
                }
            }
        };
        final Thread thread2 = new Thread() { // from class: org.apache.activemq.usecases.MessageGroupNewConsumerTest.2
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    Session createSession = MessageGroupNewConsumerTest.this.connection.createSession(true, 0);
                    MessageConsumer createConsumer = createSession.createConsumer(createSession.createQueue(simpleName));
                    MessageGroupNewConsumerTest.this.latchMessagesCreated.await();
                    while (true) {
                        Message receive = createConsumer.receive(1000L);
                        if (receive == null) {
                            MessageGroupNewConsumerTest.LOG.info(MessageGroupNewConsumerTest.this.messagesRecvd1 + " messages received by consumer1");
                            createConsumer.close();
                            createSession.close();
                            return;
                        }
                        MessageGroupNewConsumerTest.LOG.info("Con1 got message " + MessageGroupNewConsumerTest.this.formatMessage(receive));
                        createSession.commit();
                        MessageGroupNewConsumerTest.this.messagesRecvd1++;
                        if (MessageGroupNewConsumerTest.this.messagesRecvd1 == MessageGroupNewConsumerTest.groupNames.length) {
                            MessageGroupNewConsumerTest.LOG.info("All groups acquired");
                            MessageGroupNewConsumerTest.this.latchGroupsAcquired.countDown();
                            Thread.sleep(1000L);
                        }
                        Thread.sleep(50L);
                    }
                } catch (Exception e) {
                    MessageGroupNewConsumerTest.LOG.error("Consumer 1 failed", e);
                }
            }
        };
        Thread thread3 = new Thread() { // from class: org.apache.activemq.usecases.MessageGroupNewConsumerTest.3
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    MessageGroupNewConsumerTest.this.latchGroupsAcquired.await();
                    while (thread2.isAlive()) {
                        MessageGroupNewConsumerTest.LOG.info("(re)starting consumer2");
                        Session createSession = MessageGroupNewConsumerTest.this.connection.createSession(true, 0);
                        MessageConsumer createConsumer = createSession.createConsumer(createSession.createQueue(simpleName));
                        while (true) {
                            Message receive = createConsumer.receive(500L);
                            if (receive == null) {
                                break;
                            }
                            MessageGroupNewConsumerTest.LOG.info("Con2 got message       " + MessageGroupNewConsumerTest.this.formatMessage(receive));
                            createSession.commit();
                            MessageGroupNewConsumerTest.this.messagesRecvd2++;
                            Thread.sleep(50L);
                        }
                        createConsumer.close();
                        createSession.close();
                    }
                    MessageGroupNewConsumerTest.LOG.info(MessageGroupNewConsumerTest.this.messagesRecvd2 + " messages received by consumer2");
                } catch (Exception e) {
                    MessageGroupNewConsumerTest.LOG.error("Consumer 2 failed", e);
                }
            }
        };
        thread3.start();
        thread2.start();
        thread.start();
        thread.join();
        thread2.join();
        thread3.join();
        this.connection.close();
        assertEquals("consumer 2 should not get any messages", 0, this.messagesRecvd2);
        assertEquals("consumer 1 should get all the messages", this.messagesSent, this.messagesRecvd1);
        assertTrue("producer failed to send any messages", this.messagesSent > 0);
    }

    public Message generateMessage(Session session, String str, int i) throws JMSException {
        TextMessage createTextMessage = session.createTextMessage();
        createTextMessage.setJMSType("TEST_MESSAGE");
        createTextMessage.setStringProperty("JMSXGroupID", str);
        createTextMessage.setIntProperty("JMSXGroupSeq", i);
        createTextMessage.setText("<?xml?><testMessage/>");
        return createTextMessage;
    }

    public String formatMessage(Message message) {
        try {
            return message.getStringProperty("JMSXGroupID") + "-" + message.getIntProperty("JMSXGroupSeq") + "-" + message.getBooleanProperty("JMSXGroupFirstForConsumer");
        } catch (Exception e) {
            return e.getClass().getSimpleName() + ": " + e.getMessage();
        }
    }
}
