package org.apache.pulsar.broker.service;

import com.google.common.collect.Sets;
import java.net.URL;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.pulsar.broker.ConfigHelper;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/broker/service/BacklogQuotaManagerTest.class */
public class BacklogQuotaManagerTest {
    PulsarService pulsar;
    ServiceConfiguration config;
    URL adminUrl;
    PulsarAdmin admin;
    LocalBookkeeperEnsemble bkEnsemble;
    private static final int TIME_TO_CHECK_BACKLOG_QUOTA = 5;
    private static final int MAX_ENTRIES_PER_LEDGER = 5;
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) BacklogQuotaManagerTest.class);

    @BeforeMethod
    void setup() throws Exception {
        try {
            this.bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> {
                return 0;
            });
            this.bkEnsemble.start();
            this.config = new ServiceConfiguration();
            this.config.setZookeeperServers("127.0.0.1:" + this.bkEnsemble.getZookeeperPort());
            this.config.setAdvertisedAddress("localhost");
            this.config.setWebServicePort(Optional.ofNullable(0));
            this.config.setClusterName("usc");
            this.config.setBrokerServicePort(Optional.ofNullable(0));
            this.config.setAuthorizationEnabled(false);
            this.config.setAuthenticationEnabled(false);
            this.config.setBacklogQuotaCheckIntervalInSeconds(5);
            this.config.setManagedLedgerMaxEntriesPerLedger(5);
            this.config.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
            this.config.setAllowAutoTopicCreationType("non-partitioned");
            this.pulsar = new PulsarService(this.config);
            this.pulsar.start();
            this.adminUrl = new URL("http://127.0.0.1:" + this.pulsar.getListenPortHTTP().get());
            this.admin = PulsarAdmin.builder().serviceHttpUrl(this.adminUrl.toString()).build();
            this.admin.clusters().createCluster("usc", new ClusterData(this.adminUrl.toString()));
            this.admin.tenants().createTenant("prop", new TenantInfo(Sets.newHashSet("appid1"), Sets.newHashSet("usc")));
            this.admin.namespaces().createNamespace("prop/ns-quota");
            this.admin.namespaces().setNamespaceReplicationClusters("prop/ns-quota", Sets.newHashSet("usc"));
            this.admin.namespaces().createNamespace("prop/quotahold");
            this.admin.namespaces().setNamespaceReplicationClusters("prop/quotahold", Sets.newHashSet("usc"));
            this.admin.namespaces().createNamespace("prop/quotaholdasync");
            this.admin.namespaces().setNamespaceReplicationClusters("prop/quotaholdasync", Sets.newHashSet("usc"));
        } catch (Throwable th) {
            LOG.error("Error setting up broker test", th);
            Assert.fail("Broker test setup failed");
        }
    }

    @AfterMethod
    void shutdown() throws Exception {
        try {
            this.admin.close();
            this.pulsar.close();
            this.bkEnsemble.stop();
        } catch (Throwable th) {
            LOG.error("Error cleaning up broker test setup state", th);
            Assert.fail("Broker test cleanup failed");
        }
    }

    private void rolloverStats() {
        this.pulsar.getBrokerService().updateRates();
    }

    @Test
    public void testBacklogQuotaWithReader() throws Exception {
        Assert.assertEquals(this.admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), ConfigHelper.backlogQuotaMap(this.config));
        this.admin.namespaces().setBacklogQuota("prop/ns-quota", new BacklogQuota(10240L, BacklogQuota.RetentionPolicy.producer_exception));
        PulsarClient build = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        Throwable th = null;
        try {
            Reader<byte[]> create = build.newReader().topic("persistent://prop/ns-quota/topic1").receiverQueueSize(1).startMessageId(MessageId.latest).create();
            Producer<byte[]> create2 = build.newProducer().topic("persistent://prop/ns-quota/topic1").sendTimeout(2, TimeUnit.SECONDS).create();
            byte[] bArr = new byte[1024];
            for (int i = 0; i < 20; i++) {
                bArr[0] = (byte) (bArr[0] + 1);
                create2.send(bArr);
            }
            Thread.sleep(6000L);
            rolloverStats();
            TopicStats stats = this.admin.topics().getStats("persistent://prop/ns-quota/topic1");
            Assert.assertEquals(stats.backlogSize, 0L, "backlog size is [" + stats.backlogSize + "]");
            Assert.assertEquals(stats.subscriptions.size(), 1);
            long j = ((SubscriptionStats) stats.subscriptions.values().iterator().next()).msgBacklog;
            Assert.assertEquals(j, 20L, "non-durable subscription backlog is [" + j + "]");
            for (int i2 = 0; i2 < 20; i2++) {
                try {
                    bArr[0] = (byte) (bArr[0] + 1);
                    create2.send(bArr);
                } catch (PulsarClientException e) {
                    Assert.fail("Should not have gotten exception: " + e.getMessage());
                }
            }
            PersistentTopicInternalStats internalStats = this.admin.topics().getInternalStats("persistent://prop/ns-quota/topic1");
            Assert.assertEquals(internalStats.ledgers.size(), 1);
            Assert.assertEquals(((PersistentTopicInternalStats.LedgerInfo) internalStats.ledgers.get(0)).ledgerId, 7L);
            while (true) {
                Message<byte[]> readNext = create.readNext(5, TimeUnit.SECONDS);
                if (readNext == null) {
                    break;
                } else {
                    LOG.info("msg read: {} - {}", readNext.getMessageId(), Byte.valueOf(readNext.getData()[0]));
                }
            }
            if (build != null) {
                if (0 == 0) {
                    build.close();
                    return;
                }
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (build != null) {
                if (0 != 0) {
                    try {
                        build.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    build.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testConsumerBacklogEviction() throws Exception {
        Assert.assertEquals(this.admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), ConfigHelper.backlogQuotaMap(this.config));
        this.admin.namespaces().setBacklogQuota("prop/ns-quota", new BacklogQuota(10240L, BacklogQuota.RetentionPolicy.consumer_backlog_eviction));
        PulsarClient build = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        Consumer<byte[]> subscribe = build.newConsumer().topic("persistent://prop/ns-quota/topic1").subscriptionName("c1").subscribe();
        Consumer<byte[]> subscribe2 = build.newConsumer().topic("persistent://prop/ns-quota/topic1").subscriptionName("c2").subscribe();
        Producer<byte[]> create = build.newProducer().topic("persistent://prop/ns-quota/topic1").create();
        byte[] bArr = new byte[1024];
        for (int i = 0; i < 20; i++) {
            create.send(bArr);
            subscribe.receive();
            subscribe2.receive();
        }
        Thread.sleep(6000L);
        rolloverStats();
        TopicStats stats = this.admin.topics().getStats("persistent://prop/ns-quota/topic1");
        Assert.assertTrue(stats.backlogSize < 10240, "Storage size is [" + stats.storageSize + "]");
        build.close();
    }

    @Test
    public void testConsumerBacklogEvictionWithAck() throws Exception {
        Assert.assertEquals(this.admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), ConfigHelper.backlogQuotaMap(this.config));
        this.admin.namespaces().setBacklogQuota("prop/ns-quota", new BacklogQuota(10240L, BacklogQuota.RetentionPolicy.consumer_backlog_eviction));
        PulsarClient build = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).build();
        Consumer<byte[]> subscribe = build.newConsumer().topic("persistent://prop/ns-quota/topic11").subscriptionName("c11").subscribe();
        Consumer<byte[]> subscribe2 = build.newConsumer().topic("persistent://prop/ns-quota/topic11").subscriptionName("c21").subscribe();
        Producer<byte[]> create = build.newProducer().topic("persistent://prop/ns-quota/topic11").create();
        byte[] bArr = new byte[1024];
        for (int i = 0; i < 20; i++) {
            create.send(bArr);
            subscribe.acknowledge(subscribe.receive());
            subscribe2.receive();
        }
        Thread.sleep(6000L);
        rolloverStats();
        TopicStats stats = this.admin.topics().getStats("persistent://prop/ns-quota/topic11");
        Assert.assertTrue(stats.backlogSize <= 10240, "Storage size is [" + stats.storageSize + "]");
        build.close();
    }

    @Test
    public void testConcurrentAckAndEviction() throws Exception {
        Assert.assertEquals(this.admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), ConfigHelper.backlogQuotaMap(this.config));
        this.admin.namespaces().setBacklogQuota("prop/ns-quota", new BacklogQuota(10240L, BacklogQuota.RetentionPolicy.consumer_backlog_eviction));
        final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
        final CountDownLatch countDownLatch = new CountDownLatch(2);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        final PulsarClient build = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        PulsarClient build2 = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        final Consumer<byte[]> subscribe = build2.newConsumer().topic("persistent://prop/ns-quota/topic12").subscriptionName("c12").subscribe();
        final Consumer<byte[]> subscribe2 = build2.newConsumer().topic("persistent://prop/ns-quota/topic12").subscriptionName("c22").subscribe();
        Thread thread = new Thread() { // from class: org.apache.pulsar.broker.service.BacklogQuotaManagerTest.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    try {
                        cyclicBarrier.await();
                        Producer<byte[]> create = build.newProducer().topic("persistent://prop/ns-quota/topic12").create();
                        byte[] bArr = new byte[1024];
                        for (int i = 0; i < 20; i++) {
                            create.send(bArr);
                        }
                        create.close();
                        countDownLatch.countDown();
                    } catch (Exception e) {
                        atomicBoolean.set(true);
                        countDownLatch.countDown();
                    }
                } catch (Throwable th) {
                    countDownLatch.countDown();
                    throw th;
                }
            }
        };
        Thread thread2 = new Thread() { // from class: org.apache.pulsar.broker.service.BacklogQuotaManagerTest.2
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    cyclicBarrier.await();
                    for (int i = 0; i < 20; i++) {
                        subscribe.acknowledge(subscribe.receive());
                        subscribe2.receive();
                    }
                } catch (Exception e) {
                    atomicBoolean.set(true);
                } finally {
                    countDownLatch.countDown();
                }
            }
        };
        thread.start();
        thread2.start();
        countDownLatch.await(20L, TimeUnit.SECONDS);
        Assert.assertFalse(atomicBoolean.get());
        Thread.sleep(6000L);
        rolloverStats();
        TopicStats stats = this.admin.topics().getStats("persistent://prop/ns-quota/topic12");
        Assert.assertTrue(stats.backlogSize <= 10240, "Storage size is [" + stats.storageSize + "]");
        build.close();
        build2.close();
    }

    @Test
    public void testNoEviction() throws Exception {
        Assert.assertEquals(this.admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), ConfigHelper.backlogQuotaMap(this.config));
        this.admin.namespaces().setBacklogQuota("prop/ns-quota", new BacklogQuota(10240L, BacklogQuota.RetentionPolicy.consumer_backlog_eviction));
        final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
        final CountDownLatch countDownLatch = new CountDownLatch(2);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        PulsarClient build = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        final Consumer<byte[]> subscribe = build.newConsumer().topic("persistent://prop/ns-quota/topic13").subscriptionName("c13").subscribe();
        final Consumer<byte[]> subscribe2 = build.newConsumer().topic("persistent://prop/ns-quota/topic13").subscriptionName("c23").subscribe();
        final PulsarClient build2 = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        Thread thread = new Thread() { // from class: org.apache.pulsar.broker.service.BacklogQuotaManagerTest.3
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    try {
                        cyclicBarrier.await();
                        Producer<byte[]> create = build2.newProducer().topic("persistent://prop/ns-quota/topic13").create();
                        byte[] bArr = new byte[1024];
                        for (int i = 0; i < 10; i++) {
                            create.send(bArr);
                        }
                        create.close();
                        countDownLatch.countDown();
                    } catch (Exception e) {
                        atomicBoolean.set(true);
                        countDownLatch.countDown();
                    }
                } catch (Throwable th) {
                    countDownLatch.countDown();
                    throw th;
                }
            }
        };
        Thread thread2 = new Thread() { // from class: org.apache.pulsar.broker.service.BacklogQuotaManagerTest.4
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    cyclicBarrier.await();
                    for (int i = 0; i < 10; i++) {
                        subscribe.acknowledge(subscribe.receive());
                        subscribe2.acknowledge(subscribe2.receive());
                    }
                } catch (Exception e) {
                    atomicBoolean.set(true);
                } finally {
                    countDownLatch.countDown();
                }
            }
        };
        thread.start();
        thread2.start();
        countDownLatch.await();
        Assert.assertFalse(atomicBoolean.get());
        build.close();
        build2.close();
    }

    @Test
    public void testEvictionMulti() throws Exception {
        Assert.assertEquals(this.admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), ConfigHelper.backlogQuotaMap(this.config));
        this.admin.namespaces().setBacklogQuota("prop/ns-quota", new BacklogQuota(15360L, BacklogQuota.RetentionPolicy.consumer_backlog_eviction));
        final CyclicBarrier cyclicBarrier = new CyclicBarrier(4);
        final CountDownLatch countDownLatch = new CountDownLatch(4);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        PulsarClient build = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        final Consumer<byte[]> subscribe = build.newConsumer().topic("persistent://prop/ns-quota/topic14").subscriptionName("c14").subscribe();
        final Consumer<byte[]> subscribe2 = build.newConsumer().topic("persistent://prop/ns-quota/topic14").subscriptionName("c24").subscribe();
        final PulsarClient build2 = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        final PulsarClient build3 = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        Thread thread = new Thread() { // from class: org.apache.pulsar.broker.service.BacklogQuotaManagerTest.5
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    try {
                        cyclicBarrier.await();
                        Producer<byte[]> create = build3.newProducer().topic("persistent://prop/ns-quota/topic14").create();
                        byte[] bArr = new byte[1024];
                        for (int i = 0; i < 10; i++) {
                            create.send(bArr);
                        }
                        create.close();
                        countDownLatch.countDown();
                    } catch (Exception e) {
                        atomicBoolean.set(true);
                        countDownLatch.countDown();
                    }
                } catch (Throwable th) {
                    countDownLatch.countDown();
                    throw th;
                }
            }
        };
        Thread thread2 = new Thread() { // from class: org.apache.pulsar.broker.service.BacklogQuotaManagerTest.6
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    try {
                        cyclicBarrier.await();
                        Producer<byte[]> create = build2.newProducer().topic("persistent://prop/ns-quota/topic14").create();
                        byte[] bArr = new byte[1024];
                        for (int i = 0; i < 10; i++) {
                            create.send(bArr);
                        }
                        create.close();
                        countDownLatch.countDown();
                    } catch (Exception e) {
                        atomicBoolean.set(true);
                        countDownLatch.countDown();
                    }
                } catch (Throwable th) {
                    countDownLatch.countDown();
                    throw th;
                }
            }
        };
        Thread thread3 = new Thread() { // from class: org.apache.pulsar.broker.service.BacklogQuotaManagerTest.7
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    cyclicBarrier.await();
                    for (int i = 0; i < 20; i++) {
                        subscribe.acknowledge(subscribe.receive());
                    }
                } catch (Exception e) {
                    atomicBoolean.set(true);
                } finally {
                    countDownLatch.countDown();
                }
            }
        };
        Thread thread4 = new Thread() { // from class: org.apache.pulsar.broker.service.BacklogQuotaManagerTest.8
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    cyclicBarrier.await();
                    for (int i = 0; i < 20; i++) {
                        subscribe2.acknowledge(subscribe2.receive());
                    }
                } catch (Exception e) {
                    atomicBoolean.set(true);
                } finally {
                    countDownLatch.countDown();
                }
            }
        };
        thread.start();
        thread2.start();
        thread3.start();
        thread4.start();
        countDownLatch.await(20L, TimeUnit.SECONDS);
        Assert.assertFalse(atomicBoolean.get());
        Thread.sleep(6000L);
        rolloverStats();
        TopicStats stats = this.admin.topics().getStats("persistent://prop/ns-quota/topic14");
        Assert.assertTrue(stats.backlogSize <= 15360, "Storage size is [" + stats.storageSize + "]");
        build.close();
        build3.close();
        build2.close();
    }

    @Test
    public void testAheadProducerOnHold() throws Exception {
        Assert.assertEquals(this.admin.namespaces().getBacklogQuotaMap("prop/quotahold"), ConfigHelper.backlogQuotaMap(this.config));
        this.admin.namespaces().setBacklogQuota("prop/quotahold", new BacklogQuota(10240L, BacklogQuota.RetentionPolicy.producer_request_hold));
        PulsarClient build = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        Consumer<byte[]> subscribe = build.newConsumer().topic("persistent://prop/quotahold/hold").subscriptionName("c1hold").subscribe();
        byte[] bArr = new byte[1024];
        Producer<byte[]> create = build.newProducer().topic("persistent://prop/quotahold/hold").sendTimeout(2, TimeUnit.SECONDS).create();
        for (int i = 0; i <= 10; i++) {
            try {
                create.send(bArr);
                LOG.info("sent [{}]", Integer.valueOf(i));
            } catch (PulsarClientException.TimeoutException e) {
                LOG.info("timeout on [{}]", Integer.valueOf(i));
            }
        }
        for (int i2 = 0; i2 < 10; i2++) {
            subscribe.receive();
            LOG.info("received [{}]", Integer.valueOf(i2));
        }
        Thread.sleep(6000L);
        rolloverStats();
        TopicStats stats = this.admin.topics().getStats("persistent://prop/quotahold/hold");
        Assert.assertEquals(stats.publishers.size(), 0, "Number of producers on topic persistent://prop/quotahold/hold are [" + stats.publishers.size() + "]");
        build.close();
    }

    @Test
    public void testAheadProducerOnHoldTimeout() throws Exception {
        Assert.assertEquals(this.admin.namespaces().getBacklogQuotaMap("prop/quotahold"), ConfigHelper.backlogQuotaMap(this.config));
        this.admin.namespaces().setBacklogQuota("prop/quotahold", new BacklogQuota(10240L, BacklogQuota.RetentionPolicy.producer_request_hold));
        PulsarClient build = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        boolean z = false;
        build.newConsumer().topic("persistent://prop/quotahold/holdtimeout").subscriptionName("c1holdtimeout").subscribe();
        byte[] bArr = new byte[1024];
        Producer<byte[]> create = build.newProducer().topic("persistent://prop/quotahold/holdtimeout").sendTimeout(2, TimeUnit.SECONDS).create();
        for (int i = 0; i < 10; i++) {
            create.send(bArr);
        }
        Thread.sleep(6000L);
        try {
            create.send(bArr);
            create.send(bArr);
            Assert.fail("backlog quota did not exceed");
        } catch (PulsarClientException.TimeoutException e) {
            z = true;
        }
        Assert.assertTrue(z, "timeout did not occur");
        build.close();
    }

    @Test
    public void testProducerException() throws Exception {
        Assert.assertEquals(this.admin.namespaces().getBacklogQuotaMap("prop/quotahold"), ConfigHelper.backlogQuotaMap(this.config));
        this.admin.namespaces().setBacklogQuota("prop/quotahold", new BacklogQuota(10240L, BacklogQuota.RetentionPolicy.producer_exception));
        PulsarClient build = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        boolean z = false;
        build.newConsumer().topic("persistent://prop/quotahold/except").subscriptionName("c1except").subscribe();
        byte[] bArr = new byte[1024];
        Producer<byte[]> create = build.newProducer().topic("persistent://prop/quotahold/except").sendTimeout(2, TimeUnit.SECONDS).create();
        for (int i = 0; i < 10; i++) {
            create.send(bArr);
        }
        Thread.sleep(6000L);
        try {
            create.send(bArr);
            create.send(bArr);
            Assert.fail("backlog quota did not exceed");
        } catch (PulsarClientException e) {
            Assert.assertTrue((e instanceof PulsarClientException.ProducerBlockedQuotaExceededException) || (e instanceof PulsarClientException.TimeoutException), e.getMessage());
            z = true;
        }
        Assert.assertTrue(z, "backlog exceeded exception did not occur");
        build.close();
    }

    @Test
    public void testProducerExceptionAndThenUnblock() throws Exception {
        Assert.assertEquals(this.admin.namespaces().getBacklogQuotaMap("prop/quotahold"), ConfigHelper.backlogQuotaMap(this.config));
        this.admin.namespaces().setBacklogQuota("prop/quotahold", new BacklogQuota(10240L, BacklogQuota.RetentionPolicy.producer_exception));
        PulsarClient build = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        boolean z = false;
        Consumer<byte[]> subscribe = build.newConsumer().topic("persistent://prop/quotahold/exceptandunblock").subscriptionName("c1except").subscribe();
        byte[] bArr = new byte[1024];
        Producer<byte[]> create = build.newProducer().topic("persistent://prop/quotahold/exceptandunblock").sendTimeout(2, TimeUnit.SECONDS).create();
        for (int i = 0; i < 10; i++) {
            create.send(bArr);
        }
        Thread.sleep(6000L);
        try {
            create.send(bArr);
            create.send(bArr);
            Assert.fail("backlog quota did not exceed");
        } catch (PulsarClientException e) {
            Assert.assertTrue((e instanceof PulsarClientException.ProducerBlockedQuotaExceededException) || (e instanceof PulsarClientException.TimeoutException), e.getMessage());
            z = true;
        }
        Assert.assertTrue(z, "backlog exceeded exception did not occur");
        int i2 = (int) ((SubscriptionStats) this.admin.topics().getStats("persistent://prop/quotahold/exceptandunblock").subscriptions.get("c1except")).msgBacklog;
        for (int i3 = 0; i3 < i2; i3++) {
            subscribe.acknowledge((Message<?>) subscribe.receive());
        }
        Thread.sleep(6000L);
        Exception exc = null;
        boolean z2 = false;
        for (int i4 = 0; i4 < 5; i4++) {
            try {
                create.send(bArr);
            } catch (Exception e2) {
                z2 = true;
                exc = e2;
            }
        }
        Assert.assertFalse(z2, "unable to publish due to " + exc);
        build.close();
    }
}
