package org.apache.activemq.usecases;

import java.io.File;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import junit.framework.Test;
import junit.textui.TestRunner;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.perf.NetworkedSyncTest;
import org.apache.activemq.store.amq.AMQPersistenceAdapter;
import org.apache.activemq.util.Wait;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:org/apache/activemq/usecases/ExpiredMessagesTest.class */
public class ExpiredMessagesTest extends CombinationTestSupport {
    private static final Log LOG = LogFactory.getLog(ExpiredMessagesTest.class);
    BrokerService broker;
    Connection connection;
    Session session;
    MessageProducer producer;
    MessageConsumer consumer;
    public ActiveMQDestination destination = new ActiveMQQueue("test");
    public ActiveMQDestination dlqDestination = new ActiveMQQueue("ActiveMQ.DLQ");
    public boolean useTextMessage = true;
    public boolean useVMCursor = true;

    /* loaded from: input_file:org/apache/activemq/usecases/ExpiredMessagesTest$DLQListener.class */
    class DLQListener implements MessageListener {
        int count = 0;

        DLQListener() {
        }

        public void onMessage(Message message) {
            this.count++;
        }
    }

    public static Test suite() {
        return suite(ExpiredMessagesTest.class);
    }

    public static void main(String[] strArr) {
        TestRunner.run(suite());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.AutoFailTestSupport
    public void setUp() throws Exception {
        this.broker = createBroker(true, 100L);
    }

    public void testExpiredMessages() throws Exception {
        this.connection = new ActiveMQConnectionFactory(NetworkedSyncTest.broker1URL).createConnection();
        this.session = this.connection.createSession(false, 1);
        this.producer = this.session.createProducer(this.destination);
        this.producer.setTimeToLive(100L);
        this.consumer = this.session.createConsumer(this.destination);
        this.connection.start();
        final AtomicLong atomicLong = new AtomicLong();
        Thread thread = new Thread("Consumer Thread") { // from class: org.apache.activemq.usecases.ExpiredMessagesTest.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                long currentTimeMillis = System.currentTimeMillis();
                try {
                    for (long currentTimeMillis2 = System.currentTimeMillis(); currentTimeMillis2 - currentTimeMillis < 3000; currentTimeMillis2 = System.currentTimeMillis()) {
                        if (ExpiredMessagesTest.this.consumer.receive(1000L) != null) {
                            atomicLong.incrementAndGet();
                        }
                        Thread.sleep(100L);
                    }
                    ExpiredMessagesTest.this.consumer.close();
                } catch (Throwable th) {
                    th.printStackTrace();
                }
            }
        };
        thread.start();
        Thread thread2 = new Thread("Producing Thread") { // from class: org.apache.activemq.usecases.ExpiredMessagesTest.2
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                int i = 0;
                while (true) {
                    try {
                        int i2 = i;
                        i++;
                        if (i2 >= 10000) {
                            ExpiredMessagesTest.this.producer.close();
                            return;
                        }
                        ExpiredMessagesTest.this.producer.send(ExpiredMessagesTest.this.session.createTextMessage("test"));
                    } catch (Throwable th) {
                        th.printStackTrace();
                        return;
                    }
                }
            }
        };
        thread2.start();
        thread.join();
        thread2.join();
        this.session.close();
        final DestinationStatistics destinationStatistics = org.apache.activemq.TestSupport.getDestinationStatistics(this.broker, this.destination);
        assertTrue("all inflight messages expired ", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.ExpiredMessagesTest.3
            @Override // org.apache.activemq.util.Wait.Condition
            public boolean isSatisified() throws Exception {
                return destinationStatistics.getInflight().getCount() == 0;
            }
        }));
        assertEquals("Wrong inFlightCount: ", 0L, destinationStatistics.getInflight().getCount());
        LOG.info("Stats: received: " + atomicLong.get() + ", enqueues: " + destinationStatistics.getEnqueues().getCount() + ", dequeues: " + destinationStatistics.getDequeues().getCount() + ", dispatched: " + destinationStatistics.getDispatched().getCount() + ", inflight: " + destinationStatistics.getInflight().getCount() + ", expiries: " + destinationStatistics.getExpired().getCount());
        assertTrue("all sent messages expired ", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.ExpiredMessagesTest.4
            @Override // org.apache.activemq.util.Wait.Condition
            public boolean isSatisified() throws Exception {
                long count = destinationStatistics.getEnqueues().getCount();
                Thread.sleep(200L);
                ExpiredMessagesTest.LOG.info("Stats: received: " + atomicLong.get() + ", size= " + destinationStatistics.getMessages().getCount() + ", enqueues: " + destinationStatistics.getDequeues().getCount() + ", dequeues: " + destinationStatistics.getDequeues().getCount() + ", dispatched: " + destinationStatistics.getDispatched().getCount() + ", inflight: " + destinationStatistics.getInflight().getCount() + ", expiries: " + destinationStatistics.getExpired().getCount());
                return count == destinationStatistics.getEnqueues().getCount();
            }
        }, 60000L));
        LOG.info("Stats: received: " + atomicLong.get() + ", size= " + destinationStatistics.getMessages().getCount() + ", enqueues: " + destinationStatistics.getEnqueues().getCount() + ", dequeues: " + destinationStatistics.getDequeues().getCount() + ", dispatched: " + destinationStatistics.getDispatched().getCount() + ", inflight: " + destinationStatistics.getInflight().getCount() + ", expiries: " + destinationStatistics.getExpired().getCount());
        assertTrue("got at least what did not expire", atomicLong.get() >= destinationStatistics.getDequeues().getCount() - destinationStatistics.getExpired().getCount());
        assertTrue("all messages expired - queue size gone to zero " + destinationStatistics.getMessages().getCount(), Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.ExpiredMessagesTest.5
            @Override // org.apache.activemq.util.Wait.Condition
            public boolean isSatisified() throws Exception {
                ExpiredMessagesTest.LOG.info("Stats: received: " + atomicLong.get() + ", size= " + destinationStatistics.getMessages().getCount() + ", enqueues: " + destinationStatistics.getEnqueues().getCount() + ", dequeues: " + destinationStatistics.getDequeues().getCount() + ", dispatched: " + destinationStatistics.getDispatched().getCount() + ", inflight: " + destinationStatistics.getInflight().getCount() + ", expiries: " + destinationStatistics.getExpired().getCount());
                return destinationStatistics.getMessages().getCount() == 0;
            }
        }));
        final long count = destinationStatistics.getExpired().getCount() + (10000 - destinationStatistics.getEnqueues().getCount());
        final DestinationStatistics destinationStatistics2 = org.apache.activemq.TestSupport.getDestinationStatistics(this.broker, this.dlqDestination);
        LOG.info("DLQ stats: size= " + destinationStatistics2.getMessages().getCount() + ", enqueues: " + destinationStatistics2.getDequeues().getCount() + ", dequeues: " + destinationStatistics2.getDequeues().getCount() + ", dispatched: " + destinationStatistics2.getDispatched().getCount() + ", inflight: " + destinationStatistics2.getInflight().getCount() + ", expiries: " + destinationStatistics2.getExpired().getCount());
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.ExpiredMessagesTest.6
            @Override // org.apache.activemq.util.Wait.Condition
            public boolean isSatisified() throws Exception {
                return count == destinationStatistics2.getMessages().getCount();
            }
        });
        assertEquals("dlq contains all expired", count, destinationStatistics2.getMessages().getCount());
        assertEquals("memory usage is back to duck egg", 0, org.apache.activemq.TestSupport.getDestination(this.broker, this.destination).getMemoryUsage().getPercentUsage());
        assertTrue("memory usage is increased ", 0 < org.apache.activemq.TestSupport.getDestination(this.broker, this.dlqDestination).getMemoryUsage().getPercentUsage());
        MessageConsumer createDlqConsumer = createDlqConsumer(this.connection);
        final DLQListener dLQListener = new DLQListener();
        createDlqConsumer.setMessageListener(dLQListener);
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.ExpiredMessagesTest.7
            @Override // org.apache.activemq.util.Wait.Condition
            public boolean isSatisified() throws Exception {
                return count == ((long) dLQListener.count);
            }
        }, 60000L);
        assertEquals("dlq returned all expired", dLQListener.count, count);
    }

    private MessageConsumer createDlqConsumer(Connection connection) throws Exception {
        return connection.createSession(false, 1).createConsumer(this.dlqDestination);
    }

    public void initCombosForTestRecoverExpiredMessages() {
        addCombinationValues("useVMCursor", new Object[]{Boolean.TRUE, Boolean.FALSE});
    }

    public void testRecoverExpiredMessages() throws Exception {
        this.connection = new ActiveMQConnectionFactory("failover://tcp://localhost:61616").createConnection();
        this.connection.start();
        this.session = this.connection.createSession(false, 1);
        this.producer = this.session.createProducer(this.destination);
        this.producer.setTimeToLive(2000L);
        this.producer.setDeliveryMode(2);
        Thread thread = new Thread("Producing Thread") { // from class: org.apache.activemq.usecases.ExpiredMessagesTest.8
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                int i = 0;
                while (true) {
                    try {
                        int i2 = i;
                        i++;
                        if (i2 >= 1000) {
                            ExpiredMessagesTest.this.producer.close();
                            return;
                        }
                        ExpiredMessagesTest.this.producer.send(ExpiredMessagesTest.this.useTextMessage ? ExpiredMessagesTest.this.session.createTextMessage("test") : ExpiredMessagesTest.this.session.createObjectMessage("test"));
                    } catch (Throwable th) {
                        th.printStackTrace();
                        return;
                    }
                }
            }
        };
        thread.start();
        thread.join();
        DestinationStatistics destinationStatistics = org.apache.activemq.TestSupport.getDestinationStatistics(this.broker, this.destination);
        LOG.info("Stats: size: " + destinationStatistics.getMessages().getCount() + ", enqueues: " + destinationStatistics.getEnqueues().getCount() + ", dequeues: " + destinationStatistics.getDequeues().getCount() + ", dispatched: " + destinationStatistics.getDispatched().getCount() + ", inflight: " + destinationStatistics.getInflight().getCount() + ", expiries: " + destinationStatistics.getExpired().getCount());
        LOG.info("stopping broker");
        this.broker.stop();
        this.broker.waitUntilStopped();
        Thread.sleep(5000L);
        LOG.info("recovering broker");
        this.broker = createBroker(false, 5000L);
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.ExpiredMessagesTest.9
            @Override // org.apache.activemq.util.Wait.Condition
            public boolean isSatisified() throws Exception {
                DestinationStatistics destinationStatistics2 = org.apache.activemq.TestSupport.getDestinationStatistics(ExpiredMessagesTest.this.broker, ExpiredMessagesTest.this.destination);
                ExpiredMessagesTest.LOG.info("Stats: size: " + destinationStatistics2.getMessages().getCount() + ", enqueues: " + destinationStatistics2.getEnqueues().getCount() + ", dequeues: " + destinationStatistics2.getDequeues().getCount() + ", dispatched: " + destinationStatistics2.getDispatched().getCount() + ", inflight: " + destinationStatistics2.getInflight().getCount() + ", expiries: " + destinationStatistics2.getExpired().getCount());
                return destinationStatistics2.getMessages().getCount() == 0;
            }
        });
        DestinationStatistics destinationStatistics2 = org.apache.activemq.TestSupport.getDestinationStatistics(this.broker, this.destination);
        assertEquals("Expect empty queue, QueueSize: ", 0L, destinationStatistics2.getMessages().getCount());
        assertEquals("all dequeues were expired", destinationStatistics2.getDequeues().getCount(), destinationStatistics2.getExpired().getCount());
    }

    private BrokerService createBroker(boolean z, long j) throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setBrokerName("localhost");
        AMQPersistenceAdapter aMQPersistenceAdapter = new AMQPersistenceAdapter();
        aMQPersistenceAdapter.setDirectory(new File("target/expiredtest-data/"));
        aMQPersistenceAdapter.setForceRecoverReferenceStore(true);
        brokerService.setPersistenceAdapter(aMQPersistenceAdapter);
        PolicyEntry policyEntry = new PolicyEntry();
        if (this.useVMCursor) {
            policyEntry.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy());
        }
        policyEntry.setExpireMessagesPeriod(j);
        policyEntry.setMaxExpirePageSize(1200);
        PolicyMap policyMap = new PolicyMap();
        policyMap.setDefaultEntry(policyEntry);
        brokerService.setDestinationPolicy(policyMap);
        brokerService.setDeleteAllMessagesOnStartup(z);
        brokerService.addConnector(NetworkedSyncTest.broker1URL);
        brokerService.start();
        brokerService.waitUntilStarted();
        return brokerService;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.AutoFailTestSupport
    public void tearDown() throws Exception {
        this.connection.stop();
        this.broker.stop();
        this.broker.waitUntilStopped();
    }
}
