package org.apache.activemq.bugs;

import java.io.File;
import java.util.ArrayList;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.leveldb.LevelDBStore;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/bugs/AMQ6121Test.class */
public class AMQ6121Test {
    private static final Logger LOG = LoggerFactory.getLogger(AMQ6121Test.class);
    private BrokerService broker;

    @Before
    public void startBroker() throws Exception {
        this.broker = new BrokerService();
        LevelDBStore levelDBStore = new LevelDBStore();
        File file = new File("target/activemq-data/myleveldb");
        IOHelper.deleteChildren(file);
        levelDBStore.setDirectory(file);
        levelDBStore.deleteAllMessages();
        PolicyMap policyMap = new PolicyMap();
        ArrayList arrayList = new ArrayList();
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setExpireMessagesPeriod(8000L);
        policyEntry.setMaxAuditDepth(25);
        policyEntry.setUseCache(false);
        policyEntry.setLazyDispatch(false);
        policyEntry.setOptimizedDispatch(true);
        policyEntry.setProducerFlowControl(false);
        policyEntry.setEnableAudit(true);
        policyEntry.setQueue(">");
        arrayList.add(policyEntry);
        policyMap.setPolicyEntries(arrayList);
        this.broker.setDestinationPolicy(policyMap);
        this.broker.setPersistenceAdapter(levelDBStore);
        this.broker.start();
        this.broker.waitUntilStarted();
    }

    @After
    public void stopBroker() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
        }
    }

    @Test(timeout = 30000)
    public void sendToDLQ() throws Exception {
        Connection createConnection = new ActiveMQConnectionFactory(this.broker.getVmConnectorURI()).createConnection();
        Session createSession = createConnection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(createSession.createQueue("ActiveMQ.DLQ"));
        createProducer.setDeliveryMode(2);
        TextMessage createTextMessage = createSession.createTextMessage();
        createTextMessage.setText("Test_Message");
        for (int i = 0; i < 50; i++) {
            createProducer.send(createTextMessage, 2, 4, 1000L);
        }
        final QueueViewMBean proxyToQueue = getProxyToQueue("ActiveMQ.DLQ");
        LOG.info("WAITING for expiry...");
        Assert.assertTrue("Queue drained of expired", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.bugs.AMQ6121Test.1
            public boolean isSatisified() throws Exception {
                return proxyToQueue.getQueueSize() == 0;
            }
        }));
        LOG.info("FINISHED WAITING for expiry.");
        LOG.info("Queue enqueue counter ==>>>" + proxyToQueue.getEnqueueCount());
        Assert.assertEquals("Enqueue size ", 50L, proxyToQueue.getEnqueueCount());
        createConnection.close();
    }

    protected QueueViewMBean getProxyToQueue(String str) throws MalformedObjectNameException, JMSException {
        return (QueueViewMBean) this.broker.getManagementContext().newProxyInstance(new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + str), QueueViewMBean.class, true);
    }
}
