package org.apache.activemq.usecases;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/usecases/MessageGroupCloseTest.class */
public class MessageGroupCloseTest extends TestCase {
    private static final Logger LOG = LoggerFactory.getLogger(MessageGroupNewConsumerTest.class);
    private Connection connection;
    private int messagesSent;
    private int messagesRecvd1;
    private int messagesRecvd2;
    private int messageGroupCount;
    private int errorCountFirstForConsumer;
    private int errorCountWrongConsumerClose;
    private int errorCountDuplicateClose;
    private static final String connStr = "vm://localhost?broker.persistent=false&broker.useJmx=false&jms.prefetchPolicy.all=1";
    private CountDownLatch latchMessagesCreated = new CountDownLatch(1);
    private HashMap<String, Integer> messageGroups1 = new HashMap<>();
    private HashMap<String, Integer> messageGroups2 = new HashMap<>();
    private HashSet<String> closedGroups1 = new HashSet<>();
    private HashSet<String> closedGroups2 = new HashSet<>();

    public void testNewConsumer() throws JMSException, InterruptedException {
        this.connection = new ActiveMQConnectionFactory(connStr).createConnection();
        this.connection.start();
        final String simpleName = getClass().getSimpleName();
        Thread thread = new Thread() { // from class: org.apache.activemq.usecases.MessageGroupCloseTest.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    Session createSession = MessageGroupCloseTest.this.connection.createSession(true, 0);
                    MessageProducer createProducer = createSession.createProducer(createSession.createQueue(simpleName));
                    for (int i = 0; i < 10; i++) {
                        for (int i2 = 0; i2 < 10; i2++) {
                            int i3 = i2 + 1;
                            if ((i2 + 1) % 5 == 0) {
                                i3 = -1;
                            }
                            createProducer.send(MessageGroupCloseTest.this.generateMessage(createSession, Integer.toString(i), i3));
                            createSession.commit();
                            MessageGroupCloseTest.access$108(MessageGroupCloseTest.this);
                            MessageGroupCloseTest.LOG.info("Sent message: group=" + i + ", seq=" + i3);
                        }
                        if (i % 100 == 0) {
                            MessageGroupCloseTest.LOG.info("Sent messages: group=" + i);
                        }
                        MessageGroupCloseTest.access$308(MessageGroupCloseTest.this);
                    }
                    MessageGroupCloseTest.LOG.info(MessageGroupCloseTest.this.messagesSent + " messages sent");
                    MessageGroupCloseTest.this.latchMessagesCreated.countDown();
                    createProducer.close();
                    createSession.close();
                } catch (Exception e) {
                    MessageGroupCloseTest.LOG.error("Producer failed", e);
                }
            }
        };
        Thread thread2 = new Thread() { // from class: org.apache.activemq.usecases.MessageGroupCloseTest.2
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    MessageGroupCloseTest.this.latchMessagesCreated.await();
                    MessageGroupCloseTest.LOG.info("starting consumer1");
                    Session createSession = MessageGroupCloseTest.this.connection.createSession(true, 0);
                    MessageConsumer createConsumer = createSession.createConsumer(createSession.createQueue(simpleName));
                    while (true) {
                        Message receive = createConsumer.receive(5000L);
                        if (receive == null) {
                            MessageGroupCloseTest.LOG.info("Con1: total messages=" + MessageGroupCloseTest.this.messagesRecvd1);
                            MessageGroupCloseTest.LOG.info("Con1: total message groups=" + MessageGroupCloseTest.this.messageGroups1.size());
                            createConsumer.close();
                            createSession.close();
                            return;
                        }
                        MessageGroupCloseTest.LOG.info("Con1: got message " + MessageGroupCloseTest.this.formatMessage(receive));
                        MessageGroupCloseTest.this.checkMessage(receive, "Con1", MessageGroupCloseTest.this.messageGroups1, MessageGroupCloseTest.this.closedGroups1);
                        createSession.commit();
                        MessageGroupCloseTest.access$708(MessageGroupCloseTest.this);
                        if (MessageGroupCloseTest.this.messagesRecvd1 % 100 == 0) {
                            MessageGroupCloseTest.LOG.info("Con1: got messages count=" + MessageGroupCloseTest.this.messagesRecvd1);
                        }
                    }
                } catch (Exception e) {
                    MessageGroupCloseTest.LOG.error("Consumer 1 failed", e);
                }
            }
        };
        Thread thread3 = new Thread() { // from class: org.apache.activemq.usecases.MessageGroupCloseTest.3
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    MessageGroupCloseTest.this.latchMessagesCreated.await();
                    MessageGroupCloseTest.LOG.info("starting consumer2");
                    Session createSession = MessageGroupCloseTest.this.connection.createSession(true, 0);
                    MessageConsumer createConsumer = createSession.createConsumer(createSession.createQueue(simpleName));
                    while (true) {
                        Message receive = createConsumer.receive(5000L);
                        if (receive == null) {
                            createConsumer.close();
                            createSession.close();
                            MessageGroupCloseTest.LOG.info("Con2: total messages=" + MessageGroupCloseTest.this.messagesRecvd2);
                            MessageGroupCloseTest.LOG.info("Con2: total message groups=" + MessageGroupCloseTest.this.messageGroups2.size());
                            return;
                        }
                        MessageGroupCloseTest.LOG.info("Con2: got message " + MessageGroupCloseTest.this.formatMessage(receive));
                        MessageGroupCloseTest.this.checkMessage(receive, "Con2", MessageGroupCloseTest.this.messageGroups2, MessageGroupCloseTest.this.closedGroups2);
                        createSession.commit();
                        MessageGroupCloseTest.access$1008(MessageGroupCloseTest.this);
                        if (MessageGroupCloseTest.this.messagesRecvd2 % 100 == 0) {
                            MessageGroupCloseTest.LOG.info("Con2: got messages count=" + MessageGroupCloseTest.this.messagesRecvd2);
                        }
                    }
                } catch (Exception e) {
                    MessageGroupCloseTest.LOG.error("Consumer 2 failed", e);
                }
            }
        };
        thread3.start();
        thread2.start();
        thread.start();
        thread.join();
        thread2.join();
        thread3.join();
        this.connection.close();
        assertEquals("consumers should get all the messages", this.messagesSent, this.messagesRecvd1 + this.messagesRecvd2);
        assertEquals("not all message groups closed for consumer 1", this.messageGroups1.size(), this.closedGroups1.size());
        assertEquals("not all message groups closed for consumer 2", this.messageGroups2.size(), this.closedGroups2.size());
        assertTrue("producer failed to send any messages", this.messagesSent > 0);
        assertEquals("JMSXGroupFirstForConsumer not set", 0, this.errorCountFirstForConsumer);
        assertEquals("wrong consumer got close message", 0, this.errorCountWrongConsumerClose);
        assertEquals("consumer got duplicate close message", 0, this.errorCountDuplicateClose);
    }

    public Message generateMessage(Session session, String str, int i) throws JMSException {
        TextMessage createTextMessage = session.createTextMessage();
        createTextMessage.setJMSType("TEST_MESSAGE");
        createTextMessage.setStringProperty("JMSXGroupID", str);
        createTextMessage.setIntProperty("JMSXGroupSeq", i);
        createTextMessage.setText("<?xml?><testMessage/>");
        return createTextMessage;
    }

    public String formatMessage(Message message) {
        try {
            return "group=" + message.getStringProperty("JMSXGroupID") + ", seq=" + message.getIntProperty("JMSXGroupSeq");
        } catch (Exception e) {
            return e.getClass().getSimpleName() + ": " + e.getMessage();
        }
    }

    public void checkMessage(Message message, String str, Map<String, Integer> map, Set<String> set) throws JMSException {
        String stringProperty = message.getStringProperty("JMSXGroupID");
        int intProperty = message.getIntProperty("JMSXGroupSeq");
        Integer num = map.get(stringProperty);
        if (num == null) {
            if (!message.propertyExists("JMSXGroupFirstForConsumer") || !message.getBooleanProperty("JMSXGroupFirstForConsumer")) {
                LOG.info(str + ": JMSXGroupFirstForConsumer not set for group=" + stringProperty + ", seq=" + intProperty);
                this.errorCountFirstForConsumer++;
            }
            if (intProperty == -1) {
                set.add(stringProperty);
                LOG.info(str + ": wrong consumer got close message for group=" + stringProperty);
                this.errorCountWrongConsumerClose++;
            }
            map.put(stringProperty, 1);
            return;
        }
        if (set.contains(stringProperty)) {
            set.remove(stringProperty);
            if (!message.propertyExists("JMSXGroupFirstForConsumer") || !message.getBooleanProperty("JMSXGroupFirstForConsumer")) {
                LOG.info(str + ": JMSXGroupFirstForConsumer not set for group=" + stringProperty + ", seq=" + intProperty);
                this.errorCountFirstForConsumer++;
            }
            if (intProperty == -1) {
                LOG.info(str + ": consumer got duplicate close message for group=" + stringProperty);
                this.errorCountDuplicateClose++;
            }
        }
        if (intProperty == -1) {
            set.add(stringProperty);
        }
        map.put(stringProperty, Integer.valueOf(num.intValue() + 1));
    }

    static /* synthetic */ int access$108(MessageGroupCloseTest messageGroupCloseTest) {
        int i = messageGroupCloseTest.messagesSent;
        messageGroupCloseTest.messagesSent = i + 1;
        return i;
    }

    static /* synthetic */ int access$308(MessageGroupCloseTest messageGroupCloseTest) {
        int i = messageGroupCloseTest.messageGroupCount;
        messageGroupCloseTest.messageGroupCount = i + 1;
        return i;
    }

    static /* synthetic */ int access$708(MessageGroupCloseTest messageGroupCloseTest) {
        int i = messageGroupCloseTest.messagesRecvd1;
        messageGroupCloseTest.messagesRecvd1 = i + 1;
        return i;
    }

    static /* synthetic */ int access$1008(MessageGroupCloseTest messageGroupCloseTest) {
        int i = messageGroupCloseTest.messagesRecvd2;
        messageGroupCloseTest.messagesRecvd2 = i + 1;
        return i;
    }
}
