package org.apache.activemq.broker.region.cursors;

import java.io.File;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Connection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.usecases.DurableSubProcessWithRestartTest;
import org.apache.activemq.util.SubscriptionKey;
import org.apache.commons.io.FileUtils;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/broker/region/cursors/KahaDBPendingMessageCursorTest.class */
public class KahaDBPendingMessageCursorTest extends AbstractPendingMessageCursorTest {
    protected static final Logger LOG = LoggerFactory.getLogger(KahaDBPendingMessageCursorTest.class);

    @Rule
    public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target"));

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursorTest
    public void setUpBroker(boolean z) throws Exception {
        if (z && this.dataFileDir.getRoot().exists()) {
            FileUtils.cleanDirectory(this.dataFileDir.getRoot());
        }
        super.setUpBroker(z);
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursorTest
    protected void initPersistence(BrokerService brokerService) throws IOException {
        this.broker.setPersistent(true);
        this.broker.setDataDirectoryFile(this.dataFileDir.getRoot());
    }

    @Test(timeout = 60000)
    public void testDurableMessageSizeAfterRestartAndPublish() throws Exception {
        AtomicLong atomicLong = new AtomicLong();
        Connection createConnection = new ActiveMQConnectionFactory(this.brokerConnectURI).createConnection();
        createConnection.setClientID("clientId");
        createConnection.start();
        Topic publishTestMessagesDurable = publishTestMessagesDurable(createConnection, new String[]{"sub1"}, 200, atomicLong, 2);
        SubscriptionKey subscriptionKey = new SubscriptionKey("clientId", "sub1");
        verifyPendingStats(publishTestMessagesDurable, subscriptionKey, 200, atomicLong.get());
        verifyStoreStats(publishTestMessagesDurable, 200, atomicLong.get());
        stopBroker();
        setUpBroker(false);
        Connection createConnection2 = new ActiveMQConnectionFactory(this.brokerConnectURI).createConnection();
        createConnection2.setClientID("clientId");
        createConnection2.start();
        Topic publishTestMessagesDurable2 = publishTestMessagesDurable(createConnection2, new String[]{"sub1"}, 200, atomicLong, 2);
        verifyPendingStats(publishTestMessagesDurable2, subscriptionKey, DurableSubProcessWithRestartTest.CARGO_SIZE, atomicLong.get());
        verifyStoreStats(publishTestMessagesDurable2, DurableSubProcessWithRestartTest.CARGO_SIZE, atomicLong.get());
    }

    @Test(timeout = 60000)
    public void testNonPersistentDurableMessageSize() throws Exception {
        AtomicLong atomicLong = new AtomicLong();
        Connection createConnection = new ActiveMQConnectionFactory(this.brokerConnectURI).createConnection();
        createConnection.setClientID("clientId");
        createConnection.start();
        Topic publishTestMessagesDurable = publishTestMessagesDurable(createConnection, new String[]{"sub1"}, 200, atomicLong, 1);
        verifyPendingStats(publishTestMessagesDurable, new SubscriptionKey("clientId", "sub1"), 200, atomicLong.get());
        verifyStoreStats(publishTestMessagesDurable, 0, 0L);
    }
}
