package org.apache.activemq.transport.failover;

import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.usecases.DurableSubProcessWithRestartTest;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.class */
public class FailoverConsumerOutstandingCommitTest {
    private static final Logger LOG = LoggerFactory.getLogger(FailoverConsumerOutstandingCommitTest.class);
    private static final String QUEUE_NAME = "FailoverWithOutstandingCommit";
    private static final String MESSAGE_TEXT = "Test message ";
    private static final String TRANSPORT_URI = "tcp://localhost:0";
    private String url;
    final int prefetch = 10;
    BrokerService broker;

    @After
    public void stopBroker() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
        }
    }

    public void startBroker(boolean z) throws Exception {
        this.broker = createBroker(z);
        this.broker.start();
    }

    public BrokerService createBroker(boolean z) throws Exception {
        return createBroker(z, "tcp://localhost:0");
    }

    public BrokerService createBroker(boolean z, String str) throws Exception {
        this.broker = new BrokerService();
        this.broker.addConnector(str);
        this.broker.setDeleteAllMessagesOnStartup(z);
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setOptimizedDispatch(true);
        policyMap.setDefaultEntry(policyEntry);
        this.broker.setDestinationPolicy(policyMap);
        this.url = ((TransportConnector) this.broker.getTransportConnectors().get(0)).getConnectUri().toString();
        return this.broker;
    }

    @Test
    public void testFailoverConsumerDups() throws Exception {
        doTestFailoverConsumerDups(true);
    }

    public void doTestFailoverConsumerDups(boolean z) throws Exception {
        this.broker = createBroker(true);
        this.broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() { // from class: org.apache.activemq.transport.failover.FailoverConsumerOutstandingCommitTest.1
            public void commitTransaction(ConnectionContext connectionContext, TransactionId transactionId, boolean z2) throws Exception {
                connectionContext.setDontSendReponse(true);
                Executors.newSingleThreadExecutor().execute(new Runnable() { // from class: org.apache.activemq.transport.failover.FailoverConsumerOutstandingCommitTest.1.1
                    @Override // java.lang.Runnable
                    public void run() {
                        FailoverConsumerOutstandingCommitTest.LOG.info("Stopping broker before commit...");
                        try {
                            FailoverConsumerOutstandingCommitTest.this.broker.stop();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
        }});
        this.broker.start();
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("failover:(" + this.url + ")");
        activeMQConnectionFactory.setWatchTopicAdvisories(z);
        activeMQConnectionFactory.setDispatchAsync(false);
        ActiveMQConnection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.start();
        final Session createSession = createConnection.createSession(false, 1);
        final Queue createQueue = createSession.createQueue("FailoverWithOutstandingCommit?consumer.prefetchSize=10");
        final Session createSession2 = createConnection.createSession(true, 1);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final CountDownLatch countDownLatch2 = new CountDownLatch(2);
        createSession2.createConsumer(createQueue).setMessageListener(new MessageListener() { // from class: org.apache.activemq.transport.failover.FailoverConsumerOutstandingCommitTest.2
            public void onMessage(Message message) {
                FailoverConsumerOutstandingCommitTest.LOG.info("consume one and commit");
                Assert.assertNotNull("got message", message);
                try {
                    createSession2.commit();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
                countDownLatch.countDown();
                countDownLatch2.countDown();
                FailoverConsumerOutstandingCommitTest.LOG.info("done commit");
            }
        });
        Executors.newSingleThreadExecutor().execute(new Runnable() { // from class: org.apache.activemq.transport.failover.FailoverConsumerOutstandingCommitTest.3
            @Override // java.lang.Runnable
            public void run() {
                FailoverConsumerOutstandingCommitTest.LOG.info("producer started");
                try {
                    FailoverConsumerOutstandingCommitTest.this.produceMessage(createSession, createQueue, 20L);
                } catch (IllegalStateException e) {
                } catch (JMSException e2) {
                    e2.printStackTrace();
                    Assert.fail("unexpceted ex on producer: " + e2);
                }
                FailoverConsumerOutstandingCommitTest.LOG.info("producer done");
            }
        });
        this.broker.waitUntilStopped();
        this.broker = createBroker(false, this.url);
        this.broker.start();
        Assert.assertTrue("consumer added through failover", countDownLatch.await(20L, TimeUnit.SECONDS));
        Assert.assertTrue("another message was recieved after failover", countDownLatch2.await(20L, TimeUnit.SECONDS));
        createConnection.close();
    }

    @Test
    public void TestFailoverConsumerOutstandingSendTxIncomplete() throws Exception {
        doTestFailoverConsumerOutstandingSendTx(false);
    }

    @Test
    public void TestFailoverConsumerOutstandingSendTxComplete() throws Exception {
        doTestFailoverConsumerOutstandingSendTx(true);
    }

    public void doTestFailoverConsumerOutstandingSendTx(final boolean z) throws Exception {
        this.broker = createBroker(true);
        this.broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() { // from class: org.apache.activemq.transport.failover.FailoverConsumerOutstandingCommitTest.4
            public void commitTransaction(ConnectionContext connectionContext, TransactionId transactionId, boolean z2) throws Exception {
                if (z) {
                    FailoverConsumerOutstandingCommitTest.LOG.info("doing actual broker commit...");
                    super.commitTransaction(connectionContext, transactionId, z2);
                }
                connectionContext.setDontSendReponse(true);
                Executors.newSingleThreadExecutor().execute(new Runnable() { // from class: org.apache.activemq.transport.failover.FailoverConsumerOutstandingCommitTest.4.1
                    @Override // java.lang.Runnable
                    public void run() {
                        FailoverConsumerOutstandingCommitTest.LOG.info("Stopping broker before commit...");
                        try {
                            FailoverConsumerOutstandingCommitTest.this.broker.stop();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
        }});
        this.broker.start();
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("failover:(" + this.url + ")");
        activeMQConnectionFactory.setWatchTopicAdvisories(true);
        activeMQConnectionFactory.setDispatchAsync(false);
        ActiveMQConnection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.start();
        final Session createSession = createConnection.createSession(false, 1);
        final Queue createQueue = createSession.createQueue("FailoverWithOutstandingCommit?consumer.prefetchSize=10");
        final Queue createQueue2 = createSession.createQueue("FailoverWithOutstandingCommit.signal?consumer.prefetchSize=10");
        final Session createSession2 = createConnection.createSession(true, 0);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final CountDownLatch countDownLatch2 = new CountDownLatch(3);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        final ArrayList arrayList = new ArrayList();
        createSession2.createConsumer(createQueue).setMessageListener(new MessageListener() { // from class: org.apache.activemq.transport.failover.FailoverConsumerOutstandingCommitTest.5
            public void onMessage(Message message) {
                FailoverConsumerOutstandingCommitTest.LOG.info("consume one and commit: " + message);
                Assert.assertNotNull("got message", message);
                arrayList.add((TextMessage) message);
                try {
                    FailoverConsumerOutstandingCommitTest.this.produceMessage(createSession2, createQueue2, 1L);
                    createSession2.commit();
                } catch (JMSException e) {
                    FailoverConsumerOutstandingCommitTest.LOG.info("commit exception", e);
                    atomicBoolean.set(true);
                }
                countDownLatch.countDown();
                countDownLatch2.countDown();
                FailoverConsumerOutstandingCommitTest.LOG.info("done commit");
            }
        });
        Executors.newSingleThreadExecutor().execute(new Runnable() { // from class: org.apache.activemq.transport.failover.FailoverConsumerOutstandingCommitTest.6
            @Override // java.lang.Runnable
            public void run() {
                FailoverConsumerOutstandingCommitTest.LOG.info("producer started");
                try {
                    FailoverConsumerOutstandingCommitTest.this.produceMessage(createSession, createQueue, 20L);
                } catch (IllegalStateException e) {
                } catch (JMSException e2) {
                    e2.printStackTrace();
                    Assert.fail("unexpceted ex on producer: " + e2);
                }
                FailoverConsumerOutstandingCommitTest.LOG.info("producer done");
            }
        });
        this.broker.waitUntilStopped();
        this.broker = createBroker(false, this.url);
        this.broker.start();
        Assert.assertTrue("commit done through failover", countDownLatch.await(20L, TimeUnit.SECONDS));
        Assert.assertTrue("commit failed", atomicBoolean.get());
        Assert.assertTrue("another message was received after failover", countDownLatch2.await(20L, TimeUnit.SECONDS));
        int i = 0 + 1;
        Assert.assertEquals("get message 0 first", "Test message 0", ((TextMessage) arrayList.get(0)).getText());
        if (!z) {
            i++;
            Assert.assertEquals("get message 0 second", "Test message 0", ((TextMessage) arrayList.get(i)).getText());
        }
        Assert.assertTrue("another message was received", countDownLatch2.await(20L, TimeUnit.SECONDS));
        int i2 = i;
        int i3 = i + 1;
        Assert.assertEquals("get message 1 eventually", "Test message 1", ((TextMessage) arrayList.get(i2)).getText());
        createConnection.close();
    }

    @Test
    public void testRollbackFailoverConsumerTx() throws Exception {
        this.broker = createBroker(true);
        this.broker.start();
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("failover:(" + this.url + ")");
        activeMQConnectionFactory.setConsumerFailoverRedeliveryWaitPeriod(DurableSubProcessWithRestartTest.BROKER_RESTART);
        ActiveMQConnection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        Queue createQueue = createSession.createQueue(QUEUE_NAME);
        Session createSession2 = createConnection.createSession(true, 0);
        MessageConsumer createConsumer = createSession2.createConsumer(createQueue);
        Assert.assertNull("no message yet", createConsumer.receiveNoWait());
        produceMessage(createSession, createQueue, 1L);
        createSession.close();
        Assert.assertNotNull(createConsumer.receive(5000L));
        this.broker.stop();
        this.broker.waitUntilStopped();
        this.broker = createBroker(false, this.url);
        this.broker.start();
        createSession2.rollback();
        Assert.assertNotNull("got message again after rollback", createConsumer.receive(DurableSubProcessWithRestartTest.BROKER_RESTART));
        createSession2.commit();
        createSession2.close();
        Assert.assertNull("should be nothing left after commit", receiveMessage(activeMQConnectionFactory, createQueue));
        createConnection.close();
    }

    private Message receiveMessage(ActiveMQConnectionFactory activeMQConnectionFactory, Queue queue) throws Exception {
        ActiveMQConnection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(true, 0);
        Message receive = createSession.createConsumer(queue).receive(5000L);
        createSession.commit();
        createConnection.close();
        return receive;
    }

    private void produceMessage(Session session, Queue queue, long j) throws JMSException {
        MessageProducer createProducer = session.createProducer(queue);
        for (int i = 0; i < j; i++) {
            createProducer.send(session.createTextMessage("Test message " + i));
        }
        createProducer.close();
    }
}
