package org.apache.activemq.bugs;

import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.Queue;
import jakarta.jms.Session;
import java.util.Enumeration;
import java.util.concurrent.TimeUnit;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.usecases.DurableSubProcessWithRestartTest;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/bugs/AMQ6059Test.class */
public class AMQ6059Test {
    private static final Logger LOG = LoggerFactory.getLogger(AMQ6059Test.class);
    private BrokerService broker;

    @Before
    public void setUp() throws Exception {
        this.broker = createBroker();
        this.broker.start();
        this.broker.waitUntilStarted();
    }

    @After
    public void tearDown() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
            this.broker.waitUntilStopped();
        }
    }

    @Test
    public void testDLQRecovery() throws Exception {
        sendMessage(new ActiveMQQueue("QName"));
        TimeUnit.SECONDS.sleep(3L);
        LOG.info("### Check for expired message moving to DLQ.");
        Queue queue = (Queue) createDlqDestination();
        verifyIsDlq(queue);
        final QueueViewMBean proxyToQueue = getProxyToQueue(queue.getQueueName());
        Assert.assertTrue("The message expired", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.bugs.AMQ6059Test.1
            public boolean isSatisified() throws Exception {
                AMQ6059Test.LOG.info("DLQ stats: Enqueues {}, Dispatches {}, Expired {}, Inflight {}", new Object[]{Long.valueOf(proxyToQueue.getEnqueueCount()), Long.valueOf(proxyToQueue.getDispatchCount()), Long.valueOf(proxyToQueue.getExpiredCount()), Long.valueOf(proxyToQueue.getInFlightCount())});
                return proxyToQueue.getEnqueueCount() == 1;
            }
        }));
        verifyMessageIsRecovered(queue);
        restartBroker();
        verifyIsDlq(queue);
        verifyMessageIsRecovered(queue);
    }

    @Test
    public void testSetDlqFlag() throws Exception {
        ActiveMQQueue activeMQQueue = new ActiveMQQueue("QNameToFlip");
        sendMessage(activeMQQueue);
        QueueViewMBean proxyToQueue = getProxyToQueue(activeMQQueue.getQueueName());
        Assert.assertFalse(proxyToQueue.isDLQ());
        proxyToQueue.setDLQ(true);
        Assert.assertTrue(proxyToQueue.isDLQ());
    }

    protected BrokerService createBroker() throws Exception {
        return createBrokerWithDLQ(true);
    }

    private BrokerService createBrokerWithDLQ(boolean z) throws Exception {
        BrokerService brokerService = new BrokerService();
        ActiveMQDestination activeMQQueue = new ActiveMQQueue("ActiveMQ.DLQ?isDLQ=true");
        brokerService.setDestinations(new ActiveMQDestination[]{activeMQQueue});
        PolicyMap policyMap = new PolicyMap();
        SharedDeadLetterStrategy sharedDeadLetterStrategy = new SharedDeadLetterStrategy();
        sharedDeadLetterStrategy.setProcessNonPersistent(true);
        sharedDeadLetterStrategy.setProcessExpired(true);
        sharedDeadLetterStrategy.setDeadLetterQueue(activeMQQueue);
        sharedDeadLetterStrategy.setExpiration(DurableSubProcessWithRestartTest.BROKER_RESTART);
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setDeadLetterStrategy(sharedDeadLetterStrategy);
        policyEntry.setExpireMessagesPeriod(2000L);
        policyEntry.setUseCache(false);
        policyMap.put(new ActiveMQQueue(">"), policyEntry);
        brokerService.setDestinationPolicy(policyMap);
        if (z) {
            brokerService.setDeleteAllMessagesOnStartup(true);
        }
        return brokerService;
    }

    private void restartBroker() throws Exception {
        this.broker.stop();
        this.broker.waitUntilStopped();
        this.broker = createBrokerWithDLQ(false);
        this.broker.start();
        this.broker.waitUntilStarted();
    }

    private void verifyMessageIsRecovered(Queue queue) throws Exception, JMSException {
        Connection createConnection = createConnection();
        createConnection.start();
        Enumeration enumeration = createConnection.createSession(false, 1).createBrowser(queue).getEnumeration();
        Assert.assertTrue(enumeration.hasMoreElements());
        Assert.assertNotNull("Recover message after broker restarts", (Message) enumeration.nextElement());
    }

    private void sendMessage(Destination destination) throws Exception {
        Connection createConnection = createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        createSession.createProducer(destination).send(destination, createSession.createTextMessage("DLQ message"), 2, 4, 1000L);
        createConnection.stop();
        LOG.info("### Send message that will expire.");
    }

    private Connection createConnection() throws Exception {
        return new ActiveMQConnectionFactory(this.broker.getVmConnectorURI()).createConnection();
    }

    private Destination createDlqDestination() {
        return new ActiveMQQueue("ActiveMQ.DLQ");
    }

    private void verifyIsDlq(Queue queue) throws Exception {
        Assert.assertTrue("is dlq", getProxyToQueue(queue.getQueueName()).isDLQ());
    }

    private QueueViewMBean getProxyToQueue(String str) throws MalformedObjectNameException, JMSException {
        return (QueueViewMBean) this.broker.getManagementContext().newProxyInstance(new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + str), QueueViewMBean.class, true);
    }
}
