package org.apache.pulsar.broker.service;

import com.beust.jcommander.internal.Maps;
import com.google.common.collect.Sets;
import java.net.URL;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/service/BacklogQuotaManagerTest.class */
public class BacklogQuotaManagerTest {
    PulsarService pulsar;
    ServiceConfiguration config;
    URL adminUrl;
    PulsarAdmin admin;
    LocalBookkeeperEnsemble bkEnsemble;
    private static final int TIME_TO_CHECK_BACKLOG_QUOTA = 2;
    private static final int MAX_ENTRIES_PER_LEDGER = 5;
    private static final Logger LOG = LoggerFactory.getLogger(BacklogQuotaManagerTest.class);

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "backlogQuotaSizeGB")
    public Object[][] backlogQuotaSizeGB() {
        return new Object[]{new Object[]{true}, new Object[]{false}};
    }

    @BeforeClass
    void setup() throws Exception {
        try {
            this.bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> {
                return 0;
            });
            this.bkEnsemble.start();
            this.config = new ServiceConfiguration();
            this.config.setZookeeperServers("127.0.0.1:" + this.bkEnsemble.getZookeeperPort());
            this.config.setAdvertisedAddress("localhost");
            this.config.setWebServicePort(Optional.of(0));
            this.config.setClusterName("usc");
            this.config.setBrokerShutdownTimeoutMs(0L);
            this.config.setBrokerServicePort(Optional.of(0));
            this.config.setAuthorizationEnabled(false);
            this.config.setAuthenticationEnabled(false);
            this.config.setBacklogQuotaCheckIntervalInSeconds(2);
            this.config.setManagedLedgerMaxEntriesPerLedger(5);
            this.config.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
            this.config.setAllowAutoTopicCreationType("non-partitioned");
            this.config.setSystemTopicEnabled(false);
            this.config.setTopicLevelPoliciesEnabled(false);
            this.config.setForceDeleteNamespaceAllowed(true);
            this.pulsar = new PulsarService(this.config);
            this.pulsar.start();
            this.adminUrl = new URL("http://127.0.0.1:" + this.pulsar.getListenPortHTTP().get());
            this.admin = PulsarAdmin.builder().serviceHttpUrl(this.adminUrl.toString()).build();
            this.admin.clusters().createCluster("usc", ClusterData.builder().serviceUrl(this.adminUrl.toString()).build());
            this.admin.tenants().createTenant("prop", new TenantInfoImpl(Sets.newHashSet(new String[]{"appid1"}), Sets.newHashSet(new String[]{"usc"})));
        } catch (Throwable th) {
            LOG.error("Error setting up broker test", th);
            Assert.fail("Broker test setup failed");
        }
    }

    @AfterClass(alwaysRun = true)
    void shutdown() throws Exception {
        try {
            if (this.admin != null) {
                this.admin.close();
                this.admin = null;
            }
            if (this.pulsar != null) {
                this.pulsar.close();
                this.pulsar = null;
            }
            if (this.bkEnsemble != null) {
                this.bkEnsemble.stop();
                this.bkEnsemble = null;
            }
        } catch (Throwable th) {
            LOG.error("Error cleaning up broker test setup state", th);
            Assert.fail("Broker test cleanup failed");
        }
    }

    @BeforeMethod(alwaysRun = true)
    void createNamespaces() throws PulsarAdminException {
        this.config.setPreciseTimeBasedBacklogQuotaCheck(false);
        this.admin.namespaces().createNamespace("prop/ns-quota");
        this.admin.namespaces().setNamespaceReplicationClusters("prop/ns-quota", Sets.newHashSet(new String[]{"usc"}));
        this.admin.namespaces().createNamespace("prop/quotahold");
        this.admin.namespaces().setNamespaceReplicationClusters("prop/quotahold", Sets.newHashSet(new String[]{"usc"}));
        this.admin.namespaces().createNamespace("prop/quotaholdasync");
        this.admin.namespaces().setNamespaceReplicationClusters("prop/quotaholdasync", Sets.newHashSet(new String[]{"usc"}));
    }

    @AfterMethod(alwaysRun = true)
    void clearNamespaces() throws PulsarAdminException {
        this.admin.namespaces().deleteNamespace("prop/ns-quota", true);
        this.admin.namespaces().deleteNamespace("prop/quotahold", true);
        this.admin.namespaces().deleteNamespace("prop/quotaholdasync", true);
    }

    private void rolloverStats() {
        this.pulsar.getBrokerService().updateRates();
    }

    @Test
    public void testBacklogQuotaWithReader() throws Exception {
        Assert.assertEquals(this.admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), Maps.newHashMap());
        this.admin.namespaces().setBacklogQuota("prop/ns-quota", BacklogQuota.builder().limitSize(10240L).limitTime(2).retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception).build());
        PulsarClient build = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            Reader create = build.newReader().topic("persistent://prop/ns-quota/topic1").receiverQueueSize(1).startMessageId(MessageId.latest).create();
            Producer<byte[]> createProducer = createProducer(build, "persistent://prop/ns-quota/topic1");
            byte[] bArr = new byte[1024];
            for (int i = 0; i < 20; i++) {
                bArr[0] = (byte) (bArr[0] + 1);
                createProducer.send(bArr);
            }
            Thread.sleep(3000L);
            rolloverStats();
            TopicStats stats = this.admin.topics().getStats("persistent://prop/ns-quota/topic1");
            Assert.assertEquals(stats.getBacklogSize(), 0L, "backlog size is [" + stats.getBacklogSize() + "]");
            Assert.assertEquals(stats.getSubscriptions().size(), 1);
            long msgBacklog = ((SubscriptionStats) stats.getSubscriptions().values().iterator().next()).getMsgBacklog();
            Assert.assertEquals(msgBacklog, 5L, "non-durable subscription backlog is [" + msgBacklog + "]");
            MessageIdImpl messageIdImpl = null;
            for (int i2 = 0; i2 < 20; i2++) {
                try {
                    bArr[0] = (byte) (bArr[0] + 1);
                    messageIdImpl = (MessageIdImpl) createProducer.send(bArr);
                } catch (PulsarClientException e) {
                    Assert.fail("Should not have gotten exception: " + e.getMessage());
                }
            }
            MessageIdImpl messageIdImpl2 = messageIdImpl;
            Awaitility.await().untilAsserted(() -> {
                PersistentTopicInternalStats internalStats = this.admin.topics().getInternalStats("persistent://prop/ns-quota/topic1", false);
                Assert.assertEquals(internalStats.ledgers.size(), 1);
                Assert.assertEquals(((ManagedLedgerInternalStats.LedgerInfo) internalStats.ledgers.get(0)).ledgerId, messageIdImpl2.getLedgerId());
            });
            while (true) {
                Message readNext = create.readNext(5, TimeUnit.SECONDS);
                if (readNext == null) {
                    break;
                } else {
                    LOG.info("msg read: {} - {}", readNext.getMessageId(), Byte.valueOf(readNext.getData()[0]));
                }
            }
            if (build != null) {
                build.close();
            }
        } catch (Throwable th) {
            if (build != null) {
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testTriggerBacklogQuotaSizeWithReader() throws Exception {
        Assert.assertEquals(this.admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), Maps.newHashMap());
        this.admin.namespaces().setBacklogQuota("prop/ns-quota", BacklogQuota.builder().limitSize(10240L).limitTime(2).retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception).build());
        PulsarClient build = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            String str = "persistent://prop/ns-quota/topic1" + UUID.randomUUID();
            Reader create = build.newReader().topic(str).receiverQueueSize(1).startMessageId(MessageId.latest).create();
            Producer<byte[]> createProducer = createProducer(build, str);
            byte[] bArr = new byte[1024];
            for (int i = 0; i < 20; i++) {
                bArr[0] = (byte) (bArr[0] + 1);
                createProducer.send(bArr);
            }
            Thread.sleep(2000L);
            this.admin.brokers().backlogQuotaCheck();
            rolloverStats();
            TopicStats stats = this.admin.topics().getStats(str);
            Assert.assertEquals(stats.getBacklogSize(), 0L, "backlog size is [" + stats.getBacklogSize() + "]");
            Assert.assertEquals(stats.getSubscriptions().size(), 1);
            long msgBacklog = ((SubscriptionStats) stats.getSubscriptions().values().iterator().next()).getMsgBacklog();
            Assert.assertEquals(msgBacklog, 5L, "non-durable subscription backlog is [" + msgBacklog + "]");
            MessageIdImpl messageIdImpl = null;
            for (int i2 = 0; i2 < 20; i2++) {
                try {
                    bArr[0] = (byte) (bArr[0] + 1);
                    messageIdImpl = (MessageIdImpl) createProducer.send(bArr);
                } catch (PulsarClientException e) {
                    Assert.fail("Should not have gotten exception: " + e.getMessage());
                }
            }
            MessageIdImpl messageIdImpl2 = messageIdImpl;
            Awaitility.await().untilAsserted(() -> {
                PersistentTopicInternalStats internalStats = this.admin.topics().getInternalStats(str, false);
                Assert.assertEquals(internalStats.ledgers.size(), 1);
                Assert.assertEquals(((ManagedLedgerInternalStats.LedgerInfo) internalStats.ledgers.get(0)).ledgerId, messageIdImpl2.getLedgerId());
            });
            while (true) {
                Message readNext = create.readNext(5, TimeUnit.SECONDS);
                if (readNext == null) {
                    break;
                } else {
                    LOG.info("msg read: {} - {}", readNext.getMessageId(), Byte.valueOf(readNext.getData()[0]));
                }
            }
            createProducer.close();
            create.close();
            if (build != null) {
                build.close();
            }
        } catch (Throwable th) {
            if (build != null) {
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testTriggerBacklogTimeQuotaWithReader() throws Exception {
        Assert.assertEquals(this.admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), Maps.newHashMap());
        this.admin.namespaces().setBacklogQuota("prop/ns-quota", BacklogQuota.builder().limitSize(10240L).limitTime(2).retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception).build());
        PulsarClient build = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            String str = "persistent://prop/ns-quota/topic2" + UUID.randomUUID();
            Reader create = build.newReader().topic(str).receiverQueueSize(1).startMessageId(MessageId.latest).create();
            Producer<byte[]> createProducer = createProducer(build, str);
            byte[] bArr = new byte[1024];
            for (int i = 0; i < 9; i++) {
                bArr[0] = (byte) (bArr[0] + 1);
                createProducer.send(bArr);
            }
            Thread.sleep(2000L);
            this.admin.brokers().backlogQuotaCheck();
            rolloverStats();
            TopicStats stats = this.admin.topics().getStats(str);
            Assert.assertEquals(stats.getBacklogSize(), 0L, "backlog size is [" + stats.getBacklogSize() + "]");
            Assert.assertEquals(stats.getSubscriptions().size(), 1);
            long msgBacklog = ((SubscriptionStats) stats.getSubscriptions().values().iterator().next()).getMsgBacklog();
            Assert.assertEquals(msgBacklog, 9L, "non-durable subscription backlog is [" + msgBacklog + "]");
            Awaitility.await().pollDelay(Duration.ofSeconds(2L)).pollInterval(Duration.ofSeconds(1L)).untilAsserted(() -> {
                Assert.assertEquals(this.admin.topics().getInternalStats(str, false).ledgers.size(), 2);
            });
            for (int i2 = 0; i2 < 9; i2++) {
                try {
                    bArr[0] = (byte) (bArr[0] + 1);
                    createProducer.send(bArr);
                } catch (PulsarClientException e) {
                    Assert.fail("Should not have gotten exception: " + e.getMessage());
                }
            }
            while (true) {
                Message readNext = create.readNext(5, TimeUnit.SECONDS);
                if (readNext == null) {
                    break;
                } else {
                    LOG.info("msg read: {} - {}", readNext.getMessageId(), Byte.valueOf(readNext.getData()[0]));
                }
            }
            createProducer.close();
            create.close();
            if (build != null) {
                build.close();
            }
        } catch (Throwable th) {
            if (build != null) {
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testConsumerBacklogEvictionSizeQuota() throws Exception {
        Assert.assertEquals(this.admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), Maps.newHashMap());
        this.admin.namespaces().setBacklogQuota("prop/ns-quota", BacklogQuota.builder().limitSize(10240L).retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction).build());
        PulsarClient build = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            Consumer subscribe = build.newConsumer().topic(new String[]{"persistent://prop/ns-quota/topic2"}).subscriptionName("c1").subscribe();
            Consumer subscribe2 = build.newConsumer().topic(new String[]{"persistent://prop/ns-quota/topic2"}).subscriptionName("c2").subscribe();
            Producer<byte[]> createProducer = createProducer(build, "persistent://prop/ns-quota/topic2");
            byte[] bArr = new byte[1024];
            for (int i = 0; i < 20; i++) {
                createProducer.send(bArr);
                subscribe.receive();
                subscribe2.receive();
            }
            Thread.sleep(3000L);
            rolloverStats();
            TopicStats stats = this.admin.topics().getStats("persistent://prop/ns-quota/topic2");
            Assert.assertTrue(stats.getBacklogSize() < 10240, "Storage size is [" + stats.getStorageSize() + "]");
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
            throw th;
        }
    }

    @Test
    public void testConsumerBacklogEvictionTimeQuotaPrecise() throws Exception {
        Assert.assertEquals(this.admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), Maps.newHashMap());
        this.admin.namespaces().setBacklogQuota("prop/ns-quota", BacklogQuota.builder().limitTime(2).retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction).build(), BacklogQuota.BacklogQuotaType.message_age);
        this.config.setPreciseTimeBasedBacklogQuotaCheck(true);
        PulsarClient build = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        Consumer subscribe = build.newConsumer().topic(new String[]{"persistent://prop/ns-quota/topic3"}).subscriptionName("c1").subscribe();
        Consumer subscribe2 = build.newConsumer().topic(new String[]{"persistent://prop/ns-quota/topic3"}).subscriptionName("c2").subscribe();
        Producer<byte[]> createProducer = createProducer(build, "persistent://prop/ns-quota/topic3");
        byte[] bArr = new byte[1024];
        for (int i = 0; i < 9; i++) {
            createProducer.send(bArr);
            subscribe.receive();
            subscribe2.receive();
        }
        TopicStats stats = this.admin.topics().getStats("persistent://prop/ns-quota/topic3");
        Assert.assertEquals(((SubscriptionStats) stats.getSubscriptions().get("c1")).getMsgBacklog(), 9L);
        Assert.assertEquals(((SubscriptionStats) stats.getSubscriptions().get("c2")).getMsgBacklog(), 9L);
        Thread.sleep(4000L);
        rolloverStats();
        TopicStats stats2 = this.admin.topics().getStats("persistent://prop/ns-quota/topic3");
        Assert.assertEquals(((SubscriptionStats) stats2.getSubscriptions().get("c1")).getMsgBacklog(), 0L);
        Assert.assertEquals(((SubscriptionStats) stats2.getSubscriptions().get("c2")).getMsgBacklog(), 0L);
        build.close();
    }

    @Test
    public void testConsumerBacklogEvictionTimeQuota() throws Exception {
        Assert.assertEquals(this.admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), Maps.newHashMap());
        this.admin.namespaces().setBacklogQuota("prop/ns-quota", BacklogQuota.builder().limitTime(2).retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction).build(), BacklogQuota.BacklogQuotaType.message_age);
        PulsarClient build = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        Consumer subscribe = build.newConsumer().topic(new String[]{"persistent://prop/ns-quota/topic3"}).subscriptionName("c1").subscribe();
        Consumer subscribe2 = build.newConsumer().topic(new String[]{"persistent://prop/ns-quota/topic3"}).subscriptionName("c2").subscribe();
        Producer<byte[]> createProducer = createProducer(build, "persistent://prop/ns-quota/topic3");
        byte[] bArr = new byte[1024];
        for (int i = 0; i < 14; i++) {
            createProducer.send(bArr);
            subscribe.receive();
            subscribe2.receive();
        }
        TopicStats stats = this.admin.topics().getStats("persistent://prop/ns-quota/topic3");
        Assert.assertEquals(((SubscriptionStats) stats.getSubscriptions().get("c1")).getMsgBacklog(), 14L);
        Assert.assertEquals(((SubscriptionStats) stats.getSubscriptions().get("c2")).getMsgBacklog(), 14L);
        ManagedLedgerImpl managedLedger = ((PersistentTopic) this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-quota/topic3").get()).getManagedLedger();
        Position readPosition = managedLedger.getSlowestConsumer().getReadPosition();
        Thread.sleep(4000L);
        rolloverStats();
        TopicStats stats2 = this.admin.topics().getStats("persistent://prop/ns-quota/topic3");
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(((SubscriptionStats) stats2.getSubscriptions().get("c1")).getMsgBacklog(), managedLedger.getCurrentLedgerEntries());
            Assert.assertEquals(((SubscriptionStats) stats2.getSubscriptions().get("c2")).getMsgBacklog(), managedLedger.getCurrentLedgerEntries());
        });
        Assert.assertEquals(managedLedger.getSlowestConsumer().getReadPosition(), readPosition);
        build.close();
    }

    @Test(timeOut = 60000)
    public void testConsumerBacklogEvictionTimeQuotaWithPartEviction() throws Exception {
        Assert.assertEquals(this.admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), new HashMap());
        this.admin.namespaces().setBacklogQuota("prop/ns-quota", BacklogQuota.builder().limitTime(5).retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction).build(), BacklogQuota.BacklogQuotaType.message_age);
        PulsarClient build = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        String str = "persistent://prop/ns-quota/topic3" + UUID.randomUUID();
        Consumer subscribe = build.newConsumer().topic(new String[]{str}).subscriptionName("c1").subscribe();
        Consumer subscribe2 = build.newConsumer().topic(new String[]{str}).subscriptionName("c2").subscribe();
        Producer<byte[]> createProducer = createProducer(build, str);
        byte[] bArr = new byte[1024];
        for (int i = 0; i < 5; i++) {
            createProducer.send(bArr);
            subscribe.receive();
            subscribe2.receive();
        }
        TopicStats stats = this.admin.topics().getStats(str);
        Assert.assertEquals(((SubscriptionStats) stats.getSubscriptions().get("c1")).getMsgBacklog(), 5L);
        Assert.assertEquals(((SubscriptionStats) stats.getSubscriptions().get("c2")).getMsgBacklog(), 5L);
        Thread.sleep(5000L);
        for (int i2 = 0; i2 < 9; i2++) {
            createProducer.send(bArr);
            subscribe.receive();
            subscribe2.receive();
        }
        Thread.sleep(2000L);
        rolloverStats();
        TopicStats stats2 = this.admin.topics().getStats(str);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(((SubscriptionStats) stats2.getSubscriptions().get("c1")).getMsgBacklog(), 9L);
            Assert.assertEquals(((SubscriptionStats) stats2.getSubscriptions().get("c2")).getMsgBacklog(), 9L);
        });
        build.close();
    }

    @Test
    public void testConsumerBacklogEvictionTimeQuotaWithEmptyLedger() throws Exception {
        Assert.assertEquals(this.admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), Maps.newHashMap());
        this.admin.namespaces().setBacklogQuota("prop/ns-quota", BacklogQuota.builder().limitTime(2).retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction).build(), BacklogQuota.BacklogQuotaType.message_age);
        this.config.setPreciseTimeBasedBacklogQuotaCheck(true);
        PulsarClient build = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        Consumer subscribe = build.newConsumer().topic(new String[]{"persistent://prop/ns-quota/topic4"}).subscriptionName("c1").subscribe();
        createProducer(build, "persistent://prop/ns-quota/topic4").send(new byte[1024]);
        subscribe.receive();
        this.admin.topics().unload("persistent://prop/ns-quota/topic4");
        Awaitility.await().untilAsserted(() -> {
            PersistentTopicInternalStats internalStats = this.admin.topics().getInternalStats("persistent://prop/ns-quota/topic4");
            Assert.assertEquals(internalStats.ledgers.size(), 2);
            Assert.assertEquals(((ManagedLedgerInternalStats.LedgerInfo) internalStats.ledgers.get(1)).entries, 0L);
        });
        Awaitility.await().untilAsserted(() -> {
            PersistentTopicInternalStats internalStats = this.admin.topics().getInternalStats("persistent://prop/ns-quota/topic4");
            Assert.assertEquals(internalStats.ledgers.size(), 2);
            Assert.assertEquals(((ManagedLedgerInternalStats.LedgerInfo) internalStats.ledgers.get(1)).entries, 0L);
        });
        Assert.assertEquals(((SubscriptionStats) this.admin.topics().getStats("persistent://prop/ns-quota/topic4").getSubscriptions().get("c1")).getMsgBacklog(), 1L);
        rolloverStats();
        Awaitility.await().untilAsserted(() -> {
            PersistentTopicInternalStats internalStats = this.admin.topics().getInternalStats("persistent://prop/ns-quota/topic4");
            Assert.assertEquals(internalStats.ledgers.size(), 2);
            Assert.assertEquals(((ManagedLedgerInternalStats.LedgerInfo) internalStats.ledgers.get(1)).entries, 0L);
            Assert.assertEquals(((SubscriptionStats) this.admin.topics().getStats("persistent://prop/ns-quota/topic4").getSubscriptions().get("c1")).getMsgBacklog(), 0L);
        });
        build.close();
    }

    @Test
    public void testConsumerBacklogEvictionWithAckSizeQuota() throws Exception {
        Assert.assertEquals(this.admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), Maps.newHashMap());
        this.admin.namespaces().setBacklogQuota("prop/ns-quota", BacklogQuota.builder().limitSize(10240L).limitTime(2).retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction).build());
        PulsarClient build = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).build();
        Consumer subscribe = build.newConsumer().topic(new String[]{"persistent://prop/ns-quota/topic11"}).subscriptionName("c11").subscribe();
        Consumer subscribe2 = build.newConsumer().topic(new String[]{"persistent://prop/ns-quota/topic11"}).subscriptionName("c21").subscribe();
        Producer<byte[]> createProducer = createProducer(build, "persistent://prop/ns-quota/topic11");
        byte[] bArr = new byte[1024];
        for (int i = 0; i < 20; i++) {
            createProducer.send(bArr);
            subscribe.acknowledge(subscribe.receive());
            subscribe2.receive();
        }
        Thread.sleep(3000L);
        rolloverStats();
        TopicStats stats = this.admin.topics().getStats("persistent://prop/ns-quota/topic11");
        Assert.assertTrue(stats.getBacklogSize() <= 10240, "Storage size is [" + stats.getStorageSize() + "]");
    }

    @Test
    public void testConsumerBacklogEvictionWithAckTimeQuotaPrecise() throws Exception {
        Assert.assertEquals(this.admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), Maps.newHashMap());
        this.admin.namespaces().setBacklogQuota("prop/ns-quota", BacklogQuota.builder().limitTime(2).retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction).build(), BacklogQuota.BacklogQuotaType.message_age);
        this.config.setPreciseTimeBasedBacklogQuotaCheck(true);
        PulsarClient build = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).build();
        Consumer subscribe = build.newConsumer().topic(new String[]{"persistent://prop/ns-quota/topic12"}).subscriptionName("c11").subscribe();
        Consumer subscribe2 = build.newConsumer().topic(new String[]{"persistent://prop/ns-quota/topic12"}).subscriptionName("c21").subscribe();
        Producer<byte[]> createProducer = createProducer(build, "persistent://prop/ns-quota/topic12");
        byte[] bArr = new byte[1024];
        for (int i = 0; i < 9; i++) {
            createProducer.send(bArr);
            subscribe.receive();
            subscribe2.receive();
        }
        TopicStats stats = this.admin.topics().getStats("persistent://prop/ns-quota/topic12");
        Assert.assertEquals(((SubscriptionStats) stats.getSubscriptions().get("c11")).getMsgBacklog(), 9L);
        Assert.assertEquals(((SubscriptionStats) stats.getSubscriptions().get("c21")).getMsgBacklog(), 9L);
        subscribe.redeliverUnacknowledgedMessages();
        for (int i2 = 0; i2 < 9; i2++) {
            subscribe.acknowledge(subscribe.receive());
        }
        Thread.sleep(1000L);
        rolloverStats();
        TopicStats stats2 = this.admin.topics().getStats("persistent://prop/ns-quota/topic12");
        Assert.assertEquals(((SubscriptionStats) stats2.getSubscriptions().get("c11")).getMsgBacklog(), 0L);
        Assert.assertEquals(((SubscriptionStats) stats2.getSubscriptions().get("c21")).getMsgBacklog(), 9L);
        Thread.sleep(4000L);
        rolloverStats();
        Assert.assertEquals(((SubscriptionStats) this.admin.topics().getStats("persistent://prop/ns-quota/topic12").getSubscriptions().get("c21")).getMsgBacklog(), 0L);
        build.close();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Producer<byte[]> createProducer(PulsarClient pulsarClient, String str) throws PulsarClientException {
        return pulsarClient.newProducer().enableBatching(false).sendTimeout(2, TimeUnit.SECONDS).topic(str).create();
    }

    @Test
    public void testConsumerBacklogEvictionWithAckTimeQuota() throws Exception {
        Assert.assertEquals(this.admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), Maps.newHashMap());
        PulsarClient build = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).build();
        try {
            Consumer subscribe = build.newConsumer().topic(new String[]{"persistent://prop/ns-quota/topic12"}).subscriptionName("c11").subscribe();
            Consumer subscribe2 = build.newConsumer().topic(new String[]{"persistent://prop/ns-quota/topic12"}).subscriptionName("c21").subscribe();
            Producer<byte[]> createProducer = createProducer(build, "persistent://prop/ns-quota/topic12");
            byte[] bArr = new byte[1024];
            ArrayList arrayList = new ArrayList();
            for (int i = 0; i < 14; i++) {
                createProducer.send(bArr);
                arrayList.add(subscribe.receive());
                subscribe2.receive();
            }
            TopicStats stats = this.admin.topics().getStats("persistent://prop/ns-quota/topic12");
            Assert.assertEquals(((SubscriptionStats) stats.getSubscriptions().get("c11")).getMsgBacklog(), 14L);
            Assert.assertEquals(((SubscriptionStats) stats.getSubscriptions().get("c21")).getMsgBacklog(), 14L);
            for (int i2 = 0; i2 < 14; i2++) {
                if (i2 == 10) {
                    Thread.sleep(2000L);
                }
                subscribe.acknowledge((Message) arrayList.get(i2));
            }
            Awaitility.await().pollInterval(Duration.ofSeconds(1L)).untilAsserted(() -> {
                rolloverStats();
                TopicStats stats2 = this.admin.topics().getStats("persistent://prop/ns-quota/topic12");
                Assert.assertEquals(((SubscriptionStats) stats2.getSubscriptions().get("c11")).getMsgBacklog(), 0L);
                Assert.assertEquals(((SubscriptionStats) stats2.getSubscriptions().get("c21")).getMsgBacklog(), 14L);
            });
            this.admin.namespaces().setBacklogQuota("prop/ns-quota", BacklogQuota.builder().limitTime(4).retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction).build(), BacklogQuota.BacklogQuotaType.message_age);
            Awaitility.await().pollInterval(Duration.ofSeconds(1L)).atMost(Duration.ofSeconds(8L)).untilAsserted(() -> {
                Assert.assertEquals((float) ((SubscriptionStats) this.admin.topics().getStats("persistent://prop/ns-quota/topic12").getSubscriptions().get("c21")).getMsgBacklog(), 4.0f, 1.0f);
            });
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
            throw th;
        }
    }

    @Test
    public void testConcurrentAckAndEviction() throws Exception {
        Assert.assertEquals(this.admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), Maps.newHashMap());
        this.admin.namespaces().setBacklogQuota("prop/ns-quota", BacklogQuota.builder().limitSize(10240L).retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction).build());
        final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
        final CountDownLatch countDownLatch = new CountDownLatch(2);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        final PulsarClient build = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            build = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
            try {
                final Consumer subscribe = build.newConsumer().topic(new String[]{"persistent://prop/ns-quota/topic12"}).subscriptionName("c12").subscribe();
                final Consumer subscribe2 = build.newConsumer().topic(new String[]{"persistent://prop/ns-quota/topic12"}).subscriptionName("c22").subscribe();
                Thread thread = new Thread() { // from class: org.apache.pulsar.broker.service.BacklogQuotaManagerTest.1
                    @Override // java.lang.Thread, java.lang.Runnable
                    public void run() {
                        try {
                            try {
                                cyclicBarrier.await();
                                Producer createProducer = BacklogQuotaManagerTest.this.createProducer(build, "persistent://prop/ns-quota/topic12");
                                byte[] bArr = new byte[1024];
                                for (int i = 0; i < 20; i++) {
                                    createProducer.send(bArr);
                                }
                                createProducer.close();
                                countDownLatch.countDown();
                            } catch (Exception e) {
                                atomicBoolean.set(true);
                                countDownLatch.countDown();
                            }
                        } catch (Throwable th) {
                            countDownLatch.countDown();
                            throw th;
                        }
                    }
                };
                Thread thread2 = new Thread() { // from class: org.apache.pulsar.broker.service.BacklogQuotaManagerTest.2
                    @Override // java.lang.Thread, java.lang.Runnable
                    public void run() {
                        try {
                            cyclicBarrier.await();
                            for (int i = 0; i < 20; i++) {
                                subscribe.acknowledge(subscribe.receive());
                                subscribe2.receive();
                            }
                        } catch (Exception e) {
                            atomicBoolean.set(true);
                        } finally {
                            countDownLatch.countDown();
                        }
                    }
                };
                thread.start();
                thread2.start();
                countDownLatch.await(20L, TimeUnit.SECONDS);
                Assert.assertFalse(atomicBoolean.get());
                Thread.sleep(3000L);
                rolloverStats();
                TopicStats stats = this.admin.topics().getStats("persistent://prop/ns-quota/topic12");
                Assert.assertTrue(stats.getBacklogSize() <= 10240, "Storage size is [" + stats.getStorageSize() + "]");
                if (Collections.singletonList(build).get(0) != null) {
                    build.close();
                }
            } finally {
                if (Collections.singletonList(build).get(0) != null) {
                    build.close();
                }
            }
        } catch (Throwable th) {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
            throw th;
        }
    }

    @Test
    public void testNoEviction() throws Exception {
        Assert.assertEquals(this.admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), Maps.newHashMap());
        this.admin.namespaces().setBacklogQuota("prop/ns-quota", BacklogQuota.builder().limitSize(10240L).retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction).build());
        final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
        final CountDownLatch countDownLatch = new CountDownLatch(2);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        final PulsarClient build = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            final Consumer subscribe = build.newConsumer().topic(new String[]{"persistent://prop/ns-quota/topic13"}).subscriptionName("c13").subscribe();
            final Consumer subscribe2 = build.newConsumer().topic(new String[]{"persistent://prop/ns-quota/topic13"}).subscriptionName("c23").subscribe();
            build = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
            try {
                Thread thread = new Thread() { // from class: org.apache.pulsar.broker.service.BacklogQuotaManagerTest.3
                    @Override // java.lang.Thread, java.lang.Runnable
                    public void run() {
                        try {
                            try {
                                cyclicBarrier.await();
                                Producer createProducer = BacklogQuotaManagerTest.this.createProducer(build, "persistent://prop/ns-quota/topic13");
                                byte[] bArr = new byte[1024];
                                for (int i = 0; i < 10; i++) {
                                    createProducer.send(bArr);
                                }
                                createProducer.close();
                                countDownLatch.countDown();
                            } catch (Exception e) {
                                atomicBoolean.set(true);
                                countDownLatch.countDown();
                            }
                        } catch (Throwable th) {
                            countDownLatch.countDown();
                            throw th;
                        }
                    }
                };
                Thread thread2 = new Thread() { // from class: org.apache.pulsar.broker.service.BacklogQuotaManagerTest.4
                    @Override // java.lang.Thread, java.lang.Runnable
                    public void run() {
                        try {
                            cyclicBarrier.await();
                            for (int i = 0; i < 10; i++) {
                                subscribe.acknowledge(subscribe.receive());
                                subscribe2.acknowledge(subscribe2.receive());
                            }
                        } catch (Exception e) {
                            atomicBoolean.set(true);
                        } finally {
                            countDownLatch.countDown();
                        }
                    }
                };
                thread.start();
                thread2.start();
                countDownLatch.await();
                Assert.assertFalse(atomicBoolean.get());
                if (Collections.singletonList(build).get(0) != null) {
                    build.close();
                }
            } finally {
                if (Collections.singletonList(build).get(0) != null) {
                    build.close();
                }
            }
        } catch (Throwable th) {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
            throw th;
        }
    }

    @Test
    public void testEvictionMulti() throws Exception {
        Assert.assertEquals(this.admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), Maps.newHashMap());
        this.admin.namespaces().setBacklogQuota("prop/ns-quota", BacklogQuota.builder().limitSize(15360L).retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction).build());
        final CyclicBarrier cyclicBarrier = new CyclicBarrier(4);
        final CountDownLatch countDownLatch = new CountDownLatch(4);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        final PulsarClient build = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            final Consumer subscribe = build.newConsumer().topic(new String[]{"persistent://prop/ns-quota/topic14"}).subscriptionName("c14").subscribe();
            final Consumer subscribe2 = build.newConsumer().topic(new String[]{"persistent://prop/ns-quota/topic14"}).subscriptionName("c24").subscribe();
            build = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
            try {
                final PulsarClient build2 = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
                try {
                    Thread thread = new Thread() { // from class: org.apache.pulsar.broker.service.BacklogQuotaManagerTest.5
                        @Override // java.lang.Thread, java.lang.Runnable
                        public void run() {
                            try {
                                try {
                                    cyclicBarrier.await();
                                    Producer createProducer = BacklogQuotaManagerTest.this.createProducer(build2, "persistent://prop/ns-quota/topic14");
                                    byte[] bArr = new byte[1024];
                                    for (int i = 0; i < 10; i++) {
                                        createProducer.send(bArr);
                                    }
                                    createProducer.close();
                                    countDownLatch.countDown();
                                } catch (Exception e) {
                                    atomicBoolean.set(true);
                                    countDownLatch.countDown();
                                }
                            } catch (Throwable th) {
                                countDownLatch.countDown();
                                throw th;
                            }
                        }
                    };
                    Thread thread2 = new Thread() { // from class: org.apache.pulsar.broker.service.BacklogQuotaManagerTest.6
                        @Override // java.lang.Thread, java.lang.Runnable
                        public void run() {
                            try {
                                try {
                                    cyclicBarrier.await();
                                    Producer createProducer = BacklogQuotaManagerTest.this.createProducer(build, "persistent://prop/ns-quota/topic14");
                                    byte[] bArr = new byte[1024];
                                    for (int i = 0; i < 10; i++) {
                                        createProducer.send(bArr);
                                    }
                                    createProducer.close();
                                    countDownLatch.countDown();
                                } catch (Exception e) {
                                    atomicBoolean.set(true);
                                    countDownLatch.countDown();
                                }
                            } catch (Throwable th) {
                                countDownLatch.countDown();
                                throw th;
                            }
                        }
                    };
                    Thread thread3 = new Thread() { // from class: org.apache.pulsar.broker.service.BacklogQuotaManagerTest.7
                        @Override // java.lang.Thread, java.lang.Runnable
                        public void run() {
                            try {
                                cyclicBarrier.await();
                                for (int i = 0; i < 20; i++) {
                                    subscribe.acknowledge(subscribe.receive());
                                }
                            } catch (Exception e) {
                                atomicBoolean.set(true);
                            } finally {
                                countDownLatch.countDown();
                            }
                        }
                    };
                    Thread thread4 = new Thread() { // from class: org.apache.pulsar.broker.service.BacklogQuotaManagerTest.8
                        @Override // java.lang.Thread, java.lang.Runnable
                        public void run() {
                            try {
                                cyclicBarrier.await();
                                for (int i = 0; i < 20; i++) {
                                    subscribe2.acknowledge(subscribe2.receive());
                                }
                            } catch (Exception e) {
                                atomicBoolean.set(true);
                            } finally {
                                countDownLatch.countDown();
                            }
                        }
                    };
                    thread.start();
                    thread2.start();
                    thread3.start();
                    thread4.start();
                    countDownLatch.await(20L, TimeUnit.SECONDS);
                    Assert.assertFalse(atomicBoolean.get());
                    Thread.sleep(3000L);
                    rolloverStats();
                    TopicStats stats = this.admin.topics().getStats("persistent://prop/ns-quota/topic14");
                    Assert.assertTrue(stats.getBacklogSize() <= 15360, "Storage size is [" + stats.getStorageSize() + "]");
                    if (Collections.singletonList(build2).get(0) != null) {
                        build2.close();
                    }
                    if (Collections.singletonList(build).get(0) != null) {
                        build.close();
                    }
                } finally {
                    if (Collections.singletonList(build2).get(0) != null) {
                        build2.close();
                    }
                }
            } finally {
                if (Collections.singletonList(build).get(0) != null) {
                    build.close();
                }
            }
        } catch (Throwable th) {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
            throw th;
        }
    }

    @Test
    public void testAheadProducerOnHold() throws Exception {
        Assert.assertEquals(this.admin.namespaces().getBacklogQuotaMap("prop/quotahold"), Maps.newHashMap());
        this.admin.namespaces().setBacklogQuota("prop/quotahold", BacklogQuota.builder().limitSize(10240L).retentionPolicy(BacklogQuota.RetentionPolicy.producer_request_hold).build());
        PulsarClient build = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            Consumer subscribe = build.newConsumer().topic(new String[]{"persistent://prop/quotahold/hold"}).subscriptionName("c1hold").subscribe();
            byte[] bArr = new byte[1024];
            Producer<byte[]> createProducer = createProducer(build, "persistent://prop/quotahold/hold");
            for (int i = 0; i <= 10; i++) {
                try {
                    createProducer.send(bArr);
                    LOG.info("sent [{}]", Integer.valueOf(i));
                } catch (PulsarClientException.TimeoutException e) {
                    LOG.info("timeout on [{}]", Integer.valueOf(i));
                }
            }
            for (int i2 = 0; i2 < 10; i2++) {
                subscribe.receive();
                LOG.info("received [{}]", Integer.valueOf(i2));
            }
            Thread.sleep(3000L);
            rolloverStats();
            TopicStats stats = this.admin.topics().getStats("persistent://prop/quotahold/hold");
            Assert.assertEquals(stats.getPublishers().size(), 0, "Number of producers on topic persistent://prop/quotahold/hold are [" + stats.getPublishers().size() + "]");
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
            throw th;
        }
    }

    @Test
    public void testAheadProducerOnHoldTimeout() throws Exception {
        Assert.assertEquals(this.admin.namespaces().getBacklogQuotaMap("prop/quotahold"), Maps.newHashMap());
        this.admin.namespaces().setBacklogQuota("prop/quotahold", BacklogQuota.builder().limitSize(10240L).retentionPolicy(BacklogQuota.RetentionPolicy.producer_request_hold).build());
        PulsarClient build = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            boolean z = false;
            build.newConsumer().topic(new String[]{"persistent://prop/quotahold/holdtimeout"}).subscriptionName("c1holdtimeout").subscribe();
            byte[] bArr = new byte[1024];
            Producer<byte[]> createProducer = createProducer(build, "persistent://prop/quotahold/holdtimeout");
            for (int i = 0; i < 10; i++) {
                createProducer.send(bArr);
            }
            Thread.sleep(3000L);
            try {
                createProducer.send(bArr);
                createProducer.send(bArr);
                Assert.fail("backlog quota did not exceed");
            } catch (PulsarClientException.TimeoutException e) {
                z = true;
            }
            Assert.assertTrue(z, "timeout did not occur");
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
            throw th;
        }
    }

    @Test
    public void testProducerException() throws Exception {
        Assert.assertEquals(this.admin.namespaces().getBacklogQuotaMap("prop/quotahold"), Maps.newHashMap());
        this.admin.namespaces().setBacklogQuota("prop/quotahold", BacklogQuota.builder().limitSize(10240L).retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception).build());
        PulsarClient build = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            boolean z = false;
            build.newConsumer().topic(new String[]{"persistent://prop/quotahold/except"}).subscriptionName("c1except").subscribe();
            byte[] bArr = new byte[1024];
            Producer<byte[]> createProducer = createProducer(build, "persistent://prop/quotahold/except");
            for (int i = 0; i < 10; i++) {
                createProducer.send(bArr);
            }
            Thread.sleep(3000L);
            try {
                createProducer.send(bArr);
                createProducer.send(bArr);
                Assert.fail("backlog quota did not exceed");
            } catch (PulsarClientException e) {
                Assert.assertTrue((e instanceof PulsarClientException.ProducerBlockedQuotaExceededException) || (e instanceof PulsarClientException.TimeoutException), e.getMessage());
                z = true;
            }
            Assert.assertTrue(z, "backlog exceeded exception did not occur");
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
            throw th;
        }
    }

    @Test
    public void testProducerExceptionAndThenUnblockSizeQuota() throws Exception {
        Assert.assertEquals(this.admin.namespaces().getBacklogQuotaMap("prop/quotahold"), Maps.newHashMap());
        this.admin.namespaces().setBacklogQuota("prop/quotahold", BacklogQuota.builder().limitSize(10240L).retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception).build());
        PulsarClient build = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            boolean z = false;
            Consumer subscribe = build.newConsumer().topic(new String[]{"persistent://prop/quotahold/exceptandunblock"}).subscriptionName("c1except").subscribe();
            byte[] bArr = new byte[1024];
            Producer<byte[]> createProducer = createProducer(build, "persistent://prop/quotahold/exceptandunblock");
            for (int i = 0; i < 10; i++) {
                createProducer.send(bArr);
            }
            Thread.sleep(3000L);
            try {
                createProducer.send(bArr);
                createProducer.send(bArr);
                Assert.fail("backlog quota did not exceed");
            } catch (PulsarClientException e) {
                Assert.assertTrue((e instanceof PulsarClientException.ProducerBlockedQuotaExceededException) || (e instanceof PulsarClientException.TimeoutException), e.getMessage());
                z = true;
            }
            Assert.assertTrue(z, "backlog exceeded exception did not occur");
            int msgBacklog = (int) ((SubscriptionStats) this.admin.topics().getStats("persistent://prop/quotahold/exceptandunblock").getSubscriptions().get("c1except")).getMsgBacklog();
            for (int i2 = 0; i2 < msgBacklog; i2++) {
                subscribe.acknowledge(subscribe.receive());
            }
            Thread.sleep(3000L);
            createProducer.close();
            Producer<byte[]> createProducer2 = createProducer(build, "persistent://prop/quotahold/exceptandunblock");
            Exception exc = null;
            boolean z2 = false;
            for (int i3 = 0; i3 < 5; i3++) {
                try {
                    createProducer2.send(bArr);
                } catch (Exception e2) {
                    z2 = true;
                    exc = e2;
                }
            }
            Assert.assertFalse(z2, "unable to publish due to " + exc);
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
            throw th;
        }
    }

    @Test
    public void testProducerExceptionAndThenUnblockTimeQuotaPrecise() throws Exception {
        Assert.assertEquals(this.admin.namespaces().getBacklogQuotaMap("prop/quotahold"), Maps.newHashMap());
        this.admin.namespaces().setBacklogQuota("prop/quotahold", BacklogQuota.builder().limitTime(2).retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception).build(), BacklogQuota.BacklogQuotaType.message_age);
        this.config.setPreciseTimeBasedBacklogQuotaCheck(true);
        PulsarClient build = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        boolean z = false;
        Consumer subscribe = build.newConsumer().topic(new String[]{"persistent://prop/quotahold/exceptandunblock2"}).subscriptionName("c1except").subscribe();
        byte[] bArr = new byte[1024];
        Producer<byte[]> createProducer = createProducer(build, "persistent://prop/quotahold/exceptandunblock2");
        for (int i = 0; i < 9; i++) {
            createProducer.send(bArr);
        }
        Thread.sleep(4000L);
        try {
            createProducer.send(bArr);
            Assert.fail("backlog quota did not exceed");
        } catch (PulsarClientException e) {
            Assert.assertTrue((e instanceof PulsarClientException.ProducerBlockedQuotaExceededException) || (e instanceof PulsarClientException.TimeoutException), e.getMessage());
            z = true;
        }
        Assert.assertTrue(z, "backlog exceeded exception did not occur");
        Assert.assertEquals(((SubscriptionStats) this.admin.topics().getStats("persistent://prop/quotahold/exceptandunblock2").getSubscriptions().get("c1except")).getMsgBacklog(), 9);
        for (int i2 = 0; i2 < 9; i2++) {
            subscribe.acknowledge(subscribe.receive());
        }
        Thread.sleep(4000L);
        rolloverStats();
        Assert.assertEquals(((SubscriptionStats) this.admin.topics().getStats("persistent://prop/quotahold/exceptandunblock2").getSubscriptions().get("c1except")).getMsgBacklog(), 0L);
        Exception exc = null;
        boolean z2 = false;
        for (int i3 = 0; i3 < 5; i3++) {
            try {
                createProducer.send(bArr);
            } catch (Exception e2) {
                z2 = true;
                exc = e2;
            }
        }
        Assert.assertFalse(z2, "unable to publish due to " + exc);
        build.close();
    }

    @Test
    public void testProducerExceptionAndThenUnblockTimeQuota() throws Exception {
        Assert.assertEquals(this.admin.namespaces().getBacklogQuotaMap("prop/quotahold"), Maps.newHashMap());
        this.admin.namespaces().setBacklogQuota("prop/quotahold", BacklogQuota.builder().limitTime(2).retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception).build(), BacklogQuota.BacklogQuotaType.message_age);
        PulsarClient build = PulsarClient.builder().serviceUrl(this.adminUrl.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        boolean z = false;
        Consumer subscribe = build.newConsumer().topic(new String[]{"persistent://prop/quotahold/exceptandunblock2"}).subscriptionName("c1except").subscribe();
        byte[] bArr = new byte[1024];
        Producer<byte[]> createProducer = createProducer(build, "persistent://prop/quotahold/exceptandunblock2");
        for (int i = 0; i < 14; i++) {
            createProducer.send(bArr);
        }
        Thread.sleep(4000L);
        try {
            createProducer.send(bArr);
            Assert.fail("backlog quota did not exceed");
        } catch (PulsarClientException e) {
            Assert.assertTrue((e instanceof PulsarClientException.ProducerBlockedQuotaExceededException) || (e instanceof PulsarClientException.TimeoutException), e.getMessage());
            z = true;
        }
        Assert.assertTrue(z, "backlog exceeded exception did not occur");
        Assert.assertEquals(((SubscriptionStats) this.admin.topics().getStats("persistent://prop/quotahold/exceptandunblock2").getSubscriptions().get("c1except")).getMsgBacklog(), 14);
        for (int i2 = 0; i2 < 14; i2++) {
            subscribe.acknowledge(subscribe.receive());
        }
        Thread.sleep(4000L);
        rolloverStats();
        Assert.assertEquals(((SubscriptionStats) this.admin.topics().getStats("persistent://prop/quotahold/exceptandunblock2").getSubscriptions().get("c1except")).getMsgBacklog(), 0L);
        Exception exc = null;
        boolean z2 = false;
        for (int i3 = 0; i3 < 5; i3++) {
            try {
                createProducer.send(bArr);
            } catch (Exception e2) {
                z2 = true;
                exc = e2;
            }
        }
        Assert.assertFalse(z2, "unable to publish due to " + exc);
        build.close();
    }

    @Test(dataProvider = "backlogQuotaSizeGB", priority = 1)
    public void testBacklogQuotaInGB(boolean z) throws Exception {
        this.pulsar.close();
        if (z) {
            this.config.setBacklogQuotaDefaultLimitGB(10240 / 1.073741824E9d);
        } else {
            this.config.setBacklogQuotaDefaultLimitBytes(10240L);
        }
        this.config.setBacklogQuotaDefaultRetentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction);
        this.pulsar = new PulsarService(this.config);
        this.pulsar.start();
        PulsarClient build = PulsarClient.builder().serviceUrl(this.pulsar.getBrokerServiceUrl()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            Consumer subscribe = build.newConsumer().topic(new String[]{"persistent://prop/ns-quota/topic2"}).subscriptionName("c1").subscribe();
            Consumer subscribe2 = build.newConsumer().topic(new String[]{"persistent://prop/ns-quota/topic2"}).subscriptionName("c2").subscribe();
            Producer<byte[]> createProducer = createProducer(build, "persistent://prop/ns-quota/topic2");
            byte[] bArr = new byte[1024];
            for (int i = 0; i < 20; i++) {
                createProducer.send(bArr);
                subscribe.receive();
                subscribe2.receive();
            }
            Thread.sleep(3000L);
            rolloverStats();
            TopicStats stats = this.admin.topics().getStats("persistent://prop/ns-quota/topic2");
            Assert.assertTrue(stats.getBacklogSize() < 10240, "Storage size is [" + stats.getStorageSize() + "]");
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
            throw th;
        }
    }
}
