/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.broker.jmx;

import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.Destination;
import jakarta.jms.Message;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.management.MBeanServer;
import javax.management.MBeanServerInvocationHandler;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConcurrentMoveTest
extends EmbeddedBrokerTestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(ConcurrentMoveTest.class);
    protected MBeanServer mbeanServer;
    protected String domain = "org.apache.activemq";
    protected Connection connection;
    protected boolean transacted;
    protected int authMode = 1;
    protected int messageCount = 2000;

    public void testConcurrentMove() throws Exception {
        this.connection = this.connectionFactory.createConnection();
        this.connection.start();
        Session session = this.connection.createSession(this.transacted, this.authMode);
        this.destination = this.createDestination();
        MessageProducer producer = session.createProducer((Destination)this.destination);
        for (int i = 0; i < this.messageCount; ++i) {
            TextMessage message = session.createTextMessage("Message: " + i);
            producer.send((Message)message);
        }
        long usageBeforMove = this.broker.getPersistenceAdapter().size();
        LOG.info("Store usage:" + usageBeforMove);
        Object objectNameStr = this.broker.getBrokerObjectName().toString();
        objectNameStr = (String)objectNameStr + ",destinationType=Queue,destinationName=" + this.getDestinationString();
        ObjectName queueViewMBeanName = this.assertRegisteredObjectName((String)objectNameStr);
        final QueueViewMBean proxy = MBeanServerInvocationHandler.newProxyInstance(this.mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
        final ActiveMQQueue to = new ActiveMQQueue("TO");
        ((RegionBroker)this.broker.getRegionBroker()).addDestination(this.broker.getAdminConnectionContext(), (ActiveMQDestination)to, false);
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 50; ++i) {
            executorService.execute(new Runnable(){

                @Override
                public void run() {
                    try {
                        proxy.moveMatchingMessagesTo(null, to.getPhysicalName());
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        executorService.shutdown();
        executorService.awaitTermination(5L, TimeUnit.MINUTES);
        long count = proxy.getQueueSize();
        ConcurrentMoveTest.assertEquals((String)"Queue size", (long)count, (long)0L);
        ConcurrentMoveTest.assertEquals((String)"Browse size", (int)proxy.browseMessages().size(), (int)0);
        objectNameStr = this.broker.getBrokerObjectName().toString();
        objectNameStr = (String)objectNameStr + ",destinationType=Queue,destinationName=" + to.getQueueName();
        queueViewMBeanName = this.assertRegisteredObjectName((String)objectNameStr);
        QueueViewMBean toProxy = MBeanServerInvocationHandler.newProxyInstance(this.mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
        count = toProxy.getQueueSize();
        ConcurrentMoveTest.assertEquals((String)"Queue size", (long)count, (long)this.messageCount);
        long usageAfterMove = this.broker.getPersistenceAdapter().size();
        LOG.info("Store usage, before: " + usageBeforMove + ", after:" + usageAfterMove);
        LOG.info("Store size increase:" + FileUtils.byteCountToDisplaySize((long)(usageAfterMove - usageBeforMove)));
        ConcurrentMoveTest.assertTrue((String)"Usage not more than doubled", (usageAfterMove < usageBeforMove * 3L ? 1 : 0) != 0);
        producer.close();
    }

    protected ObjectName assertRegisteredObjectName(String name) throws MalformedObjectNameException, NullPointerException {
        ObjectName objectName = new ObjectName(name);
        if (this.mbeanServer.isRegistered(objectName)) {
            LOG.info("Bean Registered: " + String.valueOf(objectName));
        } else {
            ConcurrentMoveTest.fail((String)("Could not find MBean!: " + String.valueOf(objectName)));
        }
        return objectName;
    }

    @Override
    protected void setUp() throws Exception {
        this.bindAddress = "tcp://localhost:0";
        this.useTopic = false;
        super.setUp();
        this.mbeanServer = this.broker.getManagementContext().getMBeanServer();
    }

    @Override
    protected void tearDown() throws Exception {
        if (this.connection != null) {
            this.connection.close();
            this.connection = null;
        }
        super.tearDown();
    }

    @Override
    protected BrokerService createBroker() throws Exception {
        BrokerService answer = new BrokerService();
        answer.setUseJmx(true);
        answer.setEnableStatistics(true);
        answer.addConnector(this.bindAddress);
        ((KahaDBPersistenceAdapter)answer.getPersistenceAdapter()).setConcurrentStoreAndDispatchQueues(false);
        answer.deleteAllMessages();
        return answer;
    }

    @Override
    protected ConnectionFactory createConnectionFactory() throws Exception {
        return new ActiveMQConnectionFactory(((TransportConnector)this.broker.getTransportConnectors().get(0)).getPublishableConnectString());
    }

    @Override
    protected String getDestinationString() {
        return ((Object)((Object)this)).getClass().getName() + "." + this.getName(true);
    }
}

