package org.apache.activemq.usecases;

import java.io.File;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/usecases/DurableSubSelectorDelayWithRestartTest.class */
public class DurableSubSelectorDelayWithRestartTest {
    private static final Logger LOG = LoggerFactory.getLogger(DurableSubSelectorDelayWithRestartTest.class);
    public static final long RUNTIME = 60000;
    private boolean RESTART = true;
    private int NUMBER_SUBSCRIBERS = 3;
    private BrokerService broker;
    private ActiveMQTopic topic;

    /* loaded from: input_file:org/apache/activemq/usecases/DurableSubSelectorDelayWithRestartTest$DurableSubscriber.class */
    private final class DurableSubscriber {
        private final String subName;
        private final int id;
        private final String conClientId;
        final String url = "failover:(tcp://localhost:61656)";
        final ConnectionFactory cf = new ActiveMQConnectionFactory("failover:(tcp://localhost:61656)");
        private final String selector = "RELEVANT = true";

        public DurableSubscriber(int i) throws JMSException {
            this.id = i;
            this.conClientId = "cli" + i;
            this.subName = "subscription" + i;
        }

        private void process() throws JMSException {
            long currentTimeMillis = System.currentTimeMillis() + 20000;
            DurableSubSelectorDelayWithRestartTest.LOG.info(toString() + " ONLINE.");
            Connection openConnection = openConnection();
            Session createSession = openConnection.createSession(false, 1);
            TopicSubscriber createDurableSubscriber = createSession.createDurableSubscriber(DurableSubSelectorDelayWithRestartTest.this.topic, this.subName, this.selector, false);
            while (true) {
                try {
                    long currentTimeMillis2 = currentTimeMillis - System.currentTimeMillis();
                    if (currentTimeMillis2 <= 0) {
                        break;
                    }
                    Message receive = createDurableSubscriber.receive(currentTimeMillis2);
                    if (receive != null) {
                        DurableSubSelectorDelayWithRestartTest.LOG.info("Received Trans[id=" + receive.getIntProperty("TRANS") + ", count=" + 0 + "] in " + this + ".");
                    }
                } finally {
                    try {
                        createSession.close();
                        openConnection.close();
                    } catch (Exception e) {
                    }
                    DurableSubSelectorDelayWithRestartTest.LOG.info(toString() + " OFFLINE.");
                }
            }
        }

        private Connection openConnection() throws JMSException {
            Connection createConnection = this.cf.createConnection();
            createConnection.setClientID(this.conClientId);
            createConnection.start();
            return createConnection;
        }

        public void subscribe() throws JMSException {
            DurableSubSelectorDelayWithRestartTest.LOG.info(toString() + "SUBSCRIBING");
            Connection openConnection = openConnection();
            Session createSession = openConnection.createSession(false, 1);
            createSession.createDurableSubscriber(DurableSubSelectorDelayWithRestartTest.this.topic, this.subName, this.selector, false);
            createSession.close();
            openConnection.close();
        }

        private void unsubscribe() throws JMSException {
            Connection openConnection = openConnection();
            Session createSession = openConnection.createSession(false, 1);
            createSession.unsubscribe(this.subName);
            createSession.close();
            openConnection.close();
        }

        public String toString() {
            return "DurableSubscriber[id=" + this.id + "]";
        }
    }

    /* loaded from: input_file:org/apache/activemq/usecases/DurableSubSelectorDelayWithRestartTest$MsgProducer.class */
    final class MsgProducer extends Thread {
        final String url = "failover:(tcp://localhost:61656)";
        final ConnectionFactory cf;
        int transRover;
        int messageRover;

        public MsgProducer() {
            super("MsgProducer");
            this.url = "failover:(tcp://localhost:61656)";
            this.cf = new ActiveMQConnectionFactory("failover:(tcp://localhost:61656)");
            this.transRover = 0;
            this.messageRover = 0;
            setDaemon(true);
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            long currentTimeMillis = 60000 + System.currentTimeMillis();
            while (currentTimeMillis > System.currentTimeMillis()) {
                try {
                    Thread.sleep(400L);
                    send();
                    if (DurableSubSelectorDelayWithRestartTest.this.RESTART) {
                        DurableSubSelectorDelayWithRestartTest.this.destroyBroker();
                        DurableSubSelectorDelayWithRestartTest.this.startBroker(false);
                    }
                } catch (Throwable th) {
                    th.printStackTrace(System.out);
                    throw new RuntimeException(th);
                }
            }
        }

