package org.apache.activemq.bugs;

import java.io.InterruptedIOException;
import java.net.URI;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.log4j.Appender;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.Level;
import org.apache.log4j.spi.LoggingEvent;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/bugs/AMQ5426Test.class */
public class AMQ5426Test {
    private static final Logger LOG = LoggerFactory.getLogger(AMQ5426Test.class);
    private BrokerService brokerService;
    private String connectionUri;
    private AtomicBoolean hasFailureInProducer = new AtomicBoolean(false);
    private Thread producerThread;
    private AtomicBoolean hasErrorInLogger;
    private Appender errorDetectorAppender;

    protected ConnectionFactory createConnectionFactory() throws Exception {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(this.connectionUri);
        activeMQConnectionFactory.setWatchTopicAdvisories(false);
        activeMQConnectionFactory.setOptimizeAcknowledge(true);
        return activeMQConnectionFactory;
    }

    @Before
    public void setUp() throws Exception {
        this.hasFailureInProducer = new AtomicBoolean(false);
        this.hasErrorInLogger = new AtomicBoolean(false);
        this.brokerService = BrokerFactory.createBroker(new URI("broker://()/localhost?persistent=false&useJmx=true"));
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setTopicPrefetch(100);
        new PolicyMap().setDefaultEntry(policyEntry);
        this.brokerService.addConnector("tcp://0.0.0.0:0");
        this.brokerService.start();
        this.connectionUri = this.brokerService.getTransportConnectorByScheme("tcp").getPublishableConnectString();
        this.errorDetectorAppender = new AppenderSkeleton() { // from class: org.apache.activemq.bugs.AMQ5426Test.1
            public void close() {
            }

            public boolean requiresLayout() {
                return false;
            }

            protected void append(LoggingEvent loggingEvent) {
                if (loggingEvent.getLevel().isGreaterOrEqual(Level.ERROR)) {
                    AMQ5426Test.this.hasErrorInLogger.set(true);
                }
            }
        };
        org.apache.log4j.Logger.getRootLogger().addAppender(this.errorDetectorAppender);
        this.producerThread = new Thread(new Runnable() { // from class: org.apache.activemq.bugs.AMQ5426Test.2
            @Override // java.lang.Runnable
            public void run() {
                try {
                    Connection createConnection = AMQ5426Test.this.createConnectionFactory().createConnection();
                    createConnection.start();
                    Session createSession = createConnection.createSession(false, 1);
                    Topic createTopic = createSession.createTopic("test.AMQ5426");
                    AMQ5426Test.LOG.debug("Created topic: {}", createTopic);
                    MessageProducer createProducer = createSession.createProducer(createTopic);
                    createProducer.setDeliveryMode(1);
                    createProducer.setTimeToLive(1000L);
                    AMQ5426Test.LOG.debug("Created producer: {}", createProducer);
                    int i = 1;
                    while (!Thread.interrupted()) {
                        try {
                            createProducer.send(createSession.createTextMessage(" testMessage " + i));
                            try {
                                Thread.sleep(0L, 100);
                            } catch (InterruptedException e) {
                                Thread.currentThread().interrupt();
                            }
                            AMQ5426Test.LOG.debug("message sent: {}", Integer.valueOf(i));
                            i++;
                        } catch (JMSException e2) {
                            if (e2.getCause() == null || !(e2.getCause() instanceof InterruptedIOException)) {
                                throw e2;
                            }
                        }
                    }
                    createProducer.close();
                    createSession.close();
                    createConnection.close();
                } catch (Exception e3) {
                    AMQ5426Test.LOG.error(e3.getMessage(), e3);
                    AMQ5426Test.this.hasFailureInProducer.set(true);
                }
            }
        });
        this.producerThread.start();
    }

    @Test(timeout = 120000)
    public void testConsumerProperlyClosedWithoutError() throws Exception {
        Random random = new Random();
        for (int i = 0; i < 1000; i++) {
            final AtomicInteger atomicInteger = new AtomicInteger(0);
            LOG.info("Starting run {} of {}", Integer.valueOf(i), 1000);
            Connection createConnection = createConnectionFactory().createConnection();
            createConnection.start();
            Session createSession = createConnection.createSession(false, 3);
            Topic createTopic = createSession.createTopic("test.AMQ5426");
            LOG.debug("Created topic: {}", createTopic);
            MessageConsumer createConsumer = createSession.createConsumer(createTopic);
            createConsumer.setMessageListener(new MessageListener() { // from class: org.apache.activemq.bugs.AMQ5426Test.3
                public void onMessage(Message message) {
                    AMQ5426Test.LOG.debug("Received message");
                    atomicInteger.getAndIncrement();
                }
            });
            LOG.debug("Created consumer: {}", createConsumer);
            try {
                Thread.sleep(random.nextInt(5) + 1);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            LOG.debug("Closing consumer");
            createConsumer.close();
            createSession.close();
            createConnection.close();
            Assert.assertFalse("Exception in Producer Thread", this.hasFailureInProducer.get());
            Assert.assertFalse("Error detected in Logger", this.hasErrorInLogger.get());
            LOG.info("Run {} of {} completed, message received: {}", new Object[]{Integer.valueOf(i), 1000, Integer.valueOf(atomicInteger.get())});
        }
    }

    @After
    public void tearDown() throws Exception {
        LOG.info("Shutdown producer thread");
        this.producerThread.interrupt();
        this.producerThread.join();
        this.brokerService.stop();
        this.brokerService.waitUntilStopped();
        Assert.assertFalse("Exception in Producer Thread", this.hasFailureInProducer.get());
        Assert.assertFalse("Error detected in Logger", this.hasErrorInLogger.get());
    }
}
