/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.store.kahadb;

import jakarta.jms.Connection;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.Topic;
import java.io.File;
import java.net.URI;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.util.SubscriptionKey;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.Timeout;

public class KahaDBRecoverExpiredTest {
    @Rule
    public Timeout globalTimeout = new Timeout(60L, TimeUnit.SECONDS);
    @Rule
    public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target"));
    private BrokerService broker;
    private URI brokerConnectURI;
    private final ActiveMQTopic topic = new ActiveMQTopic("test.topic");
    private final SubscriptionKey subKey1 = new SubscriptionKey("clientId", "sub1");
    private final SubscriptionKey subKey2 = new SubscriptionKey("clientId", "sub2");
    private final MessageRecoveryListener listener = new MessageRecoveryListener(){

        public boolean recoverMessage(org.apache.activemq.command.Message message) {
            return true;
        }

        public boolean recoverMessageReference(MessageId ref) {
            return true;
        }

        public boolean hasSpace() {
            return true;
        }

        public boolean isDuplicate(MessageId ref) {
            return false;
        }
    };

    @Before
    public void startBroker() throws Exception {
        this.broker = new BrokerService();
        this.broker.setPersistent(true);
        KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter();
        persistenceAdapter.setDirectory(this.dataFileDir.getRoot());
        this.broker.setPersistenceAdapter((PersistenceAdapter)persistenceAdapter);
        TransportConnector connector = this.broker.addConnector(new TransportConnector());
        connector.setUri(new URI("tcp://0.0.0.0:0"));
        connector.setName("tcp");
        this.broker.start();
        this.broker.waitUntilStarted();
        this.brokerConnectURI = this.broker.getConnectorByName("tcp").getConnectUri();
    }

    @After
    public void stopBroker() throws Exception {
        this.broker.stop();
        this.broker.waitUntilStopped();
    }

    private Session initializeSubs() throws JMSException {
        Connection connection = new ActiveMQConnectionFactory(this.brokerConnectURI).createConnection();
        connection.setClientID("clientId");
        connection.start();
        Session session = connection.createSession(false, 1);
        session.createDurableSubscriber((Topic)this.topic, "sub1");
        session.createDurableSubscriber((Topic)this.topic, "sub2");
        return session;
    }

    @Test
    public void testRecoverExpired() throws Exception {
        try (Session session = this.initializeSubs();){
            int i;
            MessageProducer prod = session.createProducer((jakarta.jms.Destination)this.topic);
            Destination dest = this.broker.getDestination((ActiveMQDestination)this.topic);
            TopicMessageStore store = (TopicMessageStore)dest.getMessageStore();
            Map expired = store.recoverExpired(Set.of(this.subKey1, this.subKey2), 100, this.listener);
            Assert.assertTrue((boolean)expired.isEmpty());
            for (i = 0; i < 10; ++i) {
                ActiveMQTextMessage message = new ActiveMQTextMessage();
                message.setText("message" + i);
                int ttl = i % 2 == 0 ? 1000 : 0;
                prod.send((Message)message, 2, 4, (long)ttl);
            }
            Thread.sleep(1500L);
            expired = store.recoverExpired(Set.of(this.subKey1, this.subKey2), 100, this.listener);
            Assert.assertEquals((long)2L, (long)expired.size());
            Assert.assertEquals((long)5L, (long)((List)expired.get(this.subKey1)).size());
            Assert.assertEquals((long)5L, (long)((List)expired.get(this.subKey2)).size());
            for (i = 0; i < 2; ++i) {
                MessageAck ack = new MessageAck();
                ack.setLastMessageId(((org.apache.activemq.command.Message)((List)expired.get(this.subKey1)).get(i)).getMessageId());
                ack.setAckType((byte)6);
                ack.setDestination((ActiveMQDestination)this.topic);
                store.acknowledge(this.broker.getAdminConnectionContext(), "clientId", "sub1", ack.getLastMessageId(), ack);
            }
            expired = store.recoverExpired(Set.of(this.subKey1, this.subKey2), 100, this.listener);
            Assert.assertEquals((long)3L, (long)((List)expired.get(this.subKey1)).size());
            Assert.assertEquals((long)5L, (long)((List)expired.get(this.subKey2)).size());
            for (Map.Entry entry : expired.entrySet()) {
                for (org.apache.activemq.command.Message message : (List)entry.getValue()) {
                    MessageAck ack = new MessageAck();
                    ack.setLastMessageId(message.getMessageId());
                    ack.setAckType((byte)6);
                    ack.setDestination((ActiveMQDestination)this.topic);
                    store.acknowledge(this.broker.getAdminConnectionContext(), ((SubscriptionKey)entry.getKey()).getClientId(), ((SubscriptionKey)entry.getKey()).getSubscriptionName(), ack.getLastMessageId(), ack);
                }
            }
            expired = store.recoverExpired(Set.of(this.subKey1, this.subKey2), 100, this.listener);
            Assert.assertTrue((boolean)expired.isEmpty());
        }
    }

