package org.apache.activemq.broker.store;

import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import junit.framework.Test;
import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerRestartTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.StubConnection;
import org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.SessionInfo;

/* loaded from: input_file:org/apache/activemq/broker/store/RecoverExpiredMessagesTest.class */
public class RecoverExpiredMessagesTest extends BrokerRestartTestSupport {
    final ArrayList<String> expected = new ArrayList<>();
    final ActiveMQDestination destination = new ActiveMQQueue("TEST");
    public PendingQueueMessageStoragePolicy queuePendingPolicy;
    public TestSupport.PersistenceAdapterChoice persistenceAdapterChoice;

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.broker.BrokerTestSupport
    public void setUp() throws Exception {
        setAutoFail(true);
        super.setUp();
    }

    public void initCombosForTestRecovery() throws Exception {
        addCombinationValues("queuePendingPolicy", new PendingQueueMessageStoragePolicy[]{new FilePendingQueueMessageStoragePolicy(), new VMPendingQueueMessageStoragePolicy()});
        addCombinationValues("persistenceAdapterChoice", new TestSupport.PersistenceAdapterChoice[]{TestSupport.PersistenceAdapterChoice.JDBC, TestSupport.PersistenceAdapterChoice.KahaDB});
    }

    public void testRecovery() throws Exception {
        sendSomeMessagesThatWillExpireIn5AndThenOne();
        this.broker.stop();
        this.broker.waitUntilStopped();
        TimeUnit.SECONDS.sleep(6L);
        this.broker = createRestartedBroker();
        this.broker.start();
        consumeExpected();
    }

    private void consumeExpected() throws Exception {
        StubConnection createConnection = createConnection();
        ConnectionInfo createConnectionInfo = createConnectionInfo();
        SessionInfo createSessionInfo = createSessionInfo(createConnectionInfo);
        createConnection.send(createConnectionInfo);
        createConnection.send(createSessionInfo);
        ConsumerInfo createConsumerInfo = createConsumerInfo(createSessionInfo, this.destination);
        createConnection.send(createConsumerInfo);
        Message receiveMessage = receiveMessage(createConnection);
        assertNotNull("Should have received message " + this.expected.get(0) + " by now!", receiveMessage);
        assertEquals(this.expected.get(0), receiveMessage.getMessageId().toString());
        createConnection.send(createAck(createConsumerInfo, receiveMessage, 1, (byte) 2));
        assertNoMessagesLeft(createConnection);
        createConnection.request(closeConnectionInfo(createConnectionInfo));
    }

    private void sendSomeMessagesThatWillExpireIn5AndThenOne() throws Exception {
        StubConnection createConnection = createConnection();
        ConnectionInfo createConnectionInfo = createConnectionInfo();
        SessionInfo createSessionInfo = createSessionInfo(createConnectionInfo);
        ProducerInfo createProducerInfo = createProducerInfo(createSessionInfo);
        createConnection.send(createConnectionInfo);
        createConnection.send(createSessionInfo);
        createConnection.send(createProducerInfo);
        for (int i = 0; i < 10; i++) {
            Message createMessage = createMessage(createProducerInfo, this.destination);
            createMessage.setExpiration(System.currentTimeMillis() + 5000);
            createMessage.setPersistent(true);
            createConnection.send(createMessage);
        }
        Message createMessage2 = createMessage(createProducerInfo, this.destination);
        createMessage2.setPersistent(true);
        createConnection.send(createMessage2);
        this.expected.add(createMessage2.getMessageId().toString());
        createConnection.request(closeConnectionInfo(createConnectionInfo));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.broker.BrokerTestSupport
    public PolicyEntry getDefaultPolicy() {
        PolicyEntry defaultPolicy = super.getDefaultPolicy();
        defaultPolicy.setPendingQueuePolicy(this.queuePendingPolicy);
        defaultPolicy.setExpireMessagesPeriod(0L);
        return defaultPolicy;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.broker.BrokerRestartTestSupport
    public void configureBroker(BrokerService brokerService) throws Exception {
        super.configureBroker(brokerService);
        TestSupport.setPersistenceAdapter(brokerService, this.persistenceAdapterChoice);
    }

    public static Test suite() {
        return suite(RecoverExpiredMessagesTest.class);
    }
}
