package org.apache.activemq.usecases;

import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.net.URI;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.util.IdGenerator;

/* loaded from: input_file:org/apache/activemq/usecases/ReliableReconnectTest.class */
public class ReliableReconnectTest extends org.apache.activemq.TestSupport {
    protected static final int MESSAGE_COUNT = 100;
    protected static final String DEFAULT_BROKER_URL = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL;
    private static final int RECEIVE_TIMEOUT = 10000;
    protected String consumerClientId;
    protected Destination destination;
    protected BrokerService broker;
    protected int deliveryMode = 2;
    protected AtomicBoolean closeBroker = new AtomicBoolean(false);
    protected AtomicInteger messagesReceived = new AtomicInteger(0);
    protected int firstBatch = 10;
    private IdGenerator idGen = new IdGenerator();

    protected void setUp() throws Exception {
        setAutoFail(true);
        this.consumerClientId = this.idGen.generateId();
        super.setUp();
        this.topic = true;
        this.destination = createDestination(getClass().getName());
    }

    protected void tearDown() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
        }
    }

    @Override // org.apache.activemq.TestSupport
    public ActiveMQConnectionFactory getConnectionFactory() throws Exception {
        return new ActiveMQConnectionFactory();
    }

    protected void startBroker(boolean z) throws JMSException {
        try {
            this.broker = BrokerFactory.createBroker(new URI("broker://()/localhost"));
            this.broker.setUseShutdownHook(false);
            this.broker.setDeleteAllMessagesOnStartup(z);
            this.broker.setUseJmx(false);
            this.broker.addConnector(DEFAULT_BROKER_URL);
            this.broker.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    protected Connection createConsumerConnection() throws Exception {
        Connection createConnection = getConnectionFactory().createConnection();
        createConnection.setClientID(this.consumerClientId);
        createConnection.start();
        return createConnection;
    }

    protected MessageConsumer createConsumer(Connection connection) throws Exception {
        return connection.createSession(false, 1).createDurableSubscriber(this.destination, "TestFred");
    }

    protected void spawnConsumer() {
        new Thread(new Runnable() { // from class: org.apache.activemq.usecases.ReliableReconnectTest.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    Connection createConsumerConnection = ReliableReconnectTest.this.createConsumerConnection();
                    MessageConsumer createConsumer = ReliableReconnectTest.this.createConsumer(createConsumerConnection);
                    for (int i = 0; i < ReliableReconnectTest.this.firstBatch; i++) {
                        if (createConsumer.receive(DurableSubProcessWithRestartTest.BROKER_RESTART) != null) {
                            ReliableReconnectTest.this.messagesReceived.incrementAndGet();
                        }
                    }
                    synchronized (ReliableReconnectTest.this.closeBroker) {
                        ReliableReconnectTest.this.closeBroker.set(true);
                        ReliableReconnectTest.this.closeBroker.notify();
                    }
                    Thread.sleep(2000L);
                    for (int i2 = ReliableReconnectTest.this.firstBatch; i2 < 100; i2++) {
                        if (createConsumer.receive(DurableSubProcessWithRestartTest.BROKER_RESTART) != null) {
                            ReliableReconnectTest.this.messagesReceived.incrementAndGet();
                        }
                    }
                    createConsumerConnection.close();
                    synchronized (ReliableReconnectTest.this.messagesReceived) {
                        ReliableReconnectTest.this.messagesReceived.notify();
                    }
                } catch (Throwable th) {
                    th.printStackTrace();
                }
            }
        }).start();
    }

    public void testReconnect() throws Exception {
        startBroker(true);
        Connection createConsumerConnection = createConsumerConnection();
        createConsumer(createConsumerConnection);
        createConsumerConnection.close();
        Connection createConnection = createConnection();
        createConnection.setClientID(this.idGen.generateId());
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(this.destination);
        TextMessage createTextMessage = createSession.createTextMessage();
        for (int i = 0; i < 100; i++) {
            createTextMessage.setText("msg: " + i);
            createProducer.send(createTextMessage);
        }
        createConnection.close();
        spawnConsumer();
        synchronized (this.closeBroker) {
            if (!this.closeBroker.get()) {
                this.closeBroker.wait();
            }
        }
        this.broker.stop();
        startBroker(false);
        synchronized (this.messagesReceived) {
            if (this.messagesReceived.get() < 100) {
                this.messagesReceived.wait(60000L);
            }
        }
        int i2 = this.messagesReceived.get();
        assertTrue("Not enough messages received: " + i2, i2 > this.firstBatch);
    }
}
