package org.apache.activemq.store.jdbc;

import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.Properties;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.sql.DataSource;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.usecases.DurableSubProcessWithRestartTest;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.derby.jdbc.EmbeddedDataSource;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/store/jdbc/JmsTransactionCommitFailureTest.class */
public class JmsTransactionCommitFailureTest {
    private static final Log LOGGER = LogFactory.getLog(JmsTransactionCommitFailureTest.class);
    private static final String OUTPUT_DIR = "target/" + JmsTransactionCommitFailureTest.class.getSimpleName();
    private Properties originalSystemProps;
    private DataSource dataSource;
    private CommitFailurePersistenceAdapter persistenceAdapter;
    private BrokerService broker;
    private ConnectionFactory connectionFactory;
    private int messageCounter = 1;

    /* loaded from: input_file:org/apache/activemq/store/jdbc/JmsTransactionCommitFailureTest$CommitFailurePersistenceAdapter.class */
    private static class CommitFailurePersistenceAdapter extends JDBCPersistenceAdapter {
        private boolean isCommitFailureEnabled;
        private int transactionIsolation;

        public CommitFailurePersistenceAdapter(DataSource dataSource) {
            setDataSource(dataSource);
        }

        public void setCommitFailureEnabled(boolean z) {
            this.isCommitFailureEnabled = z;
        }

        public void setTransactionIsolation(int i) {
            super.setTransactionIsolation(i);
            this.transactionIsolation = i;
        }

        public TransactionContext getTransactionContext() throws IOException {
            TransactionContext transactionContext = new TransactionContext(this, -1, -1) { // from class: org.apache.activemq.store.jdbc.JmsTransactionCommitFailureTest.CommitFailurePersistenceAdapter.1
                public void executeBatch() throws SQLException {
                    if (CommitFailurePersistenceAdapter.this.isCommitFailureEnabled) {
                        throw new SQLException("Test commit failure exception");
                    }
                    super.executeBatch();
                }
            };
            if (this.transactionIsolation > 0) {
                transactionContext.setTransactionIsolation(this.transactionIsolation);
            }
            return transactionContext;
        }
    }

    @Before
    public void setUp() throws Exception {
        this.originalSystemProps = System.getProperties();
        Properties properties = (Properties) this.originalSystemProps.clone();
        properties.setProperty("derby.stream.error.file", OUTPUT_DIR + "/derby.log");
        System.setProperties(properties);
        this.dataSource = createDataSource();
        this.persistenceAdapter = new CommitFailurePersistenceAdapter(this.dataSource);
        this.broker = createBroker(this.persistenceAdapter);
        this.broker.start();
        this.connectionFactory = createConnectionFactory(this.broker.getBrokerName());
    }

    private DataSource createDataSource() {
        EmbeddedDataSource embeddedDataSource = new EmbeddedDataSource();
        embeddedDataSource.setDatabaseName(OUTPUT_DIR + "/derby-db");
        embeddedDataSource.setCreateDatabase("create");
        return embeddedDataSource;
    }

    private BrokerService createBroker(PersistenceAdapter persistenceAdapter) throws IOException {
        String simpleName = JmsTransactionCommitFailureTest.class.getSimpleName();
        BrokerService brokerService = new BrokerService();
        brokerService.setDataDirectory(OUTPUT_DIR + "/activemq");
        brokerService.setBrokerName(simpleName);
        brokerService.setDeleteAllMessagesOnStartup(true);
        brokerService.setAdvisorySupport(false);
        brokerService.setUseJmx(false);
        if (persistenceAdapter != null) {
            brokerService.setPersistent(true);
            brokerService.setPersistenceAdapter(persistenceAdapter);
        }
        return brokerService;
    }

    private ConnectionFactory createConnectionFactory(String str) {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("vm://" + str);
        activeMQConnectionFactory.setWatchTopicAdvisories(false);
        return activeMQConnectionFactory;
    }

    private void stopDataSource() {
        if (this.dataSource instanceof EmbeddedDataSource) {
            EmbeddedDataSource embeddedDataSource = this.dataSource;
            embeddedDataSource.setShutdownDatabase("shutdown");
            try {
                embeddedDataSource.getConnection();
            } catch (SQLException e) {
            }
        }
    }