        public void send() throws JMSException {
            int i = this.transRover + 1;
            this.transRover = i;
            DurableSubSelectorDelayWithRestartTest.LOG.info("Sending Trans[id=" + i + ", count=" + 40 + "]");
            Connection createConnection = this.cf.createConnection();
            Session createSession = createConnection.createSession(false, 1);
            MessageProducer createProducer = createSession.createProducer((Destination) null);
            for (int i2 = 0; i2 < 40; i2++) {
                Message createMessage = createSession.createMessage();
                int i3 = this.messageRover + 1;
                this.messageRover = i3;
                createMessage.setIntProperty("ID", i3);
                createMessage.setIntProperty("TRANS", i);
                createMessage.setBooleanProperty("RELEVANT", false);
                createProducer.send(DurableSubSelectorDelayWithRestartTest.this.topic, createMessage);
            }
            Message createMessage2 = createSession.createMessage();
            int i4 = this.messageRover + 1;
            this.messageRover = i4;
            createMessage2.setIntProperty("ID", i4);
            createMessage2.setIntProperty("TRANS", i);
            createMessage2.setBooleanProperty("COMMIT", true);
            createMessage2.setBooleanProperty("RELEVANT", true);
            createProducer.send(DurableSubSelectorDelayWithRestartTest.this.topic, createMessage2);
            DurableSubSelectorDelayWithRestartTest.LOG.info("Committed Trans[id=" + i + ", count=" + 40 + "], ID=" + this.messageRover);
            createSession.close();
            createConnection.close();
        }
    }

    @Test
    public void testProcess() throws Exception {
        MsgProducer msgProducer = new MsgProducer();
        msgProducer.start();
        DurableSubscriber[] durableSubscriberArr = new DurableSubscriber[this.NUMBER_SUBSCRIBERS];
        for (int i = 0; i < durableSubscriberArr.length - 1; i++) {
            durableSubscriberArr[i] = new DurableSubscriber(i);
            durableSubscriberArr[i].process();
        }
        msgProducer.join();
        durableSubscriberArr[durableSubscriberArr.length - 1] = new DurableSubscriber(durableSubscriberArr.length - 1);
        durableSubscriberArr[durableSubscriberArr.length - 1].subscribe();
        new MsgProducer().send();
        durableSubscriberArr[durableSubscriberArr.length - 1].process();
        for (int i2 = 0; i2 < durableSubscriberArr.length - 1; i2++) {
            LOG.info("Unsubscribing subscriber " + durableSubscriberArr[i2]);
            durableSubscriberArr[i2].unsubscribe();
        }
        final KahaDBPersistenceAdapter persistenceAdapter = this.broker.getPersistenceAdapter();
        Assert.assertTrue("small number of journal files should be left ", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.DurableSubSelectorDelayWithRestartTest.1
            public boolean isSatisified() throws Exception {
                DurableSubSelectorDelayWithRestartTest.LOG.info("journal data file count - expected {} actual {}", 4, Integer.valueOf(persistenceAdapter.getStore().getJournal().getFileMap().size()));
                return persistenceAdapter.getStore().getJournal().getFileMap().size() <= 4;
            }
        }, TimeUnit.MINUTES.toMillis(3L)));
        LOG.info("DONE.");
    }

    @Before
    public void setUp() throws Exception {
        this.topic = new ActiveMQTopic("TopicT");
        startBroker();
    }

    @After
    public void tearDown() throws Exception {
        destroyBroker();
    }

    private void startBroker() throws Exception {
        startBroker(true);
    }

    private void startBroker(boolean z) throws Exception {
        if (this.broker != null) {
            return;
        }
        this.broker = BrokerFactory.createBroker("broker:(vm://" + getName() + ")");
        this.broker.setBrokerName(getName());
        this.broker.setAdvisorySupport(false);
        this.broker.setDeleteAllMessagesOnStartup(z);
        File file = new File("activemq-data/" + getName() + "-kahadb");
        if (z) {
            delete(file);
        }
        this.broker.setPersistent(true);
        KahaDBPersistenceAdapter kahaDBPersistenceAdapter = new KahaDBPersistenceAdapter();
        kahaDBPersistenceAdapter.setDirectory(file);
        kahaDBPersistenceAdapter.setJournalMaxFileLength(10240);
        kahaDBPersistenceAdapter.setCleanupInterval(5000L);
        this.broker.setPersistenceAdapter(kahaDBPersistenceAdapter);
        this.broker.addConnector("tcp://localhost:61656");
        this.broker.getSystemUsage().getMemoryUsage().setLimit(268435456L);
        this.broker.getSystemUsage().getTempUsage().setLimit(268435456L);
        this.broker.getSystemUsage().getStoreUsage().setLimit(268435456L);
        LOG.info(toString() + "Starting Broker...");
        this.broker.start();
        this.broker.waitUntilStarted();
        LOG.info(toString() + " Broker started!!");
    }

    protected static String getName() {
        return "DurableSubSelectorDelayTest";
    }

    private static boolean delete(File file) {
        if (file == null) {
            return true;
        }
        if (file.isDirectory()) {
            for (File file2 : file.listFiles()) {
                delete(file2);
            }
        }
        return file.delete();
    }

    private void destroyBroker() throws Exception {
        if (this.broker == null) {
            return;
        }
        this.broker.stop();
        this.broker = null;
    }
}
