package org.apache.activemq.usecases;

import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TopicSubscriber;
import java.io.File;
import java.io.IOException;
import java.util.Vector;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/usecases/DurableSubProcessMultiRestartTest.class */
public class DurableSubProcessMultiRestartTest {
    public static final long RUNTIME = 60000;
    private BrokerService broker;
    private ActiveMQTopic topic;
    private final ReentrantReadWriteLock processLock = new ReentrantReadWriteLock(true);
    private int restartCount = 0;
    private final int SUBSCRIPTION_ID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(DurableSubProcessMultiRestartTest.class);
    static final Vector<Throwable> exceptions = new Vector<>();

    /* loaded from: input_file:org/apache/activemq/usecases/DurableSubProcessMultiRestartTest$DurableSubscriber.class */
    private final class DurableSubscriber extends Thread {
        String url;
        final ConnectionFactory cf;
        public static final String SUBSCRIPTION_NAME = "subscription";
        private final int id;
        private final String conClientId;
        private long msgCount;

        public DurableSubscriber(int i) throws JMSException {
            super("DurableSubscriber" + i);
            this.url = "tcp://localhost:61656";
            this.cf = new ActiveMQConnectionFactory(this.url);
            setDaemon(true);
            this.id = i;
            this.conClientId = "cli" + i;
            subscribe();
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            long currentTimeMillis = System.currentTimeMillis() + 60000;
            while (currentTimeMillis > System.currentTimeMillis()) {
                try {
                    DurableSubProcessMultiRestartTest.this.processLock.readLock().lock();
                    try {
                        process(5000L);
                        DurableSubProcessMultiRestartTest.this.processLock.readLock().unlock();
                    } catch (Throwable th) {
                        DurableSubProcessMultiRestartTest.this.processLock.readLock().unlock();
                        throw th;
                    }
                } catch (JMSException e) {
                    if (!(e.getCause() instanceof IOException)) {
                        DurableSubProcessMultiRestartTest.exit(toString() + " failed with JMSException", e);
                    }
                } catch (Throwable th2) {
                    DurableSubProcessMultiRestartTest.exit(toString() + " failed.", th2);
                }
            }
            unsubscribe();
            DurableSubProcessMultiRestartTest.LOG.info(toString() + " DONE. MsgCout=" + this.msgCount);
        }

        private void process(long j) throws JMSException {
            DurableSubProcessMultiRestartTest.LOG.info(toString() + " ONLINE.");
            Connection openConnection = openConnection();
            Session createSession = openConnection.createSession(false, 1);
            TopicSubscriber createDurableSubscriber = createSession.createDurableSubscriber(DurableSubProcessMultiRestartTest.this.topic, "subscription");
            long currentTimeMillis = System.currentTimeMillis() + j;
            while (currentTimeMillis > System.currentTimeMillis()) {
                try {
                    if (createDurableSubscriber.receive(100L) != null) {
                        DurableSubProcessMultiRestartTest.LOG.info(toString() + "received message...");
                        this.msgCount++;
                    }
                } finally {
                    createSession.close();
                    openConnection.close();
                    DurableSubProcessMultiRestartTest.LOG.info(toString() + " OFFLINE.");
                }
            }
        }

        private Connection openConnection() throws JMSException {
            Connection createConnection = this.cf.createConnection();
            createConnection.setClientID(this.conClientId);
            createConnection.start();
            return createConnection;
        }

        private void subscribe() throws JMSException {
            Connection openConnection = openConnection();
            Session createSession = openConnection.createSession(false, 1);
            createSession.createDurableSubscriber(DurableSubProcessMultiRestartTest.this.topic, "subscription");
            DurableSubProcessMultiRestartTest.LOG.info(toString() + " SUBSCRIBED");
            createSession.close();
            openConnection.close();
        }

        private void unsubscribe() throws JMSException {
            Connection openConnection = openConnection();
            Session createSession = openConnection.createSession(false, 1);
            createSession.unsubscribe("subscription");
            DurableSubProcessMultiRestartTest.LOG.info(toString() + " UNSUBSCRIBED");
            createSession.close();
            openConnection.close();
        }

        @Override // java.lang.Thread
        public String toString() {
            return "DurableSubscriber[id=" + this.id + "]";
        }
    }

    /* loaded from: input_file:org/apache/activemq/usecases/DurableSubProcessMultiRestartTest$MsgProducer.class */
    final class MsgProducer extends Thread {
        String url;
        final ConnectionFactory cf;
        private long msgCount;
        int messageRover;

        public MsgProducer() {
            super("MsgProducer");
            this.url = "vm://" + DurableSubProcessMultiRestartTest.getName();
            this.cf = new ActiveMQConnectionFactory(this.url);
            this.messageRover = 0;
            setDaemon(true);
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            long currentTimeMillis = 60000 + System.currentTimeMillis();
            while (currentTimeMillis > System.currentTimeMillis()) {
                try {
                    Thread.sleep(500L);
                    DurableSubProcessMultiRestartTest.this.processLock.readLock().lock();
                    try {
                        send();
                        DurableSubProcessMultiRestartTest.this.processLock.readLock().unlock();
                        DurableSubProcessMultiRestartTest.LOG.info("MsgProducer msgCount=" + this.msgCount);
                    } catch (Throwable th) {
                        DurableSubProcessMultiRestartTest.this.processLock.readLock().unlock();
                        throw th;
                    }
                } catch (Throwable th2) {
                    DurableSubProcessMultiRestartTest.exit("Server.run failed", th2);
                    return;
                }
            }
        }

