package org.apache.activemq.bugs;

import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import java.util.ArrayList;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/bugs/AMQ3167Test.class */
public class AMQ3167Test {
    protected BrokerService embeddedBroker;
    protected static final int MEMORY_LIMIT = 16384;
    protected static boolean Debug_f = false;
    protected Connection JMS_conn;
    protected long Producer_stop_time = 0;
    protected long Consumer_stop_time = 0;
    protected long Consumer_startup_delay_ms = 2000;
    protected boolean Stop_after_error = true;
    protected long Num_error = 0;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/activemq/bugs/AMQ3167Test$consumerThread.class */
    public class consumerThread extends Thread {
        protected Session msgSess;
        protected MessageConsumer msgCons;
        protected String consumerTag;
        protected int numMsg;
        protected int numPerSess;

        consumerThread(Session session, MessageConsumer messageConsumer, String str, int i, int i2) {
            this.msgSess = session;
            this.msgCons = messageConsumer;
            this.consumerTag = str;
            this.numMsg = i;
            this.numPerSess = i2;
        }

        public void execTest() throws Exception {
            Message message = null;
            int i = 0;
            int i2 = 0;
            while (i2 < this.numMsg && !didTimeOut() && (!AMQ3167Test.this.Stop_after_error || AMQ3167Test.this.Num_error == 0)) {
                message = this.msgCons.receive(1000L);
                if (message != null) {
                    checkMessage(message, i2);
                    i2++;
                    if (this.numPerSess > 1 && i2 - i >= this.numPerSess) {
                        message.acknowledge();
                        i = i2;
                    }
                }
            }
            if (this.numPerSess > 1 && i2 - i > 0) {
                message.acknowledge();
            }
            if (i2 < this.numMsg) {
                AMQ3167Test.log("* Consumer " + this.consumerTag + " timed out");
            }
        }

        protected boolean didTimeOut() {
            return AMQ3167Test.this.Consumer_stop_time > 0 && System.nanoTime() >= AMQ3167Test.this.Consumer_stop_time;
        }

        protected void checkMessage(Message message, int i) throws JMSException {
            int intProperty = message.getIntProperty("seq");
            if (i != intProperty) {
                AMQ3167Test.this.Num_error++;
                Assert.fail("*** Consumer " + this.consumerTag + " expected seq " + i + "; received " + intProperty);
            }
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                AMQ3167Test.log("- running consumer " + this.consumerTag);
                execTest();
                AMQ3167Test.log("- running consumer " + this.consumerTag);
            } catch (Throwable th) {
                AMQ3167Test.this.Num_error++;
                Assert.fail("consumer " + this.consumerTag + " failed: " + th.getMessage());
                throw new Error("consumer " + this.consumerTag + " failed", th);
            }
        }

        @Override // java.lang.Thread
        public String toString() {
            return this.consumerTag;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/activemq/bugs/AMQ3167Test$producerThread.class */
    public class producerThread extends Thread {
        protected Session msgSess;
        protected MessageProducer msgProd;
        protected String producerTag;
        protected int numMsg;
        protected int numPerSess;
        protected long producer_stop_time = 0;

        producerThread(Session session, MessageProducer messageProducer, String str, int i, int i2) {
            this.msgSess = session;
            this.msgProd = messageProducer;
            this.producerTag = str;
            this.numMsg = i;
            this.numPerSess = i2;
        }

        public void execTest() throws Exception {
            int i = 0;
            int i2 = 0;
            while (i2 < this.numMsg && !didTimeOut() && (!AMQ3167Test.this.Stop_after_error || AMQ3167Test.this.Num_error == 0)) {
                ActiveMQMessage createTextMessage = this.msgSess.createTextMessage("test message from " + this.producerTag);
                createTextMessage.setStringProperty("testprodtag", this.producerTag);
                createTextMessage.setIntProperty("seq", i2);
                if (createTextMessage instanceof ActiveMQMessage) {
                    createTextMessage.setResponseRequired(true);
                }
                this.msgProd.send(createTextMessage);
                i2++;
                if (this.numPerSess > 1 && i2 - i >= this.numPerSess) {
                    this.msgSess.commit();
                    i = i2;
                }
            }
            if (this.numPerSess > 1 && i2 - i > 0) {
                this.msgSess.commit();
            }
            if (i2 < this.numMsg) {
                String str = this.producerTag;
                long nanoTime = System.nanoTime();
                long j = this.producer_stop_time;
                AMQ3167Test.log("* Producer " + str + " timed out at " + nanoTime + " (stop time " + str + ")");
            }
        }

        protected boolean didTimeOut() {
            return AMQ3167Test.this.Producer_stop_time > 0 && System.nanoTime() >= AMQ3167Test.this.Producer_stop_time;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                AMQ3167Test.log("- running producer " + this.producerTag);
                execTest();
                AMQ3167Test.log("- finished running producer " + this.producerTag);
            } catch (Throwable th) {
                AMQ3167Test.this.Num_error++;
                Assert.fail("producer " + this.producerTag + " failed: " + th.getMessage());
                throw new Error("producer " + this.producerTag + " failed", th);
            }
        }

        @Override // java.lang.Thread
        public String toString() {
            return this.producerTag;
        }
    }

