/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageListener;
import jakarta.jms.MessageProducer;
import jakarta.jms.Queue;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.io.File;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBStore;
import org.apache.activemq.store.kahadb.disk.journal.Journal;
import org.apache.activemq.util.IOHelper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AMQ2512Test {
    private static final Logger LOG = LoggerFactory.getLogger(AMQ2512Test.class);
    private final String QUEUE_NAME = "dee.q";
    private final int INITIAL_MESSAGES_CNT = 1000;
    private final int WORKER_INTERNAL_ITERATIONS = 100;
    private final int TOTAL_MESSAGES_CNT = 101000;
    private final byte[] payload = new byte[5120];
    private final String TEXT = new String(this.payload);
    private final String PRP_INITIAL_ID = "initial-id";
    private final String PRP_WORKER_ID = "worker-id";
    private final CountDownLatch LATCH = new CountDownLatch(101000);
    private final AtomicInteger ON_MSG_COUNTER = new AtomicInteger();
    private BrokerService brokerService;
    private Connection connection;
    private String connectionURI;

    @Test(timeout=300000L)
    public void testKahaDBFailure() throws Exception {
        int i;
        ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(this.connectionURI);
        this.connection = fac.createConnection();
        Session session = this.connection.createSession(false, 1);
        Queue queue = session.createQueue("dee.q");
        MessageProducer producer = session.createProducer((Destination)queue);
        producer.setDeliveryMode(2);
        this.connection.start();
        long startTime = System.nanoTime();
        ArrayList<Consumer> consumers = new ArrayList<Consumer>();
        for (i = 0; i < 20; ++i) {
            consumers.add(new Consumer("worker-" + i));
        }
        for (i = 0; i < 1000; ++i) {
            TextMessage msg = session.createTextMessage(this.TEXT);
            msg.setStringProperty("initial-id", "initial-" + i);
            producer.send((Message)msg);
        }
        this.LATCH.await();
        long endTime = System.nanoTime();
        LOG.info("Total execution time = " + TimeUnit.MILLISECONDS.convert(endTime - startTime, TimeUnit.NANOSECONDS) + " [ms].");
        LOG.info("Rate = " + 101000L / TimeUnit.SECONDS.convert(endTime - startTime, TimeUnit.NANOSECONDS) + " [msg/s].");
        for (Consumer c : consumers) {
            c.close();
        }
        this.connection.close();
    }

    @Before
    public void setUp() throws Exception {
        this.brokerService = this.createBroker();
        this.brokerService.start();
        this.connectionURI = this.brokerService.getTransportConnectorByName("openwire").getPublishableConnectString();
    }

    @After
    public void tearDown() throws Exception {
        if (this.brokerService != null) {
            this.brokerService.stop();
            this.brokerService.waitUntilStopped();
        }
    }

    protected BrokerService createBroker() throws Exception {
        File dataFileDir = new File("target/test-amq-2512/datadb");
        IOHelper.mkdirs((File)dataFileDir);
        IOHelper.deleteChildren((File)dataFileDir);
        KahaDBStore kaha = new KahaDBStore();
        kaha.setDirectory(dataFileDir);
        kaha.setJournalDiskSyncStrategy(Journal.JournalDiskSyncStrategy.NEVER.name());
        BrokerService answer = new BrokerService();
        answer.setPersistenceAdapter((PersistenceAdapter)kaha);
        answer.setDataDirectoryFile(dataFileDir);
        answer.setUseJmx(false);
        answer.addConnector("tcp://localhost:0").setName("openwire");
        return answer;
    }

    private final class Consumer
    implements MessageListener {
        private final String name;
        private final Session session;
        private final MessageProducer producer;

        private Consumer(String name) {
            this.name = name;
            try {
                this.session = AMQ2512Test.this.connection.createSession(false, 2);
                Queue queue = this.session.createQueue("dee.q?consumer.prefetchSize=10");
                this.producer = this.session.createProducer((Destination)queue);
                this.producer.setDeliveryMode(2);
                MessageConsumer consumer = this.session.createConsumer((Destination)queue);
                consumer.setMessageListener((MessageListener)this);
            }
            catch (JMSException e) {
                e.printStackTrace();
                throw new RuntimeException(e);
            }
        }

        public void onMessage(Message message) {
            TextMessage msg = (TextMessage)message;
            try {
                if (!msg.propertyExists("worker-id")) {
                    for (int i = 0; i < 100; ++i) {
                        TextMessage newMsg = this.session.createTextMessage(msg.getText());
                        newMsg.setStringProperty("worker-id", this.name + "-" + i);
                        newMsg.setStringProperty("initial-id", msg.getStringProperty("initial-id"));
                        this.producer.send((Message)newMsg);
                    }
                }
                msg.acknowledge();
            }
            catch (JMSException e) {
                e.printStackTrace();
                throw new RuntimeException(e);
            }
            finally {
                int onMsgCounter = AMQ2512Test.this.ON_MSG_COUNTER.getAndIncrement();
                if (onMsgCounter % 1000 == 0) {
                    LOG.info("message received: " + onMsgCounter);
                }
                AMQ2512Test.this.LATCH.countDown();
            }
        }

        private void close() {
            if (this.session != null) {
                try {
                    this.session.close();
                }
                catch (JMSException e) {
                    e.printStackTrace();
                    throw new RuntimeException(e);
                }
            }
        }
    }
}

