package org.apache.activemq.broker.scheduler;

import java.io.File;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.util.IOHelper;
import org.apache.log4j.BasicConfigurator;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/broker/scheduler/LostScheduledMessagesTest.class */
public class LostScheduledMessagesTest {
    private BrokerService broker;
    private static final File schedulerDirectory = new File("target/test/ScheduledDB");
    private static final File messageDirectory = new File("target/test/MessageDB");
    private static final String QUEUE_NAME = "test";

    @Before
    public void setup() throws Exception {
        IOHelper.mkdirs(schedulerDirectory);
        IOHelper.deleteChildren(schedulerDirectory);
        IOHelper.mkdirs(messageDirectory);
        IOHelper.deleteChildren(messageDirectory);
    }

    private void startBroker() throws Exception {
        this.broker = new BrokerService();
        this.broker.setSchedulerSupport(true);
        this.broker.setPersistent(true);
        this.broker.setDeleteAllMessagesOnStartup(false);
        this.broker.setDataDirectory("target");
        this.broker.setSchedulerDirectoryFile(schedulerDirectory);
        this.broker.setDataDirectoryFile(messageDirectory);
        this.broker.setUseJmx(false);
        this.broker.addConnector("vm://localhost");
        this.broker.start();
    }

    @After
    public void tearDown() throws Exception {
        this.broker.stop();
        BasicConfigurator.resetConfiguration();
    }

    @Test
    public void MessagePassedNotUsingScheduling() throws Exception {
        doTest(false);
    }

    @Test
    public void MessageLostWhenUsingScheduling() throws Exception {
        doTest(true);
    }

    private void doTest(boolean z) throws Exception {
        startBroker();
        long currentTimeMillis = System.currentTimeMillis();
        Connection createConnection = new ActiveMQConnectionFactory("vm://localhost").createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(createSession.createQueue(QUEUE_NAME));
        TextMessage createTextMessage = createSession.createTextMessage(QUEUE_NAME);
        if (z) {
            createTextMessage.setLongProperty("AMQ_SCHEDULED_DELAY", 5000);
        }
        createProducer.send(createTextMessage);
        createSession.close();
        createConnection.close();
        this.broker.getServices();
        this.broker.stop();
        this.broker.waitUntilStopped();
        Assert.assertTrue("Failed to shut down broker in expected time. Test results inconclusive", System.currentTimeMillis() - currentTimeMillis < ((long) 5000));
        TimeUnit.MILLISECONDS.sleep(5000);
        startBroker();
        final AtomicLong atomicLong = new AtomicLong();
        Connection createConnection2 = new ActiveMQConnectionFactory("vm://localhost").createConnection();
        createConnection2.start();
        Session createSession2 = createConnection2.createSession(false, 1);
        createSession2.createConsumer(createSession2.createQueue(QUEUE_NAME)).setMessageListener(new MessageListener() { // from class: org.apache.activemq.broker.scheduler.LostScheduledMessagesTest.1
            public void onMessage(Message message) {
                atomicLong.incrementAndGet();
            }
        });
        TimeUnit.MILLISECONDS.sleep(5000 * 2);
        createSession2.close();
        createConnection2.close();
        Assert.assertEquals(1L, atomicLong.get());
    }
}
