package org.apache.pulsar.broker.stats;

import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/broker/stats/ConsumerStatsTest.class */
public class ConsumerStatsTest extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ConsumerStatsTest.class);

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    protected void setup() throws Exception {
        this.conf.setMaxUnackedMessagesPerConsumer(0);
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testConsumerStatsOnZeroMaxUnackedMessagesPerConsumer() throws PulsarClientException, InterruptedException, PulsarAdminException {
        Assert.assertEquals(this.pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer(), 0);
        Consumer<byte[]> subscribe = this.pulsarClient.newConsumer().topic("persistent://my-property/my-ns/testConsumerStatsOnZeroMaxUnackedMessagesPerConsumer").subscriptionType(SubscriptionType.Shared).ackTimeout(1L, TimeUnit.SECONDS).subscriptionName("sub").subscribe();
        Producer<byte[]> create = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/testConsumerStatsOnZeroMaxUnackedMessagesPerConsumer").create();
        for (int i = 0; i < 10; i++) {
            create.send(("message-" + i).getBytes());
        }
        int i2 = 0;
        for (int i3 = 0; i3 < 10; i3++) {
            subscribe.receive();
            i2++;
        }
        Assert.assertEquals(i2, 10);
        int i4 = 0;
        TopicStats stats = this.admin.topics().getStats("persistent://my-property/my-ns/testConsumerStatsOnZeroMaxUnackedMessagesPerConsumer");
        Assert.assertEquals(stats.subscriptions.size(), 1);
        Assert.assertEquals(((SubscriptionStats) ((Map.Entry) stats.subscriptions.entrySet().iterator().next()).getValue()).consumers.size(), 1);
        Assert.assertFalse(((ConsumerStats) ((SubscriptionStats) ((Map.Entry) stats.subscriptions.entrySet().iterator().next()).getValue()).consumers.get(0)).blockedConsumerOnUnackedMsgs);
        Assert.assertEquals(((ConsumerStats) ((SubscriptionStats) ((Map.Entry) stats.subscriptions.entrySet().iterator().next()).getValue()).consumers.get(0)).unackedMessages, 10);
        for (int i5 = 0; i5 < 10; i5++) {
            subscribe.acknowledge(subscribe.receive());
            i4++;
        }
        Assert.assertEquals(i4, 10);
        Thread.sleep(2000L);
        TopicStats stats2 = this.admin.topics().getStats("persistent://my-property/my-ns/testConsumerStatsOnZeroMaxUnackedMessagesPerConsumer");
        Assert.assertFalse(((ConsumerStats) ((SubscriptionStats) ((Map.Entry) stats2.subscriptions.entrySet().iterator().next()).getValue()).consumers.get(0)).blockedConsumerOnUnackedMsgs);
        Assert.assertEquals(((ConsumerStats) ((SubscriptionStats) ((Map.Entry) stats2.subscriptions.entrySet().iterator().next()).getValue()).consumers.get(0)).unackedMessages, 0);
    }

    @Test
    public void testAckStatsOnPartitionedTopicForExclusiveSubscription() throws PulsarAdminException, PulsarClientException, InterruptedException {
        this.admin.topics().createPartitionedTopic("persistent://my-property/my-ns/testAckStatsOnPartitionedTopicForExclusiveSubscription", 3);
        Consumer<byte[]> subscribe = this.pulsarClient.newConsumer().topic("persistent://my-property/my-ns/testAckStatsOnPartitionedTopicForExclusiveSubscription").subscriptionType(SubscriptionType.Exclusive).subscriptionName("sub").subscribe();
        Producer<byte[]> create = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/testAckStatsOnPartitionedTopicForExclusiveSubscription").create();
        for (int i = 0; i < 10; i++) {
            create.send(("message-" + i).getBytes());
        }
        int i2 = 0;
        for (int i3 = 0; i3 < 10; i3++) {
            subscribe.acknowledge(subscribe.receive());
            i2++;
        }
        Assert.assertEquals(10, i2);
        Thread.sleep(2000L);
        for (int i4 = 0; i4 < 3; i4++) {
            TopicStats stats = this.admin.topics().getStats("persistent://my-property/my-ns/testAckStatsOnPartitionedTopicForExclusiveSubscription-partition-" + i4);
            Assert.assertEquals(stats.subscriptions.size(), 1);
            Assert.assertEquals(((SubscriptionStats) ((Map.Entry) stats.subscriptions.entrySet().iterator().next()).getValue()).consumers.size(), 1);
            Assert.assertEquals(((ConsumerStats) ((SubscriptionStats) ((Map.Entry) stats.subscriptions.entrySet().iterator().next()).getValue()).consumers.get(0)).unackedMessages, 0);
        }
    }
}
