package org.apache.activemq.store;

import jakarta.jms.Destination;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/store/StoreOrderTest.class */
public abstract class StoreOrderTest {
    private static final Logger LOG = LoggerFactory.getLogger(StoreOrderTest.class);
    protected BrokerService broker;
    private ActiveMQConnection connection;
    public Destination destination = new ActiveMQQueue("StoreOrderTest?consumer.prefetchSize=0");

    /* loaded from: input_file:org/apache/activemq/store/StoreOrderTest$TransactedSend.class */
    public class TransactedSend implements Runnable {
        private CountDownLatch readyForCommit;
        private CountDownLatch firstDone;
        private boolean first;
        private Session session;
        private MessageProducer producer;

        public TransactedSend(CountDownLatch countDownLatch, CountDownLatch countDownLatch2, boolean z) throws Exception {
            this.readyForCommit = countDownLatch;
            this.firstDone = countDownLatch2;
            this.first = z;
            this.session = StoreOrderTest.this.connection.createSession(true, 0);
            this.producer = this.session.createProducer(StoreOrderTest.this.destination);
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                if (!this.first) {
                    this.firstDone.await(30L, TimeUnit.SECONDS);
                }
                this.producer.send(this.session.createTextMessage(this.first ? "first" : "second"));
                if (this.first) {
                    this.firstDone.countDown();
                }
                this.readyForCommit.countDown();
            } catch (Exception e) {
                e.printStackTrace();
                Assert.fail("unexpected ex on run " + e);
            }
        }

        public void commit() throws Exception {
            this.session.commit();
            this.session.close();
        }
    }

    protected abstract void setPersistentAdapter(BrokerService brokerService) throws Exception;

    protected void dumpMessages() throws Exception {
    }

    @Before
    public void setup() throws Exception {
        this.broker = createBroker();
        initConnection();
    }

    public void initConnection() throws Exception {
        this.connection = new ActiveMQConnectionFactory("vm://localhost?create=false").createConnection();
        this.connection.setWatchTopicAdvisories(false);
        this.connection.start();
    }

    @After
    public void stopBroker() throws Exception {
        if (this.connection != null) {
            this.connection.close();
        }
        if (this.broker != null) {
            this.broker.stop();
        }
    }

    @Test
    public void testCompositeSendReceiveAfterRestart() throws Exception {
        this.destination = new ActiveMQQueue("StoreOrderTest,SecondStoreOrderTest");
        enqueueOneMessage();
        LOG.info("restart broker");
        stopBroker();
        this.broker = createRestartedBroker();
        dumpMessages();
        initConnection();
        this.destination = new ActiveMQQueue("StoreOrderTest");
        Assert.assertNotNull("got one message from first dest", receiveOne());
        dumpMessages();
        this.destination = new ActiveMQQueue("SecondStoreOrderTest");
        Assert.assertNotNull("got one message from second dest", receiveOne());
    }

    @Test
    public void validateUnorderedTxCommit() throws Exception {
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        CountDownLatch countDownLatch = new CountDownLatch(2);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        TransactedSend transactedSend = new TransactedSend(countDownLatch, countDownLatch2, true);
        TransactedSend transactedSend2 = new TransactedSend(countDownLatch, countDownLatch2, false);
        newCachedThreadPool.execute(transactedSend);
        newCachedThreadPool.execute(transactedSend2);
        Assert.assertTrue("both started", countDownLatch.await(20L, TimeUnit.SECONDS));
        LOG.info("commit out of order");
        transactedSend2.commit();
        enqueueOneMessage();
        transactedSend.commit();
        LOG.info("send/commit done..");
        dumpMessages();
        LOG.info("receive and rollback...");
        Session createSession = this.connection.createSession(true, 0);
        String receive = receive(createSession);
        String receive2 = receive(createSession);
        String receive3 = receive(createSession);
        Assert.assertEquals("second", receive);
        Assert.assertEquals("middle", receive2);
        Assert.assertEquals("first", receive3);
        createSession.rollback();
        createSession.close();
        LOG.info("restart broker");
        stopBroker();
        this.broker = createRestartedBroker();
        initConnection();
        LOG.info("receive and rollback after restart...");
        Session createSession2 = this.connection.createSession(true, 0);
        String receive4 = receive(createSession2);
        String receive5 = receive(createSession2);
        String receive6 = receive(createSession2);
        Assert.assertEquals("second", receive4);
        Assert.assertEquals("middle", receive5);
        Assert.assertEquals("first", receive6);
        createSession2.rollback();
        createSession2.close();
        LOG.info("receive and ack each message");
        String receiveOne = receiveOne();
        String receiveOne2 = receiveOne();
        String receiveOne3 = receiveOne();
        Assert.assertEquals("second", receiveOne);
        Assert.assertEquals("middle", receiveOne2);
        Assert.assertEquals("first", receiveOne3);
    }

    private void enqueueOneMessage() throws Exception {
        Session createSession = this.connection.createSession(true, 0);
        createSession.createProducer(this.destination).send(createSession.createTextMessage("middle"));
        createSession.commit();
        createSession.close();
    }

    private String receiveOne() throws Exception {
        Session createSession = this.connection.createSession(true, 0);
        String receive = receive(createSession);
        createSession.commit();
        createSession.close();
        return receive;
    }

    private String receive(Session session) throws Exception {
        MessageConsumer createConsumer = session.createConsumer(this.destination);
        String str = null;
        TextMessage receive = createConsumer.receive(5000L);
        if (receive != null) {
            LOG.info("got message: " + receive);
            str = receive.getText();
        }
        createConsumer.close();
        return str;
    }

    protected BrokerService createBroker() throws Exception {
        return startBroker(true);
    }

    protected BrokerService createRestartedBroker() throws Exception {
        return startBroker(false);
    }

    protected BrokerService startBroker(boolean z) throws Exception {
        BrokerService brokerService = new BrokerService();
        configureBroker(brokerService);
        brokerService.setDeleteAllMessagesOnStartup(z);
        brokerService.start();
        return brokerService;
    }

    protected void configureBroker(BrokerService brokerService) throws Exception {
        setPersistentAdapter(brokerService);
        brokerService.setAdvisorySupport(false);
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setMemoryLimit(3072L);
        policyEntry.setCursorMemoryHighWaterMark(68);
        policyEntry.setExpireMessagesPeriod(0L);
        policyMap.setDefaultEntry(policyEntry);
        brokerService.setDestinationPolicy(policyMap);
    }
}
