/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.store;

import jakarta.jms.Destination;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class StoreOrderTest {
    private static final Logger LOG = LoggerFactory.getLogger(StoreOrderTest.class);
    protected BrokerService broker;
    private ActiveMQConnection connection;
    public Destination destination = new ActiveMQQueue("StoreOrderTest?consumer.prefetchSize=0");

    protected abstract void setPersistentAdapter(BrokerService var1) throws Exception;

    protected void dumpMessages() throws Exception {
    }

    @Before
    public void setup() throws Exception {
        this.broker = this.createBroker();
        this.initConnection();
    }

    public void initConnection() throws Exception {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?create=false");
        this.connection = (ActiveMQConnection)connectionFactory.createConnection();
        this.connection.setWatchTopicAdvisories(false);
        this.connection.start();
    }

    @After
    public void stopBroker() throws Exception {
        if (this.connection != null) {
            this.connection.close();
        }
        if (this.broker != null) {
            this.broker.stop();
        }
    }

    @Test
    public void testCompositeSendReceiveAfterRestart() throws Exception {
        this.destination = new ActiveMQQueue("StoreOrderTest,SecondStoreOrderTest");
        this.enqueueOneMessage();
        LOG.info("restart broker");
        this.stopBroker();
        this.broker = this.createRestartedBroker();
        this.dumpMessages();
        this.initConnection();
        this.destination = new ActiveMQQueue("StoreOrderTest");
        Assert.assertNotNull((String)"got one message from first dest", (Object)this.receiveOne());
        this.dumpMessages();
        this.destination = new ActiveMQQueue("SecondStoreOrderTest");
        Assert.assertNotNull((String)"got one message from second dest", (Object)this.receiveOne());
    }

    @Test
    public void validateUnorderedTxCommit() throws Exception {
        ExecutorService executor = Executors.newCachedThreadPool();
        CountDownLatch readyForCommit = new CountDownLatch(2);
        CountDownLatch firstDone = new CountDownLatch(1);
        TransactedSend first = new TransactedSend(readyForCommit, firstDone, true);
        TransactedSend second = new TransactedSend(readyForCommit, firstDone, false);
        executor.execute(first);
        executor.execute(second);
        Assert.assertTrue((String)"both started", (boolean)readyForCommit.await(20L, TimeUnit.SECONDS));
        LOG.info("commit out of order");
        second.commit();
        this.enqueueOneMessage();
        first.commit();
        LOG.info("send/commit done..");
        this.dumpMessages();
        String received3 = null;
        LOG.info("receive and rollback...");
        Session session = this.connection.createSession(true, 0);
        String received1 = this.receive(session);
        String received2 = this.receive(session);
        received3 = this.receive(session);
        Assert.assertEquals((Object)"second", (Object)received1);
        Assert.assertEquals((Object)"middle", (Object)received2);
        Assert.assertEquals((Object)"first", (Object)received3);
        session.rollback();
        session.close();
        LOG.info("restart broker");
        this.stopBroker();
        this.broker = this.createRestartedBroker();
        this.initConnection();
        LOG.info("receive and rollback after restart...");
        session = this.connection.createSession(true, 0);
        received1 = this.receive(session);
        received2 = this.receive(session);
        received3 = this.receive(session);
        Assert.assertEquals((Object)"second", (Object)received1);
        Assert.assertEquals((Object)"middle", (Object)received2);
        Assert.assertEquals((Object)"first", (Object)received3);
        session.rollback();
        session.close();
        LOG.info("receive and ack each message");
        received1 = this.receiveOne();
        received2 = this.receiveOne();
        received3 = this.receiveOne();
        Assert.assertEquals((Object)"second", (Object)received1);
        Assert.assertEquals((Object)"middle", (Object)received2);
        Assert.assertEquals((Object)"first", (Object)received3);
    }

    private void enqueueOneMessage() throws Exception {
        Session session = this.connection.createSession(true, 0);
        MessageProducer producer = session.createProducer(this.destination);
        producer.send((Message)session.createTextMessage("middle"));
        session.commit();
        session.close();
    }

    private String receiveOne() throws Exception {
        Session session = this.connection.createSession(true, 0);
        String received = this.receive(session);
        session.commit();
        session.close();
        return received;
    }

    private String receive(Session session) throws Exception {
        MessageConsumer consumer = session.createConsumer(this.destination);
        String result = null;
        TextMessage message = (TextMessage)consumer.receive(5000L);
        if (message != null) {
            LOG.info("got message: " + String.valueOf(message));
            result = message.getText();
        }
        consumer.close();
        return result;
    }

    protected BrokerService createBroker() throws Exception {
        boolean deleteMessagesOnStartup = true;
        return this.startBroker(deleteMessagesOnStartup);
    }

    protected BrokerService createRestartedBroker() throws Exception {
        boolean deleteMessagesOnStartup = false;
        return this.startBroker(deleteMessagesOnStartup);
    }

    protected BrokerService startBroker(boolean deleteMessagesOnStartup) throws Exception {
        BrokerService newBroker = new BrokerService();
        this.configureBroker(newBroker);
        newBroker.setDeleteAllMessagesOnStartup(deleteMessagesOnStartup);
        newBroker.start();
        return newBroker;
    }

    protected void configureBroker(BrokerService brokerService) throws Exception {
        this.setPersistentAdapter(brokerService);
        brokerService.setAdvisorySupport(false);
        PolicyMap map = new PolicyMap();
        PolicyEntry defaultEntry = new PolicyEntry();
        defaultEntry.setMemoryLimit(3072L);
        defaultEntry.setCursorMemoryHighWaterMark(68);
        defaultEntry.setExpireMessagesPeriod(0L);
        map.setDefaultEntry(defaultEntry);
        brokerService.setDestinationPolicy(map);
    }

    public class TransactedSend
    implements Runnable {
        private CountDownLatch readyForCommit;
        private CountDownLatch firstDone;
        private boolean first;
        private Session session;
        private MessageProducer producer;

        public TransactedSend(CountDownLatch readyForCommit, CountDownLatch firstDone, boolean b) throws Exception {
            this.readyForCommit = readyForCommit;
            this.firstDone = firstDone;
            this.first = b;
            this.session = StoreOrderTest.this.connection.createSession(true, 0);
            this.producer = this.session.createProducer(StoreOrderTest.this.destination);
        }

        @Override
        public void run() {
            try {
                if (!this.first) {
                    this.firstDone.await(30L, TimeUnit.SECONDS);
                }
                this.producer.send((Message)this.session.createTextMessage(this.first ? "first" : "second"));
                if (this.first) {
                    this.firstDone.countDown();
                }
                this.readyForCommit.countDown();
            }
            catch (Exception e) {
                e.printStackTrace();
                Assert.fail((String)("unexpected ex on run " + String.valueOf(e)));
            }
        }

        public void commit() throws Exception {
            this.session.commit();
            this.session.close();
        }
    }
}

