package org.apache.activemq.store.kahadb;

import jakarta.jms.Connection;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import jakarta.jms.TopicSubscriber;
import java.io.File;
import java.net.URI;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/activemq/store/kahadb/KahaDBDurableMessageRecoveryTest.class */
public class KahaDBDurableMessageRecoveryTest {

    @Rule
    public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target"));
    private BrokerService broker;
    private URI brokerConnectURI;
    private boolean recoverIndex;
    private boolean enableSubscriptionStats;

    @Parameterized.Parameters(name = "recoverIndex={0},enableSubscriptionStats={1}")
    public static Collection<Object[]> data() {
        return Arrays.asList(new Object[]{false, false}, new Object[]{false, true}, new Object[]{true, false}, new Object[]{true, true});
    }

    @Before
    public void setUpBroker() throws Exception {
        startBroker(false);
    }

    @After
    public void stopBroker() throws Exception {
        this.broker.stop();
        this.broker.waitUntilStopped();
    }

    public KahaDBDurableMessageRecoveryTest(boolean z, boolean z2) {
        this.recoverIndex = z;
        this.enableSubscriptionStats = z2;
    }

    protected void startBroker(boolean z) throws Exception {
        this.broker = new BrokerService();
        this.broker.setPersistent(true);
        this.broker.setDataDirectoryFile(this.dataFileDir.getRoot());
        TransportConnector addConnector = this.broker.addConnector(new TransportConnector());
        addConnector.setUri(new URI("tcp://0.0.0.0:0"));
        addConnector.setName("tcp");
        configurePersistence(this.broker, z);
        this.broker.start();
        this.broker.waitUntilStarted();
        this.brokerConnectURI = this.broker.getConnectorByName("tcp").getConnectUri();
    }

    protected void configurePersistence(BrokerService brokerService, boolean z) throws Exception {
        KahaDBPersistenceAdapter persistenceAdapter = brokerService.getPersistenceAdapter();
        persistenceAdapter.setForceRecoverIndex(z);
        persistenceAdapter.setEnableSubscriptionStatistics(this.enableSubscriptionStats);
        persistenceAdapter.setJournalMaxFileLength(20480);
    }

    protected void restartBroker(boolean z) throws Exception {
        stopBroker();
        startBroker(z);
    }

    protected Session getSession(int i) throws Exception {
        Connection createConnection = new ActiveMQConnectionFactory(this.brokerConnectURI).createConnection();
        createConnection.setClientID("clientId1");
        createConnection.start();
        return createConnection.createSession(false, i);
    }

    @Test
    public void durableRecoveryIndividualAcknowledge() throws Exception {
        Session session = getSession(4);
        ActiveMQTopic createTopic = session.createTopic("test.topic");
        MessageProducer createProducer = session.createProducer(createTopic);
        TopicSubscriber createDurableSubscriber = session.createDurableSubscriber(createTopic, "sub1");
        for (int i = 1; i <= 10; i++) {
            createProducer.send(session.createTextMessage("msg: " + i));
        }
        createProducer.close();
        Assert.assertTrue(Wait.waitFor(() -> {
            return 10 == getPendingMessageCount(createTopic, "clientId1", "sub1");
        }, 3000L, 500L));
        for (int i2 = 1; i2 <= 10; i2++) {
            TextMessage receive = createDurableSubscriber.receive(1000L);
            Assert.assertNotNull(receive);
            if (i2 == 5) {
                receive.acknowledge();
            }
        }
        Assert.assertTrue(Wait.waitFor(() -> {
            return 9 == getPendingMessageCount(createTopic, "clientId1", "sub1");
        }, 3000L, 500L));
        createDurableSubscriber.close();
        restartBroker(this.recoverIndex);
        Assert.assertTrue(Wait.waitFor(() -> {
            return 9 == getPendingMessageCount(createTopic, "clientId1", "sub1");
        }, 3000L, 500L));
        TopicSubscriber createDurableSubscriber2 = getSession(1).createDurableSubscriber(createTopic, "sub1");
        for (int i3 = 1; i3 <= 4; i3++) {
            TextMessage receive2 = createDurableSubscriber2.receive(1000L);
            Assert.assertNotNull(receive2);
            Assert.assertEquals("msg: " + i3, receive2.getText());
        }
        for (int i4 = 6; i4 <= 10; i4++) {
            TextMessage receive3 = createDurableSubscriber2.receive(1000L);
            Assert.assertNotNull(receive3);
            Assert.assertEquals("msg: " + i4, receive3.getText());
        }
        createDurableSubscriber2.close();
        Assert.assertTrue(Wait.waitFor(() -> {
            return 0 == getPendingMessageCount(createTopic, "clientId1", "sub1");
        }, 3000L, 500L));
    }

