package org.apache.activemq.usecases;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/usecases/DurableConsumerCloseAndReconnectTest.class */
public class DurableConsumerCloseAndReconnectTest extends org.apache.activemq.test.TestSupport {
    protected static final long RECEIVE_TIMEOUT = 5000;
    private static final Logger LOG = LoggerFactory.getLogger(DurableConsumerCloseAndReconnectTest.class);
    BrokerService brokerService;
    protected Connection connection;
    private Session session;
    private MessageConsumer consumer;
    private MessageProducer producer;
    private Destination destination;
    private int messageCount;
    private String vmConnectorURI;

    protected void setUp() throws Exception {
        createBroker();
        super.setUp();
    }

    protected void tearDown() throws Exception {
        stopBroker();
        super.tearDown();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // 
    /* renamed from: createConnectionFactory, reason: merged with bridge method [inline-methods] */
    public ActiveMQConnectionFactory mo911createConnectionFactory() throws Exception {
        return new ActiveMQConnectionFactory(this.vmConnectorURI);
    }

    protected void createBroker() throws Exception {
        this.brokerService = new BrokerService();
        this.brokerService.setUseJmx(false);
        this.brokerService.setPersistent(false);
        this.brokerService.setPersistenceAdapter(new KahaDBPersistenceAdapter());
        this.brokerService.start();
        this.brokerService.waitUntilStarted();
        this.vmConnectorURI = this.brokerService.getVmConnectorURI().toString();
    }

    protected void stopBroker() throws Exception {
        this.brokerService.stop();
        this.brokerService.waitUntilStopped();
    }

    public void testDurableSubscriberReconnectMultipleTimes() throws Exception {
        Connection createConnection = createConnection();
        createConnection.start();
        makeConsumer(1);
        closeConsumer();
        publish(30);
        int i = 1;
        for (int i2 = 0; i2 < 15; i2++) {
            makeConsumer(1);
            assertTrue("Should have received a message!", this.consumer.receive(RECEIVE_TIMEOUT) != null);
            int i3 = i;
            int i4 = i + 1;
            LOG.info("Received message " + i3);
            assertTrue("Should have received a message!", this.consumer.receive(RECEIVE_TIMEOUT) != null);
            i = i4 + 1;
            LOG.info("Received message " + i4);
            closeConsumer();
        }
        createConnection.close();
    }

    public void testCreateDurableConsumerCloseThenReconnect() throws Exception {
        Connection createConnection = createConnection();
        createConnection.start();
        consumeMessagesDeliveredWhileConsumerClosed();
        createConnection.close();
        consumeMessagesDeliveredWhileConsumerClosed();
    }

    protected void consumeMessagesDeliveredWhileConsumerClosed() throws Exception {
        makeConsumer();
        closeConsumer();
        publish(1);
        Thread.sleep(1000L);
        makeConsumer();
        assertTrue("Should have received a message!", this.consumer.receive(RECEIVE_TIMEOUT) != null);
        closeConsumer();
        LOG.info("Now lets create the consumer again and because we didn't ack, we should get it again");
        makeConsumer();
        Message receive = this.consumer.receive(RECEIVE_TIMEOUT);
        assertTrue("Should have received a message!", receive != null);
        receive.acknowledge();
        closeConsumer();
        LOG.info("Now lets create the consumer again and because we did ack, we should not get it again");
        makeConsumer();
        assertTrue("Should have no more messages left!", this.consumer.receive(2000L) == null);
        closeConsumer();
        LOG.info("Lets publish one more message now");
        publish(1);
        makeConsumer();
        Message receive2 = this.consumer.receive(RECEIVE_TIMEOUT);
        assertTrue("Should have received a message!", receive2 != null);
        receive2.acknowledge();
        closeConsumer();
    }

    protected void publish(int i) throws Exception {
        this.connection = createConnection();
        this.connection.start();
        this.session = this.connection.createSession(false, 2);
        this.destination = createDestination();
        this.producer = this.session.createProducer(this.destination);
        this.producer.setDeliveryMode(2);
        for (int i2 = 0; i2 < i; i2++) {
            Session session = this.session;
            StringBuilder append = new StringBuilder().append("This is a test: ");
            int i3 = this.messageCount;
            this.messageCount = i3 + 1;
            this.producer.send(session.createTextMessage(append.append(i3).toString()));
        }
        this.producer.close();
        this.producer = null;
        closeSession();
    }

    protected Destination createDestination() throws JMSException {
        return isTopic() ? this.session.createTopic(getSubject()) : this.session.createQueue(getSubject());
    }

    protected boolean isTopic() {
        return true;
    }

    protected void closeConsumer() throws JMSException {
        LOG.info("Closing the consumer");
        this.consumer.close();
        this.consumer = null;
        closeSession();
    }

    protected void closeSession() throws JMSException {
        this.session.close();
        this.session = null;
        this.connection.close();
        this.connection = null;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void makeConsumer() throws Exception {
        makeConsumer(2);
    }

    protected void makeConsumer(int i) throws Exception {
        String name = getName();
        String subject = getSubject();
        LOG.info("Creating a durable subscriber for clientID: " + subject + " and durable name: " + name);
        createSession(subject, i);
        this.consumer = createConsumer(name);
    }

    private MessageConsumer createConsumer(String str) throws JMSException {
        return this.destination instanceof Topic ? this.session.createDurableSubscriber(this.destination, str) : this.session.createConsumer(this.destination);
    }

    protected void createSession(String str, int i) throws Exception {
        this.connection = createConnection();
        this.connection.setClientID(str);
        this.connection.start();
        this.session = this.connection.createSession(false, i);
        this.destination = createDestination();
    }
}
