package org.apache.activemq.usecases;

import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import java.io.File;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.lang.management.ManagementFactory;
import java.util.Date;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.store.kahadb.KahaDBStore;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/usecases/DurableSubscriberNonPersistentMessageTest.class */
public class DurableSubscriberNonPersistentMessageTest extends TestCase {
    private final Logger LOG;
    private String brokerURL;
    private String consumerBrokerURL;
    int initialMaxMsgs;
    int cleanupMsgCount;
    int totalMsgCount;
    int totalMsgReceived;
    int sleep;
    int reconnectSleep;
    int messageTimeout;
    int messageSize;
    long ttl;
    static String clientId = "Jason";
    MBeanServer mbeanServer;
    BrokerService broker;

    /* loaded from: input_file:org/apache/activemq/usecases/DurableSubscriberNonPersistentMessageTest$Consumer.class */
    public class Consumer {
        private final ConnectionFactory factory;
        private final ActiveMQConnection connection;
        private final Session session;
        private final MessageConsumer messageConsumer;

        public Consumer(String str, String str2, String str3) throws JMSException {
            this.factory = new ActiveMQConnectionFactory(str);
            this.connection = this.factory.createConnection();
            this.connection.setClientID(str3);
            this.connection.start();
            this.connection.getPrefetchPolicy().setAll(15);
            this.session = this.connection.createSession(false, 1);
            this.messageConsumer = this.session.createDurableSubscriber(this.session.createTopic(str2), "MyDurableTopic");
        }

        public void deleteAllMessages() throws JMSException {
            do {
            } while (getMessage(DurableSubProcessConcurrentCommitActivateNoDuplicateTest.SERVER_SLEEP) != null);
        }

        public Message getMessage(int i) throws JMSException {
            return this.messageConsumer.receive(i);
        }

        public void close() throws JMSException {
            if (this.messageConsumer != null) {
                this.messageConsumer.close();
            }
            if (this.session != null) {
                this.session.close();
            }
            if (this.connection != null) {
                this.connection.close();
            }
        }

        public Session getSession() {
            return this.session;
        }
    }

    /* loaded from: input_file:org/apache/activemq/usecases/DurableSubscriberNonPersistentMessageTest$Producer.class */
    public class Producer {
        protected ConnectionFactory factory;
        protected transient Connection connection;
        protected transient Session session;
        protected transient MessageProducer producer;
        protected static final int messageSize = 1024;

        public Producer(String str, String str2, int i, long j) throws JMSException {
            this.factory = new ActiveMQConnectionFactory(str);
            this.connection = this.factory.createConnection();
            this.connection.start();
            this.session = this.connection.createSession(false, 1);
            this.producer = this.session.createProducer(this.session.createTopic(str2));
            this.producer.setDeliveryMode(1);
            if (j > 0) {
                this.producer.setTimeToLive(j);
            }
        }

        public void close() throws JMSException {
            if (this.connection != null) {
                this.connection.close();
            }
        }

        protected void sendMessage() throws JMSException {
            this.producer.send(this.session.createTextMessage("test message"));
        }

        protected void sendMessages(int i) throws JMSException {
            for (int i2 = 0; i2 < i; i2++) {
                this.producer.send(this.session.createTextMessage(createMessageText(i2)));
            }
        }

        private String createMessageText(int i) {
            StringBuffer stringBuffer = new StringBuffer(messageSize);
            stringBuffer.append("Message: " + i + " sent at: " + new Date());
            if (stringBuffer.length() > messageSize) {
                return stringBuffer.substring(0, messageSize);
            }
            for (int length = stringBuffer.length(); length < messageSize; length++) {
                stringBuffer.append(' ');
            }
            return stringBuffer.toString();
        }

        protected void commitTransaction() throws JMSException {
            this.session.commit();
        }
    }

    protected void setUp() throws Exception {
        super.setUp();
        this.broker = new BrokerService();
        TransportConnector addConnector = this.broker.addConnector(JmsMultipleBrokersTestSupport.AUTO_ASSIGN_TRANSPORT);
        KahaDBStore kahaDBStore = new KahaDBStore();
        kahaDBStore.setDirectory(new File("data"));
        this.broker.setPersistenceAdapter(kahaDBStore);
        this.broker.start();
        this.brokerURL = "failover:(" + addConnector.getPublishableConnectString() + ")";
        this.consumerBrokerURL = this.brokerURL + "?jms.prefetchPolicy.all=100";
        this.mbeanServer = ManagementFactory.getPlatformMBeanServer();
    }