    @Test
    public void testRecoverExpiredMax() throws Exception {
        try (Session session = this.initializeSubs();){
            int i;
            MessageProducer prod = session.createProducer((jakarta.jms.Destination)this.topic);
            Destination dest = this.broker.getDestination((ActiveMQDestination)this.topic);
            TopicMessageStore store = (TopicMessageStore)dest.getMessageStore();
            Map expired = store.recoverExpired(Set.of(this.subKey1, this.subKey2), 100, this.listener);
            Assert.assertTrue((boolean)expired.isEmpty());
            ActiveMQTextMessage message = new ActiveMQTextMessage();
            for (i = 0; i < 100; ++i) {
                message.setText("message" + i);
                int ttl = i >= 50 ? 1000 : 0;
                prod.send((Message)message, 2, 4, (long)ttl);
            }
            Thread.sleep(1500L);
            expired = store.recoverExpired(Set.of(this.subKey1, this.subKey2), 100, this.listener);
            Assert.assertEquals((long)2L, (long)expired.size());
            Assert.assertEquals((long)50L, (long)((List)expired.get(this.subKey1)).size());
            Assert.assertEquals((long)50L, (long)((List)expired.get(this.subKey2)).size());
            expired = store.recoverExpired(Set.of(this.subKey1, this.subKey2), 50, this.listener);
            Assert.assertTrue((boolean)expired.isEmpty());
            expired = store.recoverExpired(Set.of(this.subKey1, this.subKey2), 75, this.listener);
            Assert.assertEquals((long)2L, (long)expired.size());
            Assert.assertEquals((long)25L, (long)((List)expired.get(this.subKey1)).size());
            Assert.assertEquals((long)25L, (long)((List)expired.get(this.subKey2)).size());
            for (i = 0; i < 25; ++i) {
                MessageAck ack = new MessageAck();
                ack.setLastMessageId(((org.apache.activemq.command.Message)((List)expired.get(this.subKey1)).get(i)).getMessageId());
                ack.setAckType((byte)6);
                ack.setDestination((ActiveMQDestination)this.topic);
                store.acknowledge(this.broker.getAdminConnectionContext(), "clientId", "sub1", ack.getLastMessageId(), ack);
            }
            expired = store.recoverExpired(Set.of(this.subKey1, this.subKey2), 100, this.listener);
            Assert.assertEquals((long)2L, (long)expired.size());
            Assert.assertEquals((long)25L, (long)((List)expired.get(this.subKey1)).size());
            Assert.assertEquals((long)50L, (long)((List)expired.get(this.subKey2)).size());
        }
    }

