package org.apache.activemq.bugs;

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.UndeclaredThrowableException;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.XASession;
import javax.management.InstanceNotFoundException;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQXAConnection;
import org.apache.activemq.ActiveMQXAConnectionFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.broker.jmx.DestinationViewMBean;
import org.apache.activemq.broker.jmx.PersistenceAdapterViewMBean;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.jmx.RecoveredXATransactionViewMBean;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.store.kahadb.MessageDatabase;
import org.apache.activemq.util.JMXSupport;
import org.apache.activemq.util.Wait;
import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.Logger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/bugs/AMQ7067Test.class */
public class AMQ7067Test {
    static final String WIRE_LEVEL_ENDPOINT = "tcp://localhost:61616";
    protected BrokerService broker;
    protected ActiveMQXAConnection connection;
    protected XASession xaSession;
    protected XAResource xaRes;
    private final String xbean = "xbean:";
    private final String confBase = "src/test/resources/org/apache/activemq/bugs/amq7067";
    protected static Random r = new Random();
    private static final ActiveMQXAConnectionFactory ACTIVE_MQ_CONNECTION_FACTORY = new ActiveMQXAConnectionFactory("tcp://localhost:61616");
    private static final ActiveMQConnectionFactory ACTIVE_MQ_NON_XA_CONNECTION_FACTORY = new ActiveMQConnectionFactory("tcp://localhost:61616");

    @Before
    public void setup() throws Exception {
        deleteData(new File("target/data"));
        createBroker();
    }

    @After
    public void shutdown() throws Exception {
        this.broker.stop();
    }

    public void setupXAConnection() throws Exception {
        this.connection = ACTIVE_MQ_CONNECTION_FACTORY.createXAConnection();
        this.connection.start();
        this.xaSession = this.connection.createXASession();
        this.xaRes = this.xaSession.getXAResource();
    }

    private void createBroker() throws Exception {
        this.broker = new BrokerService();
        this.broker = BrokerFactory.createBroker("xbean:src/test/resources/org/apache/activemq/bugs/amq7067/activemq.xml");
        this.broker.start();
    }

