package org.apache.activemq.usecases;

import jakarta.jms.Connection;
import jakarta.jms.Session;
import java.util.concurrent.atomic.AtomicBoolean;
import junit.framework.Test;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.ManagedRegionBroker;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.Wait;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.Appender;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.appender.AbstractAppender;
import org.apache.logging.log4j.core.config.Property;
import org.apache.logging.log4j.core.filter.AbstractFilter;
import org.apache.logging.log4j.core.layout.MessageLayout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/usecases/OfflineDurableSubscriberTimeoutTest.class */
public class OfflineDurableSubscriberTimeoutTest extends org.apache.activemq.TestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(OfflineDurableSubscriberTimeoutTest.class);
    private BrokerService broker;

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.TestSupport
    public ActiveMQConnectionFactory createConnectionFactory() throws Exception {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("vm://" + getName(true));
        activeMQConnectionFactory.setWatchTopicAdvisories(false);
        return activeMQConnectionFactory;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.TestSupport
    public Connection createConnection() throws Exception {
        return createConnection("id");
    }

    protected Connection createConnection(String str) throws Exception {
        Connection createConnection = getConnectionFactory().createConnection();
        createConnection.setClientID(str);
        createConnection.start();
        return createConnection;
    }

    public static Test suite() {
        return suite(OfflineDurableSubscriberTimeoutTest.class);
    }

    protected void setUp() throws Exception {
        createBroker();
        super.setUp();
    }

    protected void tearDown() throws Exception {
        super.tearDown();
        destroyBroker();
    }

    private void createBroker() throws Exception {
        createBroker(true);
    }

    private void createBroker(boolean z) throws Exception {
        this.broker = BrokerFactory.createBroker("broker:(vm://" + getName(true) + ")");
        this.broker.setBrokerName(getName(true));
        this.broker.setDeleteAllMessagesOnStartup(z);
        this.broker.getManagementContext().setCreateConnector(false);
        this.broker.setAdvisorySupport(false);
        setDefaultPersistenceAdapter(this.broker);
        this.broker.getPersistenceAdapter().getStore().getPageFile().setPageSize(1024);
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setExpireMessagesPeriod(1000L);
        policyEntry.setProducerFlowControl(true);
        policyMap.put(new ActiveMQTopic(">"), policyEntry);
        this.broker.setDestinationPolicy(policyMap);
        this.broker.setOfflineDurableSubscriberTaskSchedule(1000L);
        this.broker.setOfflineDurableSubscriberTimeout(2004L);
        this.broker.setDestinations(new ActiveMQDestination[]{new ActiveMQTopic("topic1")});
        this.broker.start();
    }

    private void destroyBroker() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
        }
    }

    public void testOfflineDurableSubscriberTimeout() throws Exception {
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        org.apache.logging.log4j.core.Logger logger = (org.apache.logging.log4j.core.Logger) org.apache.logging.log4j.core.Logger.class.cast(LogManager.getLogger(ManagedRegionBroker.class));
        org.apache.logging.log4j.core.Logger logger2 = (org.apache.logging.log4j.core.Logger) org.apache.logging.log4j.core.Logger.class.cast(LogManager.getLogger(Topic.class));
        Appender appender = new AbstractAppender("testAppender", new AbstractFilter() { // from class: org.apache.activemq.usecases.OfflineDurableSubscriberTimeoutTest.1
        }, new MessageLayout(), false, new Property[0]) { // from class: org.apache.activemq.usecases.OfflineDurableSubscriberTimeoutTest.2
            public void append(LogEvent logEvent) {
                if (!logEvent.getLevel().isMoreSpecificThan(Level.WARN) || logEvent.getMessage().getFormattedMessage().contains("Store limit") || logEvent.getMessage().getFormattedMessage().contains("resetting to 70% of maximum available")) {
                    return;
                }
                OfflineDurableSubscriberTimeoutTest.LOG.info("** received unexpected log message: " + logEvent.getMessage().getFormattedMessage() + " [" + logEvent.getLoggerName() + "] (" + logEvent.getLevel().toString() + ")");
                atomicBoolean.set(true);
            }
        };
        appender.start();
        logger.get().addAppender(appender, Level.DEBUG, new AbstractFilter() { // from class: org.apache.activemq.usecases.OfflineDurableSubscriberTimeoutTest.3
        });
        logger.addAppender(appender);
        logger2.get().addAppender(appender, Level.DEBUG, new AbstractFilter() { // from class: org.apache.activemq.usecases.OfflineDurableSubscriberTimeoutTest.4
        });
        logger2.addAppender(appender);
        try {
            createOfflineDurableSubscribers("topic_new");
            assertTrue(Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.OfflineDurableSubscriberTimeoutTest.5
                public boolean isSatisified() throws Exception {
                    OfflineDurableSubscriberTimeoutTest.LOG.info("broker.getAdminView().getInactiveDurableTopicSubscribers():" + OfflineDurableSubscriberTimeoutTest.this.broker.getAdminView().getInactiveDurableTopicSubscribers().length);
                    return OfflineDurableSubscriberTimeoutTest.this.broker.getAdminView().getInactiveDurableTopicSubscribers().length == 1;
                }
            }));
            assertTrue(Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.OfflineDurableSubscriberTimeoutTest.6
                public boolean isSatisified() throws Exception {
                    return OfflineDurableSubscriberTimeoutTest.this.broker.getAdminView().getInactiveDurableTopicSubscribers().length == 0;
                }
            }));
            this.broker.stop();
            this.broker.waitUntilStopped();
            createBroker(false);
            this.broker.waitUntilStarted();
            createOfflineDurableSubscribers("topic_new");
            assertTrue(Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.OfflineDurableSubscriberTimeoutTest.7
                public boolean isSatisified() throws Exception {
                    return OfflineDurableSubscriberTimeoutTest.this.broker.getAdminView().getInactiveDurableTopicSubscribers().length == 1;
                }
            }));
            assertTrue(Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.OfflineDurableSubscriberTimeoutTest.8
                public boolean isSatisified() throws Exception {
                    return OfflineDurableSubscriberTimeoutTest.this.broker.getAdminView().getInactiveDurableTopicSubscribers().length == 0;
                }
            }));
            LOG.info("Create Consumer for topic1");
            createOfflineDurableSubscribers("topic1");
            assertTrue(Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.OfflineDurableSubscriberTimeoutTest.9
                public boolean isSatisified() throws Exception {
                    return OfflineDurableSubscriberTimeoutTest.this.broker.getAdminView().getInactiveDurableTopicSubscribers().length == 1;
                }
            }));
            assertTrue(Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.OfflineDurableSubscriberTimeoutTest.10
                public boolean isSatisified() throws Exception {
                    return OfflineDurableSubscriberTimeoutTest.this.broker.getAdminView().getInactiveDurableTopicSubscribers().length == 0;
                }
            }));
            assertFalse("have not found any log warn/error", atomicBoolean.get());
            logger.removeAppender(appender);
            logger2.removeAppender(appender);
        } catch (Throwable th) {
            logger.removeAppender(appender);
            logger2.removeAppender(appender);
            throw th;
        }
    }

    private void createOfflineDurableSubscribers(String str) throws Exception {
        Connection createConnection = createConnection();
        Session createSession = createConnection.createSession(false, 1);
        createSession.createDurableSubscriber(createDestination(str), "sub1", (String) null, true);
        createSession.close();
        createConnection.close();
    }
}