        public void send() throws JMSException {
            DurableSubProcessMultiRestartTest.LOG.info("Sending ... ");
            Connection createConnection = this.cf.createConnection();
            Session createSession = createConnection.createSession(false, 1);
            MessageProducer createProducer = createSession.createProducer((Destination) null);
            Message createMessage = createSession.createMessage();
            int i = this.messageRover + 1;
            this.messageRover = i;
            createMessage.setIntProperty("ID", i);
            createMessage.setBooleanProperty("COMMIT", true);
            createProducer.send(DurableSubProcessMultiRestartTest.this.topic, createMessage);
            this.msgCount++;
            DurableSubProcessMultiRestartTest.LOG.info("Message Sent.");
            createSession.close();
            createConnection.close();
        }
    }

    @Test
    public void testProcess() throws Exception {
        DurableSubscriber durableSubscriber = new DurableSubscriber(1);
        MsgProducer msgProducer = new MsgProducer();
        try {
            durableSubscriber.start();
            msgProducer.start();
            long currentTimeMillis = System.currentTimeMillis() + 60000;
            while (currentTimeMillis > System.currentTimeMillis()) {
                Thread.sleep(DurableSubProcessWithRestartTest.BROKER_RESTART);
                restartBroker();
            }
        } catch (Throwable th) {
            exit("ProcessTest.testProcess failed.", th);
        }
        try {
            msgProducer.join();
            durableSubscriber.join();
        } catch (InterruptedException e) {
            e.printStackTrace(System.out);
        }
        restartBroker();
        Assert.assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
        final KahaDBPersistenceAdapter persistenceAdapter = this.broker.getPersistenceAdapter();
        Assert.assertTrue("only less than two journal files should be left: " + persistenceAdapter.getStore().getJournal().getFileMap().size(), Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.DurableSubProcessMultiRestartTest.1
            public boolean isSatisified() throws Exception {
                return persistenceAdapter.getStore().getJournal().getFileMap().size() <= 2;
            }
        }, TimeUnit.MINUTES.toMillis(3L)));
        LOG.info("DONE.");
    }

    private void restartBroker() throws Exception {
        LOG.info("Broker restart: waiting for components.");
        this.processLock.writeLock().lock();
        try {
            destroyBroker();
            startBroker(false);
            this.restartCount++;
            LOG.info("Broker restarted. count: " + this.restartCount);
        } finally {
            this.processLock.writeLock().unlock();
        }
    }

    public static void exit(String str) {
        exit(str, null);
    }

    public static void exit(String str, Throwable th) {
        RuntimeException runtimeException = new RuntimeException(str, th);
        LOG.error(str, runtimeException);
        exceptions.add(runtimeException);
        Assert.fail(runtimeException.toString());
    }

    @Before
    public void setUp() throws Exception {
        this.topic = new ActiveMQTopic("TopicT");
        startBroker();
    }

    @After
    public void tearDown() throws Exception {
        destroyBroker();
    }

    private void startBroker() throws Exception {
        startBroker(true);
    }

    private void startBroker(boolean z) throws Exception {
        if (this.broker != null) {
            return;
        }
        this.broker = BrokerFactory.createBroker("broker:(vm://" + getName() + ")");
        this.broker.setBrokerName(getName());
        this.broker.setAdvisorySupport(false);
        this.broker.setDeleteAllMessagesOnStartup(z);
        this.broker.setKeepDurableSubsActive(true);
        File file = new File("activemq-data/" + getName() + "-kahadb");
        if (z) {
            delete(file);
        }
        this.broker.setPersistent(true);
        KahaDBPersistenceAdapter kahaDBPersistenceAdapter = new KahaDBPersistenceAdapter();
        kahaDBPersistenceAdapter.setDirectory(file);
        kahaDBPersistenceAdapter.setJournalMaxFileLength(20480);
        this.broker.setPersistenceAdapter(kahaDBPersistenceAdapter);
        this.broker.addConnector("tcp://localhost:61656");
        this.broker.getSystemUsage().getMemoryUsage().setLimit(268435456L);
        this.broker.getSystemUsage().getTempUsage().setLimit(268435456L);
        this.broker.getSystemUsage().getStoreUsage().setLimit(268435456L);
        this.broker.start();
    }

    protected static String getName() {
        return "DurableSubProcessMultiRestartTest";
    }

    private static boolean delete(File file) {
        if (file == null) {
            return true;
        }
        if (file.isDirectory()) {
            for (File file2 : file.listFiles()) {
                delete(file2);
            }
        }
        return file.delete();
    }

    private void destroyBroker() throws Exception {
        if (this.broker == null) {
            return;
        }
        this.broker.stop();
        this.broker = null;
    }
}