    protected void tearDown() throws Exception {
        this.broker.stop();
        super.tearDown();
    }

    public DurableSubscriberNonPersistentMessageTest(String str) {
        super(str);
        this.LOG = LoggerFactory.getLogger(DurableSubscriberNonPersistentMessageTest.class);
        this.initialMaxMsgs = 10;
        this.cleanupMsgCount = 10;
        this.totalMsgCount = this.initialMaxMsgs + this.cleanupMsgCount;
        this.totalMsgReceived = 0;
        this.sleep = DurableSubProcessConcurrentCommitActivateNoDuplicateTest.SERVER_SLEEP;
        this.reconnectSleep = 2000;
        this.messageTimeout = 1000;
        this.messageSize = 1024;
        this.ttl = 0L;
    }

    public static Test suite() {
        return new TestSuite(DurableSubscriberNonPersistentMessageTest.class);
    }

    public void testDurableSubscriberNonPersistentMessage() {
        this.LOG.info("Starting DurableSubscriberNonPersistentMessageTest");
        try {
            createConsumer("TEST", 0);
            Thread.sleep(1000L);
            Producer producer = new Producer(this.brokerURL, "TEST", this.messageSize, this.ttl);
            producer.sendMessages(this.totalMsgCount);
            producer.close();
            this.LOG.info(this.totalMsgCount + " messages sent");
            createConsumer("TEST", this.initialMaxMsgs);
            Thread.sleep(this.reconnectSleep);
            createConsumer("TEST", this.cleanupMsgCount);
            this.LOG.info("Test run on: " + ((String) this.mbeanServer.getAttribute(new ObjectName("org.apache.activemq:brokerName=localhost,type=Broker"), "BrokerVersion")));
            assertTrue("pendingQueueSize should be zero", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.DurableSubscriberNonPersistentMessageTest.1
                public boolean isSatisified() throws Exception {
                    Integer num = (Integer) DurableSubscriberNonPersistentMessageTest.this.mbeanServer.getAttribute(new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,endpoint=Consumer,destinationType=Topic,destinationName=TEST,clientId=Jason,consumerId=Durable(Jason_MyDurableTopic)"), "PendingQueueSize");
                    DurableSubscriberNonPersistentMessageTest.this.LOG.info("pendingQueueSize = " + num);
                    return num.intValue() == 0;
                }
            }));
            assertTrue("cursorMemoryUsage should be zero", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.DurableSubscriberNonPersistentMessageTest.2
                public boolean isSatisified() throws Exception {
                    Long l = (Long) DurableSubscriberNonPersistentMessageTest.this.mbeanServer.getAttribute(new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,endpoint=Consumer,destinationType=Topic,destinationName=TEST,clientId=Jason,consumerId=Durable(Jason_MyDurableTopic)"), "CursorMemoryUsage");
                    DurableSubscriberNonPersistentMessageTest.this.LOG.info("cursorMemoryUsage = " + l);
                    return l.longValue() == 0;
                }
            }));
            assertTrue(this.totalMsgReceived == this.initialMaxMsgs + this.cleanupMsgCount);
        } catch (Exception e) {
            this.LOG.error("Exception Executing DurableSubscriberNonPersistentMessageTest: " + getStackTrace(e));
            fail("Should not throw any exceptions");
        }
    }

    public void createConsumer(String str, int i) {
        int i2 = 0;
        int i3 = 0;
        this.LOG.info("Starting DurableSubscriber");
        try {
            Consumer consumer = new Consumer(this.consumerBrokerURL, str, clientId);
            for (int i4 = 0; i4 < i; i4++) {
                try {
                    Message message = consumer.getMessage(this.messageTimeout);
                    if (message != null) {
                        this.LOG.debug("Received Message: " + message.toString());
                        i2++;
                        this.totalMsgReceived++;
                    } else {
                        this.LOG.debug("message " + i4 + " not received");
                        i3++;
                    }
                    Thread.sleep(this.sleep);
                } catch (InterruptedException e) {
                    this.LOG.debug("Exception: " + e);
                }
            }
            consumer.close();
            this.LOG.info("Consumer Finished");
            this.LOG.info("Received " + i2);
            this.LOG.info("Not Received " + i3);
        } catch (JMSException e2) {
            this.LOG.error("Exception Executing SimpleConsumer: " + getStackTrace(e2));
        }
    }

    public String getStackTrace(Throwable th) {
        StringWriter stringWriter = new StringWriter();
        th.printStackTrace(new PrintWriter(stringWriter));
        return stringWriter.toString();
    }
}
