package org.apache.activemq.transport.failover;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/transport/failover/FailoverTxSlowAckTest.class */
public class FailoverTxSlowAckTest {
    private static final Logger LOG = LoggerFactory.getLogger(FailoverTxSlowAckTest.class);
    private static final String QUEUE_IN = "IN";
    private static final String QUEUE_OUT = "OUT";
    private static final String MESSAGE_TEXT = "Test message ";
    private static final String TRANSPORT_URI = "tcp://localhost:0";
    private String url;
    final int prefetch = 1;
    BrokerService broker;

    /* renamed from: org.apache.activemq.transport.failover.FailoverTxSlowAckTest$2, reason: invalid class name */
    /* loaded from: input_file:org/apache/activemq/transport/failover/FailoverTxSlowAckTest$2.class */
    class AnonymousClass2 implements MessageListener {
        final /* synthetic */ AtomicInteger val$receivedCount;
        final /* synthetic */ CountDownLatch val$messagesReceived;
        final /* synthetic */ MessageProducer val$consumerProducer;
        final /* synthetic */ Session val$consumerSession;
        final /* synthetic */ AtomicBoolean val$gotDisconnect;
        final /* synthetic */ CountDownLatch val$brokerDisconnectedLatch;
        final /* synthetic */ AtomicBoolean val$gotReconnected;
        final /* synthetic */ CountDownLatch val$commitDoneLatch;

        AnonymousClass2(AtomicInteger atomicInteger, CountDownLatch countDownLatch, MessageProducer messageProducer, Session session, AtomicBoolean atomicBoolean, CountDownLatch countDownLatch2, AtomicBoolean atomicBoolean2, CountDownLatch countDownLatch3) {
            this.val$receivedCount = atomicInteger;
            this.val$messagesReceived = countDownLatch;
            this.val$consumerProducer = messageProducer;
            this.val$consumerSession = session;
            this.val$gotDisconnect = atomicBoolean;
            this.val$brokerDisconnectedLatch = countDownLatch2;
            this.val$gotReconnected = atomicBoolean2;
            this.val$commitDoneLatch = countDownLatch3;
        }

