package org.apache.activemq.broker.virtual;

import jakarta.jms.ExceptionListener;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageListener;
import jakarta.jms.Session;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageProducer;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.command.ActiveMQQueue;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/broker/virtual/VirtualTopicDLQTest.class */
public class VirtualTopicDLQTest extends TestCase {
    private static BrokerService broker;
    private static final Logger LOG = LoggerFactory.getLogger(VirtualTopicDLQTest.class);
    static final String jmsConnectionURI = "failover:(vm://localhost)";
    private static final String virtualTopicName = "VirtualTopic.Test";
    private static final String consumer1Prefix = "Consumer.A.";
    private static final String consumer2Prefix = "Consumer.B.";
    private static final String consumer3Prefix = "Consumer.C.";
    private static final String dlqPrefix = "ActiveMQ.DLQ.Queue.";
    private static final int numberMessages = 6;

    /* loaded from: input_file:org/apache/activemq/broker/virtual/VirtualTopicDLQTest$TestConsumer.class */
    private class TestConsumer implements Runnable, ExceptionListener, MessageListener {
        private String destinationName;
        private boolean isTopic;
        private CountDownLatch latch;
        private boolean bFakeFail;
        private int maxRedeliveries = 0;
        private int receivedMessageCounter = 0;
        private boolean bStop = false;
        private ActiveMQConnectionFactory connectionFactory = null;
        private ActiveMQConnection connection = null;
        private Session session = null;
        private MessageConsumer consumer = null;

        public TestConsumer(String str, boolean z, int i, boolean z2) {
            this.destinationName = null;
            this.isTopic = true;
            this.latch = null;
            this.bFakeFail = false;
            this.destinationName = str;
            this.isTopic = z;
            this.latch = new CountDownLatch(i * (this.bFakeFail ? this.maxRedeliveries + 1 : 1));
            this.bFakeFail = z2;
        }

        public CountDownLatch getLatch() {
            return this.latch;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                try {
                    VirtualTopicDLQTest.LOG.info("Started TestConsumer for destination (" + this.destinationName + ")");
                    this.connectionFactory = new ActiveMQConnectionFactory(VirtualTopicDLQTest.jmsConnectionURI);
                    this.connection = this.connectionFactory.createConnection();
                    this.connection.start();
                    this.session = this.connection.createSession(true, 0);
                    RedeliveryPolicy redeliveryPolicy = this.connection.getRedeliveryPolicy();
                    redeliveryPolicy.setInitialRedeliveryDelay(1L);
                    redeliveryPolicy.setUseExponentialBackOff(false);
                    redeliveryPolicy.setMaximumRedeliveries(this.maxRedeliveries);
                    this.connection.setExceptionListener(this);
                    this.consumer = this.session.createConsumer(this.isTopic ? this.session.createTopic(this.destinationName) : this.session.createQueue(this.destinationName));
                    this.consumer.setMessageListener(this);
                    while (!this.bStop) {
                        Thread.sleep(100L);
                    }
                    Logger logger = VirtualTopicDLQTest.LOG;
                    String str = this.destinationName;
                    long count = this.latch.getCount();
                    toString();
                    logger.info("Finished TestConsumer for destination name (" + str + ") remaining " + count + " messages " + logger);
                    try {
                        if (this.connection != null) {
                            this.connection.close();
                        }
                    } catch (Exception e) {
                        VirtualTopicDLQTest.LOG.error("Closing connection/session (" + this.destinationName + ")Caught: " + e);
                    }
                } catch (Exception e2) {
                    VirtualTopicDLQTest.LOG.error("Consumer (" + this.destinationName + ") Caught: " + e2);
                    try {
                        if (this.connection != null) {
                            this.connection.close();
                        }
                    } catch (Exception e3) {
                        VirtualTopicDLQTest.LOG.error("Closing connection/session (" + this.destinationName + ")Caught: " + e3);
                    }
                }
            } catch (Throwable th) {
                try {
                    if (this.connection != null) {
                        this.connection.close();
                    }
                } catch (Exception e4) {
                    VirtualTopicDLQTest.LOG.error("Closing connection/session (" + this.destinationName + ")Caught: " + e4);
                }
                throw th;
            }
        }

        public synchronized void onException(JMSException jMSException) {
            jMSException.printStackTrace();
            VirtualTopicDLQTest.LOG.error("Consumer for destination, (" + this.destinationName + "), JMS Exception occured.  Shutting down client.");
        }

        public synchronized void setStop(boolean z) {
            this.bStop = z;
        }

