/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import jakarta.jms.ConnectionFactory;
import jakarta.jms.Message;
import jakarta.jms.MessageProducer;
import jakarta.jms.Queue;
import jakarta.jms.QueueBrowser;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import jakarta.jms.Topic;
import jakarta.jms.TopicSubscriber;
import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.jmx.DestinationViewMBean;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class MessageExpirationReaperTest {
    private BrokerService broker;
    private ConnectionFactory factory;
    private ActiveMQConnection connection;
    private final String destinationName = "TEST.Q";
    private final String brokerUrl = "tcp://localhost:0";
    private final String brokerName = "testBroker";
    private String connectionUri;

    @Before
    public void init() throws Exception {
        this.createBroker();
        this.connectionUri = ((TransportConnector)this.broker.getTransportConnectors().get(0)).getPublishableConnectString();
        this.factory = this.createConnectionFactory();
        this.connection = (ActiveMQConnection)this.factory.createConnection();
        this.connection.setClientID("test-connection");
        this.connection.start();
    }

    @After
    public void cleanUp() throws Exception {
        this.connection.close();
        this.broker.stop();
    }

    protected void createBroker() throws Exception {
        this.broker = new BrokerService();
        this.broker.setDeleteAllMessagesOnStartup(true);
        this.broker.setBrokerName("testBroker");
        this.broker.addConnector("tcp://localhost:0");
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry defaultEntry = new PolicyEntry();
        defaultEntry.setExpireMessagesPeriod(500L);
        policyMap.setDefaultEntry(defaultEntry);
        this.broker.setDestinationPolicy(policyMap);
        this.broker.start();
    }

    protected ConnectionFactory createConnectionFactory() throws Exception {
        return new ActiveMQConnectionFactory(this.connectionUri);
    }

    protected Session createSession() throws Exception {
        return this.connection.createSession(false, 1);
    }

    @Test
    public void testExpiredMessageReaping() throws Exception {
        Session producerSession = this.createSession();
        ActiveMQDestination destination = (ActiveMQDestination)producerSession.createQueue("TEST.Q");
        MessageProducer producer = producerSession.createProducer((jakarta.jms.Destination)destination);
        producer.setTimeToLive(1000L);
        int count = 3;
        for (int i = 0; i < 3; ++i) {
            TextMessage message = producerSession.createTextMessage("" + i);
            producer.send((Message)message);
        }
        Thread.sleep(2000L);
        DestinationViewMBean view = this.createView(destination);
        Assert.assertEquals((String)("Incorrect inflight count: " + view.getInFlightCount()), (long)0L, (long)view.getInFlightCount());
        Assert.assertEquals((String)"Incorrect queue size count", (long)0L, (long)view.getQueueSize());
        Assert.assertEquals((String)"Incorrect expired size count", (long)view.getEnqueueCount(), (long)view.getExpiredCount());
        for (int i = 0; i < 3; ++i) {
            TextMessage message = producerSession.createTextMessage("" + i);
            producer.send((Message)message);
        }
        Thread.sleep(2000L);
        Session browserSession = this.createSession();
        QueueBrowser browser = browserSession.createBrowser((Queue)destination);
        Assert.assertFalse((String)"no message in the browser", (boolean)browser.getEnumeration().hasMoreElements());
        Assert.assertEquals((String)("Wrong inFlightCount: " + view.getInFlightCount()), (long)0L, (long)view.getInFlightCount());
    }

    @Test
    public void testExpiredMessagesOnTopic() throws Exception {
        Session session = this.createSession();
        ActiveMQTopic destination = new ActiveMQTopic("TEST.Q?consumer.prefetchSize=0");
        MessageProducer producer = session.createProducer((jakarta.jms.Destination)destination);
        TopicSubscriber consumer = session.createDurableSubscriber((Topic)destination, "test-durable");
        producer.setTimeToLive(500L);
        int count = 3;
        for (int i = 0; i < 3; ++i) {
            TextMessage message = session.createTextMessage("" + i);
            producer.send((Message)message);
        }
        DestinationViewMBean view = this.createView((ActiveMQDestination)destination);
        Assert.assertEquals((String)"Incorrect enqueue count", (long)3L, (long)view.getEnqueueCount());
        consumer.close();
        Assert.assertTrue((String)"Incorrect queue size count", (boolean)Wait.waitFor(() -> view.getQueueSize() == 0L, (long)3000L, (long)100L));
        Assert.assertTrue((String)("Incorrect inflight count: " + view.getInFlightCount()), (boolean)Wait.waitFor(() -> view.getInFlightCount() == 0L, (long)3000L, (long)100L));
        Assert.assertTrue((String)"Incorrect expired size count", (boolean)Wait.waitFor(() -> view.getEnqueueCount() == view.getExpiredCount(), (long)3000L, (long)100L));
        Assert.assertEquals((long)0L, (long)this.broker.getDestination((ActiveMQDestination)destination).getMemoryUsage().getUsage());
    }

    @Test
    public void testExpiredMessagesOnTopic2Durables() throws Exception {
        Session session = this.createSession();
        ActiveMQTopic destination = new ActiveMQTopic("TEST.Q?consumer.prefetchSize=0");
        MessageProducer producer = session.createProducer((jakarta.jms.Destination)destination);
        TopicSubscriber consumer = session.createDurableSubscriber((Topic)destination, "test-durable");
        TopicSubscriber consumer2 = session.createDurableSubscriber((Topic)destination, "test-durable-2");
        producer.setTimeToLive(500L);
        int count = 3;
        for (int i = 0; i < 3; ++i) {
            TextMessage message = session.createTextMessage("" + i);
            producer.send((Message)message);
        }
        DestinationViewMBean view = this.createView((ActiveMQDestination)destination);
        Assert.assertEquals((String)"Incorrect enqueue count", (long)3L, (long)view.getEnqueueCount());
        consumer.close();
        consumer2.close();
        Assert.assertTrue((String)"Incorrect queue size count", (boolean)Wait.waitFor(() -> view.getQueueSize() == 0L, (long)3000L, (long)100L));
        Assert.assertTrue((String)("Incorrect inflight count: " + view.getInFlightCount()), (boolean)Wait.waitFor(() -> view.getInFlightCount() == 0L, (long)3000L, (long)100L));
        Assert.assertTrue((String)"Incorrect expired size count", (boolean)Wait.waitFor(() -> view.getEnqueueCount() * 2L == view.getExpiredCount(), (long)3000L, (long)100L));
        Destination brokerDest = this.broker.getDestination((ActiveMQDestination)destination);
        Assert.assertEquals((String)"Incorrect queue size count", (long)0L, (long)brokerDest.getMessageStore().getMessageCount());
        Assert.assertEquals((long)0L, (long)brokerDest.getMemoryUsage().getUsage());
    }

    @Test
    public void testExpiredMessagesOnTopicRecoveryListener() throws Exception {
        Session session = this.createSession();
        ActiveMQTopic destination = new ActiveMQTopic("TEST.Q?consumer.prefetchSize=0");
        MessageProducer producer = session.createProducer((jakarta.jms.Destination)destination);
        TopicSubscriber consumer = session.createDurableSubscriber((Topic)destination, "test-durable");
        producer.setTimeToLive(1000L);
        int count = 3;
        for (int i = 0; i < 3; ++i) {
            TextMessage message = session.createTextMessage("" + i);
            producer.send((Message)message);
        }
        this.broker.getSystemUsage().getMemoryUsage().setLimit(1024L);
        DestinationViewMBean view = this.createView((ActiveMQDestination)destination);
        Assert.assertEquals((String)"Incorrect enqueue count", (long)3L, (long)view.getEnqueueCount());
        consumer.close();
        Thread.sleep(3000L);
        Assert.assertEquals((long)0L, (long)view.getExpiredCount());
    }

    protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception {
        String domain = "org.apache.activemq";
        ObjectName name = destination.isQueue() ? new ObjectName(domain + ":type=Broker,brokerName=testBroker,destinationType=Queue,destinationName=TEST.Q") : new ObjectName(domain + ":type=Broker,brokerName=testBroker,destinationType=Topic,destinationName=TEST.Q");
        return (DestinationViewMBean)this.broker.getManagementContext().newProxyInstance(name, DestinationViewMBean.class, true);
    }
}