    @Test
    public void multipleDurableRecoveryIndividualAcknowledge() throws Exception {
        Session session = getSession(4);
        ActiveMQTopic activeMQTopic = (ActiveMQTopic) session.createTopic("test.topic");
        MessageProducer createProducer = session.createProducer(activeMQTopic);
        TopicSubscriber createDurableSubscriber = session.createDurableSubscriber(activeMQTopic, "sub1");
        TopicSubscriber createDurableSubscriber2 = session.createDurableSubscriber(activeMQTopic, "sub2");
        for (int i = 1; i <= 10; i++) {
            createProducer.send(session.createTextMessage("msg: " + i));
        }
        createProducer.close();
        Assert.assertTrue(Wait.waitFor(() -> {
            return 10 == getPendingMessageCount(activeMQTopic, "clientId1", "sub1");
        }, 3000L, 500L));
        Assert.assertTrue(Wait.waitFor(() -> {
            return 10 == getPendingMessageCount(activeMQTopic, "clientId1", "sub2");
        }, 3000L, 500L));
        for (int i2 = 1; i2 <= 10; i2++) {
            TextMessage receive = createDurableSubscriber.receive(1000L);
            Assert.assertNotNull(receive);
            if (i2 == 3 || i2 == 7) {
                receive.acknowledge();
            }
        }
        Assert.assertTrue(Wait.waitFor(() -> {
            return 8 == getPendingMessageCount(activeMQTopic, "clientId1", "sub1");
        }, 3000L, 500L));
        Assert.assertTrue(Wait.waitFor(() -> {
            return 10 == getPendingMessageCount(activeMQTopic, "clientId1", "sub2");
        }, 3000L, 500L));
        long pendingMessageSize = getPendingMessageSize(activeMQTopic, "clientId1", "sub1");
        long pendingMessageSize2 = getPendingMessageSize(activeMQTopic, "clientId1", "sub2");
        Assert.assertTrue(pendingMessageSize > 0);
        Assert.assertTrue(pendingMessageSize2 > 0);
        Assert.assertTrue(pendingMessageSize < pendingMessageSize2);
        createDurableSubscriber.close();
        createDurableSubscriber2.close();
        restartBroker(this.recoverIndex);
        Assert.assertTrue(Wait.waitFor(() -> {
            return 8 == getPendingMessageCount(activeMQTopic, "clientId1", "sub1");
        }, 3000L, 500L));
        Assert.assertTrue(Wait.waitFor(() -> {
            return 10 == getPendingMessageCount(activeMQTopic, "clientId1", "sub2");
        }, 3000L, 500L));
        Assert.assertEquals(pendingMessageSize, getPendingMessageSize(activeMQTopic, "clientId1", "sub1"));
        Assert.assertEquals(pendingMessageSize2, getPendingMessageSize(activeMQTopic, "clientId1", "sub2"));
        Session session2 = getSession(1);
        TopicSubscriber createDurableSubscriber3 = session2.createDurableSubscriber(activeMQTopic, "sub1");
        TopicSubscriber createDurableSubscriber4 = session2.createDurableSubscriber(activeMQTopic, "sub2");
        for (int i3 = 1; i3 <= 2; i3++) {
            TextMessage receive2 = createDurableSubscriber3.receive(1000L);
            Assert.assertNotNull(receive2);
            Assert.assertEquals("msg: " + i3, receive2.getText());
        }
        for (int i4 = 4; i4 <= 6; i4++) {
            TextMessage receive3 = createDurableSubscriber3.receive(1000L);
            Assert.assertNotNull(receive3);
            Assert.assertEquals("msg: " + i4, receive3.getText());
        }
        for (int i5 = 8; i5 <= 10; i5++) {
            TextMessage receive4 = createDurableSubscriber3.receive(1000L);
            Assert.assertNotNull(receive4);
            Assert.assertEquals("msg: " + i5, receive4.getText());
        }
        for (int i6 = 1; i6 <= 10; i6++) {
            TextMessage receive5 = createDurableSubscriber4.receive(1000L);
            Assert.assertNotNull(receive5);
            Assert.assertEquals("msg: " + i6, receive5.getText());
        }
        createDurableSubscriber3.close();
        createDurableSubscriber4.close();
        Assert.assertTrue(Wait.waitFor(() -> {
            return 0 == getPendingMessageCount(activeMQTopic, "clientId1", "sub1");
        }, 3000L, 500L));
        Assert.assertTrue(Wait.waitFor(() -> {
            return 0 == getPendingMessageCount(activeMQTopic, "clientId1", "sub2");
        }, 3000L, 500L));
    }

