package org.apache.activemq.bugs;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.usecases.DurableSubProcessWithRestartTest;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/bugs/AMQ6293Test.class */
public class AMQ6293Test {
    static final Logger LOG = LoggerFactory.getLogger(AMQ6293Test.class);
    private BrokerService brokerService;
    private String connectionUri;
    private Connection connection;
    private Session session;
    private final ExecutorService service = Executors.newFixedThreadPool(6);
    private final ActiveMQQueue queue = new ActiveMQQueue("test");
    private final int numMessages = 10000;
    private final AtomicBoolean isException = new AtomicBoolean();

    /* loaded from: input_file:org/apache/activemq/bugs/AMQ6293Test$TestConsumer.class */
    private class TestConsumer implements Runnable {
        private final MessageConsumer consumer;

        public TestConsumer(MessageConsumer messageConsumer) throws JMSException {
            this.consumer = messageConsumer;
        }

        @Override // java.lang.Runnable
        public void run() {
            int i = 0;
            while (this.consumer.receive(1000L) != null) {
                try {
                    i++;
                    if (i % 1000 == 0) {
                        AMQ6293Test.LOG.info("Received {} messages", Integer.valueOf(i));
                    }
                } catch (Exception e) {
                    AMQ6293Test.this.isException.set(true);
                    AMQ6293Test.LOG.warn(e.getMessage(), e);
                    throw new RuntimeException(e);
                }
            }
        }
    }

    @Before
    public void before() throws Exception {
        this.brokerService = new BrokerService();
        this.connectionUri = this.brokerService.addConnector(JmsMultipleBrokersTestSupport.AUTO_ASSIGN_TRANSPORT).getPublishableConnectString();
        this.brokerService.setPersistent(false);
        this.brokerService.getManagementContext().setCreateConnector(false);
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry policyEntry = new PolicyEntry();
        policyMap.setDefaultEntry(policyEntry);
        this.brokerService.setDestinationPolicy(policyMap);
        policyEntry.setQueuePrefetch(100);
        this.brokerService.start();
        this.brokerService.waitUntilStarted();
        this.connection = new ActiveMQConnectionFactory(this.connectionUri).createConnection();
        this.connection.start();
        this.session = this.connection.createSession(false, 1);
    }

    @After
    public void after() throws Exception {
        if (this.connection != null) {
            this.connection.stop();
        }
        if (this.brokerService != null) {
            this.brokerService.stop();
            this.brokerService.waitUntilStopped();
        }
    }

    @Test(timeout = 90000)
    public void testDestinationStatisticsOnPurge() throws Exception {
        sendTestMessages(10000);
        final Queue queue = (Queue) this.brokerService.getRegionBroker().getDestinationMap().get(this.queue);
        for (int i = 0; i < 5; i++) {
            this.service.submit(new TestConsumer(this.session.createConsumer(this.queue)));
        }
        for (int i2 = 0; i2 < 1; i2++) {
            this.service.submit(new Runnable() { // from class: org.apache.activemq.bugs.AMQ6293Test.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        queue.purge();
                    } catch (Exception e) {
                        AMQ6293Test.this.isException.set(true);
                        AMQ6293Test.LOG.warn(e.getMessage(), e);
                        throw new RuntimeException(e);
                    }
                }
            });
        }
        this.service.shutdown();
        Assert.assertTrue("Took too long to shutdown service", this.service.awaitTermination(1L, TimeUnit.MINUTES));
        Assert.assertFalse("Exception encountered", this.isException.get());
        Assert.assertEquals(0L, queue.getDestinationStatistics().getMessages().getCount());
        Assert.assertEquals(DurableSubProcessWithRestartTest.BROKER_RESTART, queue.getDestinationStatistics().getDequeues().getCount());
    }

    private void sendTestMessages(int i) throws JMSException {
        Session createSession = this.connection.createSession(true, 0);
        MessageProducer createProducer = createSession.createProducer(this.queue);
        TextMessage createTextMessage = createSession.createTextMessage();
        createTextMessage.setText("Message");
        for (int i2 = 1; i2 <= i; i2++) {
            createProducer.send(createTextMessage);
            if (i2 % 1000 == 0) {
                LOG.info("Sent {} messages", Integer.valueOf(i2));
                createSession.commit();
            }
        }
        createSession.close();
    }
}
