package org.apache.pulsar.broker.service;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.commons.math3.analysis.integration.BaseAbstractUnivariateIntegrator;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.common.util.FutureUtil;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.class */
public class MessagePublishBufferThrottleTest extends BrokerTestBase {
    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    protected void setup() throws Exception {
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    protected void cleanup() throws Exception {
    }

    @Test
    public void testMessagePublishBufferThrottleDisabled() throws Exception {
        this.conf.setMaxMessagePublishBufferSizeInMB(-1);
        this.conf.setMessagePublishBufferCheckIntervalInMillis(10);
        super.baseSetup();
        Producer<byte[]> create = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/testMessagePublishBufferThrottleDisabled").producerName("producer-name").create();
        AbstractTopic abstractTopic = (Topic) this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/testMessagePublishBufferThrottleDisabled").get();
        Assert.assertNotNull(abstractTopic);
        ((Producer) abstractTopic.producers.get("producer-name")).getCnx().setMessagePublishBufferSize(4611686018427387903L);
        Thread.sleep(20L);
        Assert.assertFalse(this.pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 10; i++) {
            arrayList.add(create.sendAsync(new byte[1048576]));
        }
        FutureUtil.waitForAll(arrayList).get();
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            Assert.assertNotNull(((CompletableFuture) it.next()).get());
        }
        Thread.sleep(20L);
        Assert.assertFalse(this.pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
        super.internalCleanup();
    }

    @Test
    public void testMessagePublishBufferThrottleEnable() throws Exception {
        this.conf.setMaxMessagePublishBufferSizeInMB(1);
        this.conf.setMessagePublishBufferCheckIntervalInMillis(BaseAbstractUnivariateIntegrator.DEFAULT_MAX_ITERATIONS_COUNT);
        super.baseSetup();
        Assert.assertFalse(this.pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
        Producer<byte[]> create = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/testMessagePublishBufferThrottleEnable").producerName("producer-name").create();
        AbstractTopic abstractTopic = (Topic) this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/testMessagePublishBufferThrottleEnable").get();
        Assert.assertNotNull(abstractTopic);
        ((Producer) abstractTopic.producers.get("producer-name")).getCnx().setMessagePublishBufferSize(4611686018427387903L);
        Assert.assertFalse(this.pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
        create.sendAsync(new byte[1024]).get(1L, TimeUnit.SECONDS);
        getPulsar().getBrokerService().checkMessagePublishBuffer();
        Assert.assertTrue(this.pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
        ((Producer) abstractTopic.producers.get("producer-name")).getCnx().setMessagePublishBufferSize(0L);
        getPulsar().getBrokerService().checkMessagePublishBuffer();
        Assert.assertFalse(this.pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 10; i++) {
            arrayList.add(create.sendAsync(new byte[1048576]));
        }
        FutureUtil.waitForAll(arrayList).get();
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            Assert.assertNotNull(((CompletableFuture) it.next()).get());
        }
        Assert.assertEquals(this.pulsar.getBrokerService().getCurrentMessagePublishBufferSize(), 0L);
        super.internalCleanup();
    }

    @Test
    public void testBlockByPublishRateLimiting() throws Exception {
        this.conf.setMaxMessagePublishBufferSizeInMB(1);
        this.conf.setMessagePublishBufferCheckIntervalInMillis(BaseAbstractUnivariateIntegrator.DEFAULT_MAX_ITERATIONS_COUNT);
        super.baseSetup();
        Assert.assertFalse(this.pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
        Producer<byte[]> create = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/testBlockByPublishRateLimiting").producerName("producer-name").create();
        AbstractTopic abstractTopic = (Topic) this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/testBlockByPublishRateLimiting").get();
        Assert.assertNotNull(abstractTopic);
        ((Producer) abstractTopic.producers.get("producer-name")).getCnx().setMessagePublishBufferSize(4611686018427387903L);
        Assert.assertFalse(this.pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
        create.sendAsync(new byte[1024]).get(1L, TimeUnit.SECONDS);
        getPulsar().getBrokerService().checkMessagePublishBuffer();
        Assert.assertTrue(this.pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
        ((Producer) abstractTopic.producers.get("producer-name")).getCnx().setMessagePublishBufferSize(0L);
        getPulsar().getBrokerService().checkMessagePublishBuffer();
        ((Producer) abstractTopic.producers.get("producer-name")).getCnx().setAutoReadDisabledRateLimiting(true);
        ((Producer) abstractTopic.producers.get("producer-name")).getCnx().disableCnxAutoRead();
        ((Producer) abstractTopic.producers.get("producer-name")).getCnx().enableCnxAutoRead();
        ((Producer) abstractTopic.producers.get("producer-name")).getCnx().setAutoReadDisabledRateLimiting(false);
        ((Producer) abstractTopic.producers.get("producer-name")).getCnx().enableCnxAutoRead();
        Assert.assertFalse(this.pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 10; i++) {
            arrayList.add(create.sendAsync(new byte[1048576]));
        }
        FutureUtil.waitForAll(arrayList).get();
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            Assert.assertNotNull(((CompletableFuture) it.next()).get());
        }
        Assert.assertEquals(this.pulsar.getBrokerService().getCurrentMessagePublishBufferSize(), 0L);
        super.internalCleanup();
    }
}
