package org.apache.activemq.usecases;

import jakarta.jms.Connection;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageListener;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import jakarta.jms.TopicSubscriber;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.Date;
import java.util.Vector;
import javax.management.MBeanServerConnection;
import javax.management.MBeanServerInvocationHandler;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.management.QueryExp;
import junit.framework.Test;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/usecases/TopicDurableConnectStatsTest.class */
public class TopicDurableConnectStatsTest extends org.apache.activemq.TestSupport {
    private BrokerService broker;
    private ActiveMQTopic topic;
    protected MBeanServerConnection mbeanServer;
    private static final Logger LOG = LoggerFactory.getLogger(TopicDurableConnectStatsTest.class);
    private static Session session2 = null;
    private final Vector<Throwable> exceptions = new Vector<>();
    private final int messageSize = 4000;
    protected String domain = "org.apache.activemq";
    private ActiveMQConnectionFactory connectionFactory = null;
    final int numMessages = 20;

    /* loaded from: input_file:org/apache/activemq/usecases/TopicDurableConnectStatsTest$Listener.class */
    public static class Listener implements MessageListener {
        int count = 0;
        String id = null;

        Listener() {
        }

        public void onMessage(Message message) {
            this.count++;
            try {
                TopicDurableConnectStatsTest.session2.commit();
            } catch (JMSException e) {
                e.printStackTrace();
            }
            if (this.id != null) {
                try {
                    TopicDurableConnectStatsTest.LOG.info(this.id + ", " + message.getJMSMessageID());
                } catch (Exception e2) {
                }
            }
            try {
                Thread.sleep(2L);
            } catch (InterruptedException e3) {
                e3.printStackTrace();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.TestSupport
    public ActiveMQConnectionFactory createConnectionFactory() throws Exception {
        this.connectionFactory = new ActiveMQConnectionFactory("vm://" + getName(true));
        ActiveMQPrefetchPolicy activeMQPrefetchPolicy = new ActiveMQPrefetchPolicy();
        activeMQPrefetchPolicy.setAll(10);
        this.connectionFactory.setPrefetchPolicy(activeMQPrefetchPolicy);
        this.connectionFactory.setWatchTopicAdvisories(false);
        return this.connectionFactory;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.TestSupport
    public Connection createConnection() throws Exception {
        return createConnection("cliName");
    }

    protected Connection createConnection(String str) throws Exception {
        Connection createConnection = super.createConnection();
        createConnection.setClientID(str);
        createConnection.start();
        return createConnection;
    }

    public static Test suite() {
        return suite(TopicDurableConnectStatsTest.class);
    }

    protected void setUp() throws Exception {
        this.exceptions.clear();
        this.topic = createDestination();
        createBroker();
        this.mbeanServer = ManagementFactory.getPlatformMBeanServer();
        super.setUp();
    }

    protected void tearDown() throws Exception {
        super.tearDown();
        destroyBroker();
    }

    private void createBroker() throws Exception {
        createBroker(true);
    }

    private void createBroker(boolean z) throws Exception {
        this.broker = BrokerFactory.createBroker("broker:(vm://" + getName(true) + ")");
        this.broker.setBrokerName(getName(true));
        this.broker.setDeleteAllMessagesOnStartup(z);
        this.broker.setAdvisorySupport(false);
        this.broker.addConnector("tcp://0.0.0.0:0");
        setDefaultPersistenceAdapter(this.broker);
        this.broker.start();
    }

    private void destroyBroker() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
        }
    }

    protected ObjectName assertRegisteredObjectName(String str) throws MalformedObjectNameException, NullPointerException {
        ObjectName objectName = new ObjectName(str);
        LOG.info("** Looking for " + str);
        try {
            if (this.mbeanServer.isRegistered(objectName)) {
                LOG.info("Bean Registered: " + objectName);
            } else {
                LOG.info("Couldn't find Mbean! " + objectName);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return objectName;
    }

    public void testPendingTopicStat() throws Exception {
        Connection createConnection = createConnection("cliId1");
        Session createSession = createConnection.createSession(true, 1);
        assertNotNull(createSession.createDurableSubscriber(this.topic, "SubsId", "filter = 'true'", true));
        DurableSubscriptionViewMBean durableSubscriptionViewMBean = (DurableSubscriptionViewMBean) MBeanServerInvocationHandler.newProxyInstance(this.mbeanServer, (ObjectName) this.mbeanServer.queryNames(new ObjectName(this.domain + ":type=Broker,brokerName=" + getName(true) + ",destinationType=Topic,destinationName=" + this.topic.getTopicName() + ",endpoint=Consumer,clientId=cliId1,consumerId=*"), (QueryExp) null).iterator().next(), DurableSubscriptionViewMBean.class, true);
        LOG.info("Beginning Pending Queue Size count: " + durableSubscriptionViewMBean.getPendingQueueSize());
        LOG.info("Prefetch Limit: " + durableSubscriptionViewMBean.getPrefetchSize());
        assertEquals("no pending", 0, durableSubscriptionViewMBean.getPendingQueueSize());
        assertEquals("Prefetch Limit ", 10, durableSubscriptionViewMBean.getPrefetchSize());
        Connection createConnection2 = createConnection("x");
        Session createSession2 = createConnection2.createSession(true, 1);
        MessageProducer createProducer = createSession2.createProducer(this.topic);
        createProducer.setDeliveryMode(2);
        int i = 0;
        while (i < 20) {
            if (i == 15) {
                LOG.info("Killing consumer at 15");
                createSession.close();
                createConnection.close();
            }
            TextMessage createTextMessage = createSession2.createTextMessage(createMessageText(i));
            createTextMessage.setJMSExpiration(0L);
            createTextMessage.setStringProperty("filter", "true");
            createProducer.send(this.topic, createTextMessage);
            createSession2.commit();
            i++;
        }
        LOG.info("Sent " + i + " messages in total");
        createConnection2.close();
        LOG.info("Pending Queue Size count: " + durableSubscriptionViewMBean.getPendingQueueSize());
        assertEquals("pending as expected", 20, durableSubscriptionViewMBean.getPendingQueueSize());
        LOG.info("Re-connect client and consume messages");
        Connection createConnection3 = createConnection("cliId1");
        session2 = createConnection3.createSession(true, 1);
        TopicSubscriber createDurableSubscriber = session2.createDurableSubscriber(this.topic, "SubsId", "filter = 'true'", true);
        final Listener listener = new Listener();
        createDurableSubscriber.setMessageListener(listener);
        assertTrue("received all sent", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.TopicDurableConnectStatsTest.1
            public boolean isSatisified() throws Exception {
                return 20 == listener.count;
            }
        }));
        LOG.info("Received: " + listener.count);
        int pendingQueueSize = durableSubscriptionViewMBean.getPendingQueueSize();
        LOG.info("Pending Queue Size count: " + pendingQueueSize);
        assertEquals("Pending queue after consumed", 0, pendingQueueSize);
        session2.close();
        createConnection3.close();
        LOG.info("FINAL Pending Queue Size count (after consumer close): " + durableSubscriptionViewMBean.getPendingQueueSize());
    }

    private String createMessageText(int i) {
        StringBuffer stringBuffer = new StringBuffer(4000);
        stringBuffer.append("Message: " + i + " sent at: " + new Date());
        if (stringBuffer.length() > 4000) {
            return stringBuffer.substring(0, 4000);
        }
        for (int length = stringBuffer.length(); length < 4000; length++) {
            stringBuffer.append(' ');
        }
        return stringBuffer.toString();
    }
}
