package org.apache.activemq.usecases;

import java.util.LinkedList;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.broker.BrokerRegistry;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/usecases/ConsumeTopicPrefetchTest.class */
public class ConsumeTopicPrefetchTest extends ProducerConsumerTestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(ConsumeTopicPrefetchTest.class);
    protected String[] messageTexts;
    protected int prefetchSize = 100;
    protected long consumerTimeout = DurableSubProcessWithRestartTest.BROKER_RESTART;

    public void testSendPrefetchSize() throws JMSException {
        testWithMessageCount(this.prefetchSize);
    }

    public void testSendDoublePrefetchSize() throws JMSException {
        testWithMessageCount(this.prefetchSize * 2);
    }

    public void testSendPrefetchSizePlusOne() throws JMSException {
        testWithMessageCount(this.prefetchSize + 1);
    }

    protected void testWithMessageCount(int i) throws JMSException {
        makeMessages(i);
        LOG.info("About to send and receive: " + i + " on destination: " + this.destination + " of type: " + this.destination.getClass().getName());
        for (int i2 = 0; i2 < i; i2++) {
            this.producer.send(this.session.createTextMessage(this.messageTexts[i2]));
        }
        validateConsumerPrefetch(getSubject(), this.prefetchSize);
        LinkedList linkedList = new LinkedList();
        int i3 = i / 2;
        for (int i4 = 0; i4 < i3; i4++) {
            linkedList.add(consumeMessge(i4));
        }
        validateConsumerPrefetchGreaterOrEqual(getSubject(), (long) Math.min(i, 1.5d * this.prefetchSize));
        for (int i5 = 0; i5 < i3; i5++) {
            ((TextMessage) linkedList.remove()).acknowledge();
        }
        for (int i6 = i3; i6 < i; i6++) {
            consumeMessge(i6).acknowledge();
        }
        validateConsumerPrefetch(getSubject(), 0L);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.usecases.TestSupport
    public Connection createConnection() throws Exception {
        ActiveMQConnection createConnection = super.createConnection();
        createConnection.getPrefetchPolicy().setQueuePrefetch(this.prefetchSize);
        createConnection.getPrefetchPolicy().setTopicPrefetch(this.prefetchSize);
        return createConnection;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public TextMessage consumeMessge(int i) throws JMSException {
        TextMessage receive = this.consumer.receive(this.consumerTimeout);
        assertTrue("Should have received a message by now for message: " + i, receive != null);
        assertTrue("Should be a TextMessage: " + receive, receive instanceof TextMessage);
        TextMessage textMessage = receive;
        assertEquals("Message content", this.messageTexts[i], textMessage.getText());
        return textMessage;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void makeMessages(int i) {
        this.messageTexts = new String[i];
        for (int i2 = 0; i2 < i; i2++) {
            this.messageTexts[i2] = "Message for test: + " + getName() + " = " + i2;
        }
    }

    private void validateConsumerPrefetchGreaterOrEqual(String str, long j) throws JMSException {
        doValidateConsumerPrefetch(str, j, true);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void validateConsumerPrefetch(String str, long j) throws JMSException {
        doValidateConsumerPrefetch(str, j, false);
    }

    protected void doValidateConsumerPrefetch(String str, final long j, final boolean z) throws JMSException {
        for (final Destination destination : BrokerRegistry.getInstance().lookup("localhost").getRegionBroker().getTopicRegion().getDestinationMap().values()) {
            if (destination.getName().equals(str)) {
                try {
                    Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.ConsumeTopicPrefetchTest.1
                        public boolean isSatisified() throws Exception {
                            DestinationStatistics destinationStatistics = destination.getDestinationStatistics();
                            ConsumeTopicPrefetchTest.LOG.info("inflight for : " + destination.getName() + ": " + destinationStatistics.getInflight().getCount());
                            return z ? destinationStatistics.getInflight().getCount() >= j : destinationStatistics.getInflight().getCount() == j;
                        }
                    });
                    DestinationStatistics destinationStatistics = destination.getDestinationStatistics();
                    LOG.info("inflight for : " + destination.getName() + ": " + destinationStatistics.getInflight().getCount());
                    if (z) {
                        String name = destination.getName();
                        long count = destinationStatistics.getInflight().getCount();
                        destinationStatistics.getInflight().getCount();
                        assertTrue("inflight for: " + name + ": " + count + " > " + name, destinationStatistics.getInflight().getCount() >= j);
                    } else {
                        assertEquals("inflight for: " + destination.getName() + ": " + destinationStatistics.getInflight().getCount() + " matches", j, destinationStatistics.getInflight().getCount());
                    }
                } catch (Exception e) {
                    throw new JMSException(e.toString());
                }
            }
        }
    }
}
