package org.apache.activemq.broker.region;

import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerStoppedException;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.bugs.AMQ4607Test;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.Wait;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.Appender;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.appender.AbstractAppender;
import org.apache.logging.log4j.core.config.Property;
import org.apache.logging.log4j.core.filter.AbstractFilter;
import org.apache.logging.log4j.core.layout.MessageLayout;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/broker/region/DestinationGCStressTest.class */
public class DestinationGCStressTest {
    protected static final Logger logger = LoggerFactory.getLogger(DestinationGCStressTest.class);
    private BrokerService brokerService;

    @Before
    public void setUp() throws Exception {
        this.brokerService = createBroker();
        this.brokerService.start();
        this.brokerService.waitUntilStarted();
    }

    @After
    public void tearDown() throws Exception {
        if (this.brokerService != null) {
            this.brokerService.stop();
            this.brokerService.waitUntilStopped();
        }
    }

    protected BrokerService createBroker() throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setPersistent(false);
        brokerService.setUseJmx(false);
        brokerService.setSchedulePeriodForDestinationPurge(1);
        brokerService.setMaxPurgedDestinationsPerSweep(100);
        brokerService.setAdvisorySupport(false);
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setGcInactiveDestinations(true);
        policyEntry.setInactiveTimeoutBeforeGC(1L);
        PolicyMap policyMap = new PolicyMap();
        policyMap.setDefaultEntry(policyEntry);
        brokerService.setDestinationPolicy(policyMap);
        return brokerService;
    }

    @Test(timeout = 60000)
    public void testClashWithPublishAndGC() throws Exception {
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        final org.apache.logging.log4j.core.Logger logger2 = (org.apache.logging.log4j.core.Logger) org.apache.logging.log4j.core.Logger.class.cast(LogManager.getLogger(RegionBroker.class));
        Appender appender = new AbstractAppender("testAppender", new AbstractFilter() { // from class: org.apache.activemq.broker.region.DestinationGCStressTest.1
        }, new MessageLayout(), false, new Property[0]) { // from class: org.apache.activemq.broker.region.DestinationGCStressTest.2
            public void append(LogEvent logEvent) {
                if (logEvent.getLevel().equals(Level.ERROR) && logEvent.getMessage().getFormattedMessage().startsWith("Failed to remove inactive")) {
                    logger2.info("received unexpected log message: " + logEvent.getMessage());
                    atomicBoolean.set(true);
                }
            }
        };
        appender.start();
        logger2.get().addAppender(appender, Level.DEBUG, new AbstractFilter() { // from class: org.apache.activemq.broker.region.DestinationGCStressTest.3
        });
        logger2.addAppender(appender);
        try {
            final AtomicInteger atomicInteger = new AtomicInteger(AMQ4607Test.TIMEOUT);
            final ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("vm://localhost?create=false");
            activeMQConnectionFactory.setWatchTopicAdvisories(false);
            Connection createConnection = activeMQConnectionFactory.createConnection();
            createConnection.start();
            createConnection.createSession(false, 1).createConsumer(new ActiveMQTopic(">"));
            ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
            for (int i = 0; i < 1; i++) {
                newCachedThreadPool.submit(new Runnable() { // from class: org.apache.activemq.broker.region.DestinationGCStressTest.4
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            Connection createConnection2 = activeMQConnectionFactory.createConnection();
                            createConnection2.start();
                            Session createSession = createConnection2.createSession(false, 1);
                            MessageProducer createProducer = createSession.createProducer((Destination) null);
                            TextMessage createTextMessage = createSession.createTextMessage();
                            while (true) {
                                int decrementAndGet = atomicInteger.decrementAndGet();
                                if (decrementAndGet <= 0) {
                                    createConnection2.close();
                                    return;
                                }
                                createProducer.send(new ActiveMQTopic("A." + decrementAndGet), createTextMessage);
                            }
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
            newCachedThreadPool.shutdown();
            newCachedThreadPool.awaitTermination(60L, TimeUnit.SECONDS);
            logger2.info("Done");
            createConnection.close();
            logger2.removeAppender(appender);
            Assert.assertFalse("failed on unexpected log event", atomicBoolean.get());
        } catch (Throwable th) {
            logger2.removeAppender(appender);
            throw th;
        }
    }

    @Test(timeout = 60000)
    public void testAddRemoveWildcardWithGc() throws Exception {
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        final org.apache.logging.log4j.core.Logger logger2 = (org.apache.logging.log4j.core.Logger) org.apache.logging.log4j.core.Logger.class.cast(LogManager.getLogger(RegionBroker.class));
        Appender appender = new AbstractAppender("testAppender", new AbstractFilter() { // from class: org.apache.activemq.broker.region.DestinationGCStressTest.5
        }, new MessageLayout(), false, new Property[0]) { // from class: org.apache.activemq.broker.region.DestinationGCStressTest.6
            public void append(LogEvent logEvent) {
                if (logEvent.getLevel().equals(Level.ERROR) && logEvent.getMessage().getFormattedMessage().startsWith("Failed to remove inactive")) {
                    if (logEvent.getThrown() == null || !(logEvent.getThrown().getCause() instanceof BrokerStoppedException)) {
                        logger2.info("received unexpected log message: " + logEvent.getMessage());
                        atomicBoolean.set(true);
                    }
                }
            }
        };
        appender.start();
        logger2.get().addAppender(appender, Level.DEBUG, new AbstractFilter() { // from class: org.apache.activemq.broker.region.DestinationGCStressTest.7
        });
        logger2.addAppender(appender);
        try {
            final AtomicInteger atomicInteger = new AtomicInteger(10000);
            final ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("vm://localhost?create=false");
            activeMQConnectionFactory.setWatchTopicAdvisories(false);
            Connection createConnection = activeMQConnectionFactory.createConnection();
            createConnection.start();
            final Session createSession = createConnection.createSession(false, 1);
            ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
            for (int i = 0; i < 1; i++) {
                newCachedThreadPool.submit(new Runnable() { // from class: org.apache.activemq.broker.region.DestinationGCStressTest.8
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            Connection createConnection2 = activeMQConnectionFactory.createConnection();
                            createConnection2.start();
                            Session createSession2 = createConnection2.createSession(false, 1);
                            MessageProducer createProducer = createSession2.createProducer((Destination) null);
                            TextMessage createTextMessage = createSession2.createTextMessage();
                            while (true) {
                                int decrementAndGet = atomicInteger.decrementAndGet();
                                if (decrementAndGet <= 0) {
                                    createConnection2.close();
                                    return;
                                }
                                createProducer.send(new ActiveMQTopic("A." + decrementAndGet), createTextMessage);
                            }
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
            newCachedThreadPool.submit(new Runnable() { // from class: org.apache.activemq.broker.region.DestinationGCStressTest.9
                @Override // java.lang.Runnable
                public void run() {
                    for (int i2 = 0; i2 < 1000; i2++) {
                        try {
                            createSession.createConsumer(new ActiveMQTopic(">")).close();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
            newCachedThreadPool.shutdown();
            newCachedThreadPool.awaitTermination(60L, TimeUnit.SECONDS);
            logger2.info("Done");
            Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.broker.region.DestinationGCStressTest.10
                public boolean isSatisified() throws Exception {
                    int size = DestinationGCStressTest.this.brokerService.getRegionBroker().getTopicRegion().getDestinationMap().size();
                    logger2.info("Num topics: " + size);
                    return size == 0;
                }
            });
            createConnection.close();
            logger2.removeAppender(appender);
            Assert.assertFalse("failed on unexpected log event", atomicBoolean.get());
        } catch (Throwable th) {
            logger2.removeAppender(appender);
            throw th;
        }
    }

    @Test(timeout = 60000)
    public void testAllDestsSeeSub() throws Exception {
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final AtomicInteger atomicInteger2 = new AtomicInteger(AMQ4607Test.TIMEOUT);
        final ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("vm://localhost?create=false");
        activeMQConnectionFactory.setWatchTopicAdvisories(false);
        Connection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.start();
        final Session createSession = createConnection.createSession(false, 1);
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        for (int i = 0; i < 1; i++) {
            newCachedThreadPool.submit(new Runnable() { // from class: org.apache.activemq.broker.region.DestinationGCStressTest.11
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        Connection createConnection2 = activeMQConnectionFactory.createConnection();
                        createConnection2.start();
                        Session createSession2 = createConnection2.createSession(false, 1);
                        MessageProducer createProducer = createSession2.createProducer((Destination) null);
                        TextMessage createTextMessage = createSession2.createTextMessage();
                        while (true) {
                            int decrementAndGet = atomicInteger2.decrementAndGet();
                            if (decrementAndGet <= 0) {
                                createConnection2.close();
                                return;
                            }
                            createProducer.send(new ActiveMQTopic("A." + decrementAndGet), createTextMessage);
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        newCachedThreadPool.submit(new Runnable() { // from class: org.apache.activemq.broker.region.DestinationGCStressTest.12
            @Override // java.lang.Runnable
            public void run() {
                MessageConsumer createConsumer;
                for (int i2 = 0; i2 < 1000; i2++) {
                    try {
                        createConsumer = createSession.createConsumer(new ActiveMQTopic(">"));
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    if (DestinationGCStressTest.this.destMissingSub(atomicInteger)) {
                        return;
                    }
                    createConsumer.close();
                }
            }
        });
        newCachedThreadPool.shutdown();
        newCachedThreadPool.awaitTermination(60L, TimeUnit.SECONDS);
        createConnection.close();
        Assert.assertEquals("no dests missing sub", 0L, atomicInteger.get());
    }

    private boolean destMissingSub(AtomicInteger atomicInteger) {
        Iterator it = this.brokerService.getRegionBroker().getTopicRegion().getDestinationMap().values().iterator();
        while (it.hasNext()) {
            if (((Destination) it.next()).getConsumers().isEmpty()) {
                atomicInteger.incrementAndGet();
                return true;
            }
        }
        return false;
    }
}
