package org.apache.pulsar.client.impl;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.AtomicDouble;
import java.util.ArrayList;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javassist.bytecode.Opcode;
import org.apache.commons.math3.optimization.direct.CMAESOptimizer;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.PublishRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/client/impl/MessagePublishThrottlingTest.class */
public class MessagePublishThrottlingTest extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) MessagePublishThrottlingTest.class);

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
        this.conf.setClusterName("test");
        this.conf.setTopicPublisherThrottlingTickTimeMillis(1);
        this.conf.setBrokerPublisherThrottlingTickTimeMillis(1);
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod
    protected void cleanup() throws Exception {
        super.internalCleanup();
        super.resetConfig();
    }

    @Test
    public void testSimplePublishMessageThrottling() throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        this.admin.namespaces().createNamespace("my-property/throttling_publish", Sets.newHashSet("test"));
        PublishRate publishRate = new PublishRate();
        publishRate.publishThrottlingRateInMsg = 10;
        ProducerImpl create = this.pulsarClient.newProducer().topic("persistent://my-property/throttling_publish/throttlingMessageBlock").maxPendingMessages(CMAESOptimizer.DEFAULT_MAXITERATIONS).create();
        PersistentTopic persistentTopic = (PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopicIfExists("persistent://my-property/throttling_publish/throttlingMessageBlock").get()).get();
        Assert.assertEquals(persistentTopic.getTopicPublishRateLimiter(), PublishRateLimiter.DISABLED_RATE_LIMITER);
        this.admin.namespaces().setPublishRate("my-property/throttling_publish", publishRate);
        retryStrategically(r4 -> {
            return !persistentTopic.getTopicPublishRateLimiter().equals(PublishRateLimiter.DISABLED_RATE_LIMITER);
        }, 5, 200L);
        Assert.assertNotEquals(persistentTopic.getTopicPublishRateLimiter(), PublishRateLimiter.DISABLED_RATE_LIMITER);
        Producer producer = (Producer) persistentTopic.getProducers().values().iterator().next();
        producer.updateRates();
        for (int i = 0; i < 200; i++) {
            create.send(new byte[80]);
        }
        producer.updateRates();
        org.junit.Assert.assertTrue(producer.getStats().msgRateIn < ((double) Opcode.GOTO_W));
        publishRate.publishThrottlingRateInMsg = -1;
        this.admin.namespaces().setPublishRate("my-property/throttling_publish", publishRate);
        retryStrategically(r42 -> {
            return persistentTopic.getTopicPublishRateLimiter().equals(PublishRateLimiter.DISABLED_RATE_LIMITER);
        }, 5, 200L);
        Assert.assertEquals(persistentTopic.getTopicPublishRateLimiter(), PublishRateLimiter.DISABLED_RATE_LIMITER);
        producer.updateRates();
        for (int i2 = 0; i2 < 200; i2++) {
            create.send(new byte[80]);
        }
        producer.updateRates();
        org.junit.Assert.assertTrue(producer.getStats().msgRateIn > ((double) Opcode.GOTO_W));
        create.close();
    }

    @Test
    public void testSimplePublishByteThrottling() throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        this.admin.namespaces().createNamespace("my-property/throttling_publish", Sets.newHashSet("test"));
        PublishRate publishRate = new PublishRate();
        publishRate.publishThrottlingRateInByte = 400L;
        ProducerImpl create = this.pulsarClient.newProducer().topic("persistent://my-property/throttling_publish/throttlingRateBlock").create();
        PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getOrCreateTopic("persistent://my-property/throttling_publish/throttlingRateBlock").get();
        Assert.assertEquals(persistentTopic.getTopicPublishRateLimiter(), PublishRateLimiter.DISABLED_RATE_LIMITER);
        this.admin.namespaces().setPublishRate("my-property/throttling_publish", publishRate);
        retryStrategically(r4 -> {
            return !persistentTopic.getTopicPublishRateLimiter().equals(PublishRateLimiter.DISABLED_RATE_LIMITER);
        }, 5, 200L);
        Assert.assertNotEquals(persistentTopic.getTopicPublishRateLimiter(), PublishRateLimiter.DISABLED_RATE_LIMITER);
        Producer producer = (Producer) persistentTopic.getProducers().values().iterator().next();
        producer.updateRates();
        for (int i = 0; i < 100; i++) {
            create.send(new byte[1]);
        }
        producer.updateRates();
        org.junit.Assert.assertTrue(producer.getStats().msgRateIn < ((double) 100));
        publishRate.publishThrottlingRateInByte = -1L;
        this.admin.namespaces().setPublishRate("my-property/throttling_publish", publishRate);
        retryStrategically(r42 -> {
            return persistentTopic.getTopicPublishRateLimiter().equals(PublishRateLimiter.DISABLED_RATE_LIMITER);
        }, 5, 200L);
        Assert.assertEquals(persistentTopic.getTopicPublishRateLimiter(), PublishRateLimiter.DISABLED_RATE_LIMITER);
        producer.updateRates();
        for (int i2 = 0; i2 < 100; i2++) {
            create.send(new byte[1]);
        }
        producer.updateRates();
        org.junit.Assert.assertTrue(producer.getStats().msgRateIn > ((double) 100));
        create.close();
    }

    @Test
    public void testBrokerPublishMessageThrottling() throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        this.admin.namespaces().createNamespace("my-property/throttling_publish", Sets.newHashSet("test"));
        ProducerImpl create = this.pulsarClient.newProducer().topic("persistent://my-property/throttling_publish/brokerThrottlingMessageBlock").enableBatching(false).maxPendingMessages(CMAESOptimizer.DEFAULT_MAXITERATIONS).create();
        PersistentTopic persistentTopic = (PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopicIfExists("persistent://my-property/throttling_publish/brokerThrottlingMessageBlock").get()).get();
        Assert.assertEquals(persistentTopic.getBrokerPublishRateLimiter(), PublishRateLimiter.DISABLED_RATE_LIMITER);
        this.admin.brokers().updateDynamicConfiguration("brokerPublisherThrottlingMaxMessageRate", Integer.toString(10));
        retryStrategically(r4 -> {
            return persistentTopic.getBrokerPublishRateLimiter() != PublishRateLimiter.DISABLED_RATE_LIMITER;
        }, 5, 200L);
        log.info("Get broker configuration: brokerTick {},  MaxMessageRate {}, MaxByteRate {}", Integer.valueOf(this.pulsar.getConfiguration().getBrokerPublisherThrottlingTickTimeMillis()), Integer.valueOf(this.pulsar.getConfiguration().getBrokerPublisherThrottlingMaxMessageRate()), Long.valueOf(this.pulsar.getConfiguration().getBrokerPublisherThrottlingMaxByteRate()));
        Assert.assertNotEquals(persistentTopic.getBrokerPublishRateLimiter(), PublishRateLimiter.DISABLED_RATE_LIMITER);
        Producer producer = (Producer) persistentTopic.getProducers().values().iterator().next();
        producer.updateRates();
        for (int i = 0; i < 100; i++) {
            create.send(new byte[80]);
        }
        producer.updateRates();
        double d = producer.getStats().msgRateIn;
        log.info("1-st rate in: {}, total: {} ", (Object) Double.valueOf(d), (Object) 100);
        org.junit.Assert.assertTrue(d < ((double) 100));
        this.admin.brokers().updateDynamicConfiguration("brokerPublisherThrottlingMaxMessageRate", Integer.toString(0));
        retryStrategically(r42 -> {
            return persistentTopic.getBrokerPublishRateLimiter().equals(PublishRateLimiter.DISABLED_RATE_LIMITER);
        }, 5, 200L);
        Assert.assertEquals(persistentTopic.getBrokerPublishRateLimiter(), PublishRateLimiter.DISABLED_RATE_LIMITER);
        producer.updateRates();
        for (int i2 = 0; i2 < 100; i2++) {
            create.send(new byte[80]);
        }
        producer.updateRates();
        double d2 = producer.getStats().msgRateIn;
        log.info("2-nd rate in: {}, total: {} ", (Object) Double.valueOf(d2), (Object) 100);
        org.junit.Assert.assertTrue(d2 > ((double) 100));
        create.close();
    }

    @Test
    public void testBrokerPublishByteThrottling() throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        this.admin.namespaces().createNamespace("my-property/throttling_publish", Sets.newHashSet("test"));
        ProducerImpl create = this.pulsarClient.newProducer().topic("persistent://my-property/throttling_publish/brokerThrottlingByteBlock").enableBatching(false).maxPendingMessages(CMAESOptimizer.DEFAULT_MAXITERATIONS).create();
        PersistentTopic persistentTopic = (PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopicIfExists("persistent://my-property/throttling_publish/brokerThrottlingByteBlock").get()).get();
        Assert.assertEquals(persistentTopic.getBrokerPublishRateLimiter(), PublishRateLimiter.DISABLED_RATE_LIMITER);
        this.admin.brokers().updateDynamicConfiguration("brokerPublisherThrottlingMaxByteRate", Long.toString(400L));
        retryStrategically(r4 -> {
            return persistentTopic.getBrokerPublishRateLimiter() != PublishRateLimiter.DISABLED_RATE_LIMITER;
        }, 5, 200L);
        log.info("Get broker configuration after enable: brokerTick {},  MaxMessageRate {}, MaxByteRate {}", Integer.valueOf(this.pulsar.getConfiguration().getBrokerPublisherThrottlingTickTimeMillis()), Integer.valueOf(this.pulsar.getConfiguration().getBrokerPublisherThrottlingMaxMessageRate()), Long.valueOf(this.pulsar.getConfiguration().getBrokerPublisherThrottlingMaxByteRate()));
        Assert.assertNotEquals(persistentTopic.getBrokerPublishRateLimiter(), PublishRateLimiter.DISABLED_RATE_LIMITER);
        Producer producer = (Producer) persistentTopic.getProducers().values().iterator().next();
        producer.updateRates();
        for (int i = 0; i < 20; i++) {
            create.send(new byte[80]);
        }
        producer.updateRates();
        double d = producer.getStats().msgThroughputIn;
        log.info("1-st byte rate in: {}, total: {} ", Double.valueOf(d), Integer.valueOf(20 * 80));
        org.junit.Assert.assertTrue(d < ((double) (20 * 80)));
        this.admin.brokers().updateDynamicConfiguration("brokerPublisherThrottlingMaxByteRate", Long.toString(0L));
        retryStrategically(r42 -> {
            return persistentTopic.getBrokerPublishRateLimiter().equals(PublishRateLimiter.DISABLED_RATE_LIMITER);
        }, 5, 200L);
        log.info("Get broker configuration after disable: brokerTick {},  MaxMessageRate {}, MaxByteRate {}", Integer.valueOf(this.pulsar.getConfiguration().getBrokerPublisherThrottlingTickTimeMillis()), Integer.valueOf(this.pulsar.getConfiguration().getBrokerPublisherThrottlingMaxMessageRate()), Long.valueOf(this.pulsar.getConfiguration().getBrokerPublisherThrottlingMaxByteRate()));
        Assert.assertEquals(persistentTopic.getBrokerPublishRateLimiter(), PublishRateLimiter.DISABLED_RATE_LIMITER);
        producer.updateRates();
        for (int i2 = 0; i2 < 20; i2++) {
            create.send(new byte[80]);
        }
        producer.updateRates();
        double d2 = producer.getStats().msgThroughputIn;
        log.info("2-nd byte rate in: {}, total: {} ", Double.valueOf(d2), Integer.valueOf(20 * 80));
        org.junit.Assert.assertTrue(d2 > ((double) (20 * 80)));
        create.close();
    }

    @Test
    public void testBrokerTopicPublishByteThrottling() throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        this.admin.namespaces().createNamespace("my-property/throttling_publish", Sets.newHashSet("test"));
        PublishRate publishRate = new PublishRate();
        publishRate.publishThrottlingRateInByte = 400L;
        ProducerImpl create = this.pulsarClient.newProducer().topic("persistent://my-property/throttling_publish/brokerTopicThrottlingByteBlock").enableBatching(false).maxPendingMessages(CMAESOptimizer.DEFAULT_MAXITERATIONS).create();
        PersistentTopic persistentTopic = (PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopicIfExists("persistent://my-property/throttling_publish/brokerTopicThrottlingByteBlock").get()).get();
        Assert.assertEquals(persistentTopic.getBrokerPublishRateLimiter(), PublishRateLimiter.DISABLED_RATE_LIMITER);
        Assert.assertEquals(persistentTopic.getTopicPublishRateLimiter(), PublishRateLimiter.DISABLED_RATE_LIMITER);
        this.admin.brokers().updateDynamicConfiguration("brokerPublisherThrottlingMaxByteRate", Long.toString(800L));
        this.admin.namespaces().setPublishRate("my-property/throttling_publish", publishRate);
        retryStrategically(r4 -> {
            return !persistentTopic.getTopicPublishRateLimiter().equals(PublishRateLimiter.DISABLED_RATE_LIMITER);
        }, 5, 200L);
        retryStrategically(r42 -> {
            return persistentTopic.getBrokerPublishRateLimiter() != PublishRateLimiter.DISABLED_RATE_LIMITER;
        }, 5, 200L);
        log.info("Get broker configuration after enable: brokerTick {},  MaxMessageRate {}, MaxByteRate {}", Integer.valueOf(this.pulsar.getConfiguration().getBrokerPublisherThrottlingTickTimeMillis()), Integer.valueOf(this.pulsar.getConfiguration().getBrokerPublisherThrottlingMaxMessageRate()), Long.valueOf(this.pulsar.getConfiguration().getBrokerPublisherThrottlingMaxByteRate()));
        Assert.assertNotEquals(persistentTopic.getBrokerPublishRateLimiter(), PublishRateLimiter.DISABLED_RATE_LIMITER);
        Assert.assertNotEquals(persistentTopic.getTopicPublishRateLimiter(), PublishRateLimiter.DISABLED_RATE_LIMITER);
        Producer producer = (Producer) persistentTopic.getProducers().values().iterator().next();
        producer.updateRates();
        int i = 40;
        int i2 = 80;
        for (int i3 = 0; i3 < 40; i3++) {
            create.send(new byte[80]);
        }
        producer.updateRates();
        double d = producer.getStats().msgThroughputIn;
        log.info("1-st byte rate in 1: {}, total: {} ", Double.valueOf(d), Integer.valueOf(40 * 80));
        org.junit.Assert.assertTrue(d < ((double) (40 * 80)));
        ArrayList newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(3);
        ArrayList newArrayListWithExpectedSize2 = Lists.newArrayListWithExpectedSize(3);
        for (int i4 = 0; i4 < 3; i4++) {
            String str = "persistent://my-property/throttling_publish/brokerTopicThrottlingByteBlock" + i4;
            ProducerImpl create2 = this.pulsarClient.newProducer().topic(str).enableBatching(false).maxPendingMessages(CMAESOptimizer.DEFAULT_MAXITERATIONS).create();
            PersistentTopic persistentTopic2 = (PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopicIfExists(str).get()).get();
            newArrayListWithExpectedSize.add(create2);
            newArrayListWithExpectedSize2.add(persistentTopic2);
            Assert.assertNotEquals(persistentTopic2.getBrokerPublishRateLimiter(), PublishRateLimiter.DISABLED_RATE_LIMITER);
            this.admin.namespaces().setPublishRate("my-property/throttling_publish", publishRate);
            retryStrategically(r43 -> {
                return !persistentTopic2.getTopicPublishRateLimiter().equals(PublishRateLimiter.DISABLED_RATE_LIMITER);
            }, 5, 200L);
            Assert.assertNotEquals(persistentTopic2.getTopicPublishRateLimiter(), PublishRateLimiter.DISABLED_RATE_LIMITER);
        }
        ArrayList newArrayListWithExpectedSize3 = Lists.newArrayListWithExpectedSize(3);
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        AtomicDouble atomicDouble = new AtomicDouble(CMAESOptimizer.DEFAULT_STOPFITNESS);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        CountDownLatch countDownLatch = new CountDownLatch(3);
        for (int i5 = 0; i5 < 3; i5++) {
            newArrayListWithExpectedSize3.add(() -> {
                int incrementAndGet = atomicInteger.incrementAndGet();
                ProducerImpl producerImpl = (ProducerImpl) newArrayListWithExpectedSize.get(incrementAndGet);
                Producer producer2 = (Producer) ((PersistentTopic) newArrayListWithExpectedSize2.get(incrementAndGet)).getProducers().values().iterator().next();
                producer2.updateRates();
                for (int i6 = 0; i6 < i; i6++) {
                    producerImpl.send(new byte[i2]);
                }
                producer2.updateRates();
                atomicDouble.addAndGet(producer2.getStats().msgThroughputIn);
                countDownLatch.countDown();
                return null;
            });
        }
        newSingleThreadExecutor.invokeAll(newArrayListWithExpectedSize3);
        countDownLatch.await(2L, TimeUnit.SECONDS);
        log.info("2-nd rate in: {}, total: {} ", Double.valueOf(atomicDouble.get()), Integer.valueOf(3 * 40 * 80));
        org.junit.Assert.assertTrue(d < atomicDouble.get());
        org.junit.Assert.assertTrue(d < ((double) ((3 * 40) * 80)));
        publishRate.publishThrottlingRateInByte = -1L;
        this.admin.namespaces().setPublishRate("my-property/throttling_publish", publishRate);
        retryStrategically(r44 -> {
            return persistentTopic.getTopicPublishRateLimiter().equals(PublishRateLimiter.DISABLED_RATE_LIMITER);
        }, 5, 200L);
        Assert.assertEquals(persistentTopic.getTopicPublishRateLimiter(), PublishRateLimiter.DISABLED_RATE_LIMITER);
        producer.updateRates();
        for (int i6 = 0; i6 < 40; i6++) {
            create.send(new byte[80]);
        }
        producer.updateRates();
        double d2 = producer.getStats().msgThroughputIn;
        log.info("3-rd byte rate in: {}, rate in 2: {},  total: {} ", Double.valueOf(d), Double.valueOf(d2), Integer.valueOf(40 * 80));
        org.junit.Assert.assertTrue(d < d2);
        org.junit.Assert.assertTrue(d2 < ((double) (40 * 80)));
        this.admin.brokers().updateDynamicConfiguration("brokerPublisherThrottlingMaxByteRate", Long.toString(0L));
        retryStrategically(r45 -> {
            return persistentTopic.getBrokerPublishRateLimiter().equals(PublishRateLimiter.DISABLED_RATE_LIMITER);
        }, 5, 200L);
        log.info("Get broker configuration after disable: brokerTick {},  MaxMessageRate {}, MaxByteRate {}", Integer.valueOf(this.pulsar.getConfiguration().getBrokerPublisherThrottlingTickTimeMillis()), Integer.valueOf(this.pulsar.getConfiguration().getBrokerPublisherThrottlingMaxMessageRate()), Long.valueOf(this.pulsar.getConfiguration().getBrokerPublisherThrottlingMaxByteRate()));
        Assert.assertEquals(persistentTopic.getBrokerPublishRateLimiter(), PublishRateLimiter.DISABLED_RATE_LIMITER);
        producer.updateRates();
        for (int i7 = 0; i7 < 40; i7++) {
            create.send(new byte[80]);
        }
        producer.updateRates();
        double d3 = producer.getStats().msgThroughputIn;
        log.info("4-th byte rate in: {}, total: {} ", Double.valueOf(d3), Integer.valueOf(40 * 80));
        org.junit.Assert.assertTrue(d3 > ((double) (40 * 80)));
        create.close();
    }
}
