/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.usecases;

import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.Topic;
import jakarta.jms.TopicSubscriber;
import java.io.File;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DurableSubSelectorDelayTest {
    private static final Logger LOG = LoggerFactory.getLogger(DurableSubSelectorDelayTest.class);
    public static final long RUNTIME = 180000L;
    private BrokerService broker;
    private ActiveMQTopic topic;
    private String connectionUri;

    @Test
    public void testProcess() throws Exception {
        MsgProducer msgProducer = new MsgProducer();
        msgProducer.start();
        DurableSubscriber[] subscribers = new DurableSubscriber[10];
        for (int i = 0; i < subscribers.length; ++i) {
            subscribers[i] = new DurableSubscriber(i);
            subscribers[i].process();
        }
        msgProducer.join();
        for (int j = 0; j < subscribers.length; ++j) {
            LOG.info("Unsubscribing subscriber " + String.valueOf(subscribers[j]));
            subscribers[j].unsubscribe();
        }
        TimeUnit.MINUTES.sleep(2L);
        final KahaDBPersistenceAdapter pa = (KahaDBPersistenceAdapter)this.broker.getPersistenceAdapter();
        Assert.assertTrue((String)("less than two journal file should be left, was: " + pa.getStore().getJournal().getFileMap().size()), (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return pa.getStore().getJournal().getFileMap().size() <= 2;
            }
        }, (long)TimeUnit.MINUTES.toMillis(2L)));
        LOG.info("DONE.");
    }

    @Before
    public void setUp() throws Exception {
        this.topic = new ActiveMQTopic("TopicT");
        this.startBroker();
    }

    @After
    public void tearDown() throws Exception {
        this.destroyBroker();
    }

    private void startBroker() throws Exception {
        this.startBroker(true);
    }

    private void startBroker(boolean deleteAllMessages) throws Exception {
        if (this.broker != null) {
            return;
        }
        this.broker = BrokerFactory.createBroker((String)("broker:(vm://" + DurableSubSelectorDelayTest.getName() + ")"));
        this.broker.setBrokerName(DurableSubSelectorDelayTest.getName());
        this.broker.setAdvisorySupport(false);
        this.broker.setDeleteAllMessagesOnStartup(deleteAllMessages);
        File kahadbData = new File("activemq-data/" + DurableSubSelectorDelayTest.getName() + "-kahadb");
        if (deleteAllMessages) {
            DurableSubSelectorDelayTest.delete(kahadbData);
        }
        this.broker.setPersistent(true);
        KahaDBPersistenceAdapter kahadb = new KahaDBPersistenceAdapter();
        kahadb.setDirectory(kahadbData);
        kahadb.setJournalMaxFileLength(512000);
        this.broker.setPersistenceAdapter((PersistenceAdapter)kahadb);
        this.connectionUri = this.broker.addConnector("tcp://localhost:0").getPublishableConnectString();
        this.broker.getSystemUsage().getMemoryUsage().setLimit(0x10000000L);
        this.broker.getSystemUsage().getTempUsage().setLimit(0x10000000L);
        this.broker.getSystemUsage().getStoreUsage().setLimit(0x10000000L);
        this.broker.start();
    }

    protected static String getName() {
        return "DurableSubSelectorDelayTest";
    }

    private static boolean delete(File path) {
        if (path == null) {
            return true;
        }
        if (path.isDirectory()) {
            for (File file : path.listFiles()) {
                DurableSubSelectorDelayTest.delete(file);
            }
        }
        return path.delete();
    }

    private void destroyBroker() throws Exception {
        if (this.broker == null) {
            return;
        }
        this.broker.stop();
        this.broker = null;
    }

    final class MsgProducer
    extends Thread {
        final String url;
        final ConnectionFactory cf;
        int transRover;
        int messageRover;
        int count;

        public MsgProducer() {
            super("MsgProducer");
            this.url = "vm://" + DurableSubSelectorDelayTest.getName();
            this.cf = new ActiveMQConnectionFactory(this.url);
            this.transRover = 0;
            this.messageRover = 0;
            this.count = 40;
            this.setDaemon(true);
        }

        public MsgProducer(int count) {
            super("MsgProducer");
            this.url = "vm://" + DurableSubSelectorDelayTest.getName();
            this.cf = new ActiveMQConnectionFactory(this.url);
            this.transRover = 0;
            this.messageRover = 0;
            this.count = 40;
            this.setDaemon(true);
            this.count = count;
        }

        @Override
        public void run() {
            long endTime = 180000L + System.currentTimeMillis();
            try {
                while (endTime > System.currentTimeMillis()) {
                    Thread.sleep(400L);
                    this.send();
                }
            }
            catch (Throwable e) {
                e.printStackTrace(System.out);
                throw new RuntimeException(e);
            }
        }

        public void send() throws JMSException {
            int trans = ++this.transRover;
            boolean relevantTrans = true;
            LOG.info("Sending Trans[id=" + trans + ", count=" + this.count + "]");
            Connection con = this.cf.createConnection();
            Session sess = con.createSession(false, 1);
            MessageProducer prod = sess.createProducer(null);
            for (int i = 0; i < this.count; ++i) {
                Message message = sess.createMessage();
                message.setIntProperty("ID", ++this.messageRover);
                message.setIntProperty("TRANS", trans);
                message.setBooleanProperty("RELEVANT", false);
                prod.send((Destination)DurableSubSelectorDelayTest.this.topic, message);
            }
            Message message = sess.createMessage();
            message.setIntProperty("ID", ++this.messageRover);
            message.setIntProperty("TRANS", trans);
            message.setBooleanProperty("COMMIT", true);
            message.setBooleanProperty("RELEVANT", relevantTrans);
            prod.send((Destination)DurableSubSelectorDelayTest.this.topic, message);
            LOG.info("Committed Trans[id=" + trans + ", count=" + this.count + "], ID=" + this.messageRover);
            sess.close();
            con.close();
        }
    }

    private final class DurableSubscriber {
        final ConnectionFactory cf;
        private final String subName;
        private final int id;
        private final String conClientId;
        private final String selector;

        public DurableSubscriber(int id) throws JMSException {
            this.cf = new ActiveMQConnectionFactory(DurableSubSelectorDelayTest.this.connectionUri);
            this.id = id;
            this.conClientId = "cli" + id;
            this.subName = "subscription" + id;
            this.selector = "RELEVANT = true";
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void process() throws JMSException {
            long end = System.currentTimeMillis() + 20000L;
            int transCount = 0;
            LOG.info(this.toString() + " ONLINE.");
            Connection con = this.openConnection();
            Session sess = con.createSession(false, 1);
            TopicSubscriber consumer = sess.createDurableSubscriber((Topic)DurableSubSelectorDelayTest.this.topic, this.subName, this.selector, false);
            try {
                long max;
                while ((max = end - System.currentTimeMillis()) > 0L) {
                    Message message = consumer.receive(max);
                    if (message == null) continue;
                    LOG.info("Received Trans[id=" + message.getIntProperty("TRANS") + ", count=" + transCount + "] in " + String.valueOf(this) + ".");
                }
            }
            finally {
                sess.close();
                con.close();
                LOG.info(this.toString() + " OFFLINE.");
            }
        }

        private Connection openConnection() throws JMSException {
            Connection con = this.cf.createConnection();
            con.setClientID(this.conClientId);
            con.start();
            return con;
        }

        private void unsubscribe() throws JMSException {
            Connection con = this.openConnection();
            Session session = con.createSession(false, 1);
            session.unsubscribe(this.subName);
            session.close();
            con.close();
        }

        public String toString() {
            return "DurableSubscriber[id=" + this.id + "]";
        }
    }
}