    protected Connection createUnsecuredConnection(String str, String str2) throws JMSException {
        return new ActiveMQConnectionFactory(this.embeddedBroker.getVmConnectorURI()).createConnection(str, str2);
    }

    @Before
    public void testPrep() throws Exception {
        this.embeddedBroker = new BrokerService();
        configureBroker(this.embeddedBroker);
        this.embeddedBroker.start();
        this.embeddedBroker.waitUntilStarted();
        this.JMS_conn = createUnsecuredConnection(null, null);
        this.JMS_conn.start();
    }

    @After
    public void testCleanup() throws Exception {
        this.JMS_conn.stop();
        this.embeddedBroker.stop();
    }

    protected void configureBroker(BrokerService brokerService) throws Exception {
        brokerService.setBrokerName("testbroker1");
        brokerService.setUseJmx(false);
        brokerService.setPersistent(true);
        brokerService.setDataDirectory("target/AMQ3167Test");
        configureDestinationPolicy(brokerService);
    }

    protected void configureDestinationPolicy(BrokerService brokerService) {
        ArrayList arrayList = new ArrayList();
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setQueue(">");
        policyEntry.setMemoryLimit(16384L);
        policyEntry.setProducerFlowControl(false);
        arrayList.add(policyEntry);
        PolicyMap policyMap = new PolicyMap();
        policyMap.setPolicyEntries(arrayList);
        brokerService.setDestinationPolicy(policyMap);
    }

    @Test
    public void testQueueLostMessage() throws Exception {
        ActiveMQDestination createDestination = ActiveMQDestination.createDestination("lostmsgtest.queue", (byte) 1);
        this.Producer_stop_time = System.nanoTime() + 10000000000L;
        this.Consumer_stop_time = this.Producer_stop_time + 5000000000L;
        runLostMsgTest(createDestination, 1000000, 1, 1, false);
        Assert.assertTrue(this.Num_error == 0);
    }

    protected static void log(String str) {
        if (Debug_f) {
            System.err.println(str);
        }
    }

    protected void runLostMsgTest(Destination destination, int i, int i2, int i3, boolean z) throws Exception {
        log(">> Starting producer " + "prod");
        Session createSession = this.JMS_conn.createSession(i2 > 1, 1);
        producerThread producerthread = new producerThread(createSession, createSession.createProducer(destination), "prod", i, i2);
        producerthread.start();
        log("Started producer " + "prod");
        log("Waiting before starting consumers");
        Thread.sleep(this.Consumer_startup_delay_ms);
        log(">> Starting consumer");
        Session createSession2 = this.JMS_conn.createSession(false, i3 > 1 ? 2 : 1);
        consumerThread consumerthread = new consumerThread(createSession2, createSession2.createConsumer(destination), "cons", i, i3);
        consumerthread.start();
        log("Started consumer " + "cons");
        log("< waiting for producer.");
        producerthread.join();
        log("< waiting for consumer.");
        consumerthread.join();
        log("Shutting down");
    }
}
