package org.apache.activemq.transport.failover;

import jakarta.jms.Connection;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageListener;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import jakarta.jms.Topic;
import jakarta.jms.TopicSubscriber;
import jakarta.jms.TransactionRolledBackException;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ConsumerBrokerExchange;
import org.apache.activemq.broker.TransportConnection;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/activemq/transport/failover/FailoverDurableSubTransactionTest.class */
public class FailoverDurableSubTransactionTest {
    private static final Logger LOG = LoggerFactory.getLogger(FailoverDurableSubTransactionTest.class);
    private static final String TOPIC_NAME = "Failover.WithTx";
    private static final String TRANSPORT_URI = "tcp://localhost:0";
    private String url;
    BrokerService broker;

    @Parameterized.Parameter(0)
    public FailType failType;

    /* loaded from: input_file:org/apache/activemq/transport/failover/FailoverDurableSubTransactionTest$FailType.class */
    public enum FailType {
        ON_DISPATCH,
        ON_ACK,
        ON_COMMIT,
        ON_DISPACH_WITH_REPLAY_DELAY
    }

    @Parameterized.Parameters(name = "failType=#{0}")
    public static Iterable<Object[]> parameters() {
        return Arrays.asList(new Object[]{FailType.ON_DISPATCH}, new Object[]{FailType.ON_DISPACH_WITH_REPLAY_DELAY}, new Object[]{FailType.ON_ACK}, new Object[]{FailType.ON_COMMIT});
    }

    @After
    public void tearDown() throws Exception {
        stopBroker();
    }