    @Test
    public void multipleDurableTestRecoverSubscription() throws Exception {
        Session session = getSession(4);
        ActiveMQTopic createTopic = session.createTopic("test.topic");
        MessageProducer createProducer = session.createProducer(createTopic);
        TopicSubscriber createDurableSubscriber = session.createDurableSubscriber(createTopic, "sub1");
        TopicSubscriber createDurableSubscriber2 = session.createDurableSubscriber(createTopic, "sub2");
        for (int i = 1; i <= 10; i++) {
            createProducer.send(session.createTextMessage("msg: " + i));
        }
        createProducer.close();
        for (int i2 = 1; i2 <= 10; i2++) {
            TextMessage receive = createDurableSubscriber.receive(1000L);
            Assert.assertNotNull(receive);
            if (i2 == 3 || i2 == 7) {
                receive.acknowledge();
            }
        }
        Assert.assertTrue(Wait.waitFor(() -> {
            return 8 == getPendingMessageCount(createTopic, "clientId1", "sub1");
        }, 3000L, 500L));
        Assert.assertTrue(Wait.waitFor(() -> {
            return 10 == getPendingMessageCount(createTopic, "clientId1", "sub2");
        }, 3000L, 500L));
        createDurableSubscriber.close();
        createDurableSubscriber2.close();
        restartBroker(this.recoverIndex);
        TopicMessageStore messageStore = this.broker.getDestination(createTopic).getMessageStore();
        final AtomicInteger atomicInteger = new AtomicInteger();
        final AtomicInteger atomicInteger2 = new AtomicInteger();
        messageStore.recoverSubscription("clientId1", "sub1", new MessageRecoveryListener() { // from class: org.apache.activemq.store.kahadb.KahaDBDurableMessageRecoveryTest.1
            public boolean recoverMessageReference(MessageId messageId) throws Exception {
                return false;
            }

            public boolean recoverMessage(Message message) throws Exception {
                TextMessage textMessage = (TextMessage) message;
                if (textMessage.getText().equals("msg: 3") || textMessage.getText().equals("msg: 7")) {
                    throw new IllegalStateException("Got wrong message: " + textMessage.getText());
                }
                atomicInteger.incrementAndGet();
                return true;
            }

            public boolean isDuplicate(MessageId messageId) {
                return false;
            }

            public boolean hasSpace() {
                return true;
            }
        });
        messageStore.recoverSubscription("clientId1", "sub2", new MessageRecoveryListener() { // from class: org.apache.activemq.store.kahadb.KahaDBDurableMessageRecoveryTest.2
            public boolean recoverMessageReference(MessageId messageId) throws Exception {
                return false;
            }

            public boolean recoverMessage(Message message) throws Exception {
                atomicInteger2.incrementAndGet();
                return true;
            }

            public boolean isDuplicate(MessageId messageId) {
                return false;
            }

            public boolean hasSpace() {
                return true;
            }
        });
        Assert.assertEquals(8L, atomicInteger.get());
        Assert.assertEquals(10L, atomicInteger2.get());
    }

    @Test
    public void durableRecoveryDuplicates() throws Exception {
        Session session = getSession(1);
        ActiveMQTopic createTopic = session.createTopic("test.topic.duplicates");
        MessageProducer createProducer = session.createProducer(createTopic);
        TopicSubscriber createDurableSubscriber = session.createDurableSubscriber(createTopic, "sub1");
        MessageStore messageStore = this.broker.getDestination(createTopic).getMessageStore();
        for (int i = 1; i <= 10; i++) {
            ActiveMQMessage createTextMessage = session.createTextMessage("msg: " + i);
            createProducer.send(createTextMessage);
            messageStore.addMessage(this.broker.getAdminConnectionContext(), createTextMessage);
        }
        createProducer.close();
        Assert.assertTrue(Wait.waitFor(() -> {
            return 10 == getPendingMessageCount(createTopic, "clientId1", "sub1");
        }, 3000L, 500L));
        for (int i2 = 1; i2 <= 10; i2++) {
            Assert.assertNotNull(createDurableSubscriber.receive(1000L));
        }
        createDurableSubscriber.close();
        Assert.assertTrue(Wait.waitFor(() -> {
            return 0 == getPendingMessageCount(createTopic, "clientId1", "sub1");
        }, 3000L, 500L));
    }

    protected long getPendingMessageCount(ActiveMQTopic activeMQTopic, String str, String str2) throws Exception {
        return this.broker.getDestination(activeMQTopic).getMessageStore().getMessageCount(str, str2);
    }

    protected long getPendingMessageSize(ActiveMQTopic activeMQTopic, String str, String str2) throws Exception {
        return this.broker.getDestination(activeMQTopic).getMessageStore().getMessageSize(str, str2);
    }
}
