/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageListener;
import jakarta.jms.ObjectMessage;
import jakarta.jms.Session;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.bugs.MessageSender;
import org.apache.activemq.bugs.Receiver;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.usage.SystemUsage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MissingDataFileTest
extends TestCase {
    private static final Logger LOG = LoggerFactory.getLogger(MissingDataFileTest.class);
    private static int counter = 500;
    private static int hectorToHaloCtr;
    private static int xenaToHaloCtr;
    private static int troyToHaloCtr;
    private static int haloToHectorCtr;
    private static int haloToXenaCtr;
    private static int haloToTroyCtr;
    private final String hectorToHalo = "hectorToHalo";
    private final String xenaToHalo = "xenaToHalo";
    private final String troyToHalo = "troyToHalo";
    private final String haloToHector = "haloToHector";
    private final String haloToXena = "haloToXena";
    private final String haloToTroy = "haloToTroy";
    private BrokerService broker;
    private Connection hectorConnection;
    private Connection xenaConnection;
    private Connection troyConnection;
    private Connection haloConnection;
    private final Object lock = new Object();
    final boolean useTopic = false;
    final boolean useSleep = true;
    protected static final String payload;

    public Connection createConnection() throws JMSException {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        return factory.createConnection();
    }

    public Session createSession(Connection connection, boolean transacted) throws JMSException {
        return connection.createSession(transacted, 1);
    }

    public void startBroker() throws Exception {
        this.broker = new BrokerService();
        this.broker.setDeleteAllMessagesOnStartup(true);
        this.broker.setPersistent(true);
        this.broker.setUseJmx(true);
        this.broker.addConnector("tcp://localhost:61616").setName("Default");
        SystemUsage systemUsage = new SystemUsage();
        systemUsage.getMemoryUsage().setLimit(0xA00000L);
        this.broker.setSystemUsage(systemUsage);
        KahaDBPersistenceAdapter kahaDBPersistenceAdapter = new KahaDBPersistenceAdapter();
        kahaDBPersistenceAdapter.setJournalMaxFileLength(16384);
        kahaDBPersistenceAdapter.setCleanupInterval(500L);
        this.broker.setPersistenceAdapter((PersistenceAdapter)kahaDBPersistenceAdapter);
        this.broker.start();
        LOG.info("Starting broker..");
    }

    public void tearDown() throws Exception {
        this.hectorConnection.close();
        this.xenaConnection.close();
        this.troyConnection.close();
        this.haloConnection.close();
        this.broker.stop();
    }

    public void testForNoDataFoundError() throws Exception {
        this.startBroker();
        this.hectorConnection = this.createConnection();
        Thread hectorThread = this.buildProducer(this.hectorConnection, "hectorToHalo", false, false);
        Receiver hHectorReceiver = new Receiver(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void receive(String s) throws Exception {
                if (++haloToHectorCtr >= counter) {
                    Object object = MissingDataFileTest.this.lock;
                    synchronized (object) {
                        MissingDataFileTest.this.lock.notifyAll();
                    }
                }
                MissingDataFileTest.this.possiblySleep(haloToHectorCtr);
            }
        };
        this.buildReceiver(this.hectorConnection, "haloToHector", false, hHectorReceiver, false);
        this.troyConnection = this.createConnection();
        Thread troyThread = this.buildProducer(this.troyConnection, "troyToHalo");
        Receiver hTroyReceiver = new Receiver(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void receive(String s) throws Exception {
                if (++haloToTroyCtr >= counter) {
                    Object object = MissingDataFileTest.this.lock;
                    synchronized (object) {
                        MissingDataFileTest.this.lock.notifyAll();
                    }
                }
                MissingDataFileTest.this.possiblySleep(haloToTroyCtr);
            }
        };
        this.buildReceiver(this.hectorConnection, "haloToTroy", false, hTroyReceiver, false);
        this.xenaConnection = this.createConnection();
        Thread xenaThread = this.buildProducer(this.xenaConnection, "xenaToHalo");
        Receiver hXenaReceiver = new Receiver(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void receive(String s) throws Exception {
                if (++haloToXenaCtr >= counter) {
                    Object object = MissingDataFileTest.this.lock;
                    synchronized (object) {
                        MissingDataFileTest.this.lock.notifyAll();
                    }
                }
                MissingDataFileTest.this.possiblySleep(haloToXenaCtr);
            }
        };
        this.buildReceiver(this.xenaConnection, "haloToXena", false, hXenaReceiver, false);
        this.haloConnection = this.createConnection();
        final MessageSender hectorSender = this.buildTransactionalProducer("haloToHector", this.haloConnection, false);
        final MessageSender troySender = this.buildTransactionalProducer("haloToTroy", this.haloConnection, false);
        final MessageSender xenaSender = this.buildTransactionalProducer("haloToXena", this.haloConnection, false);
        Receiver hectorReceiver = new Receiver(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void receive(String s) throws Exception {
                troySender.send(payload);
                if (++hectorToHaloCtr >= counter) {
                    Object object = MissingDataFileTest.this.lock;
                    synchronized (object) {
                        MissingDataFileTest.this.lock.notifyAll();
                    }
                    MissingDataFileTest.this.possiblySleep(hectorToHaloCtr);
                }
            }
        };
        Receiver xenaReceiver = new Receiver(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void receive(String s) throws Exception {
                hectorSender.send(payload);
                if (++xenaToHaloCtr >= counter) {
                    Object object = MissingDataFileTest.this.lock;
                    synchronized (object) {
                        MissingDataFileTest.this.lock.notifyAll();
                    }
                }
                MissingDataFileTest.this.possiblySleep(xenaToHaloCtr);
            }
        };
        Receiver troyReceiver = new Receiver(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void receive(String s) throws Exception {
                xenaSender.send(payload);
                if (++troyToHaloCtr >= counter) {
                    Object object = MissingDataFileTest.this.lock;
                    synchronized (object) {
                        MissingDataFileTest.this.lock.notifyAll();
                    }
                }
            }
        };
        this.buildReceiver(this.haloConnection, "hectorToHalo", true, hectorReceiver, false);
        this.buildReceiver(this.haloConnection, "xenaToHalo", true, xenaReceiver, false);
        this.buildReceiver(this.haloConnection, "troyToHalo", true, troyReceiver, false);
        this.haloConnection.start();
        this.troyConnection.start();
        troyThread.start();
        this.xenaConnection.start();
        xenaThread.start();
        this.hectorConnection.start();
        hectorThread.start();
        this.waitForMessagesToBeDelivered();
        MissingDataFileTest.assertEquals((int)hectorToHaloCtr, (int)counter);
        LOG.info("hectorToHalo received " + hectorToHaloCtr + " messages");
        MissingDataFileTest.assertEquals((int)xenaToHaloCtr, (int)counter);
        LOG.info("xenaToHalo received " + xenaToHaloCtr + " messages");
        MissingDataFileTest.assertEquals((int)troyToHaloCtr, (int)counter);
        LOG.info("troyToHalo received " + troyToHaloCtr + " messages");
        MissingDataFileTest.assertEquals((int)haloToHectorCtr, (int)counter);
        LOG.info("haloToHector received " + haloToHectorCtr + " messages");
        MissingDataFileTest.assertEquals((int)haloToXenaCtr, (int)counter);
        LOG.info("haloToXena received " + haloToXenaCtr + " messages");
        MissingDataFileTest.assertEquals((int)haloToTroyCtr, (int)counter);
        LOG.info("haloToTroy received " + haloToTroyCtr + " messages");
    }

    protected void possiblySleep(int count) throws InterruptedException {
        if (count % 100 == 0) {
            Thread.sleep(5000L);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void waitForMessagesToBeDelivered() {
        long maxWaitTime;
        long waitTime = maxWaitTime = (long)(counter * 1000);
        long start = maxWaitTime <= 0L ? 0L : System.currentTimeMillis();
        Object object = this.lock;
        synchronized (object) {
            boolean hasMessages = true;
            while (hasMessages && waitTime >= 0L) {
                try {
                    this.lock.wait(200L);
                }
                catch (InterruptedException e) {
                    LOG.error(e.toString());
                }
                hasMessages = hectorToHaloCtr < counter || xenaToHaloCtr < counter || troyToHaloCtr < counter || haloToHectorCtr < counter || haloToXenaCtr < counter || haloToTroyCtr < counter;
                waitTime = maxWaitTime - (System.currentTimeMillis() - start);
            }
        }
    }

    public MessageSender buildTransactionalProducer(String queueName, Connection connection, boolean isTopic) throws Exception {
        return new MessageSender(queueName, connection, true, isTopic);
    }

    public Thread buildProducer(Connection connection, String queueName) throws Exception {
        return this.buildProducer(connection, queueName, false, false);
    }

    public Thread buildProducer(Connection connection, final String queueName, boolean transacted, boolean isTopic) throws Exception {
        final MessageSender producer = new MessageSender(queueName, connection, transacted, isTopic);
        Thread thread = new Thread(){

            @Override
            public synchronized void run() {
                for (int i = 0; i < counter; ++i) {
                    try {
                        producer.send(payload);
                        continue;
                    }
                    catch (Exception e) {
                        throw new RuntimeException("on " + queueName + " send", e);
                    }
                }
            }
        };
        return thread;
    }

    public void buildReceiver(Connection connection, String queueName, boolean transacted, final Receiver receiver, boolean isTopic) throws Exception {
        final Session session = transacted ? connection.createSession(true, 0) : connection.createSession(false, 1);
        MessageConsumer inputMessageConsumer = session.createConsumer((Destination)(isTopic ? session.createTopic(queueName) : session.createQueue(queueName)));
        MessageListener messageListener = new MessageListener(){

            public void onMessage(Message message) {
                try {
                    ObjectMessage objectMessage = (ObjectMessage)message;
                    String s = (String)((Object)objectMessage.getObject());
                    receiver.receive(s);
                    if (session.getTransacted()) {
                        session.commit();
                    }
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };
        inputMessageConsumer.setMessageListener(messageListener);
    }

    static {
        payload = new String(new byte[500]);
    }
}