    public void stopBroker() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
        }
    }

    public void startBroker(boolean z) throws Exception {
        this.broker = createBroker(z);
        this.broker.start();
    }

    public void startBroker(boolean z, String str) throws Exception {
        this.broker = createBroker(z, str);
        this.broker.start();
    }

    public BrokerService createBroker(boolean z) throws Exception {
        return createBroker(z, "tcp://localhost:0");
    }

    public BrokerService createBroker(boolean z, String str) throws Exception {
        this.broker = new BrokerService();
        this.broker.setUseJmx(false);
        this.broker.setAdvisorySupport(false);
        this.broker.addConnector(str);
        this.broker.setDeleteAllMessagesOnStartup(z);
        PolicyMap policyMap = new PolicyMap();
        policyMap.setDefaultEntry(new PolicyEntry());
        this.broker.setDestinationPolicy(policyMap);
        this.broker.setKeepDurableSubsActive(true);
        this.url = ((TransportConnector) this.broker.getTransportConnectors().get(0)).getConnectUri().toString();
        return this.broker;
    }

    public void configureConnectionFactory(ActiveMQConnectionFactory activeMQConnectionFactory) {
        activeMQConnectionFactory.setWatchTopicAdvisories(false);
        activeMQConnectionFactory.getRedeliveryPolicy().setMaximumRedeliveries(-1);
        if (FailType.ON_DISPACH_WITH_REPLAY_DELAY.equals(this.failType)) {
            return;
        }
        activeMQConnectionFactory.getRedeliveryPolicy().setInitialRedeliveryDelay(0L);
        activeMQConnectionFactory.getRedeliveryPolicy().setRedeliveryDelay(0L);
    }

    @Test
    public void testFailoverCommit() throws Exception {
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final int i = FailType.ON_COMMIT.equals(this.failType) ? 1 : 9;
        this.broker = createBroker(true);
        this.broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() { // from class: org.apache.activemq.transport.failover.FailoverDurableSubTransactionTest.1
            public void commitTransaction(ConnectionContext connectionContext, TransactionId transactionId, boolean z) throws Exception {
                if (!FailType.ON_COMMIT.equals(FailoverDurableSubTransactionTest.this.failType) || atomicInteger.incrementAndGet() != i) {
                    super.commitTransaction(connectionContext, transactionId, z);
                    return;
                }
                Iterator it = ((TransportConnector) FailoverDurableSubTransactionTest.this.broker.getTransportConnectors().get(0)).getConnections().iterator();
                while (it.hasNext()) {
                    TransportConnection transportConnection = (TransportConnection) it.next();
                    FailoverDurableSubTransactionTest.LOG.error("Whacking connection on commit: " + transportConnection);
                    transportConnection.serviceException(new IOException("ERROR NOW"));
                }
            }

            public void acknowledge(ConsumerBrokerExchange consumerBrokerExchange, MessageAck messageAck) throws Exception {
                if (FailType.ON_ACK.equals(FailoverDurableSubTransactionTest.this.failType) && messageAck.getAckType() == 0 && atomicInteger.incrementAndGet() == i) {
                    Iterator it = ((TransportConnector) FailoverDurableSubTransactionTest.this.broker.getTransportConnectors().get(0)).getConnections().iterator();
                    while (it.hasNext()) {
                        TransportConnection transportConnection = (TransportConnection) it.next();
                        FailoverDurableSubTransactionTest.LOG.error("Whacking connection on ack: " + transportConnection);
                        transportConnection.serviceException(new IOException("ERROR NOW"));
                    }
                }
                super.acknowledge(consumerBrokerExchange, messageAck);
            }

            public void postProcessDispatch(MessageDispatch messageDispatch) {
                super.postProcessDispatch(messageDispatch);
                if ((FailType.ON_DISPATCH.equals(FailoverDurableSubTransactionTest.this.failType) || FailType.ON_DISPACH_WITH_REPLAY_DELAY.equals(FailoverDurableSubTransactionTest.this.failType)) && atomicInteger.incrementAndGet() == i) {
                    Iterator it = ((TransportConnector) FailoverDurableSubTransactionTest.this.broker.getTransportConnectors().get(0)).getConnections().iterator();
                    while (it.hasNext()) {
                        TransportConnection transportConnection = (TransportConnection) it.next();
                        FailoverDurableSubTransactionTest.LOG.error("Whacking connection on dispatch: " + transportConnection);
                        transportConnection.serviceException(new IOException("ERROR NOW"));
                    }
                }
            }
        }});
        this.broker.start();
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("failover:(" + this.url + ")");
        activeMQConnectionFactory.setAlwaysSyncSend(true);
        activeMQConnectionFactory.setAlwaysSessionAsync(false);
        activeMQConnectionFactory.getPrefetchPolicy().setDurableTopicPrefetch(FailType.ON_ACK.equals(this.failType) ? 2 : 100);
        configureConnectionFactory(activeMQConnectionFactory);
        Connection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.setClientID("CID");
        createConnection.start();
        Session createSession = createConnection.createSession(true, 0);
        Topic createTopic = createSession.createTopic(TOPIC_NAME);
        createSession.createDurableSubscriber(createTopic, "DS").close();
        produceMessage(createTopic, 10);
        LOG.info("Production done! " + this.broker.getDestination(ActiveMQDestination.transform(createTopic)));
        TopicSubscriber createDurableSubscriber = createSession.createDurableSubscriber(createTopic, "DS");
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        HashSet hashSet = new HashSet();
        while (!atomicBoolean.get()) {
            hashSet.clear();
            int i2 = 0;
            while (true) {
                if (i2 >= 10) {
                    break;
                }
                Message receive = createDurableSubscriber.receive(5000L);
                if (receive == null) {
                    LOG.info("Failed to receive on: " + i2);
                    break;
                } else {
                    LOG.info("Received: @" + i2 + ":" + receive.getJMSMessageID() + ", ID:" + receive.getIntProperty("ID"));
                    Assert.assertTrue("single instance of: " + i2, hashSet.add(Integer.valueOf(receive.getIntProperty("ID"))));
                    i2++;
                }
            }
            if (i2 == 10) {
                try {
                    createSession.commit();
                    atomicBoolean.set(true);
                } catch (TransactionRolledBackException e) {
                    LOG.info("Got expected", e);
                    createSession.rollback();
                }
            } else {
                createSession.rollback();
            }
        }
        createDurableSubscriber.close();
        createConnection.close();
        Destination destination = this.broker.getDestination(ActiveMQDestination.transform(new ActiveMQQueue("ActiveMQ.DLQ")));
        LOG.info("DLQ: " + destination);
        Assert.assertEquals("DLQ empty ", 0L, destination.getDestinationStatistics().getMessages().getCount());
    }

    @Test
    public void testFailoverCommitListener() throws Exception {
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final int i = FailType.ON_ACK.equals(this.failType) ? 1 : 1;
        this.broker = createBroker(true);
        this.broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() { // from class: org.apache.activemq.transport.failover.FailoverDurableSubTransactionTest.2
            public void commitTransaction(ConnectionContext connectionContext, TransactionId transactionId, boolean z) throws Exception {
                FailoverDurableSubTransactionTest.LOG.info("commit request: " + transactionId);
                if (!FailType.ON_COMMIT.equals(FailoverDurableSubTransactionTest.this.failType) || atomicInteger.incrementAndGet() != i) {
                    super.commitTransaction(connectionContext, transactionId, z);
                    return;
                }
                Iterator it = ((TransportConnector) FailoverDurableSubTransactionTest.this.broker.getTransportConnectors().get(0)).getConnections().iterator();
                while (it.hasNext()) {
                    TransportConnection transportConnection = (TransportConnection) it.next();
                    FailoverDurableSubTransactionTest.LOG.error("Whacking connection on commit: " + transportConnection);
                    transportConnection.serviceException(new IOException("ERROR NOW"));
                }
            }

            public void acknowledge(ConsumerBrokerExchange consumerBrokerExchange, MessageAck messageAck) throws Exception {
                FailoverDurableSubTransactionTest.LOG.info("ack request: " + messageAck);
                if (!FailType.ON_ACK.equals(FailoverDurableSubTransactionTest.this.failType) || atomicInteger.incrementAndGet() != i) {
                    super.acknowledge(consumerBrokerExchange, messageAck);
                    return;
                }
                Iterator it = ((TransportConnector) FailoverDurableSubTransactionTest.this.broker.getTransportConnectors().get(0)).getConnections().iterator();
                while (it.hasNext()) {
                    TransportConnection transportConnection = (TransportConnection) it.next();
                    FailoverDurableSubTransactionTest.LOG.error("Whacking connection on ack: " + transportConnection);
                    transportConnection.serviceException(new IOException("ERROR NOW"));
                }
            }
        }});
        this.broker.start();
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("failover:(" + this.url + ")");
        activeMQConnectionFactory.setAlwaysSyncSend(true);
        activeMQConnectionFactory.setAlwaysSessionAsync(true);
        activeMQConnectionFactory.setWatchTopicAdvisories(false);
        Connection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.setClientID("CID");
        createConnection.start();
        Session createSession = createConnection.createSession(true, 0);
        Topic createTopic = createSession.createTopic(TOPIC_NAME);
        createSession.createDurableSubscriber(createTopic, "DS").close();
        createConnection.close();
        produceMessage(createTopic, 20);
        LOG.info("Production done! " + this.broker.getDestination(ActiveMQDestination.transform(createTopic)));
        Connection createConnection2 = activeMQConnectionFactory.createConnection();
        createConnection2.setClientID("CID");
        createConnection2.start();
        final Session createSession2 = createConnection2.createSession(true, 0);
        TopicSubscriber createDurableSubscriber = createSession2.createDurableSubscriber(createTopic, "DS");
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        final HashSet hashSet = new HashSet();
        final AtomicInteger atomicInteger2 = new AtomicInteger();
        createDurableSubscriber.setMessageListener(new MessageListener() { // from class: org.apache.activemq.transport.failover.FailoverDurableSubTransactionTest.3
            public void onMessage(Message message) {
                try {
                    int andIncrement = atomicInteger2.getAndIncrement();
                    FailoverDurableSubTransactionTest.LOG.info("Received: @" + andIncrement + ":" + message.getJMSMessageID() + ", ID:" + message.getIntProperty("ID"));
                    Assert.assertTrue("single instance of: " + andIncrement, hashSet.add(Integer.valueOf(message.getIntProperty("ID"))));
                    if (atomicInteger2.get() == 10) {
                        createSession2.commit();
                        atomicBoolean.set(true);
                    }
                } catch (TransactionRolledBackException e) {
                    FailoverDurableSubTransactionTest.LOG.info("Got expected", e);
                    try {
                        createSession2.rollback();
                    } catch (JMSException e2) {
                        e2.printStackTrace();
                    }
                    hashSet.clear();
                    atomicInteger2.set(0);
                } catch (JMSException e3) {
                    e3.printStackTrace();
                }
            }
        });
        createConnection2.start();
        try {
            Assert.assertTrue("committed ok", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.transport.failover.FailoverDurableSubTransactionTest.4
                public boolean isSatisified() throws Exception {
                    return atomicBoolean.get();
                }
            }));
            createDurableSubscriber.close();
            createConnection2.close();
            Destination destination = this.broker.getDestination(ActiveMQDestination.transform(new ActiveMQQueue("ActiveMQ.DLQ")));
            LOG.info("DLQ: " + destination);
            Assert.assertEquals("DLQ empty ", 0L, destination.getDestinationStatistics().getMessages().getCount());
        } catch (Throwable th) {
            createDurableSubscriber.close();
            createConnection2.close();
            throw th;
        }
    }

    private void produceMessage(Topic topic, int i) throws JMSException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(this.url);
        configureConnectionFactory(activeMQConnectionFactory);
        Connection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(topic);
        TextMessage createTextMessage = createSession.createTextMessage("Test message");
        for (int i2 = 0; i2 < i; i2++) {
            createTextMessage.setIntProperty("ID", i2);
            createProducer.send(createTextMessage);
        }
        createConnection.close();
    }
}