    private void stopBroker() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
            this.broker = null;
        }
    }

    @After
    public void tearDown() throws Exception {
        try {
            stopBroker();
            stopDataSource();
        } finally {
            System.setProperties(this.originalSystemProps);
        }
    }

    @Test
    public void testJmsTransactionCommitFailure() throws Exception {
        sendMessage("testJmsTransactionCommitFailure", 1);
        Assert.assertEquals(1L, getMessageCount());
        this.persistenceAdapter.setCommitFailureEnabled(true);
        try {
            LOGGER.warn("Attempt to send Message-2/Message-3 (first time)...");
            sendMessage("testJmsTransactionCommitFailure", 2);
            LOGGER.warn("Message-2/Message-3 successfuly sent (first time)");
            Assert.fail();
        } catch (JMSException e) {
            LOGGER.warn("Attempt to send Message-2/Message-3 failed", e);
            this.messageCounter -= 2;
            Assert.assertEquals(1L, getMessageCount());
        }
        this.persistenceAdapter.setCommitFailureEnabled(false);
        LOGGER.warn("Attempt to send Message-2/Message-3 (second time)...");
        sendMessage("testJmsTransactionCommitFailure", 2);
        LOGGER.warn("Message-2/Message-3 successfuly sent (second time)");
        Assert.assertEquals(3L, getMessageCount());
        for (int i = 1; i <= 3; i++) {
            TextMessage receiveMessage = receiveMessage("testJmsTransactionCommitFailure", DurableSubProcessWithRestartTest.BROKER_RESTART);
            LOGGER.warn(i + ". Message received (" + receiveMessage + ")");
            Assert.assertNotNull(receiveMessage);
            Assert.assertTrue(receiveMessage instanceof TextMessage);
            Assert.assertEquals(i, receiveMessage.getIntProperty("MessageId"));
            Assert.assertEquals("Message-" + i, receiveMessage.getText());
        }
        Assert.assertEquals(0L, getMessageCount());
        Assert.assertNull(receiveMessage("testJmsTransactionCommitFailure", 4000L));
    }

    @Test
    public void testQueueMemoryLeak() throws Exception {
        sendMessage("testMemoryLeak", 1);
        this.persistenceAdapter.setCommitFailureEnabled(true);
        for (int i = 0; i < 10; i++) {
            try {
                try {
                    sendMessage("testMemoryLeak", 2);
                } finally {
                    this.persistenceAdapter.setCommitFailureEnabled(false);
                }
            } catch (JMSException e) {
            }
        }
        Queue destination = this.broker.getDestination(new ActiveMQQueue("testMemoryLeak"));
        if (destination instanceof Queue) {
            Queue queue = destination;
            Queue.class.getDeclaredField("indexOrderedCursorUpdates").setAccessible(true);
            Assert.assertEquals(0L, ((List) r0.get(queue)).size());
        }
    }

    @Test
    public void testQueueMemoryLeakNoTx() throws Exception {
        sendMessage("testMemoryLeak", 1);
        this.persistenceAdapter.setCommitFailureEnabled(true);
        for (int i = 0; i < 10; i++) {
            try {
                try {
                    sendMessage("testMemoryLeak", 2, false);
                } finally {
                    this.persistenceAdapter.setCommitFailureEnabled(false);
                }
            } catch (JMSException e) {
            }
        }
        Queue destination = this.broker.getDestination(new ActiveMQQueue("testMemoryLeak"));
        if (destination instanceof Queue) {
            Queue queue = destination;
            Queue.class.getDeclaredField("indexOrderedCursorUpdates").setAccessible(true);
            Assert.assertEquals(0L, ((List) r0.get(queue)).size());
        }
    }

    private void sendMessage(String str, int i) throws JMSException {
        sendMessage(str, i, true);
    }

    /* JADX WARN: Finally extract failed */
    private void sendMessage(String str, int i, boolean z) throws JMSException {
        Connection createConnection = this.connectionFactory.createConnection();
        try {
            Session createSession = createConnection.createSession(z, z ? 0 : 1);
            try {
                MessageProducer createProducer = createSession.createProducer(createSession.createQueue(str));
                for (int i2 = 0; i2 < i; i2++) {
                    try {
                        TextMessage createTextMessage = createSession.createTextMessage();
                        createTextMessage.setIntProperty("MessageId", this.messageCounter);
                        int i3 = this.messageCounter;
                        this.messageCounter = i3 + 1;
                        createTextMessage.setText("Message-" + i3);
                        createProducer.send(createTextMessage);
                    } catch (Throwable th) {
                        createProducer.close();
                        throw th;
                    }
                }
                if (z) {
                    createSession.commit();
                }
                createProducer.close();
                createSession.close();
            } catch (Throwable th2) {
                createSession.close();
                throw th2;
            }
        } finally {
            createConnection.close();
        }
    }

    /* JADX WARN: Finally extract failed */
    private Message receiveMessage(String str, long j) throws JMSException {
        Connection createConnection = this.connectionFactory.createConnection();
        try {
            createConnection.start();
            try {
                Session createSession = createConnection.createSession(true, 0);
                try {
                    MessageConsumer createConsumer = createSession.createConsumer(createSession.createQueue(str));
                    try {
                        Message receive = createConsumer.receive(j);
                        createSession.commit();
                        createConsumer.close();
                        createSession.close();
                        createConnection.stop();
                        return receive;
                    } catch (Throwable th) {
                        createConsumer.close();
                        throw th;
                    }
                } catch (Throwable th2) {
                    createSession.close();
                    throw th2;
                }
            } catch (Throwable th3) {
                createConnection.stop();
                throw th3;
            }
        } finally {
            createConnection.close();
        }
    }

    /* JADX WARN: Finally extract failed */
    private long getMessageCount() throws SQLException {
        long j = -1;
        java.sql.Connection connection = this.dataSource.getConnection();
        try {
            Statement createStatement = connection.createStatement();
            try {
                ResultSet executeQuery = createStatement.executeQuery("select count(*) from activemq_msgs");
                while (executeQuery.next()) {
                    try {
                        j = executeQuery.getLong(1);
                    } catch (Throwable th) {
                        executeQuery.close();
                        throw th;
                    }
                }
                executeQuery.close();
                createStatement.close();
                return j;
            } catch (Throwable th2) {
                createStatement.close();
                throw th2;
            }
        } finally {
            connection.close();
        }
    }
}
