package org.apache.activemq.bugs;

import java.io.File;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;

/* loaded from: input_file:org/apache/activemq/bugs/AMQ2401Test.class */
public class AMQ2401Test extends TestCase implements MessageListener {
    BrokerService broker;
    private ActiveMQConnectionFactory factory;
    private static final int SEND_COUNT = 500;
    private static final int CONSUMER_COUNT = 50;
    private static final int PRODUCER_COUNT = 1;
    private static final int LOG_INTERVAL = 10;
    private ArrayList<Service> services = new ArrayList<>(51);
    int count = 0;
    CountDownLatch latch;

    /* loaded from: input_file:org/apache/activemq/bugs/AMQ2401Test$Service.class */
    private interface Service {
        void start() throws Exception;

        void close();
    }

    /* loaded from: input_file:org/apache/activemq/bugs/AMQ2401Test$TestConsumer.class */
    private class TestConsumer implements Runnable, Service {
        ActiveMQConnection connection;
        Session session;
        MessageConsumer consumer;
        Thread thread;

        TestConsumer() throws Exception {
            AMQ2401Test.this.factory.setOptimizeAcknowledge(false);
            this.connection = AMQ2401Test.this.factory.createConnection();
            this.session = this.connection.createSession(false, 3);
            this.consumer = this.session.createConsumer(this.session.createQueue("AMQ2401Test"));
            this.consumer.setMessageListener(AMQ2401Test.this);
        }

        @Override // org.apache.activemq.bugs.AMQ2401Test.Service
        public void start() throws Exception {
            this.connection.start();
        }

        @Override // org.apache.activemq.bugs.AMQ2401Test.Service
        public void close() {
            try {
                this.connection.close();
            } catch (JMSException e) {
            }
        }

        @Override // java.lang.Runnable
        public void run() {
            while (AMQ2401Test.this.latch.getCount() > 0) {
                try {
                    AMQ2401Test.this.onMessage(this.consumer.receive());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/activemq/bugs/AMQ2401Test$TestProducer.class */
    private class TestProducer implements Runnable, Service {
        Thread thread = new Thread(this, "TestProducer");
        BytesMessage message;
        Connection connection;
        Session session;
        MessageProducer producer;

        TestProducer() throws Exception {
            this.connection = AMQ2401Test.this.factory.createConnection();
            this.connection.start();
            this.session = this.connection.createSession(false, 3);
            this.producer = this.session.createProducer(this.session.createQueue("AMQ2401Test"));
        }

        @Override // org.apache.activemq.bugs.AMQ2401Test.Service
        public void start() {
            this.thread.start();
        }

        @Override // java.lang.Runnable
        public void run() {
            for (int i = 1; i <= AMQ2401Test.SEND_COUNT; i++) {
                try {
                    if (i % 10 == 0) {
                        System.out.println("Sending: " + i);
                    }
                    this.message = this.session.createBytesMessage();
                    this.message.writeBytes(new byte[1024]);
                    this.producer.send(this.message);
                } catch (JMSException e) {
                    e.printStackTrace();
                    return;
                }
            }
        }

        @Override // org.apache.activemq.bugs.AMQ2401Test.Service
        public void close() {
            try {
                this.connection.close();
            } catch (JMSException e) {
            }
        }
    }

    protected void setUp() throws Exception {
        this.broker = new BrokerService();
        this.broker.setDataDirectory("target" + File.separator + "test-data" + File.separator + "AMQ2401Test");
        this.broker.setDeleteAllMessagesOnStartup(true);
        this.broker.addConnector("tcp://0.0.0.0:2401");
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setMemoryLimit(102400L);
        policyEntry.setProducerFlowControl(true);
        policyEntry.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy());
        policyEntry.setQueue(">");
        policyMap.setDefaultEntry(policyEntry);
        this.broker.setDestinationPolicy(policyMap);
        this.broker.start();
        this.broker.waitUntilStarted();
        this.factory = new ActiveMQConnectionFactory("tcp://0.0.0.0:2401");
        super.setUp();
    }

    protected void tearDown() throws Exception {
        this.broker.stop();
    }

    public void testDupsOk() throws Exception {
        Service service = null;
        Service service2 = null;
        try {
            this.latch = new CountDownLatch(SEND_COUNT);
            for (int i = 0; i < 50; i++) {
                TestConsumer testConsumer = new TestConsumer();
                testConsumer.start();
                this.services.add(testConsumer);
            }
            for (int i2 = 0; i2 < 1; i2++) {
                TestProducer testProducer = new TestProducer();
                testProducer.start();
                this.services.add(testProducer);
            }
            waitForMessageReceipt(3000L);
            if (0 != 0) {
                service.close();
            }
            if (0 != 0) {
                service2.close();
            }
        } catch (Throwable th) {
            if (0 != 0) {
                service.close();
            }
            if (0 != 0) {
                service2.close();
            }
            throw th;
        }
    }

    public void onMessage(Message message) {
        this.latch.countDown();
        int i = this.count + 1;
        this.count = i;
        if (i % 10 == 0) {
            System.out.println("Received message " + this.count);
        }
        try {
            Thread.currentThread();
            Thread.sleep(1L);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private void waitForMessageReceipt(long j) throws InterruptedException, TimeoutException {
        if (!this.latch.await(j, TimeUnit.MILLISECONDS)) {
            throw new TimeoutException("Consumner didn't receive messages");
        }
    }
}
