/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.usecases;

import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.Message;
import jakarta.jms.MessageListener;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnection;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.Wait;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.Appender;
import org.apache.logging.log4j.core.Filter;
import org.apache.logging.log4j.core.Layout;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.appender.AbstractAppender;
import org.apache.logging.log4j.core.config.Property;
import org.apache.logging.log4j.core.filter.AbstractFilter;
import org.apache.logging.log4j.core.layout.MessageLayout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TopicProducerFlowControlTest
extends TestCase
implements MessageListener {
    private static final Logger LOG = LoggerFactory.getLogger(TopicProducerFlowControlTest.class);
    private static final String brokerName = "testBroker";
    private static final String brokerUrl = "vm://testBroker";
    protected static final int destinationMemLimit = 0x200000;
    private static final AtomicLong produced = new AtomicLong();
    private static final AtomicLong consumed = new AtomicLong();
    private static final int numMessagesToSend = 50000;
    private BrokerService broker;

    protected void setUp() throws Exception {
        produced.set(0L);
        consumed.set(0L);
        this.broker = new BrokerService();
        this.broker.setBrokerName(brokerName);
        this.broker.setPersistent(false);
        this.broker.setSchedulerSupport(false);
        this.broker.setUseJmx(false);
        this.broker.setUseShutdownHook(false);
        this.broker.addConnector(brokerUrl);
        PolicyMap pm = new PolicyMap();
        PolicyEntry tpe = new PolicyEntry();
        tpe.setTopic(">");
        tpe.setMemoryLimit(0x200000L);
        tpe.setProducerFlowControl(true);
        tpe.setAdvisoryWhenFull(true);
        tpe.setBlockedProducerWarningInterval(2000L);
        pm.setPolicyEntries(Arrays.asList(tpe));
        this.setDestinationPolicy(this.broker, pm);
        this.broker.start();
        this.broker.waitUntilStarted();
    }

    protected void setDestinationPolicy(BrokerService broker, PolicyMap pm) {
        broker.setDestinationPolicy(pm);
    }

    protected void tearDown() throws Exception {
        this.broker.stop();
        this.broker.waitUntilStopped();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testTopicProducerFlowControl() throws Exception {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
        connectionFactory.setAlwaysSyncSend(true);
        connectionFactory.setProducerWindowSize(1024);
        ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
        prefetchPolicy.setAll(5000);
        connectionFactory.setPrefetchPolicy(prefetchPolicy);
        Connection c = connectionFactory.createConnection();
        c.start();
        Session listenerSession = c.createSession(false, 1);
        Destination destination = this.createDestination(listenerSession);
        listenerSession.createConsumer(destination).setMessageListener((MessageListener)new TopicProducerFlowControlTest());
        final AtomicInteger blockedCounter = new AtomicInteger(0);
        listenerSession.createConsumer((Destination)new ActiveMQTopic("ActiveMQ.Advisory.FULL.>")).setMessageListener(new MessageListener(){

            public void onMessage(Message message) {
                try {
                    if (blockedCounter.get() % 100 == 0) {
                        LOG.info("Got full advisory, usageName: " + message.getStringProperty("usageName") + ", usageCount: " + message.getLongProperty("usageCount") + ", blockedCounter: " + blockedCounter.get());
                    }
                    blockedCounter.incrementAndGet();
                }
                catch (Exception error) {
                    error.printStackTrace();
                    LOG.error("missing advisory property", (Throwable)error);
                }
            }
        });
        final AtomicInteger warnings = new AtomicInteger();
        org.apache.logging.log4j.core.Logger logger = (org.apache.logging.log4j.core.Logger)org.apache.logging.log4j.core.Logger.class.cast(LogManager.getLogger(Topic.class));
        AbstractAppender appender = new AbstractAppender("testAppender", (Filter)new AbstractFilter(){}, (Layout)new MessageLayout(), false, new Property[0]){

            public void append(LogEvent event) {
                if (event.getLevel().equals((Object)Level.WARN) && event.getMessage().getFormattedMessage().contains("Usage Manager memory limit reached")) {
                    LOG.info("received  log message: " + String.valueOf(event.getMessage()));
                    warnings.incrementAndGet();
                }
            }
        };
        appender.start();
        logger.get().addAppender((Appender)appender, Level.DEBUG, (Filter)new AbstractFilter(){});
        logger.addAppender((Appender)appender);
        try {
            final Session session = connectionFactory.createConnection().createSession(false, 1);
            final MessageProducer producer = session.createProducer(destination);
            Thread producingThread = new Thread("Producing Thread"){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void run() {
                    try {
                        for (long i = 0L; i < 50000L; ++i) {
                            producer.send((Message)session.createTextMessage("test"));
                            long count = produced.incrementAndGet();
                            if (count % 10000L != 0L) continue;
                            LOG.info("Produced " + count + " messages");
                        }
                    }
                    catch (Throwable ex) {
                        ex.printStackTrace();
                    }
                    finally {
                        try {
                            producer.close();
                            session.close();
                        }
                        catch (Exception i) {}
                    }
                }
            };
            producingThread.start();
            Wait.waitFor((Wait.Condition)new Wait.Condition(){

                public boolean isSatisified() throws Exception {
                    return consumed.get() == 50000L;
                }
            }, (long)300000L);
            TopicProducerFlowControlTest.assertEquals((String)"Didn't produce all messages", (long)50000L, (long)produced.get());
            TopicProducerFlowControlTest.assertEquals((String)"Didn't consume all messages", (long)50000L, (long)consumed.get());
            TopicProducerFlowControlTest.assertTrue((String)"Producer got blocked", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

                public boolean isSatisified() throws Exception {
                    return blockedCounter.get() > 0;
                }
            }, (long)5000L));
            LOG.info("BlockedCount: " + blockedCounter.get() + ", Warnings:" + warnings.get());
            TopicProducerFlowControlTest.assertTrue((String)"got a few warnings", (warnings.get() > 1 ? 1 : 0) != 0);
            TopicProducerFlowControlTest.assertTrue((String)"warning limited", (warnings.get() < blockedCounter.get() ? 1 : 0) != 0);
        }
        finally {
            logger.removeAppender((Appender)appender);
        }
    }

    public void testTransactedProducerBlockedAndClosedWillRelease() throws Exception {
        this.doTestTransactedProducerBlockedAndClosedWillRelease(false);
    }

    public void testTransactedSyncSendProducerBlockedAndClosedWillRelease() throws Exception {
        this.doTestTransactedProducerBlockedAndClosedWillRelease(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void doTestTransactedProducerBlockedAndClosedWillRelease(boolean alwaysSyncSend) throws Exception {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
        connectionFactory.setWatchTopicAdvisories(false);
        connectionFactory.setAlwaysSyncSend(alwaysSyncSend);
        Connection c = connectionFactory.createConnection();
        c.start();
        ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
        prefetchPolicy.setAll(5000);
        connectionFactory.setPrefetchPolicy(prefetchPolicy);
        Session listenerSession = c.createSession(false, 1);
        Destination destination = this.createDestination(listenerSession);
        final AtomicInteger warnings = new AtomicInteger();
        org.apache.logging.log4j.core.Logger logger = (org.apache.logging.log4j.core.Logger)org.apache.logging.log4j.core.Logger.class.cast(LogManager.getLogger(Topic.class));
        AbstractAppender appender = new AbstractAppender("testAppender", (Filter)new AbstractFilter(){}, (Layout)new MessageLayout(), false, new Property[0]){

            public void append(LogEvent event) {
                if (event.getLevel().equals((Object)Level.WARN) && event.getMessage().getFormattedMessage().contains("Usage Manager memory limit reached")) {
                    LOG.info("received  log message: " + String.valueOf(event.getMessage()));
                    warnings.incrementAndGet();
                }
            }
        };
        appender.start();
        logger.get().addAppender((Appender)appender, Level.DEBUG, (Filter)new AbstractFilter(){});
        logger.addAppender((Appender)appender);
        try {
            final Session session = connectionFactory.createConnection().createSession(true, 0);
            final MessageProducer producer = session.createProducer(destination);
            Thread producingThread = new Thread("Producing Thread"){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void run() {
                    try {
                        for (long i = 0L; i < 50000L; ++i) {
                            producer.send((Message)session.createTextMessage("test"));
                            long count = produced.incrementAndGet();
                            if (count % 10000L != 0L) continue;
                            LOG.info("Produced " + count + " messages");
                        }
                    }
                    catch (Throwable ex) {
                        ex.printStackTrace();
                    }
                    finally {
                        try {
                            producer.close();
                            session.close();
                        }
                        catch (Exception i) {}
                    }
                }
            };
            producingThread.start();
            TopicProducerFlowControlTest.assertTrue((String)"Producer got blocked", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

                public boolean isSatisified() throws Exception {
                    return warnings.get() > 0;
                }
            }, (long)5000L));
            LOG.info("Produced: " + produced.get() + ", Warnings:" + warnings.get());
            TopicProducerFlowControlTest.assertTrue((String)"Producer got blocked", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

                public boolean isSatisified() throws Exception {
                    return warnings.get() > 0;
                }
            }, (long)5000L));
            long enqueueCountWhenBlocked = this.broker.getDestination(ActiveMQDestination.transform((Destination)destination)).getDestinationStatistics().getEnqueues().getCount();
            for (TransportConnection transportConnection : ((TransportConnector)this.broker.getTransportConnectors().get(0)).getConnections()) {
                transportConnection.serviceException((Throwable)new IOException("forcing close for hung connection"));
            }
            TopicProducerFlowControlTest.assertTrue((String)"Usage gets released on close", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

                public boolean isSatisified() throws Exception {
                    LOG.info("Usage: " + TopicProducerFlowControlTest.this.broker.getSystemUsage().getMemoryUsage().getUsage());
                    return TopicProducerFlowControlTest.this.broker.getSystemUsage().getMemoryUsage().getUsage() == 0L;
                }
            }, (long)5000L));
            c.close();
            if (!ActiveMQDestination.transform((Destination)destination).isTemporary()) {
                TopicProducerFlowControlTest.assertEquals((String)"nothing sent during close", (long)enqueueCountWhenBlocked, (long)this.broker.getDestination(ActiveMQDestination.transform((Destination)destination)).getDestinationStatistics().getEnqueues().getCount());
            }
        }
        finally {
            logger.removeAppender((Appender)appender);
        }
    }

    protected Destination createDestination(Session listenerSession) throws Exception {
        return new ActiveMQTopic("test");
    }

    public void onMessage(Message message) {
        long count = consumed.incrementAndGet();
        if (count % 100L == 0L) {
            try {
                Thread.sleep(100L);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
        if (count % 10000L == 0L) {
            LOG.info("\tConsumed " + count + " messages");
        }
    }
}

