/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageProducer;
import jakarta.jms.ResourceAllocationException;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import jakarta.jms.Topic;
import jakarta.jms.TopicSubscriber;
import java.io.File;
import java.io.FilenameFilter;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.commons.lang.StringUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AMQ7118Test {
    protected static final Logger LOG = LoggerFactory.getLogger(AMQ7118Test.class);
    protected static Random r = new Random();
    static final String WIRE_LEVEL_ENDPOINT = "tcp://localhost:61616";
    protected BrokerService broker;
    protected Connection producerConnection;
    protected Session pSession;
    protected Connection cConnection;
    protected Session cSession;
    private final String xbean = "xbean:";
    private final String confBase = "src/test/resources/org/apache/activemq/bugs/amq7118";
    int checkpointIndex = 0;
    private static final ActiveMQConnectionFactory ACTIVE_MQ_CONNECTION_FACTORY = new ActiveMQConnectionFactory("tcp://localhost:61616");

    @Before
    public void setup() throws Exception {
        this.deleteData(new File("target/data"));
        this.createBroker();
        ACTIVE_MQ_CONNECTION_FACTORY.setConnectionIDPrefix("bla");
    }

    @After
    public void shutdown() throws Exception {
        this.broker.stop();
    }

    public void setupProducerConnection() throws Exception {
        this.producerConnection = ACTIVE_MQ_CONNECTION_FACTORY.createConnection();
        this.producerConnection.start();
        this.pSession = this.producerConnection.createSession(false, 1);
    }

    public void setupConsumerConnection() throws Exception {
        this.cConnection = ACTIVE_MQ_CONNECTION_FACTORY.createConnection();
        this.cConnection.setClientID("myClient1");
        this.cConnection.start();
        this.cSession = this.cConnection.createSession(false, 1);
    }

    private void createBroker() throws Exception {
        this.broker = new BrokerService();
        this.broker = BrokerFactory.createBroker((String)"xbean:src/test/resources/org/apache/activemq/bugs/amq7118/activemq.xml");
        this.broker.start();
    }

    @Test(timeout=90000L)
    public void testCompaction() throws Exception {
        CountDownLatch latch = new CountDownLatch(1);
        this.setupProducerConnection();
        this.setupConsumerConnection();
        Topic topic = this.pSession.createTopic("test");
        TopicSubscriber consumer = this.cSession.createDurableSubscriber(topic, "clientId1");
        LOG.info("Produce message to test topic");
        AMQ7118Test.produce(this.pSession, topic, 1, 512);
        LOG.info("Consume message from test topic");
        Message msg = consumer.receive(5000L);
        Assert.assertNotNull((Object)msg);
        LOG.info("Produce more messages to test topic and get into PFC");
        boolean sent = AMQ7118Test.produce(this.cSession, topic, 20, 524288);
        Assert.assertFalse((String)"Never got to PFC condition", (boolean)sent);
        LOG.info("PFC hit");
        this.producerConnection.close();
        this.checkFiles(false, 21, "db-21.log");
        this.checkFiles(true, 23, "db-23.log");
        this.checkFiles(true, 23, "db-23.log");
        this.checkFiles(true, 23, "db-23.log");
        LOG.info("Consuming the rest of the files...");
        for (int i = 0; i < 20; ++i) {
            msg = consumer.receive(5000L);
        }
        LOG.info("All messages Consumed.");
        this.checkFiles(true, 2, "db-30.log");
        this.checkFiles(true, 3, "db-31.log");
        this.checkFiles(true, 2, "db-31.log");
        this.checkFiles(true, 2, "db-31.log");
        this.checkFiles(true, 2, "db-31.log");
        this.broker.stop();
        this.broker.waitUntilStopped();
    }

    protected static boolean produce(Session session, Topic topic, int messageCount, int messageSize) throws JMSException {
        MessageProducer producer = session.createProducer((Destination)topic);
        for (int i = 0; i < messageCount; ++i) {
            TextMessage helloMessage = session.createTextMessage(StringUtils.repeat((String)"a", (int)messageSize));
            try {
                producer.send((Message)helloMessage);
                continue;
            }
            catch (ResourceAllocationException e) {
                return false;
            }
        }
        return true;
    }

    private void deleteData(File file) {
        String[] entries = file.list();
        if (entries == null) {
            return;
        }
        for (String s : entries) {
            File currentFile = new File(file.getPath(), s);
            if (currentFile.isDirectory()) {
                this.deleteData(currentFile);
            }
            currentFile.delete();
        }
        file.delete();
    }

    private void checkFiles(boolean doCheckpoint, int expectedCount, String lastFileName) throws Exception {
        File dbfiles = new File("target/data/kahadb");
        FilenameFilter lff = new FilenameFilter(){

            @Override
            public boolean accept(File dir, String name) {
                return name.toLowerCase().startsWith("db-") && name.toLowerCase().endsWith("log");
            }
        };
        if (doCheckpoint) {
            LOG.info("Initiating checkpointUpdate " + ++this.checkpointIndex + " ...");
            this.broker.getPersistenceAdapter().checkpoint(true);
            TimeUnit.SECONDS.sleep(4L);
            LOG.info("Checkpoint complete.");
        }
        File[] files = dbfiles.listFiles(lff);
        Arrays.sort(files, new DBFileComparator());
        this.logfiles(files);
        while (files.length != expectedCount) {
            TimeUnit.SECONDS.sleep(1L);
        }
        Assert.assertEquals((long)expectedCount, (long)files.length);
        Assert.assertEquals((Object)lastFileName, (Object)files[files.length - 1].getName());
    }

    private void logfiles(File[] files) {
        LOG.info("Files found in KahaDB:");
        for (File file : files) {
            LOG.info("    " + file.getName());
        }
    }

    private class DBFileComparator
    implements Comparator<File> {
        private DBFileComparator() {
        }

        @Override
        public int compare(File o1, File o2) {
            int n1 = this.extractNumber(o1.getName());
            int n2 = this.extractNumber(o2.getName());
            return n1 - n2;
        }

        private int extractNumber(String name) {
            int i = 0;
            try {
                int s = name.indexOf(45) + 1;
                int e = name.lastIndexOf(46);
                String number = name.substring(s, e);
                i = Integer.parseInt(number);
            }
            catch (Exception e) {
                i = 0;
            }
            return i;
        }
    }
}