        public void onMessage(Message message) {
            FailoverTxSlowAckTest.LOG.info("consume one and commit");
            Assert.assertNotNull("got message", message);
            this.val$receivedCount.incrementAndGet();
            this.val$messagesReceived.countDown();
            try {
                TimeUnit.SECONDS.sleep(1L);
                this.val$consumerProducer.send(message);
                this.val$consumerSession.getTransactionContext().addSynchronization(new Synchronization() { // from class: org.apache.activemq.transport.failover.FailoverTxSlowAckTest.2.1
                    public void beforeEnd() throws Exception {
                        FailoverTxSlowAckTest.LOG.info("waiting for failover reconnect");
                        AnonymousClass2.this.val$gotDisconnect.set(Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.transport.failover.FailoverTxSlowAckTest.2.1.1
                            public boolean isSatisified() throws Exception {
                                return !AnonymousClass2.this.val$consumerSession.getConnection().getTransport().isConnected();
                            }
                        }));
                        AnonymousClass2.this.val$brokerDisconnectedLatch.countDown();
                        FailoverTxSlowAckTest.LOG.info("got disconnect");
                        AnonymousClass2.this.val$gotReconnected.set(Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.transport.failover.FailoverTxSlowAckTest.2.1.2
                            public boolean isSatisified() throws Exception {
                                return AnonymousClass2.this.val$consumerSession.getConnection().getTransport().isConnected();
                            }
                        }));
                        FailoverTxSlowAckTest.LOG.info("got failover reconnect");
                    }
                });
                this.val$consumerSession.commit();
                FailoverTxSlowAckTest.LOG.info("done commit");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                this.val$commitDoneLatch.countDown();
            }
        }
    }

    @After
    public void stopBroker() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
        }
    }

    public void startBroker(boolean z) throws Exception {
        this.broker = createBroker(z);
        this.broker.start();
    }

    public BrokerService createBroker(boolean z) throws Exception {
        return createBroker(z, "tcp://localhost:0");
    }

    public BrokerService createBroker(boolean z, String str) throws Exception {
        this.broker = new BrokerService();
        this.broker.addConnector(str);
        this.broker.setDeleteAllMessagesOnStartup(z);
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setOptimizedDispatch(true);
        policyMap.setDefaultEntry(policyEntry);
        this.broker.setDestinationPolicy(policyMap);
        return this.broker;
    }

    @Test
    public void testFailoverDuringAckRollsback() throws Exception {
        this.broker = createBroker(true);
        final ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
        this.broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() { // from class: org.apache.activemq.transport.failover.FailoverTxSlowAckTest.1
            int sendCount = 0;

            public void send(ProducerBrokerExchange producerBrokerExchange, org.apache.activemq.command.Message message) throws Exception {
                super.send(producerBrokerExchange, message);
                this.sendCount++;
                if (this.sendCount > 1) {
                    newFixedThreadPool.execute(new Runnable() { // from class: org.apache.activemq.transport.failover.FailoverTxSlowAckTest.1.1
                        @Override // java.lang.Runnable
                        public void run() {
                            FailoverTxSlowAckTest.LOG.info("Stopping broker before commit...");
                            try {
                                FailoverTxSlowAckTest.this.broker.stop();
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        }
                    });
                }
            }
        }});
        this.broker.start();
        this.url = ((TransportConnector) this.broker.getTransportConnectors().get(0)).getConnectUri().toString();
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("failover:(" + this.url + ")");
        activeMQConnectionFactory.setWatchTopicAdvisories(false);
        activeMQConnectionFactory.setDispatchAsync(false);
        ActiveMQConnection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.start();
        final Session createSession = createConnection.createSession(false, 1);
        final Queue createQueue = createSession.createQueue("IN?consumer.prefetchSize=1");
        Session createSession2 = createConnection.createSession(true, 1);
        Queue createQueue2 = createSession2.createQueue(QUEUE_OUT);
        MessageProducer createProducer = createSession2.createProducer(createQueue2);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        CountDownLatch countDownLatch3 = new CountDownLatch(1);
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        AtomicBoolean atomicBoolean2 = new AtomicBoolean();
        createSession2.createConsumer(createQueue).setMessageListener(new AnonymousClass2(atomicInteger, countDownLatch2, createProducer, createSession2, atomicBoolean, countDownLatch3, atomicBoolean2, countDownLatch));
        newFixedThreadPool.execute(new Runnable() { // from class: org.apache.activemq.transport.failover.FailoverTxSlowAckTest.3
            @Override // java.lang.Runnable
            public void run() {
                FailoverTxSlowAckTest.LOG.info("producer started");
                try {
                    FailoverTxSlowAckTest.this.produceMessage(createSession, createQueue, 1L);
                } catch (IllegalStateException e) {
                } catch (JMSException e2) {
                    e2.printStackTrace();
                    Assert.fail("unexpceted ex on producer: " + e2);
                }
                FailoverTxSlowAckTest.LOG.info("producer done");
            }
        });
        this.broker.waitUntilStopped();
        countDownLatch3.await();
        this.broker = createBroker(false, this.url);
        this.broker.start();
        Assert.assertTrue("message was recieved ", countDownLatch2.await(20L, TimeUnit.SECONDS));
        Assert.assertTrue("tx complete through failover", countDownLatch.await(40L, TimeUnit.SECONDS));
        Assert.assertEquals("one delivery", 1L, atomicInteger.get());
        Assert.assertTrue("got disconnect/reconnect", atomicBoolean.get());
        Assert.assertTrue("got reconnect", atomicBoolean2.get());
        Assert.assertNull("No message produced", receiveMessage(activeMQConnectionFactory, createQueue2));
    }

    private Message receiveMessage(ActiveMQConnectionFactory activeMQConnectionFactory, Queue queue) throws Exception {
        ActiveMQConnection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(true, 0);
        Message receive = createSession.createConsumer(queue).receive(4000L);
        createSession.commit();
        createConnection.close();
        return receive;
    }

    private void produceMessage(Session session, Queue queue, long j) throws JMSException {
        MessageProducer createProducer = session.createProducer(queue);
        for (int i = 0; i < j; i++) {
            createProducer.send(session.createTextMessage("Test message " + i), 2, 4, 500L);
        }
        createProducer.close();
    }
}
