package org.apache.activemq;

import jakarta.jms.Connection;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.Queue;
import jakarta.jms.Session;
import jakarta.jms.XAConnection;
import jakarta.jms.XASession;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import junit.framework.TestCase;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.usecases.DurableSubProcessWithRestartTest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/XAConsumerTest.class */
public class XAConsumerTest extends TestCase {
    private static final String TEST_AMQ_BROKER_URI = "tcp://localhost:0";
    private String brokerUri;
    private BrokerService broker;
    static final Logger LOG = LoggerFactory.getLogger(XAConsumerTest.class);
    private static long txGenerator = 21;

    protected void setUp() throws Exception {
        this.broker = createBroker();
        this.broker.start();
        this.broker.waitUntilStarted();
        this.brokerUri = this.broker.getTransportConnectorByScheme("tcp").getPublishableConnectString();
    }

    protected void tearDown() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
        }
    }

    public void testPullRequestXAConsumer() throws Exception {
        XAConnection createXAConnection = new ActiveMQXAConnectionFactory("admin", "admin", this.brokerUri + "?trace=true&jms.prefetchPolicy.all=0").createXAConnection();
        createXAConnection.start();
        ActiveMQXAConnectionFactory activeMQXAConnectionFactory = new ActiveMQXAConnectionFactory("admin", "admin", this.brokerUri + "?trace=true&jms.prefetchPolicy.all=0");
        activeMQXAConnectionFactory.setXaAckMode(1);
        Connection createConnection = activeMQXAConnectionFactory.createConnection();
        createConnection.start();
        try {
            LOG.info(">>>INVOKE XA receive with PullRequest Consumer...");
            XASession createXASession = createXAConnection.createXASession();
            XAResource xAResource = createXASession.getXAResource();
            Xid createXid = createXid();
            xAResource.start(createXid, 0);
            Queue createQueue = createXASession.createQueue("TEST.T2");
            final MessageConsumer createConsumer = createXASession.createConsumer(createQueue);
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            final CountDownLatch countDownLatch2 = new CountDownLatch(1);
            new Thread(new Runnable() { // from class: org.apache.activemq.XAConsumerTest.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        createConsumer.receive(600000L);
                    } catch (JMSException e) {
                        countDownLatch2.countDown();
                        XAConsumerTest.LOG.info("got expected ex: ", e);
                    } finally {
                        countDownLatch.countDown();
                    }
                }
            }).start();
            LOG.info(">>>simulate Transaction Rollback");
            xAResource.end(createXid, 536870912);
            xAResource.rollback(createXid);
            LOG.info(">>>Sending message...");
            Session createSession = createConnection.createSession(false, 1);
            createSession.createProducer(createQueue).send(createSession.createMessage());
            countDownLatch.await(30L, TimeUnit.SECONDS);
            countDownLatch2.await(5L, TimeUnit.SECONDS);
            createConsumer.close();
            createXASession.close();
            Message receive = createSession.createConsumer(createQueue).receive(5000L);
            assertNotNull("Got message", receive);
            LOG.info("Got message on new session", receive);
            receive.acknowledge();
            LOG.info(">>>Closing Connection");
            if (createXAConnection != null) {
                createXAConnection.close();
            }
            if (createConnection != null) {
                createConnection.close();
            }
        } catch (Throwable th) {
            LOG.info(">>>Closing Connection");
            if (createXAConnection != null) {
                createXAConnection.close();
            }
            if (createConnection != null) {
                createConnection.close();
            }
            throw th;
        }
    }

    public void testPullRequestXAConsumerSingleConsumer() throws Exception {
        XAConnection createXAConnection = new ActiveMQXAConnectionFactory("admin", "admin", this.brokerUri + "?trace=true&jms.prefetchPolicy.all=0").createXAConnection();
        createXAConnection.start();
        try {
            LOG.info(">>>INVOKE XA receive with PullRequest Consumer...");
            XASession createXASession = createXAConnection.createXASession();
            XAResource xAResource = createXASession.getXAResource();
            Xid createXid = createXid();
            xAResource.start(createXid, 0);
            Queue createQueue = createXASession.createQueue("TEST.T2");
            final MessageConsumer createConsumer = createXASession.createConsumer(createQueue);
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            final CountDownLatch countDownLatch2 = new CountDownLatch(1);
            new Thread(new Runnable() { // from class: org.apache.activemq.XAConsumerTest.2
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        createConsumer.receive(600000L);
                    } catch (JMSException e) {
                        countDownLatch2.countDown();
                        XAConsumerTest.LOG.info("got expected ex: ", e);
                    } finally {
                        countDownLatch.countDown();
                    }
                }
            }).start();
            LOG.info(">>>simulate Transaction Rollback");
            xAResource.end(createXid, 536870912);
            xAResource.rollback(createXid);
            XASession createXASession2 = createXAConnection.createXASession();
            XAResource xAResource2 = createXASession2.getXAResource();
            Xid createXid2 = createXid();
            xAResource2.start(createXid2, 0);
            LOG.info(">>>Sending message...");
            ActiveMQMessage createMessage = createXASession2.createMessage();
            createMessage.setTransactionId(new XATransactionId(createXid2));
            createXASession2.createProducer(createQueue).send(createMessage);
            xAResource2.end(createXid2, 67108864);
            xAResource2.commit(createXid2, true);
            countDownLatch.await(30L, TimeUnit.SECONDS);
            countDownLatch2.await(5L, TimeUnit.SECONDS);
            createConsumer.close();
            MessageConsumer createConsumer2 = createXASession.createConsumer(createQueue);
            Xid createXid3 = createXid();
            xAResource.start(createXid3, 0);
            Message receive = createConsumer2.receive(DurableSubProcessWithRestartTest.BROKER_RESTART);
            assertNotNull("Got message", receive);
            LOG.info("Got message on new session", receive);
            xAResource.end(createXid3, 67108864);
            xAResource.commit(createXid3, true);
            LOG.info(">>>Closing Connection");
            if (createXAConnection != null) {
                createXAConnection.close();
            }
        } catch (Throwable th) {
            LOG.info(">>>Closing Connection");
            if (createXAConnection != null) {
                createXAConnection.close();
            }
            throw th;
        }
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.io.DataOutputStream, long] */
    public Xid createXid() throws IOException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        ?? dataOutputStream = new DataOutputStream(byteArrayOutputStream);
        long j = txGenerator + 1;
        txGenerator = dataOutputStream;
        dataOutputStream.writeLong(j);
        dataOutputStream.close();
        final byte[] byteArray = byteArrayOutputStream.toByteArray();
        return new Xid() { // from class: org.apache.activemq.XAConsumerTest.3
            public int getFormatId() {
                return 86;
            }

            public byte[] getGlobalTransactionId() {
                return byteArray;
            }

            public byte[] getBranchQualifier() {
                return byteArray;
            }
        };
    }

    private BrokerService createBroker() throws Exception {
        BrokerService brokerService = new BrokerService();
        PolicyMap policyMap = new PolicyMap();
        ArrayList arrayList = new ArrayList();
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setProducerFlowControl(true);
        policyEntry.setUseCache(true);
        policyEntry.setPrioritizedMessages(false);
        policyEntry.setExpireMessagesPeriod(0L);
        policyEntry.setQueuePrefetch(0);
        policyEntry.setQueue(">");
        arrayList.add(policyEntry);
        policyMap.setPolicyEntries(arrayList);
        brokerService.setDestinationPolicy(policyMap);
        brokerService.addConnector("tcp://localhost:0");
        brokerService.deleteAllMessages();
        return brokerService;
    }
}
