/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.broker.scheduler;

import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageListener;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TemporaryQueue;
import jakarta.jms.TemporaryTopic;
import jakarta.jms.TextMessage;
import jakarta.jms.Topic;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.scheduler.JobSchedulerTestSupport;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.store.kahadb.disk.journal.DataFile;
import org.apache.activemq.store.kahadb.disk.page.Transaction;
import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
import org.apache.activemq.util.IdGenerator;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JobSchedulerManagementTest
extends JobSchedulerTestSupport {
    private static final transient Logger LOG = LoggerFactory.getLogger(JobSchedulerManagementTest.class);

    @Override
    protected BrokerService createBroker() throws Exception {
        BrokerService brokerService = this.createBroker(true);
        if (this.isPersistent()) {
            ((JobSchedulerStoreImpl)brokerService.getJobSchedulerStore()).setCleanupInterval(500L);
            ((JobSchedulerStoreImpl)brokerService.getJobSchedulerStore()).setJournalMaxFileLength(102400);
        }
        return brokerService;
    }

    @Test
    public void testRemoveAllScheduled() throws Exception {
        ((org.apache.logging.log4j.core.Logger)org.apache.logging.log4j.core.Logger.class.cast(LogManager.getLogger(Transaction.class))).setLevel(Level.DEBUG);
        int COUNT = 5000;
        System.setProperty("maxKahaDBTxSize", "512000");
        Connection connection = this.createConnection();
        this.scheduleMessage(connection, TimeUnit.SECONDS.toMillis(180L), 5000);
        Session session = connection.createSession(false, 1);
        Topic management = session.createTopic("ActiveMQ.Scheduler.Management");
        MessageConsumer consumer = session.createConsumer((Destination)this.destination);
        final CountDownLatch latch = new CountDownLatch(5000);
        consumer.setMessageListener(new MessageListener(){

            public void onMessage(Message message) {
                latch.countDown();
            }
        });
        connection.start();
        MessageProducer producer = session.createProducer((Destination)management);
        Message request = session.createMessage();
        request.setStringProperty("AMQ_SCHEDULER_ACTION", "REMOVEALL");
        producer.send(request);
        latch.await(10L, TimeUnit.SECONDS);
        Assert.assertEquals((long)latch.getCount(), (long)5000L);
        if (this.isPersistent()) {
            Assert.assertEquals((long)1L, (long)this.getNumberOfJournalFiles());
        }
        connection.close();
    }

    @Test
    public void testRemoveAllScheduledAtTime() throws Exception {
        int COUNT = 3;
        Connection connection = this.createConnection();
        this.scheduleMessage(connection, TimeUnit.SECONDS.toMillis(6L));
        this.scheduleMessage(connection, TimeUnit.SECONDS.toMillis(15L));
        this.scheduleMessage(connection, TimeUnit.SECONDS.toMillis(20L));
        Session session = connection.createSession(false, 1);
        Topic management = session.createTopic("ActiveMQ.Scheduler.Management");
        TemporaryQueue browseDest = session.createTemporaryQueue();
        MessageConsumer consumer = session.createConsumer((Destination)this.destination);
        final CountDownLatch latch = new CountDownLatch(3);
        consumer.setMessageListener(new MessageListener(){

            public void onMessage(Message message) {
                latch.countDown();
            }
        });
        MessageConsumer browser = session.createConsumer((Destination)browseDest);
        final CountDownLatch browsedLatch = new CountDownLatch(3);
        browser.setMessageListener(new MessageListener(){

            public void onMessage(Message message) {
                browsedLatch.countDown();
                LOG.debug("Scheduled Message Browser got Message: " + String.valueOf(message));
            }
        });
        connection.start();
        long start = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(10L);
        long end = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(30L);
        MessageProducer producer = session.createProducer((Destination)management);
        Message request = session.createMessage();
        request.setStringProperty("AMQ_SCHEDULER_ACTION", "REMOVEALL");
        request.setStringProperty("ACTION_START_TIME", Long.toString(start));
        request.setStringProperty("ACTION_END_TIME", Long.toString(end));
        producer.send(request);
        request = session.createMessage();
        request.setStringProperty("AMQ_SCHEDULER_ACTION", "BROWSE");
        request.setJMSReplyTo((Destination)browseDest);
        producer.send(request);
        latch.await(10L, TimeUnit.SECONDS);
        Assert.assertEquals((long)2L, (long)browsedLatch.getCount());
        latch.await(10L, TimeUnit.SECONDS);
        Assert.assertEquals((long)2L, (long)latch.getCount());
        connection.close();
    }

    @Test
    public void testBrowseAllScheduled() throws Exception {
        int COUNT = 10;
        Connection connection = this.createConnection();
        this.scheduleMessage(connection, TimeUnit.SECONDS.toMillis(9L), 10);
        Session session = connection.createSession(false, 1);
        Topic requestBrowse = session.createTopic("ActiveMQ.Scheduler.Management");
        TemporaryQueue browseDest = session.createTemporaryQueue();
        MessageConsumer consumer = session.createConsumer((Destination)this.destination);
        final CountDownLatch latch = new CountDownLatch(10);
        consumer.setMessageListener(new MessageListener(){

            public void onMessage(Message message) {
                latch.countDown();
            }
        });
        MessageConsumer browser = session.createConsumer((Destination)browseDest);
        final CountDownLatch browsedLatch = new CountDownLatch(10);
        browser.setMessageListener(new MessageListener(){

            public void onMessage(Message message) {
                browsedLatch.countDown();
                LOG.debug("Scheduled Message Browser got Message: " + String.valueOf(message));
            }
        });
        connection.start();
        MessageProducer producer = session.createProducer((Destination)requestBrowse);
        Message request = session.createMessage();
        request.setStringProperty("AMQ_SCHEDULER_ACTION", "BROWSE");
        request.setJMSReplyTo((Destination)browseDest);
        producer.send(request);
        Thread.sleep(2000L);
        Assert.assertEquals((long)latch.getCount(), (long)10L);
        latch.await(10L, TimeUnit.SECONDS);
        Assert.assertEquals((long)browsedLatch.getCount(), (long)0L);
        latch.await(10L, TimeUnit.SECONDS);
        Assert.assertEquals((long)latch.getCount(), (long)0L);
        connection.close();
    }

    @Test
    public void testBrowseWindowlScheduled() throws Exception {
        int COUNT = 10;
        Connection connection = this.createConnection();
        this.scheduleMessage(connection, TimeUnit.SECONDS.toMillis(5L));
        this.scheduleMessage(connection, TimeUnit.SECONDS.toMillis(10L), 10);
        this.scheduleMessage(connection, TimeUnit.SECONDS.toMillis(20L));
        Session session = connection.createSession(false, 1);
        Topic requestBrowse = session.createTopic("ActiveMQ.Scheduler.Management");
        TemporaryQueue browseDest = session.createTemporaryQueue();
        MessageConsumer consumer = session.createConsumer((Destination)this.destination);
        final CountDownLatch latch = new CountDownLatch(12);
        consumer.setMessageListener(new MessageListener(){

            public void onMessage(Message message) {
                latch.countDown();
            }
        });
        MessageConsumer browser = session.createConsumer((Destination)browseDest);
        final CountDownLatch browsedLatch = new CountDownLatch(10);
        browser.setMessageListener(new MessageListener(){

            public void onMessage(Message message) {
                browsedLatch.countDown();
                LOG.debug("Scheduled Message Browser got Message: " + String.valueOf(message));
            }
        });
        connection.start();
        long start = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(6L);
        long end = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(15L);
        MessageProducer producer = session.createProducer((Destination)requestBrowse);
        Message request = session.createMessage();
        request.setStringProperty("AMQ_SCHEDULER_ACTION", "BROWSE");
        request.setStringProperty("ACTION_START_TIME", Long.toString(start));
        request.setStringProperty("ACTION_END_TIME", Long.toString(end));
        request.setJMSReplyTo((Destination)browseDest);
        producer.send(request);
        Thread.sleep(2000L);
        Assert.assertEquals((long)12L, (long)latch.getCount());
        latch.await(15L, TimeUnit.SECONDS);
        Assert.assertEquals((long)0L, (long)browsedLatch.getCount());
        latch.await(20L, TimeUnit.SECONDS);
        Assert.assertEquals((long)0L, (long)latch.getCount());
        connection.close();
    }

    @Test
    public void testRemoveScheduled() throws Exception {
        int COUNT = 10;
        Connection connection = this.createConnection();
        this.scheduleMessage(connection, TimeUnit.SECONDS.toMillis(9L), 10);
        Session session = connection.createSession(false, 1);
        Topic management = session.createTopic("ActiveMQ.Scheduler.Management");
        TemporaryQueue browseDest = session.createTemporaryQueue();
        MessageConsumer consumer = session.createConsumer((Destination)this.destination);
        MessageProducer producer = session.createProducer((Destination)management);
        final CountDownLatch latch = new CountDownLatch(10);
        consumer.setMessageListener(new MessageListener(){

            public void onMessage(Message message) {
                latch.countDown();
            }
        });
        Session browseSession = connection.createSession(false, 1);
        MessageConsumer browser = browseSession.createConsumer((Destination)browseDest);
        connection.start();
        Message request = session.createMessage();
        request.setStringProperty("AMQ_SCHEDULER_ACTION", "BROWSE");
        request.setJMSReplyTo((Destination)browseDest);
        producer.send(request);
        for (int i = 0; i < 10; ++i) {
            Message message = browser.receive(2000L);
            Assert.assertNotNull((Object)message);
            try {
                Message remove = session.createMessage();
                remove.setStringProperty("AMQ_SCHEDULER_ACTION", "REMOVE");
                remove.setStringProperty("scheduledJobId", message.getStringProperty("scheduledJobId"));
                producer.send(remove);
                continue;
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
        latch.await(11L, TimeUnit.SECONDS);
        Assert.assertEquals((long)10L, (long)latch.getCount());
        connection.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRemoveNotScheduled() throws Exception {
        Connection connection = this.createConnection();
        Session session = connection.createSession(false, 1);
        Topic management = session.createTopic("ActiveMQ.Scheduler.Management");
        MessageProducer producer = session.createProducer((Destination)management);
        try {
            Message remove = session.createMessage();
            remove.setStringProperty("AMQ_SCHEDULER_ACTION", "REMOVEALL");
            remove.setStringProperty("scheduledJobId", new IdGenerator().generateId());
            producer.send(remove);
        }
        catch (Exception e) {
            Assert.fail((String)"Caught unexpected exception during remove of unscheduled message.");
        }
        finally {
            connection.close();
        }
    }

    @Test
    public void testBrowseWithSelector() throws Exception {
        Connection connection = this.createConnection();
        this.scheduleMessage(connection, TimeUnit.SECONDS.toMillis(9L));
        this.scheduleMessage(connection, TimeUnit.SECONDS.toMillis(10L));
        this.scheduleMessage(connection, TimeUnit.SECONDS.toMillis(5L));
        this.scheduleMessage(connection, TimeUnit.SECONDS.toMillis(45L));
        Session session = connection.createSession(false, 1);
        Topic requestBrowse = session.createTopic("ActiveMQ.Scheduler.Management");
        TemporaryTopic browseDest = session.createTemporaryTopic();
        MessageConsumer browser = session.createConsumer((Destination)browseDest, "AMQ_SCHEDULED_DELAY = 45000");
        connection.start();
        MessageProducer producer = session.createProducer((Destination)requestBrowse);
        Message request = session.createMessage();
        request.setStringProperty("AMQ_SCHEDULER_ACTION", "BROWSE");
        request.setJMSReplyTo((Destination)browseDest);
        producer.send(request);
        Message message = browser.receive(5000L);
        Assert.assertNotNull((Object)message);
        Assert.assertEquals((long)45000L, (long)message.getLongProperty("AMQ_SCHEDULED_DELAY"));
        Assert.assertEquals((Object)this.destination, (Object)((ActiveMQMessage)message).getOriginalDestination());
        message = browser.receive(5000L);
        Assert.assertNull((Object)message);
        connection.close();
    }

    protected void scheduleMessage(Connection connection, long delay) throws Exception {
        this.scheduleMessage(connection, delay, 1);
    }

    protected void scheduleMessage(Connection connection, long delay, int count) throws Exception {
        Session session = connection.createSession(false, 1);
        MessageProducer producer = session.createProducer((Destination)this.destination);
        TextMessage message = session.createTextMessage("test msg");
        message.setLongProperty("AMQ_SCHEDULED_DELAY", delay);
        for (int i = 0; i < count; ++i) {
            producer.send((Message)message);
        }
        producer.close();
    }

    private int getNumberOfJournalFiles() throws IOException, InterruptedException {
        Collection files = ((JobSchedulerStoreImpl)this.broker.getJobSchedulerStore()).getJournal().getFileMap().values();
        int reality = 0;
        for (DataFile file : files) {
            if (file == null) continue;
            ++reality;
        }
        return reality;
    }
}

