/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.usecases;

import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageProducer;
import jakarta.jms.TextMessage;
import jakarta.jms.Topic;
import jakarta.jms.TopicConnection;
import jakarta.jms.TopicSession;
import jakarta.jms.TopicSubscriber;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.Wait;
import org.apache.commons.lang.RandomStringUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DurableSubscriptionHangTestCase {
    private static final Logger LOG = LoggerFactory.getLogger(DurableSubscriptionHangTestCase.class);
    static final String brokerName = "DurableSubscriptionHangTestCase";
    static final String clientID = "myId";
    private static final String topicName = "myTopic";
    private static final String durableSubName = "mySub";
    BrokerService brokerService;

    @Before
    public void startBroker() throws Exception {
        this.brokerService = new BrokerService();
        this.brokerService.setDeleteAllMessagesOnStartup(true);
        this.brokerService.setBrokerName(brokerName);
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry defaultEntry = new PolicyEntry();
        defaultEntry.setExpireMessagesPeriod(1000L);
        policyMap.setDefaultEntry(defaultEntry);
        this.brokerService.setDestinationPolicy(policyMap);
        this.brokerService.start();
    }

    @After
    public void brokerStop() throws Exception {
        this.brokerService.stop();
    }

    @Test
    public void testHanging() throws Exception {
        this.registerDurableSubscription();
        this.produceExpiredAndOneNonExpiredMessages();
        Assert.assertTrue((boolean)Wait.waitFor(() -> this.brokerService.getDestination((ActiveMQDestination)new ActiveMQTopic(topicName)).getDestinationStatistics().getExpired().getCount() == 1000L, (long)30000L, (long)500L));
        Message message = this.getUnexpiredMessageFromDurableSubscription();
        LOG.info("got message:" + String.valueOf(message));
        Assert.assertNotNull((String)"Unable to read unexpired message", (Object)message);
    }

    private void produceExpiredAndOneNonExpiredMessages() throws JMSException {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://DurableSubscriptionHangTestCase");
        TopicConnection connection = connectionFactory.createTopicConnection();
        TopicSession session = connection.createTopicSession(false, 1);
        Topic topic = session.createTopic(topicName);
        MessageProducer producer = session.createProducer((Destination)topic);
        producer.setTimeToLive(TimeUnit.SECONDS.toMillis(1L));
        for (int i = 0; i < 1000; ++i) {
            this.sendRandomMessage(session, producer);
        }
        producer.setTimeToLive(TimeUnit.DAYS.toMillis(1L));
        this.sendRandomMessage(session, producer);
        connection.close();
        LOG.info("produceExpiredAndOneNonExpiredMessages done");
    }

    private void registerDurableSubscription() throws JMSException {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://DurableSubscriptionHangTestCase");
        TopicConnection connection = connectionFactory.createTopicConnection();
        connection.setClientID(clientID);
        TopicSession topicSession = connection.createTopicSession(false, 1);
        Topic topic = topicSession.createTopic(topicName);
        TopicSubscriber durableSubscriber = topicSession.createDurableSubscriber(topic, durableSubName);
        connection.start();
        durableSubscriber.close();
        connection.close();
        LOG.info("Durable Sub Registered");
    }

    private Message getUnexpiredMessageFromDurableSubscription() throws Exception {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://DurableSubscriptionHangTestCase");
        TopicConnection connection = connectionFactory.createTopicConnection();
        connection.setClientID(clientID);
        TopicSession topicSession = connection.createTopicSession(false, 1);
        Topic topic = topicSession.createTopic(topicName);
        connection.start();
        TopicSubscriber subscriber = topicSession.createDurableSubscriber(topic, durableSubName);
        LOG.info("About to receive messages");
        Message message = subscriber.receive(1000L);
        subscriber.close();
        connection.close();
        LOG.info("collectMessagesFromDurableSubscriptionForOneMinute done");
        return message;
    }

    private void sendRandomMessage(TopicSession session, MessageProducer producer) throws JMSException {
        TextMessage textMessage = session.createTextMessage();
        textMessage.setText(RandomStringUtils.random((int)500, (String)"abcdefghijklmnopqrstuvwxyz"));
        producer.send((Message)textMessage);
    }
}

