/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.store.kahadb;

import jakarta.jms.Destination;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class KahaDBStoreRecoveryExpiryTest {
    private BrokerService broker;
    private ActiveMQConnection connection;
    private final Destination destination = new ActiveMQQueue("Test");
    private Session session;

    @Test
    public void testRestartWitExpired() throws Exception {
        this.publishMessages(1, 0);
        this.publishMessages(1, 2000);
        this.publishMessages(1, 0);
        this.restartBroker(3000);
        this.consumeMessages(2);
    }

    @Test
    public void testRestartWitExpiredLargerThanBatchRecovery() throws Exception {
        this.publishMessages(210, 2000);
        this.publishMessages(10, 0);
        this.restartBroker(3000);
        this.consumeMessages(10);
    }

    private void consumeMessages(int count) throws Exception {
        MessageConsumer consumer = this.session.createConsumer(this.destination);
        for (int i = 0; i < count; ++i) {
            Assert.assertNotNull((String)("got message " + i), (Object)consumer.receive(4000L));
        }
        Assert.assertNull((String)"none left over", (Object)consumer.receive(2000L));
    }

    private void restartBroker(int restartDelay) throws Exception {
        this.stopBroker();
        TimeUnit.MILLISECONDS.sleep(restartDelay);
        this.startBroker();
    }

    @After
    public void stopBroker() throws Exception {
        this.broker.stop();
        this.broker.waitUntilStopped();
    }

    private void publishMessages(int count, int expiry) throws Exception {
        MessageProducer producer = this.session.createProducer(this.destination);
        for (int i = 0; i < count; ++i) {
            producer.send((Message)this.session.createTextMessage(), 2, 5, (long)expiry);
        }
    }

    @Before
    public void startBroker() throws Exception {
        this.broker = new BrokerService();
        ((KahaDBPersistenceAdapter)this.broker.getPersistenceAdapter()).setIndexCacheSize(0);
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry defaultEntry = new PolicyEntry();
        defaultEntry.setPendingQueuePolicy((PendingQueueMessageStoragePolicy)new VMPendingQueueMessageStoragePolicy());
        policyMap.setDefaultEntry(defaultEntry);
        this.broker.setDestinationPolicy(policyMap);
        this.broker.setUseJmx(false);
        this.broker.start();
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?create=false");
        this.connection = (ActiveMQConnection)connectionFactory.createConnection();
        this.connection.setWatchTopicAdvisories(false);
        this.connection.start();
        this.session = this.connection.createSession(false, 1);
    }
}

