package org.apache.activemq.store.kahadb;

import jakarta.jms.JMSException;
import jakarta.jms.MessageProducer;
import jakarta.jms.Queue;
import jakarta.jms.Session;
import java.io.File;
import java.lang.reflect.Field;
import java.net.URI;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.store.kahadb.MessageDatabase;
import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/store/kahadb/KahaDBInFlightTxMemoryUsageTest.class */
public class KahaDBInFlightTxMemoryUsageTest {
    static final Logger LOG = LoggerFactory.getLogger(KahaDBInFlightTxMemoryUsageTest.class);

    @Rule
    public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target"));
    private BrokerService broker;
    private URI brokerConnectURI;
    private Map<TransactionId, List<MessageDatabase.Operation<?>>> inflightTransactions;

    @Before
    public void startBroker() throws Exception {
        this.broker = new BrokerService();
        this.broker.setPersistent(true);
        this.broker.setDataDirectoryFile(this.dataFileDir.getRoot());
        this.broker.setDeleteAllMessagesOnStartup(true);
        TransportConnector addConnector = this.broker.addConnector(new TransportConnector());
        addConnector.setUri(new URI("tcp://0.0.0.0:0"));
        addConnector.setName("tcp");
        this.broker.start();
        this.broker.waitUntilStarted();
        this.brokerConnectURI = this.broker.getConnectorByName("tcp").getConnectUri();
        KahaDBPersistenceAdapter persistenceAdapter = this.broker.getPersistenceAdapter();
        Field declaredField = MessageDatabase.class.getDeclaredField("inflightTransactions");
        declaredField.setAccessible(true);
        this.inflightTransactions = (LinkedHashMap) declaredField.get(persistenceAdapter.getStore());
    }

    @After
    public void stopBroker() throws Exception {
        this.broker.stop();
        this.broker.waitUntilStopped();
    }

    @Test
    public void testKahaDBInFlightTxMessagesClearedFromMemory() throws JMSException {
        ActiveMQConnection createConnection = new ActiveMQConnectionFactory(this.brokerConnectURI).createConnection();
        createConnection.setAlwaysSyncSend(true);
        createConnection.start();
        Session createSession = createConnection.createSession(true, 0);
        Queue createQueue = createSession.createQueue("test.queue");
        try {
            Assert.assertTrue(this.inflightTransactions.isEmpty());
            MessageProducer createProducer = createSession.createProducer(createQueue);
            for (int i = 0; i < 10; i++) {
                createProducer.send(createSession.createTextMessage("test"));
            }
            Assert.assertEquals(this.inflightTransactions.size(), 1L);
            List<MessageDatabase.Operation<?>> orElseThrow = this.inflightTransactions.values().stream().findFirst().orElseThrow();
            Assert.assertEquals(10L, orElseThrow.size());
            Iterator<MessageDatabase.Operation<?>> it = orElseThrow.iterator();
            while (it.hasNext()) {
                MessageDatabase.AddOperation addOperation = (MessageDatabase.Operation) it.next();
                Assert.assertTrue(addOperation instanceof MessageDatabase.AddOperation);
                KahaAddMessageCommand command = addOperation.getCommand();
                TestCase.assertNotNull(addOperation.getLocation());
                TestCase.assertNotNull(command);
                TestCase.assertNotNull(command.getMessageId());
                TestCase.assertNotNull(command.getDestination());
                Assert.assertNull(command.getMessage());
            }
            createSession.commit();
            Assert.assertTrue(this.inflightTransactions.isEmpty());
            createConnection.close();
        } catch (Throwable th) {
            createConnection.close();
            throw th;
        }
    }
}
