package org.apache.activemq.bugs;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.BrokerView;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/activemq/bugs/AMQ2584Test.class */
public class AMQ2584Test extends TestSupport {
    static final Logger LOG = LoggerFactory.getLogger(AMQ2584Test.class);
    ActiveMQTopic topic;
    Session producerSession;
    MessageProducer producer;
    String data;
    private final TestSupport.PersistenceAdapterChoice persistenceAdapterChoice;
    BrokerService broker = null;
    ActiveMQConnection consumerConnection = null;
    ActiveMQConnection producerConnection = null;
    final int minPercentUsageForStore = 3;

    @Parameterized.Parameters(name = "{0}")
    public static Collection<TestSupport.PersistenceAdapterChoice[]> getTestParameters() {
        TestSupport.PersistenceAdapterChoice[] persistenceAdapterChoiceArr = {TestSupport.PersistenceAdapterChoice.KahaDB};
        ArrayList arrayList = new ArrayList();
        arrayList.add(persistenceAdapterChoiceArr);
        return arrayList;
    }

    public AMQ2584Test(TestSupport.PersistenceAdapterChoice persistenceAdapterChoice) {
        this.persistenceAdapterChoice = persistenceAdapterChoice;
    }

    @Test(timeout = 120000)
    public void testSize() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1000 * 3);
        openConsumer(countDownLatch);
        assertEquals(0, this.broker.getAdminView().getStorePercentUsage());
        for (int i = 0; i < 1000; i++) {
            sendMessage(false);
        }
        final BrokerView adminView = this.broker.getAdminView();
        this.broker.getSystemUsage().getStoreUsage().isFull();
        LOG.info("store percent usage: " + adminView.getStorePercentUsage());
        assertTrue("some store in use", this.broker.getAdminView().getStorePercentUsage() > 3);
        assertTrue("redelivery consumer got all it needs", countDownLatch.await(60L, TimeUnit.SECONDS));
        closeConsumer();
        final CountDownLatch countDownLatch2 = new CountDownLatch(1000);
        this.consumerConnection = createConnection();
        this.consumerConnection.createSession(false, 1).createConsumer(new ActiveMQQueue("ActiveMQ.DLQ")).setMessageListener(new MessageListener() { // from class: org.apache.activemq.bugs.AMQ2584Test.1
            public void onMessage(Message message) {
                if (countDownLatch2.getCount() % 500 == 0) {
                    AMQ2584Test.LOG.info("remaining on DLQ: " + countDownLatch2.getCount());
                }
                countDownLatch2.countDown();
            }
        });
        this.consumerConnection.start();
        assertTrue("Not all messages reached the DLQ", countDownLatch2.await(60L, TimeUnit.SECONDS));
        assertTrue("Store usage exceeds expected usage", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.bugs.AMQ2584Test.2
            public boolean isSatisified() throws Exception {
                AMQ2584Test.this.broker.getSystemUsage().getStoreUsage().isFull();
                AMQ2584Test.LOG.info("store precent usage: " + adminView.getStorePercentUsage());
                return AMQ2584Test.this.broker.getAdminView().getStorePercentUsage() < 3;
            }
        }));
        closeConsumer();
    }

    private void openConsumer(final CountDownLatch countDownLatch) throws Exception {
        this.consumerConnection = createConnection();
        this.consumerConnection.setClientID("cliID");
        this.consumerConnection.start();
        final Session createSession = this.consumerConnection.createSession(false, 1);
        MessageListener messageListener = new MessageListener() { // from class: org.apache.activemq.bugs.AMQ2584Test.3
            public void onMessage(Message message) {
                countDownLatch.countDown();
                try {
                    createSession.recover();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };
        createSession.createDurableSubscriber(this.topic, "subName1").setMessageListener(messageListener);
        createSession.createDurableSubscriber(this.topic, "subName2").setMessageListener(messageListener);
        createSession.createDurableSubscriber(this.topic, "subName3").setMessageListener(messageListener);
    }

    private void closeConsumer() throws JMSException {
        if (this.consumerConnection != null) {
            this.consumerConnection.close();
        }
        this.consumerConnection = null;
    }

    private void sendMessage(boolean z) throws Exception {
        if (this.producerConnection == null) {
            this.producerConnection = createConnection();
            this.producerConnection.start();
            this.producerSession = this.producerConnection.createSession(false, 1);
            this.producer = this.producerSession.createProducer(this.topic);
        }
        Message createMessage = this.producerSession.createMessage();
        createMessage.setStringProperty("data", this.data);
        this.producer.send(createMessage);
    }

    private void startBroker(boolean z) throws Exception {
        this.broker = new BrokerService();
        this.broker.setAdvisorySupport(false);
        this.broker.setBrokerName("testStoreSize");
        if (z) {
            this.broker.setDeleteAllMessagesOnStartup(true);
        }
        LOG.info("Starting broker with persistenceAdapterChoice " + this.persistenceAdapterChoice.toString());
        setPersistenceAdapter(this.broker, this.persistenceAdapterChoice);
        configurePersistenceAdapter(this.broker.getPersistenceAdapter());
        this.broker.getSystemUsage().getStoreUsage().setLimit(200000000L);
        this.broker.start();
    }

    private void configurePersistenceAdapter(PersistenceAdapter persistenceAdapter) {
        Properties properties = new Properties();
        String valueOf = String.valueOf(1048576);
        properties.put("journalMaxFileLength", valueOf);
        properties.put("maxFileLength", valueOf);
        properties.put("cleanupInterval", "2000");
        properties.put("checkpointInterval", "2000");
        IntrospectionSupport.setProperties(persistenceAdapter, properties);
    }

    private void stopBroker() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
        }
        this.broker = null;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.TestSupport
    public ActiveMQConnectionFactory createConnectionFactory() throws Exception {
        return new ActiveMQConnectionFactory("vm://testStoreSize?jms.watchTopicAdvisories=false&jms.redeliveryPolicy.maximumRedeliveries=0&jms.closeTimeout=60000&waitForStart=5000&create=false");
    }

    @Before
    public void setUp() throws Exception {
        StringBuilder sb = new StringBuilder(5000);
        for (int i = 0; i < 5000; i++) {
            sb.append('a');
        }
        this.data = sb.toString();
        startBroker(true);
        this.topic = createDestination();
    }

    @After
    public void tearDown() throws Exception {
        stopBroker();
    }
}
