package org.apache.activemq.bugs;

import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.sql.DataSource;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.Locker;
import org.apache.activemq.broker.TransportConnection;
import org.apache.activemq.store.jdbc.DataSourceServiceSupport;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.store.jdbc.LeaseDatabaseLocker;
import org.apache.activemq.store.jdbc.TransactionContext;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.LeaseLockerIOExceptionHandler;
import org.apache.derby.jdbc.EmbeddedDataSource;
import org.apache.log4j.Level;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/bugs/TrapMessageInJDBCStoreTest.class */
public class TrapMessageInJDBCStoreTest extends TestCase {
    private static final String MY_TEST_Q = "MY_TEST_Q";
    private static final Logger LOG = LoggerFactory.getLogger(TrapMessageInJDBCStoreTest.class);
    private String transportUrl = "tcp://127.0.0.1:0";
    private BrokerService broker;
    private TestTransactionContext testTransactionContext;
    private TestJDBCPersistenceAdapter jdbc;

    /* loaded from: input_file:org/apache/activemq/bugs/TrapMessageInJDBCStoreTest$TestJDBCPersistenceAdapter.class */
    public class TestJDBCPersistenceAdapter extends JDBCPersistenceAdapter {
        public TestJDBCPersistenceAdapter() {
        }

        public TransactionContext getTransactionContext() throws IOException {
            return TrapMessageInJDBCStoreTest.this.testTransactionContext;
        }
    }

    /* loaded from: input_file:org/apache/activemq/bugs/TrapMessageInJDBCStoreTest$TestTransactionContext.class */
    public class TestTransactionContext extends TransactionContext {
        private int count;

        public TestTransactionContext(JDBCPersistenceAdapter jDBCPersistenceAdapter) throws IOException {
            super(jDBCPersistenceAdapter);
        }

        public void executeBatch() throws SQLException {
            super.executeBatch();
            this.count++;
            TrapMessageInJDBCStoreTest.LOG.debug("ExecuteBatchOverride: count:" + this.count, new RuntimeException("executeBatch"));
            if (this.count == 16) {
                throw new SQLException("TEST SQL EXCEPTION from executeBatch after super.execution: count:" + this.count);
            }
        }
    }

    protected BrokerService createBroker(boolean z) throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setUseJmx(z);
        DataSource dataSource = (EmbeddedDataSource) DataSourceServiceSupport.createDataSource(IOHelper.getDefaultDataDirectory());
        this.jdbc = new TestJDBCPersistenceAdapter();
        this.jdbc.setDataSource(dataSource);
        this.jdbc.setCleanupPeriod(0);
        this.testTransactionContext = new TestTransactionContext(this.jdbc);
        this.jdbc.setLockKeepAlivePeriod(1000L);
        Locker leaseDatabaseLocker = new LeaseDatabaseLocker();
        leaseDatabaseLocker.setLockAcquireSleepInterval(2000L);
        this.jdbc.setLocker(leaseDatabaseLocker);
        brokerService.setPersistenceAdapter(this.jdbc);
        brokerService.setIoExceptionHandler(new LeaseLockerIOExceptionHandler());
        this.transportUrl = brokerService.addConnector(this.transportUrl).getPublishableConnectString();
        return brokerService;
    }

    public void testDBCommitException() throws Exception {
        org.apache.log4j.Logger.getLogger(TransportConnection.class.getName() + ".Service").setLevel(Level.TRACE);
        this.broker = createBroker(false);
        this.broker.deleteAllMessages();
        this.broker.start();
        this.broker.waitUntilStarted();
        LOG.info("***Broker started...");
        String str = "failover:(" + this.transportUrl + ")?timeout=5000";
        sendMessage(MY_TEST_Q, str);
        LOG.info("*** after send: db contains message seq " + dbMessageCount());
        assertEquals("number of consumed messages", 3, consumeMessages(MY_TEST_Q, str).size());
        ArrayList<Long> dbMessageCount = dbMessageCount();
        LOG.info("*** after consume - db contains message seq " + dbMessageCount);
        assertEquals("number of messages in DB after test", 0, dbMessageCount.size());
        this.broker.stop();
        this.broker.waitUntilStopped();
    }

    public List<TextMessage> consumeMessages(String str, String str2) throws JMSException {
        Connection connection = null;
        LOG.debug("*** consumeMessages() called ...");
        try {
            connection = new ActiveMQConnectionFactory(str2).createConnection();
            connection.start();
            Session createSession = connection.createSession(false, 1);
            Queue createQueue = createSession.createQueue(str);
            ArrayList arrayList = new ArrayList();
            MessageConsumer createConsumer = createSession.createConsumer(createQueue);
            while (true) {
                TextMessage receive = createConsumer.receive(4000L);
                LOG.debug("*** consumed Messages :" + receive);
                if (receive == null) {
                    break;
                }
                arrayList.add(receive);
            }
            if (connection != null) {
                connection.close();
            }
            return arrayList;
        } catch (Throwable th) {
            if (connection != null) {
                connection.close();
            }
            throw th;
        }
    }

    public void sendMessage(String str, String str2) throws Exception {
        Connection connection = null;
        try {
            connection = new ActiveMQConnectionFactory(str2).createConnection();
            Session createSession = connection.createSession(false, 1);
            MessageProducer createProducer = createSession.createProducer(createSession.createQueue(str));
            createProducer.setDeliveryMode(2);
            TextMessage createTextMessage = createSession.createTextMessage("1");
            LOG.debug("*** send message 1 to broker...");
            createProducer.send(createTextMessage);
            LOG.debug("***  send message 2 to broker");
            createTextMessage.setText("2");
            createProducer.send(createTextMessage);
            ArrayList<Long> dbMessageCount = dbMessageCount();
            LOG.info("*** after send 2 - db contains message seq " + dbMessageCount);
            assertEquals("number of messages in DB after send 2", 2, dbMessageCount.size());
            LOG.debug("***  send  message 3 to broker");
            createTextMessage.setText("3");
            createProducer.send(createTextMessage);
            LOG.debug("*** Finished sending messages to broker");
            if (connection != null) {
                connection.close();
            }
        } catch (Throwable th) {
            if (connection != null) {
                connection.close();
            }
            throw th;
        }
    }

    private ArrayList<Long> dbMessageCount() throws SQLException, IOException {
        java.sql.Connection connection = this.broker.getPersistenceAdapter().getDataSource().getConnection();
        PreparedStatement prepareStatement = connection.prepareStatement("SELECT MSGID_SEQ FROM ACTIVEMQ_MSGS");
        try {
            ResultSet executeQuery = prepareStatement.executeQuery();
            ArrayList<Long> arrayList = new ArrayList<>();
            while (executeQuery.next()) {
                arrayList.add(Long.valueOf(executeQuery.getLong(1)));
            }
            return arrayList;
        } finally {
            prepareStatement.close();
            connection.close();
        }
    }
}