    @Test
    public void testRecoverExpiredSubSet() throws Exception {
        try (Session session = this.initializeSubs();){
            MessageProducer prod = session.createProducer((jakarta.jms.Destination)this.topic);
            Destination dest = this.broker.getDestination((ActiveMQDestination)this.topic);
            TopicMessageStore store = (TopicMessageStore)dest.getMessageStore();
            Map expired = store.recoverExpired(Set.of(this.subKey1, this.subKey2), 100, this.listener);
            Assert.assertTrue((boolean)expired.isEmpty());
            for (int i = 0; i < 10; ++i) {
                ActiveMQTextMessage message = new ActiveMQTextMessage();
                message.setText("message" + i);
                prod.send((Message)message, 2, 4, 1000L);
            }
            Thread.sleep(1500L);
            expired = store.recoverExpired(Set.of(this.subKey2), 100, this.listener);
            Assert.assertEquals((long)1L, (long)expired.size());
            Assert.assertEquals((long)10L, (long)((List)expired.get(this.subKey2)).size());
            MessageAck ack = new MessageAck();
            ack.setLastMessageId(((org.apache.activemq.command.Message)((List)expired.get(this.subKey2)).get(0)).getMessageId());
            ack.setAckType((byte)6);
            ack.setDestination((ActiveMQDestination)this.topic);
            store.acknowledge(this.broker.getAdminConnectionContext(), "clientId", "sub2", ack.getLastMessageId(), ack);
            expired = store.recoverExpired(Set.of(this.subKey2), 100, this.listener);
            Assert.assertEquals((long)1L, (long)expired.size());
            Assert.assertEquals((long)9L, (long)((List)expired.get(this.subKey2)).size());
            expired = store.recoverExpired(Set.of(this.subKey1), 100, this.listener);
            Assert.assertEquals((long)1L, (long)expired.size());
            Assert.assertEquals((long)10L, (long)((List)expired.get(this.subKey1)).size());
            SubscriptionKey unmatched = new SubscriptionKey("clientId", "sub3");
            expired = store.recoverExpired(Set.of(unmatched), 100, this.listener);
            Assert.assertTrue((boolean)expired.isEmpty());
            expired = store.recoverExpired(Set.of(this.subKey1, this.subKey2, unmatched), 100, this.listener);
            Assert.assertEquals((long)2L, (long)expired.size());
        }
    }

    @Test
    public void testRecoverExpiredRecoveryListener() throws Exception {
        try (Session session = this.initializeSubs();){
            MessageProducer prod = session.createProducer((jakarta.jms.Destination)this.topic);
            Destination dest = this.broker.getDestination((ActiveMQDestination)this.topic);
            TopicMessageStore store = (TopicMessageStore)dest.getMessageStore();
            ActiveMQTextMessage message = new ActiveMQTextMessage();
            for (int i = 0; i < 10; ++i) {
                message.setText("message" + i);
                prod.send((Message)message, 2, 4, 1000L);
            }
            Thread.sleep(1500L);
            final AtomicBoolean hasSpaceCalled = new AtomicBoolean();
            Map expired = store.recoverExpired(Set.of(this.subKey1, this.subKey2), 100, new MessageRecoveryListener(){

                public boolean recoverMessage(org.apache.activemq.command.Message message) {
                    return true;
                }

                public boolean recoverMessageReference(MessageId ref) {
                    return false;
                }

                public boolean hasSpace() {
                    hasSpaceCalled.set(true);
                    return false;
                }

                public boolean isDuplicate(MessageId ref) {
                    return false;
                }
            });
            Assert.assertTrue((boolean)expired.isEmpty());
            Assert.assertTrue((boolean)hasSpaceCalled.get());
            final HashSet ids = new HashSet();
            store.recoverExpired(Set.of(this.subKey1, this.subKey2), 100, new MessageRecoveryListener(){

                public boolean recoverMessage(org.apache.activemq.command.Message message) {
                    Assert.assertTrue((String)"duplicate message passed to listener", (boolean)ids.add(message.getMessageId()));
                    return true;
                }

                public boolean recoverMessageReference(MessageId ref) {
                    return false;
                }

                public boolean hasSpace() {
                    return true;
                }

                public boolean isDuplicate(MessageId ref) {
                    return false;
                }
            });
        }
    }
}

