package org.apache.activemq.bugs;

import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.perf.NetworkedSyncTest;
import org.apache.activemq.util.SocketProxy;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:org/apache/activemq/bugs/DurableConsumerTest.class */
public class DurableConsumerTest extends TestCase {
    private static final Log LOG = LogFactory.getLog(DurableConsumerTest.class);
    private static int COUNT = 10240;
    private static String CONSUMER_NAME = "DURABLE_TEST";
    protected BrokerService broker;
    protected String bindAddress = NetworkedSyncTest.broker1URL;
    protected byte[] payload = new byte[16384];
    protected ConnectionFactory factory;

    public void testConsumer() throws Exception {
        this.factory = createConnectionFactory();
        Connection createConnection = this.factory.createConnection();
        createConnection.setClientID(CONSUMER_NAME);
        Session createSession = createConnection.createSession(false, 1);
        Topic createTopic = createSession.createTopic(getClass().getName());
        createSession.createDurableSubscriber(createTopic, CONSUMER_NAME);
        createConnection.start();
        createConnection.close();
        this.broker.stop();
        this.broker = createBroker(false);
        Connection createConnection2 = this.factory.createConnection();
        Session createSession2 = createConnection2.createSession(false, 1);
        MessageProducer createProducer = createSession2.createProducer(createTopic);
        createConnection2.start();
        for (int i = 0; i < COUNT; i++) {
            BytesMessage createBytesMessage = createSession2.createBytesMessage();
            createBytesMessage.writeBytes(this.payload);
            createProducer.send(createBytesMessage);
            if (i != 0 && i % SocketProxy.ACCEPT_TIMEOUT_MILLIS == 0) {
                LOG.info("Sent msg " + i);
            }
        }
        createConnection2.close();
        this.broker.stop();
        this.broker = createBroker(false);
        Connection createConnection3 = this.factory.createConnection();
        createConnection3.setClientID(CONSUMER_NAME);
        TopicSubscriber createDurableSubscriber = createConnection3.createSession(false, 1).createDurableSubscriber(createTopic, CONSUMER_NAME);
        createConnection3.start();
        for (int i2 = 0; i2 < COUNT; i2++) {
            assertNotNull("Missing message: " + i2, createDurableSubscriber.receive(5000L));
            if (i2 != 0 && i2 % SocketProxy.ACCEPT_TIMEOUT_MILLIS == 0) {
                LOG.info("Received msg " + i2);
            }
        }
        createConnection3.close();
    }

    protected void setUp() throws Exception {
        if (this.broker == null) {
            this.broker = createBroker(true);
        }
        super.setUp();
    }

    protected void tearDown() throws Exception {
        super.tearDown();
        if (this.broker != null) {
            this.broker.stop();
            this.broker = null;
        }
    }

    protected Topic creatTopic(Session session, String str) throws JMSException {
        return session.createTopic(str);
    }

    protected BrokerService createBroker(boolean z) throws Exception {
        BrokerService brokerService = new BrokerService();
        configureBroker(brokerService, z);
        brokerService.start();
        return brokerService;
    }

    protected void configureBroker(BrokerService brokerService, boolean z) throws Exception {
        brokerService.setDeleteAllMessagesOnStartup(z);
        brokerService.addConnector(this.bindAddress);
        brokerService.setUseShutdownHook(false);
    }

    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
        return new ActiveMQConnectionFactory(this.bindAddress);
    }
}
