package org.apache.activemq.transport.failover;

import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.TransactionRolledBackException;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.AutoFailTestSupport;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ConsumerBrokerExchange;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.perf.NetworkedSyncTest;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/transport/failover/FailoverTransactionTest.class */
public class FailoverTransactionTest {
    private static final Log LOG = LogFactory.getLog(FailoverTransactionTest.class);
    private static final String QUEUE_NAME = "FailoverWithTx";
    private String url = NetworkedSyncTest.broker1URL;
    BrokerService broker;

    public void startCleanBroker() throws Exception {
        startBroker(true);
    }

    @After
    public void stopBroker() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
        }
    }

    public void startBroker(boolean z) throws Exception {
        this.broker = createBroker(z);
        this.broker.start();
    }

    public BrokerService createBroker(boolean z) throws Exception {
        this.broker = new BrokerService();
        this.broker.setUseJmx(false);
        this.broker.addConnector(this.url);
        this.broker.setDeleteAllMessagesOnStartup(z);
        return this.broker;
    }

    @Test
    public void testFailoverProducerCloseBeforeTransaction() throws Exception {
        startCleanBroker();
        Connection createConnection = new ActiveMQConnectionFactory("failover:(" + this.url + ")").createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(true, 1);
        Queue createQueue = createSession.createQueue(QUEUE_NAME);
        MessageConsumer createConsumer = createSession.createConsumer(createQueue);
        produceMessage(createSession, createQueue);
        this.broker.stop();
        startBroker(false);
        createSession.commit();
        Assert.assertNotNull("we got the message", createConsumer.receive(20000L));
        createSession.commit();
        createConnection.close();
    }

    @Test
    public void testFailoverCommitReplyLost() throws Exception {
        doTestFailoverCommitReplyLost(0);
    }

    @Test
    public void testFailoverCommitReplyLostJdbc() throws Exception {
        doTestFailoverCommitReplyLost(1);
    }

    @Test
    public void testFailoverCommitReplyLostKahaDB() throws Exception {
        doTestFailoverCommitReplyLost(2);
    }

    public void doTestFailoverCommitReplyLost(int i) throws Exception {
        this.broker = createBroker(true);
        setPersistenceAdapter(i);
        this.broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() { // from class: org.apache.activemq.transport.failover.FailoverTransactionTest.1
            public void commitTransaction(ConnectionContext connectionContext, TransactionId transactionId, boolean z) throws Exception {
                super.commitTransaction(connectionContext, transactionId, z);
                connectionContext.setDontSendReponse(true);
                Executors.newSingleThreadExecutor().execute(new Runnable() { // from class: org.apache.activemq.transport.failover.FailoverTransactionTest.1.1
                    @Override // java.lang.Runnable
                    public void run() {
                        FailoverTransactionTest.LOG.info("Stopping broker post commit...");
                        try {
                            FailoverTransactionTest.this.broker.stop();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
        }});
        this.broker.start();
        Connection createConnection = new ActiveMQConnectionFactory("failover:(" + this.url + ")").createConnection();
        createConnection.start();
        final Session createSession = createConnection.createSession(true, 1);
        Queue createQueue = createSession.createQueue(QUEUE_NAME);
        MessageConsumer createConsumer = createSession.createConsumer(createQueue);
        produceMessage(createSession, createQueue);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        Executors.newSingleThreadExecutor().execute(new Runnable() { // from class: org.apache.activemq.transport.failover.FailoverTransactionTest.2
            @Override // java.lang.Runnable
            public void run() {
                FailoverTransactionTest.LOG.info("doing async commit...");
                try {
                    createSession.commit();
                } catch (JMSException e) {
                    Assert.assertTrue(e instanceof TransactionRolledBackException);
                    FailoverTransactionTest.LOG.info("got commit exception: ", e);
                }
                countDownLatch.countDown();
                FailoverTransactionTest.LOG.info("done async commit");
            }
        });
        this.broker.waitUntilStopped();
        this.broker = createBroker(false);
        setPersistenceAdapter(i);
        this.broker.start();
        Assert.assertTrue("tx committed trough failover", countDownLatch.await(30L, TimeUnit.SECONDS));
        Message receive = createConsumer.receive(20000L);
        LOG.info("Received: " + receive);
        Assert.assertNotNull("we got the message", receive);
        Assert.assertNull("we got just one message", createConsumer.receive(2000L));
        createSession.commit();
        createConsumer.close();
        createConnection.close();
        this.broker.stop();
        this.broker.waitUntilStopped();
        LOG.info("Checking for remaining/hung messages..");
        this.broker = createBroker(false);
        setPersistenceAdapter(i);
        this.broker.start();
        Connection createConnection2 = new ActiveMQConnectionFactory("failover:(" + this.url + ")").createConnection();
        createConnection2.start();
        MessageConsumer createConsumer2 = createConnection2.createSession(false, 1).createConsumer(createQueue);
        Message receive2 = createConsumer2.receive(1000L);
        if (receive2 == null) {
            receive2 = createConsumer2.receive(5000L);
        }
        LOG.info("Received: " + receive2);
        Assert.assertNull("no messges left dangling but got: " + receive2, receive2);
        createConnection2.close();
    }

    private void setPersistenceAdapter(int i) throws IOException {
        switch (i) {
            case 0:
            default:
                return;
            case 1:
                this.broker.setPersistenceAdapter(new JDBCPersistenceAdapter());
                return;
            case 2:
                KahaDBPersistenceAdapter kahaDBPersistenceAdapter = new KahaDBPersistenceAdapter();
                kahaDBPersistenceAdapter.setDirectory(new File("target/activemq-data/kahadb/FailoverTransactionTest"));
                this.broker.setPersistenceAdapter(kahaDBPersistenceAdapter);
                return;
        }
    }

    @Test
    public void testFailoverProducerCloseBeforeTransactionFailWhenDisabled() throws Exception {
        startCleanBroker();
        Connection createConnection = new ActiveMQConnectionFactory("failover:(" + this.url + ")?trackTransactionProducers=false").createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(true, 1);
        Queue createQueue = createSession.createQueue(QUEUE_NAME);
        MessageConsumer createConsumer = createSession.createConsumer(createQueue);
        produceMessage(createSession, createQueue);
        this.broker.stop();
        startBroker(false);
        createSession.commit();
        Assert.assertNull("we got the message", createConsumer.receive(5000L));
        createSession.commit();
        createConnection.close();
    }

    @Test
    public void testFailoverMultipleProducerCloseBeforeTransaction() throws Exception {
        startCleanBroker();
        Connection createConnection = new ActiveMQConnectionFactory("failover:(" + this.url + ")").createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(true, 1);
        Queue createQueue = createSession.createQueue(QUEUE_NAME);
        MessageConsumer createConsumer = createSession.createConsumer(createQueue);
        for (int i = 0; i < 10; i++) {
            MessageProducer createProducer = createSession.createProducer(createQueue);
            createProducer.send(createSession.createTextMessage("Test message: 10"));
            createProducer.close();
        }
        this.broker.stop();
        startBroker(false);
        createSession.commit();
        for (int i2 = 0; i2 < 10; i2++) {
            Assert.assertNotNull("we got all the message: 10", createConsumer.receive(20000L));
        }
        createSession.commit();
        createConnection.close();
    }

    @Test
    public void testFailoverConsumerAckLost() throws Exception {
        for (int i = 0; i < 3; i++) {
            try {
                doTestFailoverConsumerAckLost(i);
                stopBroker();
            } catch (Throwable th) {
                stopBroker();
                throw th;
            }
        }
    }

    public void doTestFailoverConsumerAckLost(final int i) throws Exception {
        this.broker = createBroker(true);
        setPersistenceAdapter(0);
        this.broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() { // from class: org.apache.activemq.transport.failover.FailoverTransactionTest.3
            public void acknowledge(ConsumerBrokerExchange consumerBrokerExchange, final MessageAck messageAck) throws Exception {
                consumerBrokerExchange.getConnectionContext().setDontSendReponse(true);
                Executors.newSingleThreadExecutor().execute(new Runnable() { // from class: org.apache.activemq.transport.failover.FailoverTransactionTest.3.1
                    @Override // java.lang.Runnable
                    public void run() {
                        FailoverTransactionTest.LOG.info("Stopping broker on ack: " + messageAck);
                        try {
                            FailoverTransactionTest.this.broker.stop();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
        }});
        this.broker.start();
        Vector vector = new Vector();
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("failover:(" + this.url + ")");
        Connection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.start();
        vector.add(createConnection);
        Session createSession = createConnection.createSession(false, 1);
        Queue createQueue = createSession.createQueue("FailoverWithTx?consumer.prefetchSize=1");
        Connection createConnection2 = activeMQConnectionFactory.createConnection();
        createConnection2.start();
        vector.add(createConnection2);
        final Session createSession2 = createConnection2.createSession(true, 1);
        Connection createConnection3 = activeMQConnectionFactory.createConnection();
        createConnection3.start();
        vector.add(createConnection3);
        Session createSession3 = createConnection3.createSession(true, 1);
        final MessageConsumer createConsumer = createSession2.createConsumer(createQueue);
        MessageConsumer createConsumer2 = createSession3.createConsumer(createQueue);
        produceMessage(createSession, createQueue);
        produceMessage(createSession, createQueue);
        final Vector vector2 = new Vector();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Executors.newSingleThreadExecutor().execute(new Runnable() { // from class: org.apache.activemq.transport.failover.FailoverTransactionTest.4
            @Override // java.lang.Runnable
            public void run() {
                FailoverTransactionTest.LOG.info("doing async commit after consume...");
                try {
                    Message receive = createConsumer.receive(20000L);
                    FailoverTransactionTest.LOG.info("consumer1 first attempt got message: " + receive);
                    vector2.add(receive);
                    TimeUnit.SECONDS.sleep(i * 2);
                    Message receive2 = createConsumer.receive(5000L);
                    FailoverTransactionTest.LOG.info("consumer1 second attempt got message: " + receive2);
                    if (receive2 != null) {
                        vector2.add(receive2);
                    }
                    FailoverTransactionTest.LOG.info("committing consumer1 session: " + vector2.size() + " messsage(s)");
                    try {
                        createSession2.commit();
                    } catch (JMSException e) {
                        FailoverTransactionTest.LOG.info("got exception ex on commit", e);
                        if (!(e instanceof TransactionRolledBackException)) {
                            throw e;
                        }
                        atomicBoolean.set(true);
                    }
                    countDownLatch.countDown();
                    FailoverTransactionTest.LOG.info("done async commit");
                } catch (Exception e2) {
                    e2.printStackTrace();
                }
            }
        });
        this.broker.waitUntilStopped();
        this.broker = createBroker(false);
        setPersistenceAdapter(0);
        this.broker.start();
        Assert.assertTrue("tx committed trough failover", countDownLatch.await(30L, TimeUnit.SECONDS));
        LOG.info("received message count: " + vector2.size());
        Message receive = createConsumer.receive(atomicBoolean.get() ? 5000L : 20000L);
        LOG.info("post: from consumer1 received: " + receive);
        if (atomicBoolean.get()) {
            Assert.assertNotNull("should be available again after commit rollback ex", receive);
        } else {
            Assert.assertNull("should be nothing left for consumer as recieve should have committed", receive);
        }
        createSession2.commit();
        if (atomicBoolean.get() || (!atomicBoolean.get() && vector2.size() == 1)) {
            Message receive2 = createConsumer2.receive(10000L);
            LOG.info("post: from consumer2 received: " + receive2);
            Assert.assertNotNull("got second message on consumer2", receive2);
            createSession3.commit();
        }
        Iterator it = vector.iterator();
        while (it.hasNext()) {
            ((Connection) it.next()).close();
        }
        this.broker.stop();
        this.broker.waitUntilStopped();
        LOG.info("Checking for remaining/hung messages..");
        this.broker = createBroker(false);
        setPersistenceAdapter(0);
        this.broker.start();
        Connection createConnection4 = new ActiveMQConnectionFactory("failover:(" + this.url + ")").createConnection();
        createConnection4.start();
        MessageConsumer createConsumer3 = createConnection4.createSession(false, 1).createConsumer(createQueue);
        Message receive3 = createConsumer3.receive(1000L);
        if (receive3 == null) {
            receive3 = createConsumer3.receive(5000L);
        }
        LOG.info("Sweep received: " + receive3);
        Assert.assertNull("no messges left dangling but got: " + receive3, receive3);
        createConnection4.close();
    }

    @Test
    public void testAutoRollbackWithMissingRedeliveries() throws Exception {
        this.broker = createBroker(true);
        this.broker.start();
        Connection createConnection = new ActiveMQConnectionFactory("failover:(" + this.url + ")").createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        Queue createQueue = createSession.createQueue("FailoverWithTx?consumer.prefetchSize=1");
        Session createSession2 = createConnection.createSession(true, 0);
        MessageConsumer createConsumer = createSession2.createConsumer(createQueue);
        produceMessage(createSession, createQueue);
        Assert.assertNotNull(createConsumer.receive(20000L));
        this.broker.stop();
        this.broker = createBroker(false);
        setPersistenceAdapter(1);
        this.broker.start();
        try {
            createSession2.commit();
            Assert.fail("expected transaciton rolledback ex");
        } catch (TransactionRolledBackException e) {
        }
        this.broker.stop();
        this.broker = createBroker(false);
        this.broker.start();
        Assert.assertNotNull("should get rolledback message from original restarted broker", createConsumer.receive(20000L));
        createConnection.close();
    }

    @Test
    public void testWaitForMissingRedeliveries() throws Exception {
        LOG.info("testWaitForMissingRedeliveries()");
        this.broker = createBroker(true);
        this.broker.start();
        Connection createConnection = new ActiveMQConnectionFactory("failover:(" + this.url + ")?jms.consumerFailoverRedeliveryWaitPeriod=30000").createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        Queue createQueue = createSession.createQueue(QUEUE_NAME);
        final Session createSession2 = createConnection.createSession(true, 0);
        MessageConsumer createConsumer = createSession2.createConsumer(createQueue);
        produceMessage(createSession, createQueue);
        Message receive = createConsumer.receive(20000L);
        if (receive == null) {
            AutoFailTestSupport.dumpAllThreads("missing-");
        }
        Assert.assertNotNull("got message just produced", receive);
        this.broker.stop();
        this.broker = createBroker(false);
        setPersistenceAdapter(1);
        this.broker.start();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        Executors.newSingleThreadExecutor().execute(new Runnable() { // from class: org.apache.activemq.transport.failover.FailoverTransactionTest.5
            @Override // java.lang.Runnable
            public void run() {
                FailoverTransactionTest.LOG.info("doing async commit...");
                try {
                    createSession2.commit();
                    countDownLatch.countDown();
                } catch (JMSException e) {
                }
            }
        });
        this.broker.stop();
        this.broker = createBroker(false);
        this.broker.start();
        Assert.assertTrue("commit was successfull", countDownLatch.await(30L, TimeUnit.SECONDS));
        Assert.assertNull("should not get committed message", createConsumer.receive(5000L));
        createConnection.close();
    }

    @Test
    public void testPoisonOnDeliveryWhilePending() throws Exception {
        LOG.info("testWaitForMissingRedeliveries()");
        this.broker = createBroker(true);
        this.broker.start();
        Connection createConnection = new ActiveMQConnectionFactory("failover:(" + this.url + ")?jms.consumerFailoverRedeliveryWaitPeriod=10000").createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        Queue createQueue = createSession.createQueue("FailoverWithTx?consumer.prefetchSize=0");
        final Session createSession2 = createConnection.createSession(true, 0);
        MessageConsumer createConsumer = createSession2.createConsumer(createQueue);
        produceMessage(createSession, createQueue);
        Message receive = createConsumer.receive(20000L);
        if (receive == null) {
            AutoFailTestSupport.dumpAllThreads("missing-");
        }
        Assert.assertNotNull("got message just produced", receive);
        this.broker.stop();
        this.broker = createBroker(false);
        this.broker.start();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        Executors.newSingleThreadExecutor().execute(new Runnable() { // from class: org.apache.activemq.transport.failover.FailoverTransactionTest.6
            @Override // java.lang.Runnable
            public void run() {
                FailoverTransactionTest.LOG.info("doing async commit...");
                try {
                    createSession2.commit();
                } catch (JMSException e) {
                    countDownLatch.countDown();
                }
            }
        });
        Assert.assertNull("consumer2 not get a message while pending to 1", createSession2.createConsumer(createSession2.createQueue("FailoverWithTx?consumer.prefetchSize=1")).receive(2000L));
        Assert.assertTrue("commit completed with ex", countDownLatch.await(15L, TimeUnit.SECONDS));
        Assert.assertNull("consumer should not get rolledback and non redelivered message", createConsumer.receive(5000L));
        TextMessage receive2 = createSession2.createConsumer(createSession2.createQueue("ActiveMQ.DLQ")).receive(5000L);
        Assert.assertNotNull("found message in dlq", receive2);
        Assert.assertEquals("text matches", "Test message", receive2.getText());
        createSession2.commit();
        createConnection.close();
    }

    private void produceMessage(Session session, Queue queue) throws JMSException {
        MessageProducer createProducer = session.createProducer(queue);
        createProducer.send(session.createTextMessage("Test message"));
        createProducer.close();
    }
}
