package org.apache.activemq.broker.scheduler;

import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
import org.apache.activemq.broker.util.RedeliveryPlugin;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
import org.apache.activemq.usecases.DurableSubProcessWithRestartTest;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.Wait;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/broker/scheduler/JobSchedulerRedliveryPluginDLQStoreCleanupTest.class */
public class JobSchedulerRedliveryPluginDLQStoreCleanupTest {
    static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStoreCheckpointTest.class);
    private JobSchedulerStoreImpl store;
    private BrokerService brokerService;
    private ByteSequence payload;
    private String connectionURI;
    private ActiveMQConnectionFactory cf;

    @Before
    public void setUp() throws Exception {
        ((org.apache.logging.log4j.core.Logger) org.apache.logging.log4j.core.Logger.class.cast(LogManager.getLogger(JobSchedulerStoreImpl.class))).setLevel(Level.TRACE);
        File file = new File("target/test/ScheduledJobsDB");
        IOHelper.mkdirs(file);
        IOHelper.deleteChildren(file);
        createSchedulerStore(file);
        this.brokerService = new BrokerService();
        this.brokerService.setUseJmx(false);
        this.brokerService.setDeleteAllMessagesOnStartup(true);
        this.brokerService.setJobSchedulerStore(this.store);
        this.brokerService.setSchedulerSupport(true);
        this.brokerService.setAdvisorySupport(false);
        TransportConnector addConnector = this.brokerService.addConnector("tcp://0.0.0.0:0");
        this.brokerService.setPlugins(new BrokerPlugin[]{createRedeliveryPlugin()});
        PolicyEntry policyEntry = new PolicyEntry();
        IndividualDeadLetterStrategy individualDeadLetterStrategy = new IndividualDeadLetterStrategy();
        individualDeadLetterStrategy.setProcessExpired(true);
        individualDeadLetterStrategy.setProcessNonPersistent(false);
        individualDeadLetterStrategy.setUseQueueForQueueMessages(true);
        individualDeadLetterStrategy.setQueuePrefix("DLQ.");
        policyEntry.setDeadLetterStrategy(individualDeadLetterStrategy);
        PolicyMap policyMap = new PolicyMap();
        policyMap.setDefaultEntry(policyEntry);
        this.brokerService.setDestinationPolicy(policyMap);
        this.brokerService.start();
        this.brokerService.waitUntilStarted();
        this.connectionURI = addConnector.getPublishableConnectString();
        byte[] bArr = new byte[8192];
        for (int i = 0; i < bArr.length; i++) {
            bArr[i] = (byte) (i % 256);
        }
        this.payload = new ByteSequence(bArr);
        this.cf = new ActiveMQConnectionFactory(this.connectionURI);
        this.cf.getRedeliveryPolicy().setMaximumRedeliveries(0);
    }

    protected void createSchedulerStore(File file) throws Exception {
        this.store = new JobSchedulerStoreImpl();
        this.store.setDirectory(file);
        this.store.setCheckpointInterval(5000L);
        this.store.setCleanupInterval(DurableSubProcessWithRestartTest.BROKER_RESTART);
        this.store.setJournalMaxFileLength(10240);
    }

    protected RedeliveryPlugin createRedeliveryPlugin() {
        RedeliveryPlugin redeliveryPlugin = new RedeliveryPlugin();
        RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
        redeliveryPolicy.setMaximumRedeliveries(3);
        redeliveryPolicy.setDestination(new ActiveMQQueue("FOO.BAR"));
        RedeliveryPolicy redeliveryPolicy2 = new RedeliveryPolicy();
        redeliveryPolicy2.setInitialRedeliveryDelay(5000L);
        redeliveryPolicy2.setMaximumRedeliveries(0);
        RedeliveryPolicyMap redeliveryPolicyMap = new RedeliveryPolicyMap();
        redeliveryPolicyMap.setDefaultEntry(redeliveryPolicy2);
        redeliveryPolicyMap.setRedeliveryPolicyEntries(Arrays.asList(redeliveryPolicy));
        redeliveryPlugin.setRedeliveryPolicyMap(redeliveryPolicyMap);
        return redeliveryPlugin;
    }

    @After
    public void tearDown() throws Exception {
        if (this.brokerService != null) {
            this.brokerService.stop();
            this.brokerService.waitUntilStopped();
        }
    }

    @Test
    public void testProducerAndRollback() throws Exception {
        Connection createConnection = this.cf.createConnection();
        final Session createSession = createConnection.createSession(false, 1);
        final Session createSession2 = createConnection.createSession(true, 0);
        Queue createQueue = createSession.createQueue("FOO.BAR");
        final MessageProducer createProducer = createSession.createProducer(createQueue);
        MessageConsumer createConsumer = createSession2.createConsumer(createQueue);
        final CountDownLatch countDownLatch = new CountDownLatch(8);
        createConnection.start();
        createProducer.setDeliveryMode(2);
        createConsumer.setMessageListener(new MessageListener() { // from class: org.apache.activemq.broker.scheduler.JobSchedulerRedliveryPluginDLQStoreCleanupTest.1
            public void onMessage(Message message) {
                try {
                    JobSchedulerRedliveryPluginDLQStoreCleanupTest.LOG.info("Rolling back incoming message");
                    createSession2.rollback();
                } catch (JMSException e) {
                    JobSchedulerRedliveryPluginDLQStoreCleanupTest.LOG.warn("Failed to Rollback on incoming message");
                }
            }
        });
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        newSingleThreadScheduledExecutor.scheduleWithFixedDelay(new Runnable() { // from class: org.apache.activemq.broker.scheduler.JobSchedulerRedliveryPluginDLQStoreCleanupTest.2
            @Override // java.lang.Runnable
            public void run() {
                try {
                    BytesMessage createBytesMessage = createSession.createBytesMessage();
                    createBytesMessage.writeBytes(JobSchedulerRedliveryPluginDLQStoreCleanupTest.this.payload.data, JobSchedulerRedliveryPluginDLQStoreCleanupTest.this.payload.offset, JobSchedulerRedliveryPluginDLQStoreCleanupTest.this.payload.length);
                    createProducer.send(createBytesMessage);
                    JobSchedulerRedliveryPluginDLQStoreCleanupTest.LOG.info("Send next Message to Queue");
                    countDownLatch.countDown();
                } catch (JMSException e) {
                    JobSchedulerRedliveryPluginDLQStoreCleanupTest.LOG.warn("Send of message did not complete.");
                }
            }
        }, 0L, 5L, TimeUnit.SECONDS);
        Assert.assertTrue("Should have sent all messages", countDownLatch.await(2L, TimeUnit.MINUTES));
        newSingleThreadScheduledExecutor.shutdownNow();
        Assert.assertTrue(newSingleThreadScheduledExecutor.awaitTermination(30L, TimeUnit.SECONDS));
        Assert.assertTrue("Should clean out the scheduler store", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.broker.scheduler.JobSchedulerRedliveryPluginDLQStoreCleanupTest.3
            public boolean isSatisified() throws Exception {
                return JobSchedulerRedliveryPluginDLQStoreCleanupTest.this.getNumJournalFiles() == 1;
            }
        }));
    }

    private int getNumJournalFiles() throws IOException {
        return this.store.getJournal().getFileMap().size();
    }
}