        public synchronized void onMessage(Message message) {
            this.receivedMessageCounter++;
            this.latch.countDown();
            Logger logger = VirtualTopicDLQTest.LOG;
            String str = this.destinationName;
            long count = this.latch.getCount();
            int i = this.receivedMessageCounter;
            logger.info("Consumer for destination (" + str + ") latch countdown: " + count + " :: Number messages received " + logger);
            try {
                VirtualTopicDLQTest.LOG.info("Consumer for destination (" + this.destinationName + ") Received message id :: " + message.getJMSMessageID());
                if (this.bFakeFail) {
                    VirtualTopicDLQTest.LOG.info("Consumer on destination " + this.destinationName + " rolling back JMS Session for message: " + message.toString());
                    this.session.rollback();
                } else {
                    VirtualTopicDLQTest.LOG.info("Consumer on destination " + this.destinationName + " committing JMS Session for message: " + message.toString());
                    this.session.commit();
                }
            } catch (JMSException e) {
                VirtualTopicDLQTest.LOG.error("Error reading JMS Message from destination " + this.destinationName + ".");
            }
        }
    }

    /* loaded from: input_file:org/apache/activemq/broker/virtual/VirtualTopicDLQTest$TestProducer.class */
    private class TestProducer implements Runnable {
        private String destinationName;
        private boolean isTopic;
        private int numberMessages;
        private CountDownLatch latch;

        public TestProducer(String str, boolean z, int i) {
            this.destinationName = null;
            this.isTopic = true;
            this.numberMessages = 0;
            this.latch = null;
            this.destinationName = str;
            this.isTopic = z;
            this.numberMessages = i;
            this.latch = new CountDownLatch(i);
        }

        public CountDownLatch getLatch() {
            return this.latch;
        }

        @Override // java.lang.Runnable
        public void run() {
            ActiveMQConnection activeMQConnection = null;
            try {
                try {
                    VirtualTopicDLQTest.LOG.info("Started TestProducer for destination (" + this.destinationName + ")");
                    activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory(VirtualTopicDLQTest.jmsConnectionURI).createConnection();
                    activeMQConnection.start();
                    ActiveMQSession createSession = activeMQConnection.createSession(false, 1);
                    ActiveMQMessageProducer createProducer = createSession.createProducer(this.isTopic ? createSession.createTopic(this.destinationName) : createSession.createQueue(this.destinationName));
                    createProducer.setDeliveryMode(1);
                    for (int i = 0; i < this.numberMessages; i++) {
                        try {
                            createProducer.send(createSession.createTextMessage("I am a message :: " + String.valueOf(i)));
                        } catch (Exception e) {
                            VirtualTopicDLQTest.LOG.info("Producer for destination (" + this.destinationName + ") Caught: " + e);
                        }
                        this.latch.countDown();
                        Thread.sleep(1000L);
                    }
                    VirtualTopicDLQTest.LOG.info("Finished TestProducer for destination (" + this.destinationName + ")");
                    if (activeMQConnection != null) {
                        try {
                            activeMQConnection.close();
                        } catch (Exception e2) {
                            VirtualTopicDLQTest.LOG.error("Closing connection/session (" + this.destinationName + ")Caught: " + e2);
                        }
                    }
                } catch (Throwable th) {
                    if (activeMQConnection != null) {
                        try {
                            activeMQConnection.close();
                        } catch (Exception e3) {
                            VirtualTopicDLQTest.LOG.error("Closing connection/session (" + this.destinationName + ")Caught: " + e3);
                            throw th;
                        }
                    }
                    throw th;
                }
            } catch (Exception e4) {
                VirtualTopicDLQTest.LOG.error("Terminating TestProducer(" + this.destinationName + ")Caught: " + e4);
                if (activeMQConnection != null) {
                    try {
                        activeMQConnection.close();
                    } catch (Exception e5) {
                        VirtualTopicDLQTest.LOG.error("Closing connection/session (" + this.destinationName + ")Caught: " + e5);
                    }
                }
            }
        }
    }

    @Before
    public void setUp() throws Exception {
        try {
            broker = BrokerFactory.createBroker("xbean:org/apache/activemq/broker/virtual/virtual-individual-dlq.xml", true);
            broker.start();
            broker.waitUntilStarted();
        } catch (Exception e) {
            e.printStackTrace();
            throw e;
        }
    }

    @After
    public void tearDown() throws Exception {
        try {
            purgeDestination("ActiveMQ.DLQ.Queue.Consumer.A.VirtualTopic.Test");
            purgeDestination("ActiveMQ.DLQ.Queue.Consumer.B.VirtualTopic.Test");
            purgeDestination("ActiveMQ.DLQ.Queue.Consumer.C.VirtualTopic.Test");
        } catch (Exception e) {
            e.printStackTrace();
        }
        if (broker != null) {
            broker.stop();
            broker.waitUntilStopped();
            broker = null;
        }
    }

    @Test
    public void testVirtualTopicSubscriberDeadLetterQueue() throws Exception {
        TestConsumer testConsumer = null;
        TestConsumer testConsumer2 = null;
        TestConsumer testConsumer3 = null;
        TestConsumer testConsumer4 = null;
        TestConsumer testConsumer5 = null;
        TestConsumer testConsumer6 = null;
        try {
            try {
                testConsumer = new TestConsumer("Consumer.A.VirtualTopic.Test", false, 6, true);
                thread(testConsumer, false);
                testConsumer2 = new TestConsumer("Consumer.B.VirtualTopic.Test", false, 6, true);
                thread(testConsumer2, false);
                testConsumer3 = new TestConsumer("Consumer.C.VirtualTopic.Test", false, 6, false);
                thread(testConsumer3, false);
                testConsumer4 = new TestConsumer("ActiveMQ.DLQ.Queue.Consumer.A.VirtualTopic.Test", false, 6, false);
                thread(testConsumer4, false);
                testConsumer5 = new TestConsumer("ActiveMQ.DLQ.Queue.Consumer.B.VirtualTopic.Test", false, 6, false);
                thread(testConsumer5, false);
                testConsumer6 = new TestConsumer("ActiveMQ.DLQ.Queue.Consumer.C.VirtualTopic.Test", false, 6, false);
                thread(testConsumer6, false);
                Thread.sleep(1000L);
                TestProducer testProducer = new TestProducer(virtualTopicName, true, 6);
                thread(testProducer, false);
                assertTrue("sent all producer messages in time, count is: " + testProducer.getLatch().getCount(), testProducer.getLatch().await(10L, TimeUnit.SECONDS));
                LOG.info("producer successful, count = " + testProducer.getLatch().getCount());
                assertTrue("remaining consumer1 count should be zero, is: " + testConsumer.getLatch().getCount(), testConsumer.getLatch().await(10L, TimeUnit.SECONDS));
                LOG.info("consumer1 successful, count = " + testConsumer.getLatch().getCount());
                assertTrue("remaining consumer2 count should be zero, is: " + testConsumer2.getLatch().getCount(), testConsumer2.getLatch().await(10L, TimeUnit.SECONDS));
                LOG.info("consumer2 successful, count = " + testConsumer2.getLatch().getCount());
                assertTrue("remaining consumer3 count should be zero, is: " + testConsumer3.getLatch().getCount(), testConsumer3.getLatch().await(10L, TimeUnit.SECONDS));
                LOG.info("consumer3 successful, count = " + testConsumer3.getLatch().getCount());
                assertTrue("remaining dlqConsumer1 count should be zero, is: " + testConsumer4.getLatch().getCount(), testConsumer4.getLatch().await(10L, TimeUnit.SECONDS));
                LOG.info("dlqConsumer1 successful, count = " + testConsumer4.getLatch().getCount());
                assertTrue("remaining dlqConsumer2 count should be zero, is: " + testConsumer5.getLatch().getCount(), testConsumer5.getLatch().await(10L, TimeUnit.SECONDS));
                LOG.info("dlqConsumer2 successful, count = " + testConsumer5.getLatch().getCount());
                assertTrue("remaining dlqConsumer3 count should be 6, is: " + testConsumer6.getLatch().getCount(), testConsumer6.getLatch().getCount() == 6);
                LOG.info("dlqConsumer2 successful, count = " + testConsumer5.getLatch().getCount());
                if (testConsumer != null) {
                    testConsumer.setStop(true);
                }
                if (testConsumer2 != null) {
                    testConsumer2.setStop(true);
                }
                if (testConsumer3 != null) {
                    testConsumer3.setStop(true);
                }
                if (testConsumer4 != null) {
                    testConsumer4.setStop(true);
                }
                if (testConsumer5 != null) {
                    testConsumer5.setStop(true);
                }
                if (testConsumer6 != null) {
                    testConsumer6.setStop(true);
                }
            } catch (Exception e) {
                e.printStackTrace();
                throw e;
            }
        } catch (Throwable th) {
            if (testConsumer != null) {
                testConsumer.setStop(true);
            }
            if (testConsumer2 != null) {
                testConsumer2.setStop(true);
            }
            if (testConsumer3 != null) {
                testConsumer3.setStop(true);
            }
            if (testConsumer4 != null) {
                testConsumer4.setStop(true);
            }
            if (testConsumer5 != null) {
                testConsumer5.setStop(true);
            }
            if (testConsumer6 != null) {
                testConsumer6.setStop(true);
            }
            throw th;
        }
    }

    private static Thread thread(Runnable runnable, boolean z) {
        Thread thread = new Thread(runnable);
        thread.setDaemon(z);
        thread.start();
        return thread;
    }

    private static void purgeDestination(String str) throws Exception {
        Queue queue = (Queue) broker.getRegionBroker().getQueueRegion().getDestinationMap().get(new ActiveMQQueue(str));
        queue.purge();
        assertEquals(0L, queue.getDestinationStatistics().getMessages().getCount());
    }
}