    @Test
    public void testXAPrepare() throws Exception {
        setupXAConnection();
        MessageProducer createProducer = this.xaSession.createProducer(this.xaSession.createQueue("holdKahaDb"));
        XATransactionId createXATransaction = createXATransaction();
        System.out.println("****** create new txid = " + createXATransaction);
        this.xaRes.start(createXATransaction, 0);
        createProducer.send(this.xaSession.createTextMessage(StringUtils.repeat("a", 10)));
        this.xaRes.end(createXATransaction, 67108864);
        final Queue createQueue = this.xaSession.createQueue("test");
        produce(this.xaRes, this.xaSession, createQueue, 100, 524288);
        this.xaRes.prepare(createXATransaction);
        produce(this.xaRes, this.xaSession, createQueue, 100, 524288);
        ((org.apache.activemq.broker.region.Queue) this.broker.getRegionBroker().getDestinationMap().get(createQueue)).purge();
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.bugs.AMQ7067Test.1
            public boolean isSatisified() throws Exception {
                return 0 == AMQ7067Test.this.getQueueSize(createQueue.getQueueName());
            }
        });
        this.broker.getPersistenceAdapter().checkpoint(true);
        Assert.assertEquals(1L, this.xaRes.recover(16777216).length);
        this.connection.close();
        this.broker.stop();
        this.broker.waitUntilStopped();
        createBroker();
        setupXAConnection();
        System.out.println("****** recovered = " + this.xaRes.recover(16777216));
        Assert.assertEquals(1L, r0.length);
    }

    @Test
    public void testXAPrepareWithAckCompactionDoesNotLooseInflight() throws Exception {
        ((Logger) Logger.class.cast(LogManager.getLogger(MessageDatabase.class))).setLevel(Level.TRACE);
        setupXAConnection();
        MessageProducer createProducer = this.xaSession.createProducer(this.xaSession.createQueue("holdKahaDb"));
        XATransactionId createXATransaction = createXATransaction();
        System.out.println("****** create new txid = " + createXATransaction);
        this.xaRes.start(createXATransaction, 0);
        createProducer.send(this.xaSession.createTextMessage(StringUtils.repeat("a", 10)));
        this.xaRes.end(createXATransaction, 67108864);
        final Queue createQueue = this.xaSession.createQueue("test");
        produce(this.xaRes, this.xaSession, createQueue, 100, 524288);
        this.xaRes.prepare(createXATransaction);
        produce(this.xaRes, this.xaSession, createQueue, 100, 524288);
        ((org.apache.activemq.broker.region.Queue) this.broker.getRegionBroker().getDestinationMap().get(createQueue)).purge();
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.bugs.AMQ7067Test.2
            public boolean isSatisified() throws Exception {
                return 0 == AMQ7067Test.this.getQueueSize(createQueue.getQueueName());
            }
        });
        int compactAcksAfterNoGC = this.broker.getPersistenceAdapter().getCompactAcksAfterNoGC() + 1;
        for (int i = 0; i < compactAcksAfterNoGC * 2; i++) {
            this.broker.getPersistenceAdapter().checkpoint(true);
        }
        TimeUnit.SECONDS.sleep(5L);
        Assert.assertEquals(1L, this.xaRes.recover(16777216).length);
        this.connection.close();
        this.broker.stop();
        this.broker.waitUntilStopped();
        createBroker();
        setupXAConnection();
        System.out.println("****** recovered = " + this.xaRes.recover(16777216));
        Assert.assertEquals(1L, r0.length);
    }

    @Test
    public void testXACommitWithAckCompactionDoesNotLooseOutcomeOnFullRecovery() throws Exception {
        doTestXACompletionWithAckCompactionDoesNotLooseOutcomeOnFullRecovery(true);
    }

    @Test
    public void testXARollbackWithAckCompactionDoesNotLooseOutcomeOnFullRecovery() throws Exception {
        doTestXACompletionWithAckCompactionDoesNotLooseOutcomeOnFullRecovery(false);
    }

    protected void doTestXACompletionWithAckCompactionDoesNotLooseOutcomeOnFullRecovery(boolean z) throws Exception {
        this.broker.getPersistenceAdapter().setCompactAcksAfterNoGC(2);
        ((Logger) Logger.class.cast(LogManager.getLogger(MessageDatabase.class))).setLevel(Level.TRACE);
        setupXAConnection();
        Queue createQueue = this.xaSession.createQueue("holdKahaDb");
        MessageProducer createProducer = this.xaSession.createProducer(createQueue);
        XATransactionId createXATransaction = createXATransaction();
        System.out.println("****** create new txid = " + createXATransaction);
        this.xaRes.start(createXATransaction, 0);
        createProducer.send(this.xaSession.createTextMessage(StringUtils.repeat("a", 10)));
        this.xaRes.end(createXATransaction, 67108864);
        final Queue createQueue2 = this.xaSession.createQueue("test");
        produce(this.xaRes, this.xaSession, createQueue2, 100, 524288);
        ((org.apache.activemq.broker.region.Queue) this.broker.getRegionBroker().getDestinationMap().get(createQueue2)).purge();
        this.xaRes.prepare(createXATransaction);
        produce(this.xaRes, this.xaSession, createQueue, 1, 10);
        produce(this.xaRes, this.xaSession, createQueue2, 50, 524288);
        ((org.apache.activemq.broker.region.Queue) this.broker.getRegionBroker().getDestinationMap().get(createQueue2)).purge();
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.bugs.AMQ7067Test.3
            public boolean isSatisified() throws Exception {
                return 0 == AMQ7067Test.this.getQueueSize(createQueue2.getQueueName());
            }
        });
        if (z) {
            this.xaRes.commit(createXATransaction, false);
        } else {
            this.xaRes.rollback(createXATransaction);
        }
        produce(this.xaRes, this.xaSession, createQueue2, 50, 524288);
        ((org.apache.activemq.broker.region.Queue) this.broker.getRegionBroker().getDestinationMap().get(createQueue2)).purge();
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.bugs.AMQ7067Test.4
            public boolean isSatisified() throws Exception {
                return 0 == AMQ7067Test.this.getQueueSize(createQueue2.getQueueName());
            }
        });
        int compactAcksAfterNoGC = this.broker.getPersistenceAdapter().getCompactAcksAfterNoGC() + 1;
        for (int i = 0; i < 4; i++) {
            for (int i2 = 0; i2 < compactAcksAfterNoGC; i2++) {
                this.broker.getPersistenceAdapter().checkpoint(true);
            }
            TimeUnit.SECONDS.sleep(2L);
        }
        Assert.assertEquals(0L, this.xaRes.recover(16777216).length);
        this.connection.close();
        curruptIndexFile(getDataDirectory());
        this.broker.stop();
        this.broker.waitUntilStopped();
        createBroker();
        setupXAConnection();
        System.out.println("****** recovered = " + this.xaRes.recover(16777216));
        Assert.assertEquals(0L, r0.length);
    }

    @Test
    public void testXAcommit() throws Exception {
        setupXAConnection();
        Queue createQueue = this.xaSession.createQueue("holdKahaDb");
        createDanglingTransaction(this.xaRes, this.xaSession, createQueue);
        MessageProducer createProducer = this.xaSession.createProducer(createQueue);
        XATransactionId createXATransaction = createXATransaction();
        System.out.println("****** create new txid = " + createXATransaction);
        this.xaRes.start(createXATransaction, 0);
        createProducer.send(this.xaSession.createTextMessage(StringUtils.repeat("a", 10)));
        this.xaRes.end(createXATransaction, 67108864);
        this.xaRes.prepare(createXATransaction);
        final Queue createQueue2 = this.xaSession.createQueue("test");
        produce(this.xaRes, this.xaSession, createQueue2, 100, 524288);
        this.xaRes.commit(createXATransaction, false);
        produce(this.xaRes, this.xaSession, createQueue2, 100, 524288);
        ((org.apache.activemq.broker.region.Queue) this.broker.getRegionBroker().getDestinationMap().get(createQueue2)).purge();
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.bugs.AMQ7067Test.5
            public boolean isSatisified() throws Exception {
                return 0 == AMQ7067Test.this.getQueueSize(createQueue2.getQueueName());
            }
        });
        this.broker.getPersistenceAdapter().checkpoint(true);
        Assert.assertEquals(1L, this.xaRes.recover(16777216).length);
        this.connection.close();
        this.broker.stop();
        this.broker.waitUntilStopped();
        createBroker();
        setupXAConnection();
        Assert.assertEquals(1L, this.xaRes.recover(16777216).length);
    }

    @Test
    public void testXArollback() throws Exception {
        setupXAConnection();
        Queue createQueue = this.xaSession.createQueue("holdKahaDb");
        createDanglingTransaction(this.xaRes, this.xaSession, createQueue);
        MessageProducer createProducer = this.xaSession.createProducer(createQueue);
        XATransactionId createXATransaction = createXATransaction();
        System.out.println("****** create new txid = " + createXATransaction);
        this.xaRes.start(createXATransaction, 0);
        createProducer.send(this.xaSession.createTextMessage(StringUtils.repeat("a", 10)));
        this.xaRes.end(createXATransaction, 67108864);
        this.xaRes.prepare(createXATransaction);
        Queue createQueue2 = this.xaSession.createQueue("test");
        produce(this.xaRes, this.xaSession, createQueue2, 100, 524288);
        this.xaRes.rollback(createXATransaction);
        produce(this.xaRes, this.xaSession, createQueue2, 100, 524288);
        ((org.apache.activemq.broker.region.Queue) this.broker.getRegionBroker().getDestinationMap().get(createQueue2)).purge();
        Assert.assertEquals(1L, this.xaRes.recover(16777216).length);
        this.connection.close();
        this.broker.stop();
        this.broker.waitUntilStopped();
        createBroker();
        setupXAConnection();
        Assert.assertEquals(1L, this.xaRes.recover(16777216).length);
    }

    @Test
    public void testCommit() throws Exception {
        Connection createConnection = ACTIVE_MQ_NON_XA_CONNECTION_FACTORY.createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(true, 0);
        Queue createQueue = createSession.createQueue("holdKahaDb");
        createSession.createProducer(createQueue).send(createSession.createTextMessage(StringUtils.repeat("a", 10)));
        final Queue createQueue2 = createSession.createQueue("test");
        produce(createConnection, createQueue2, 100, 524288);
        createSession.commit();
        produce(createConnection, createQueue2, 100, 524288);
        System.out.println(String.format("QueueSize %s: %d", createQueue.getQueueName(), Long.valueOf(getQueueSize(createQueue.getQueueName()))));
        purgeQueue(createQueue2.getQueueName());
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.bugs.AMQ7067Test.6
            public boolean isSatisified() throws Exception {
                return 0 == AMQ7067Test.this.getQueueSize(createQueue2.getQueueName());
            }
        });
        this.broker.getPersistenceAdapter().checkpoint(true);
        createConnection.close();
        curruptIndexFile(getDataDirectory());
        this.broker.stop();
        this.broker.waitUntilStopped();
        createBroker();
        this.broker.waitUntilStarted();
        try {
            TimeUnit.SECONDS.sleep(1L);
            System.out.println(String.format("QueueSize %s: %d", createQueue.getQueueName(), Long.valueOf(getQueueSize(createQueue.getQueueName()))));
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
        Assert.assertEquals(1L, getQueueSize(createQueue.getQueueName()));
    }

    @Test
    public void testRollback() throws Exception {
        Connection createConnection = ACTIVE_MQ_NON_XA_CONNECTION_FACTORY.createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(true, 0);
        Queue createQueue = createSession.createQueue("holdKahaDb");
        createSession.createProducer(createQueue).send(createSession.createTextMessage(StringUtils.repeat("a", 10)));
        final Queue createQueue2 = createSession.createQueue("test");
        produce(createConnection, createQueue2, 100, 524288);
        createSession.rollback();
        produce(createConnection, createQueue2, 100, 524288);
        System.out.println(String.format("QueueSize %s: %d", createQueue.getQueueName(), Long.valueOf(getQueueSize(createQueue.getQueueName()))));
        purgeQueue(createQueue2.getQueueName());
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.bugs.AMQ7067Test.7
            public boolean isSatisified() throws Exception {
                return 0 == AMQ7067Test.this.getQueueSize(createQueue2.getQueueName());
            }
        });
        this.broker.getPersistenceAdapter().checkpoint(true);
        createConnection.close();
        curruptIndexFile(getDataDirectory());
        this.broker.stop();
        this.broker.waitUntilStopped();
        createBroker();
        this.broker.waitUntilStarted();
        try {
            getQueueSize(createQueue.getQueueName());
            Assert.fail("expect InstanceNotFoundException");
        } catch (UndeclaredThrowableException e) {
            Assert.assertTrue(e.getCause() instanceof InstanceNotFoundException);
        }
    }

    @Test
    public void testForwardAcksAndCommitsWithLocalTransaction() throws Exception {
        this.broker.getPersistenceAdapter().setCompactAcksAfterNoGC(2);
        Connection createConnection = ACTIVE_MQ_NON_XA_CONNECTION_FACTORY.createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(true, 0);
        Queue createQueue = createSession.createQueue("holdKahaDb");
        createSession.createProducer(createQueue).send(createSession.createTextMessage(StringUtils.repeat("a", 10)));
        createSession.commit();
        final Queue createQueue2 = createSession.createQueue("test");
        for (int i = 0; i < 5; i++) {
            produce(createConnection, createQueue2, 60, 524288);
            consume(createConnection, createQueue2, 30, true);
        }
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.bugs.AMQ7067Test.8
            public boolean isSatisified() throws Exception {
                return 150 == AMQ7067Test.this.getQueueSize(createQueue2.getQueueName());
            }
        });
        int compactAcksAfterNoGC = this.broker.getPersistenceAdapter().getCompactAcksAfterNoGC() + 1;
        for (int i2 = 0; i2 < 10; i2++) {
            for (int i3 = 0; i3 < compactAcksAfterNoGC; i3++) {
                this.broker.getPersistenceAdapter().checkpoint(true);
            }
            TimeUnit.SECONDS.sleep(2L);
        }
        createSession.commit();
        createConnection.close();
        curruptIndexFile(getDataDirectory());
        this.broker.stop();
        this.broker.waitUntilStopped();
        createBroker();
        this.broker.waitUntilStarted();
        try {
            TimeUnit.SECONDS.sleep(1L);
            System.out.println(String.format("QueueSize %s: %d", createQueue.getQueueName(), Long.valueOf(getQueueSize(createQueue.getQueueName()))));
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
        Assert.assertEquals(1L, getQueueSize(createQueue.getQueueName()));
        Assert.assertEquals(150L, getQueueSize(createQueue2.getQueueName()));
    }

    private static void consume(Connection connection, Queue queue, int i, boolean z) throws JMSException {
        Session createSession = connection.createSession(z, z ? 0 : 1);
        MessageConsumer createConsumer = createSession.createConsumer(queue);
        int i2 = 0;
        while (createConsumer.receive(1000L) != null && i2 < i) {
            i2++;
            createSession.commit();
        }
        System.out.println(i2 + " messages consumed from " + queue.getQueueName());
        createSession.close();
    }

    protected static void createDanglingTransaction(XAResource xAResource, XASession xASession, Queue queue) throws JMSException, IOException, XAException {
        MessageProducer createProducer = xASession.createProducer(queue);
        XATransactionId createXATransaction = createXATransaction();
        xAResource.start(createXATransaction, 0);
        createProducer.send(xASession.createTextMessage(StringUtils.repeat("dangler", 10)));
        xAResource.end(createXATransaction, 67108864);
        xAResource.prepare(createXATransaction);
        System.out.println("****** createDanglingTransaction txId = " + createXATransaction);
    }

    protected static void produce(XAResource xAResource, XASession xASession, Queue queue, int i, int i2) throws JMSException, IOException, XAException {
        MessageProducer createProducer = xASession.createProducer(queue);
        for (int i3 = 0; i3 < i; i3++) {
            XATransactionId createXATransaction = createXATransaction();
            xAResource.start(createXATransaction, 0);
            createProducer.send(xASession.createTextMessage(StringUtils.repeat("a", i2)));
            xAResource.end(createXATransaction, 67108864);
            xAResource.commit(createXATransaction, true);
        }
    }

    protected static void produce(Connection connection, Queue queue, int i, int i2) throws JMSException, IOException, XAException {
        Session createSession = connection.createSession(true, 0);
        MessageProducer createProducer = createSession.createProducer(queue);
        for (int i3 = 0; i3 < i; i3++) {
            createProducer.send(createSession.createTextMessage(StringUtils.repeat("a", i2)));
            createSession.commit();
        }
    }

    protected static XATransactionId createXATransaction() throws IOException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
        dataOutputStream.writeLong(r.nextInt());
        dataOutputStream.close();
        byte[] byteArray = byteArrayOutputStream.toByteArray();
        XATransactionId xATransactionId = new XATransactionId();
        xATransactionId.setBranchQualifier(byteArray);
        xATransactionId.setGlobalTransactionId(byteArray);
        xATransactionId.setFormatId(55);
        return xATransactionId;
    }

    private RecoveredXATransactionViewMBean getProxyToPreparedTransactionViewMBean(TransactionId transactionId) throws MalformedObjectNameException, JMSException {
        return (RecoveredXATransactionViewMBean) this.broker.getManagementContext().newProxyInstance(new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,transactionType=RecoveredXaTransaction,xid=" + JMXSupport.encodeObjectNamePart(transactionId.toString())), RecoveredXATransactionViewMBean.class, true);
    }

    private PersistenceAdapterViewMBean getProxyToPersistenceAdapter(String str) throws MalformedObjectNameException, JMSException {
        return (PersistenceAdapterViewMBean) this.broker.getManagementContext().newProxyInstance(BrokerMBeanSupport.createPersistenceAdapterName(this.broker.getBrokerObjectName().toString(), str), PersistenceAdapterViewMBean.class, true);
    }

    private void deleteData(File file) throws Exception {
        String[] list = file.list();
        if (list == null) {
            return;
        }
        for (String str : list) {
            File file2 = new File(file.getPath(), str);
            if (file2.isDirectory()) {
                deleteData(file2);
            }
            file2.delete();
        }
        file.delete();
    }

    private long getQueueSize(String str) throws MalformedObjectNameException {
        return ((DestinationViewMBean) this.broker.getManagementContext().newProxyInstance(new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + JMXSupport.encodeObjectNamePart(str)), DestinationViewMBean.class, true)).getQueueSize();
    }

    private void purgeQueue(String str) throws MalformedObjectNameException, Exception {
        ((QueueViewMBean) this.broker.getManagementContext().newProxyInstance(new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + JMXSupport.encodeObjectNamePart(str)), QueueViewMBean.class, true)).purge();
    }

    private String getDataDirectory() throws MalformedObjectNameException {
        return ((BrokerViewMBean) this.broker.getManagementContext().newProxyInstance(new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost"), BrokerViewMBean.class, true)).getDataDirectory();
    }

    protected static void curruptIndexFile(String str) throws FileNotFoundException, UnsupportedEncodingException {
        PrintWriter printWriter = new PrintWriter(String.format("%s/kahadb/db.data", str), "UTF-8");
        printWriter.println("asdasdasd");
        printWriter.close();
    }
}
