package org.apache.activemq.broker.scheduler;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.scheduler.JobSchedulerTestSupport;
import org.apache.activemq.store.kahadb.disk.journal.Location;
import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
import org.apache.activemq.usecases.DurableSubProcessWithRestartTest;
import org.apache.activemq.util.DefaultTestAppender;
import org.apache.activemq.util.ProducerThread;
import org.apache.activemq.util.Wait;
import org.apache.log4j.Appender;
import org.apache.log4j.Level;
import org.apache.log4j.spi.LoggingEvent;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/broker/scheduler/JmsSchedulerTest.class */
public class JmsSchedulerTest extends JobSchedulerTestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(JmsSchedulerTest.class);

    @Test
    public void testCron() throws Exception {
        final AtomicInteger atomicInteger = new AtomicInteger();
        Connection createConnection = createConnection();
        Session createSession = createConnection.createSession(false, 1);
        MessageConsumer createConsumer = createSession.createConsumer(this.destination);
        final CountDownLatch countDownLatch = new CountDownLatch(10);
        createConsumer.setMessageListener(new MessageListener() { // from class: org.apache.activemq.broker.scheduler.JmsSchedulerTest.1
            public void onMessage(Message message) {
                atomicInteger.incrementAndGet();
                countDownLatch.countDown();
                JmsSchedulerTest.LOG.info("Received scheduled message, waiting for {} more", Long.valueOf(countDownLatch.getCount()));
            }
        });
        createConnection.start();
        MessageProducer createProducer = createSession.createProducer(this.destination);
        TextMessage createTextMessage = createSession.createTextMessage("test msg");
        createTextMessage.setStringProperty("AMQ_SCHEDULED_CRON", "* * * * *");
        createTextMessage.setLongProperty("AMQ_SCHEDULED_DELAY", 1000L);
        createTextMessage.setLongProperty("AMQ_SCHEDULED_PERIOD", 500L);
        createTextMessage.setIntProperty("AMQ_SCHEDULED_REPEAT", 9);
        createProducer.send(createTextMessage);
        createProducer.close();
        Thread.sleep(500L);
        Assert.assertEquals(1L, this.broker.getBroker().getAdaptor(SchedulerBroker.class).getJobScheduler().getAllJobs().size());
        countDownLatch.await(240L, TimeUnit.SECONDS);
        Assert.assertEquals(10L, atomicInteger.get());
        createConnection.close();
    }

    @Test
    public void testSchedule() throws Exception {
        Connection createConnection = createConnection();
        Session createSession = createConnection.createSession(false, 1);
        MessageConsumer createConsumer = createSession.createConsumer(this.destination);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        createConsumer.setMessageListener(new MessageListener() { // from class: org.apache.activemq.broker.scheduler.JmsSchedulerTest.2
            public void onMessage(Message message) {
                countDownLatch.countDown();
            }
        });
        createConnection.start();
        MessageProducer createProducer = createSession.createProducer(this.destination);
        TextMessage createTextMessage = createSession.createTextMessage("test msg");
        createTextMessage.setLongProperty("AMQ_SCHEDULED_DELAY", 5000L);
        createProducer.send(createTextMessage);
        createProducer.close();
        Thread.sleep(2000L);
        Assert.assertEquals(countDownLatch.getCount(), 1L);
        countDownLatch.await(5L, TimeUnit.SECONDS);
        Assert.assertEquals(countDownLatch.getCount(), 0L);
        createConnection.close();
    }

    @Test
    public void testTransactedSchedule() throws Exception {
        Connection createConnection = createConnection();
        final Session createSession = createConnection.createSession(true, 0);
        MessageConsumer createConsumer = createSession.createConsumer(this.destination);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        createConsumer.setMessageListener(new MessageListener() { // from class: org.apache.activemq.broker.scheduler.JmsSchedulerTest.3
            public void onMessage(Message message) {
                try {
                    createSession.commit();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
                countDownLatch.countDown();
            }
        });
        createConnection.start();
        MessageProducer createProducer = createSession.createProducer(this.destination);
        TextMessage createTextMessage = createSession.createTextMessage("test msg");
        createTextMessage.setLongProperty("AMQ_SCHEDULED_DELAY", 5000L);
        createProducer.send(createTextMessage);
        createSession.commit();
        createProducer.close();
        Thread.sleep(2000L);
        Assert.assertEquals(countDownLatch.getCount(), 1L);
        countDownLatch.await(10L, TimeUnit.SECONDS);
        Assert.assertEquals(countDownLatch.getCount(), 0L);
        createConnection.close();
    }

    @Test
    public void testScheduleRepeated() throws Exception {
        final AtomicInteger atomicInteger = new AtomicInteger();
        Connection createConnection = createConnection();
        Session createSession = createConnection.createSession(false, 1);
        MessageConsumer createConsumer = createSession.createConsumer(this.destination);
        final CountDownLatch countDownLatch = new CountDownLatch(10);
        createConsumer.setMessageListener(new MessageListener() { // from class: org.apache.activemq.broker.scheduler.JmsSchedulerTest.4
            public void onMessage(Message message) {
                atomicInteger.incrementAndGet();
                countDownLatch.countDown();
                JmsSchedulerTest.LOG.info("Received scheduled message, waiting for {} more", Long.valueOf(countDownLatch.getCount()));
            }
        });
        createConnection.start();
        MessageProducer createProducer = createSession.createProducer(this.destination);
        TextMessage createTextMessage = createSession.createTextMessage("test msg");
        createTextMessage.setLongProperty("AMQ_SCHEDULED_DELAY", 1000L);
        createTextMessage.setLongProperty("AMQ_SCHEDULED_PERIOD", 500L);
        createTextMessage.setIntProperty("AMQ_SCHEDULED_REPEAT", 9);
        createProducer.send(createTextMessage);
        createProducer.close();
        Assert.assertEquals(countDownLatch.getCount(), 10L);
        countDownLatch.await(10L, TimeUnit.SECONDS);
        Assert.assertEquals(0L, countDownLatch.getCount());
        Thread.sleep(1000L);
        Assert.assertEquals(10L, atomicInteger.get());
        createConnection.close();
    }

    @Test
    public void testScheduleRestart() throws Exception {
        testScheduleRestart(JobSchedulerTestSupport.RestartType.NORMAL);
    }

    @Test
    public void testScheduleFullRecoveryRestart() throws Exception {
        testScheduleRestart(JobSchedulerTestSupport.RestartType.FULL_RECOVERY);
    }

    @Test
    public void testUpdatesAppliedToIndexBeforeJournalShouldBeDiscarded() throws Exception {
        final AtomicInteger atomicInteger = new AtomicInteger();
        JobSchedulerStoreImpl jobSchedulerStore = this.broker.getJobSchedulerStore();
        Location location = null;
        registerLogAppender(new DefaultTestAppender() { // from class: org.apache.activemq.broker.scheduler.JmsSchedulerTest.5
            public void doAppend(LoggingEvent loggingEvent) {
                if (loggingEvent.getMessage().toString().contains("Removed Job past last appened in the journal")) {
                    atomicInteger.incrementAndGet();
                }
            }
        });
        Connection createConnection = createConnection();
        Session createSession = createConnection.createSession(false, 1);
        createConnection.start();
        MessageProducer createProducer = createSession.createProducer(this.destination);
        for (int i = 0; i < 1000; i++) {
            TextMessage createTextMessage = createSession.createTextMessage("test msg");
            createTextMessage.setLongProperty("AMQ_SCHEDULED_DELAY", 5000L);
            createProducer.send(createTextMessage);
            if (500 == i) {
                location = jobSchedulerStore.getJournal().getLastAppendLocation();
            }
        }
        createProducer.close();
        this.broker.stop();
        this.broker.waitUntilStopped();
        jobSchedulerStore.getJournal().setLastAppendLocation(location);
        jobSchedulerStore.load();
        Assert.assertEquals(atomicInteger.get(), 500L);
    }

    private void registerLogAppender(Appender appender) {
        org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(JobSchedulerStoreImpl.class);
        logger.addAppender(appender);
        logger.setLevel(Level.TRACE);
    }

    private void testScheduleRestart(JobSchedulerTestSupport.RestartType restartType) throws Exception {
        Connection createConnection = createConnection();
        Session createSession = createConnection.createSession(false, 1);
        createConnection.start();
        MessageProducer createProducer = createSession.createProducer(this.destination);
        TextMessage createTextMessage = createSession.createTextMessage("test msg");
        createTextMessage.setLongProperty("AMQ_SCHEDULED_DELAY", 5000L);
        createProducer.send(createTextMessage);
        createProducer.close();
        restartBroker(restartType);
        Connection createConnection2 = createConnection();
        createConnection2.start();
        Session createSession2 = createConnection2.createSession(false, 1);
        Assert.assertNotNull("Didn't receive the message", createSession2.createConsumer(this.destination).receive(DurableSubProcessWithRestartTest.BROKER_RESTART));
        MessageProducer createProducer2 = createSession2.createProducer(this.destination);
        createTextMessage.setLongProperty("AMQ_SCHEDULED_DELAY", 5000L);
        createProducer2.send(createTextMessage);
        createProducer2.close();
        createConnection2.close();
    }

    @Test
    public void testJobSchedulerStoreUsage() throws Exception {
        this.broker.getSystemUsage().getJobSchedulerUsage().setLimit(10240L);
        Connection createConnection = new ActiveMQConnectionFactory("vm://localhost").createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        final ProducerThread producerThread = new ProducerThread(createSession, this.destination) { // from class: org.apache.activemq.broker.scheduler.JmsSchedulerTest.6
            protected Message createMessage(int i) throws Exception {
                Message createMessage = super.createMessage(i);
                createMessage.setLongProperty("AMQ_SCHEDULED_DELAY", 5000L);
                return createMessage;
            }
        };
        producerThread.setMessageCount(100);
        producerThread.start();
        MessageConsumer createConsumer = createSession.createConsumer(this.destination);
        final CountDownLatch countDownLatch = new CountDownLatch(100);
        createConsumer.setMessageListener(new MessageListener() { // from class: org.apache.activemq.broker.scheduler.JmsSchedulerTest.7
            public void onMessage(Message message) {
                countDownLatch.countDown();
            }
        });
        Thread.sleep(DurableSubProcessWithRestartTest.BROKER_RESTART);
        Assert.assertEquals(100L, countDownLatch.getCount());
        this.broker.getSystemUsage().getJobSchedulerUsage().setLimit(34603008L);
        Thread.sleep(DurableSubProcessWithRestartTest.BROKER_RESTART);
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.broker.scheduler.JmsSchedulerTest.8
            public boolean isSatisified() throws Exception {
                return producerThread.getSentCount() == producerThread.getMessageCount();
            }
        }, 20000L);
        Assert.assertEquals("Producer didn't send all messages", producerThread.getMessageCount(), producerThread.getSentCount());
        countDownLatch.await(20000L, TimeUnit.MILLISECONDS);
        Assert.assertEquals("Consumer did not receive all messages.", 0L, countDownLatch.getCount());
        createConnection.close();
    }
}
