package org.apache.pulsar.broker.service;

import java.lang.reflect.Field;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.impl.EntryCacheImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerConfiguration;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConfiguration;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.collections.ConcurrentLongPairSet;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test
/* loaded from: input_file:org/apache/pulsar/broker/service/PersistentTopicE2ETest.class */
public class PersistentTopicE2ETest extends BrokerTestBase {
    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    protected void setup() throws Exception {
        super.baseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testSimpleProducerEvents() throws Exception {
        Producer createProducer = this.pulsarClient.createProducer("persistent://prop/use/ns-abc/topic0");
        PersistentTopic topicReference = this.pulsar.getBrokerService().getTopicReference("persistent://prop/use/ns-abc/topic0");
        Assert.assertNotNull(topicReference);
        Assert.assertEquals(topicReference.getProducers().size(), 1L);
        for (int i = 0; i < 10; i++) {
            createProducer.send(("my-message-" + i).getBytes());
        }
        rolloverPerIntervalStats();
        Assert.assertTrue(((Producer) topicReference.getProducers().values().iterator().next()).getStats().msgRateIn > 0.0d);
        createProducer.close();
        Thread.sleep(100L);
        Assert.assertEquals(topicReference.getProducers().size(), 0L);
    }

    @Test
    public void testSimpleConsumerEvents() throws Exception {
        ConsumerConfiguration consumerConfiguration = new ConsumerConfiguration();
        consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive);
        Consumer subscribe = this.pulsarClient.subscribe("persistent://prop/use/ns-abc/topic1", "sub1", consumerConfiguration);
        PersistentTopic topicReference = this.pulsar.getBrokerService().getTopicReference("persistent://prop/use/ns-abc/topic1");
        PersistentSubscription subscription = topicReference.getSubscription("sub1");
        Assert.assertNotNull(topicReference);
        Assert.assertNotNull(subscription);
        Assert.assertTrue(subscription.getDispatcher().isConsumerConnected());
        Thread.sleep(100L);
        Assert.assertEquals(getAvailablePermits(subscription), 1000);
        Producer createProducer = this.pulsarClient.createProducer("persistent://prop/use/ns-abc/topic1");
        for (int i = 0; i < 20; i++) {
            createProducer.send(("my-message-" + i).getBytes());
        }
        Assert.assertTrue(subscription.getDispatcher().isConsumerConnected());
        rolloverPerIntervalStats();
        Assert.assertEquals(subscription.getNumberOfEntriesInBacklog(), 20L);
        Thread.sleep(100L);
        Assert.assertEquals(getAvailablePermits(subscription), 980);
        for (int i2 = 0; i2 < 10; i2++) {
            Message receive = subscribe.receive();
            Assert.assertEquals(new String(receive.getData()), "my-message-" + i2);
            subscribe.acknowledge(receive);
        }
        rolloverPerIntervalStats();
        Thread.sleep(100L);
        Assert.assertEquals(subscription.getNumberOfEntriesInBacklog(), 10L);
        for (int i3 = 0; i3 < 10; i3++) {
            Message receive2 = subscribe.receive();
            if (i3 == 9) {
                subscribe.acknowledgeCumulative(receive2);
            }
        }
        rolloverPerIntervalStats();
        Thread.sleep(100L);
        Assert.assertEquals(subscription.getNumberOfEntriesInBacklog(), 0L);
        subscribe.unsubscribe();
        subscribe.close();
        try {
            subscribe.unsubscribe();
            Assert.fail("Should have failed");
        } catch (PulsarClientException.AlreadyClosedException e) {
        }
        Thread.sleep(100L);
        Assert.assertNull(topicReference.getSubscription("sub1"));
        createProducer.close();
        Thread.sleep(100L);
    }

    @Test
    public void testConsumerFlowControl() throws Exception {
        ConsumerConfiguration consumerConfiguration = new ConsumerConfiguration();
        consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive);
        consumerConfiguration.setReceiverQueueSize(4);
        Consumer subscribe = this.pulsarClient.subscribe("persistent://prop/use/ns-abc/topic2", "sub2", consumerConfiguration);
        Producer createProducer = this.pulsarClient.createProducer("persistent://prop/use/ns-abc/topic2");
        PersistentTopic topicReference = this.pulsar.getBrokerService().getTopicReference("persistent://prop/use/ns-abc/topic2");
        Assert.assertNotNull(topicReference);
        PersistentSubscription subscription = topicReference.getSubscription("sub2");
        Assert.assertNotNull(subscription);
        Thread.sleep(100L);
        Assert.assertEquals(getAvailablePermits(subscription), 4);
        for (int i = 0; i < 4 / 2; i++) {
            createProducer.send(("my-message-" + i).getBytes());
            subscribe.acknowledge(subscribe.receive());
        }
        Thread.sleep(100L);
        Assert.assertEquals(getAvailablePermits(subscription), 4);
        subscribe.close();
        Assert.assertFalse(subscription.getDispatcher().isConsumerConnected());
    }

    @Test
    public void testActiveSubscriptionWithCache() throws Exception {
        ConsumerConfiguration consumerConfiguration = new ConsumerConfiguration();
        consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive);
        consumerConfiguration.setReceiverQueueSize(4);
        Consumer subscribe = this.pulsarClient.subscribe("persistent://prop/use/ns-abc/topic2", "sub2", consumerConfiguration);
        Producer createProducer = this.pulsarClient.createProducer("persistent://prop/use/ns-abc/topic2");
        for (int i = 0; i < 4 / 2; i++) {
            createProducer.send(("my-message-" + i).getBytes());
            subscribe.acknowledge(subscribe.receive());
        }
        ManagedLedgerImpl managedLedger = this.pulsar.getBrokerService().getTopicReference("persistent://prop/use/ns-abc/topic2").getManagedLedger();
        Field declaredField = ManagedLedgerImpl.class.getDeclaredField("entryCache");
        declaredField.setAccessible(true);
        EntryCacheImpl entryCacheImpl = (EntryCacheImpl) declaredField.get(managedLedger);
        ManagedCursor managedCursor = (ManagedCursor) managedLedger.getActiveCursors().iterator().next();
        Assert.assertNotNull(managedCursor);
        Assert.assertEquals("sub2", managedCursor.getName());
        Assert.assertTrue(entryCacheImpl.getSize() != 0);
        subscribe.close();
        Thread.sleep(1000L);
        Assert.assertFalse(managedLedger.getActiveCursors().iterator().hasNext());
        Assert.assertTrue(entryCacheImpl.getSize() == 0);
    }

    @Test(enabled = false)
    public void testConcurrentConsumerThreads() throws Exception {
        final ConsumerConfiguration consumerConfiguration = new ConsumerConfiguration();
        consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive);
        consumerConfiguration.setReceiverQueueSize(100);
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        final CyclicBarrier cyclicBarrier = new CyclicBarrier(11);
        for (int i = 0; i < 10; i++) {
            newCachedThreadPool.submit(new Callable<Void>() { // from class: org.apache.pulsar.broker.service.PersistentTopicE2ETest.1
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public Void call() throws Exception {
                    cyclicBarrier.await();
                    Consumer subscribe = PersistentTopicE2ETest.this.pulsarClient.subscribe("persistent://prop/use/ns-abc/topic3", "sub3", consumerConfiguration);
                    for (int i2 = 0; i2 < 10; i2++) {
                        subscribe.acknowledge(subscribe.receive());
                    }
                    return null;
                }
            });
        }
        Producer createProducer = this.pulsarClient.createProducer("persistent://prop/use/ns-abc/topic3");
        for (int i2 = 0; i2 < 1000; i2++) {
            createProducer.send(("my-message-" + i2).getBytes());
        }
        cyclicBarrier.await();
        Thread.sleep(100L);
        PersistentSubscription subscription = this.pulsar.getBrokerService().getTopicReference("persistent://prop/use/ns-abc/topic3").getSubscription("sub3");
        Assert.assertEquals(subscription.getNumberOfEntriesInBacklog(), 0L);
        Thread.sleep(100L);
        Assert.assertEquals(getAvailablePermits(subscription), 100);
    }

    @Test(enabled = false)
    public void testGracefulClose() throws Exception {
        Producer createProducer = this.pulsarClient.createProducer("persistent://prop/use/ns-abc/topic4");
        Thread.sleep(100L);
        PersistentTopic topicReference = this.pulsar.getBrokerService().getTopicReference("persistent://prop/use/ns-abc/topic4");
        Assert.assertNotNull(topicReference);
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        newCachedThreadPool.submit(() -> {
            for (int i = 0; i < 10; i++) {
                createProducer.send(("my-message-" + i).getBytes());
            }
            countDownLatch.countDown();
            return null;
        });
        createProducer.close();
        Assert.assertEquals(((Producer) topicReference.getProducers().values().iterator().next()).getPendingPublishAcks(), 0L);
        countDownLatch.await();
        ConsumerConfiguration consumerConfiguration = new ConsumerConfiguration();
        consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive);
        Consumer subscribe = this.pulsarClient.subscribe("persistent://prop/use/ns-abc/topic4", "sub4", consumerConfiguration);
        PersistentSubscription subscription = topicReference.getSubscription("sub4");
        Assert.assertNotNull(subscription);
        Message message = null;
        for (int i = 0; i < 10; i++) {
            message = subscribe.receive();
        }
        try {
            subscribe.close();
            Assert.fail("should have failed");
        } catch (IllegalStateException e) {
        }
        subscribe.acknowledgeCumulative(message);
        Thread.sleep(100L);
        subscribe.close();
        Thread.sleep(100L);
        Assert.assertTrue(subscription.getDispatcher().isConsumerConnected());
    }

    @Test
    public void testSimpleCloseTopic() throws Exception {
        ConsumerConfiguration consumerConfiguration = new ConsumerConfiguration();
        consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive);
        Consumer subscribe = this.pulsarClient.subscribe("persistent://prop/use/ns-abc/topic5", "sub5", consumerConfiguration);
        Producer createProducer = this.pulsarClient.createProducer("persistent://prop/use/ns-abc/topic5");
        PersistentTopic topicReference = this.pulsar.getBrokerService().getTopicReference("persistent://prop/use/ns-abc/topic5");
        Assert.assertNotNull(topicReference);
        Assert.assertNotNull(topicReference.getSubscription("sub5"));
        for (int i = 0; i < 10; i++) {
            createProducer.send(("my-message-" + i).getBytes());
            subscribe.acknowledge(subscribe.receive());
        }
        createProducer.close();
        subscribe.close();
        topicReference.close().get();
        Assert.assertNull(this.pulsar.getBrokerService().getTopicReference("persistent://prop/use/ns-abc/topic5"));
    }

    @Test
    public void testSingleClientMultipleSubscriptions() throws Exception {
        ConsumerConfiguration consumerConfiguration = new ConsumerConfiguration();
        consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive);
        this.pulsarClient.subscribe("persistent://prop/use/ns-abc/topic6", "sub6", consumerConfiguration);
        this.pulsarClient.createProducer("persistent://prop/use/ns-abc/topic6");
        try {
            this.pulsarClient.subscribe("persistent://prop/use/ns-abc/topic6", "sub6", consumerConfiguration);
            Assert.fail("Should have thrown an exception since one consumer is already connected");
        } catch (PulsarClientException e) {
            Assert.assertTrue(e.getMessage().contains("Exclusive consumer is already connected"));
        }
    }

    @Test
    public void testMultipleClientsMultipleSubscriptions() throws Exception {
        ConsumerConfiguration consumerConfiguration = new ConsumerConfiguration();
        consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive);
        PulsarClient create = PulsarClient.create(this.brokerUrl.toString());
        PulsarClient create2 = PulsarClient.create(this.brokerUrl.toString());
        try {
            try {
                create.subscribe("persistent://prop/use/ns-abc/topic7", "sub7", consumerConfiguration);
                create.createProducer("persistent://prop/use/ns-abc/topic7");
                create2.createProducer("persistent://prop/use/ns-abc/topic7");
                create2.subscribe("persistent://prop/use/ns-abc/topic7", "sub7", consumerConfiguration);
                Assert.fail("Should have thrown an exception since one consumer is already connected");
                create2.shutdown();
                create.shutdown();
            } catch (PulsarClientException e) {
                Assert.assertTrue(e.getMessage().contains("Exclusive consumer is already connected"));
                create2.shutdown();
                create.shutdown();
            }
        } catch (Throwable th) {
            create2.shutdown();
            create.shutdown();
            throw th;
        }
    }

    @Test
    public void testTopicDeleteWithDisconnectedSubscription() throws Exception {
        ConsumerConfiguration consumerConfiguration = new ConsumerConfiguration();
        consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive);
        Consumer subscribe = this.pulsarClient.subscribe("persistent://prop/use/ns-abc/topic8", "sub1", consumerConfiguration);
        PersistentTopic topicReference = this.pulsar.getBrokerService().getTopicReference("persistent://prop/use/ns-abc/topic8");
        PersistentSubscription subscription = topicReference.getSubscription("sub1");
        Assert.assertNotNull(topicReference);
        Assert.assertNotNull(subscription);
        Assert.assertTrue(subscription.getDispatcher().isConsumerConnected());
        subscribe.close();
        Assert.assertFalse(subscription.getDispatcher().isConsumerConnected());
        this.admin.persistentTopics().delete("persistent://prop/use/ns-abc/topic8");
        try {
            this.admin.persistentTopics().getStats("persistent://prop/use/ns-abc/topic8");
        } catch (PulsarAdminException e) {
        }
    }

    int getAvailablePermits(PersistentSubscription persistentSubscription) {
        return ((Consumer) persistentSubscription.getDispatcher().getConsumers().get(0)).getAvailablePermits();
    }

    @Test(enabled = false)
    public void testUnloadNamespace() throws Exception {
        DestinationName destinationName = DestinationName.get("persistent://prop/use/ns-abc/topic-9");
        this.pulsarClient.createProducer("persistent://prop/use/ns-abc/topic-9");
        this.pulsarClient.close();
        Assert.assertTrue(this.pulsar.getBrokerService().getTopicReference("persistent://prop/use/ns-abc/topic-9") != null);
        Assert.assertTrue(this.pulsar.getManagedLedgerFactory().getManagedLedgers().containsKey(destinationName.getPersistenceNamingEncoding()));
        this.admin.namespaces().unload("prop/use/ns-abc");
        int i = 0;
        while (i < 30 && this.pulsar.getBrokerService().getTopicReference("persistent://prop/use/ns-abc/topic-9") != null) {
            Thread.sleep(1000L);
            i++;
        }
        if (i == 30) {
            Assert.fail("The topic reference should be null");
        }
        Assert.assertFalse(this.pulsar.getManagedLedgerFactory().getManagedLedgers().containsKey(destinationName.getPersistenceNamingEncoding()));
    }

    @Test
    public void testGC() throws Exception {
        this.pulsarClient.createProducer("persistent://prop/use/ns-abc/topic-10").close();
        Assert.assertNotNull(this.pulsar.getBrokerService().getTopicReference("persistent://prop/use/ns-abc/topic-10"));
        runGC();
        Assert.assertNull(this.pulsar.getBrokerService().getTopicReference("persistent://prop/use/ns-abc/topic-10"));
        ConsumerConfiguration consumerConfiguration = new ConsumerConfiguration();
        consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive);
        Consumer subscribe = this.pulsarClient.subscribe("persistent://prop/use/ns-abc/topic-10", "sub1", consumerConfiguration);
        runGC();
        Assert.assertNotNull(this.pulsar.getBrokerService().getTopicReference("persistent://prop/use/ns-abc/topic-10"));
        subscribe.close();
        runGC();
        Assert.assertNotNull(this.pulsar.getBrokerService().getTopicReference("persistent://prop/use/ns-abc/topic-10"));
        this.admin.persistentTopics().deleteSubscription("persistent://prop/use/ns-abc/topic-10", "sub1");
        runGC();
        Assert.assertNull(this.pulsar.getBrokerService().getTopicReference("persistent://prop/use/ns-abc/topic-10"));
    }

    @Test
    public void testGcAndRetentionPolicy() throws Exception {
        this.admin.namespaces().setRetention("prop/use/ns-abc", new RetentionPolicies(10, 10));
        this.pulsarClient.createProducer("persistent://prop/use/ns-abc/topic-10").close();
        Assert.assertNotNull(this.pulsar.getBrokerService().getTopicReference("persistent://prop/use/ns-abc/topic-10"));
        runGC();
        Assert.assertNotNull(this.pulsar.getBrokerService().getTopicReference("persistent://prop/use/ns-abc/topic-10"));
        this.admin.namespaces().setRetention("prop/use/ns-abc", new RetentionPolicies(0, 10));
        Thread.sleep(300L);
        ConsumerConfiguration consumerConfiguration = new ConsumerConfiguration();
        consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive);
        Consumer subscribe = this.pulsarClient.subscribe("persistent://prop/use/ns-abc/topic-10", "sub1", consumerConfiguration);
        runGC();
        Assert.assertNotNull(this.pulsar.getBrokerService().getTopicReference("persistent://prop/use/ns-abc/topic-10"));
        subscribe.close();
        runGC();
        Assert.assertNotNull(this.pulsar.getBrokerService().getTopicReference("persistent://prop/use/ns-abc/topic-10"));
        this.admin.persistentTopics().deleteSubscription("persistent://prop/use/ns-abc/topic-10", "sub1");
        runGC();
        Assert.assertNull(this.pulsar.getBrokerService().getTopicReference("persistent://prop/use/ns-abc/topic-10"));
    }

    @Test
    public void testMessageExpiry() throws Exception {
        this.admin.namespaces().createNamespace("prop/use/expiry-check");
        this.admin.namespaces().setNamespaceMessageTTL("prop/use/expiry-check", 1);
        ConsumerConfiguration consumerConfiguration = new ConsumerConfiguration();
        consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive);
        Consumer subscribe = this.pulsarClient.subscribe("persistent://prop/use/expiry-check/topic1", "sub1", consumerConfiguration);
        PersistentSubscription subscription = this.pulsar.getBrokerService().getTopicReference("persistent://prop/use/expiry-check/topic1").getSubscription("sub1");
        subscribe.close();
        Assert.assertFalse(subscription.getDispatcher().isConsumerConnected());
        Producer createProducer = this.pulsarClient.createProducer("persistent://prop/use/expiry-check/topic1");
        for (int i = 0; i < 10; i++) {
            createProducer.send(("my-message-" + i).getBytes());
        }
        rolloverPerIntervalStats();
        Assert.assertEquals(subscription.getNumberOfEntriesInBacklog(), 10L);
        Thread.sleep(TimeUnit.SECONDS.toMillis(1));
        runMessageExpiryCheck();
        Assert.assertEquals(subscription.getNumberOfEntriesInBacklog(), 0L);
        createProducer.close();
        subscribe.close();
        this.admin.persistentTopics().deleteSubscription("persistent://prop/use/expiry-check/topic1", "sub1");
        this.admin.persistentTopics().delete("persistent://prop/use/expiry-check/topic1");
        this.admin.namespaces().deleteNamespace("prop/use/expiry-check");
    }

    @Test
    public void testMessageExpiryWithFewExpiredBacklog() throws Exception {
        this.admin.namespaces().createNamespace("prop/use/expiry-check-1");
        this.admin.namespaces().setNamespaceMessageTTL("prop/use/expiry-check-1", 10);
        ConsumerConfiguration consumerConfiguration = new ConsumerConfiguration();
        consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive);
        this.pulsarClient.subscribe("persistent://prop/use/expiry-check-1/topic1", "sub1", consumerConfiguration);
        PersistentSubscription subscription = this.pulsar.getBrokerService().getTopicReference("persistent://prop/use/expiry-check-1/topic1").getSubscription("sub1");
        Assert.assertTrue(subscription.getDispatcher().isConsumerConnected());
        Producer createProducer = this.pulsarClient.createProducer("persistent://prop/use/expiry-check-1/topic1");
        for (int i = 0; i < 10; i++) {
            createProducer.send(("my-message-" + i).getBytes());
        }
        rolloverPerIntervalStats();
        Assert.assertEquals(subscription.getNumberOfEntriesInBacklog(), 10L);
        Thread.sleep(TimeUnit.SECONDS.toMillis(10));
        runMessageExpiryCheck();
        Assert.assertEquals(subscription.getNumberOfEntriesInBacklog(), 10L);
        Thread.sleep(TimeUnit.SECONDS.toMillis(10 / 2));
        runMessageExpiryCheck();
        Assert.assertEquals(subscription.getNumberOfEntriesInBacklog(), 0L);
    }

    @Test
    public void testSubscriptionTypeTransitions() throws Exception {
        ConsumerConfiguration consumerConfiguration = new ConsumerConfiguration();
        consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive);
        ConsumerConfiguration consumerConfiguration2 = new ConsumerConfiguration();
        consumerConfiguration2.setSubscriptionType(SubscriptionType.Shared);
        ConsumerConfiguration consumerConfiguration3 = new ConsumerConfiguration();
        consumerConfiguration3.setSubscriptionType(SubscriptionType.Failover);
        Consumer subscribe = this.pulsarClient.subscribe("persistent://prop/use/ns-abc/shared-topic2", "sub2", consumerConfiguration);
        Consumer consumer = null;
        Consumer consumer2 = null;
        PersistentSubscription subscription = this.pulsar.getBrokerService().getTopicReference("persistent://prop/use/ns-abc/shared-topic2").getSubscription("sub2");
        try {
            consumer = this.pulsarClient.subscribe("persistent://prop/use/ns-abc/shared-topic2", "sub2", consumerConfiguration2);
            Assert.fail("should have failed");
        } catch (PulsarClientException e) {
            Assert.assertTrue(e.getMessage().contains("Subscription is of different type"));
        }
        try {
            consumer2 = this.pulsarClient.subscribe("persistent://prop/use/ns-abc/shared-topic2", "sub2", consumerConfiguration3);
            Assert.fail("should have failed");
        } catch (PulsarClientException e2) {
            Assert.assertTrue(e2.getMessage().contains("Subscription is of different type"));
        }
        subscribe.close();
        try {
            consumer = this.pulsarClient.subscribe("persistent://prop/use/ns-abc/shared-topic2", "sub2", consumerConfiguration2);
            Assert.assertEquals(subscription.getDispatcher().getType(), PulsarApi.CommandSubscribe.SubType.Shared);
        } catch (PulsarClientException e3) {
            Assert.fail("should not fail");
        }
        try {
            subscribe = this.pulsarClient.subscribe("persistent://prop/use/ns-abc/shared-topic2", "sub2", consumerConfiguration);
            Assert.fail("should have failed");
        } catch (PulsarClientException e4) {
            Assert.assertTrue(e4.getMessage().contains("Subscription is of different type"));
        }
        consumer.close();
        try {
            consumer2 = this.pulsarClient.subscribe("persistent://prop/use/ns-abc/shared-topic2", "sub2", consumerConfiguration3);
            Assert.assertEquals(subscription.getDispatcher().getType(), PulsarApi.CommandSubscribe.SubType.Failover);
        } catch (PulsarClientException e5) {
            Assert.fail("should not fail");
        }
        consumer2.close();
        try {
            subscribe = this.pulsarClient.subscribe("persistent://prop/use/ns-abc/shared-topic2", "sub2", consumerConfiguration);
            Assert.assertEquals(subscription.getDispatcher().getType(), PulsarApi.CommandSubscribe.SubType.Exclusive);
        } catch (PulsarClientException e6) {
            Assert.fail("should not fail");
        }
        subscribe.close();
        this.admin.persistentTopics().delete("persistent://prop/use/ns-abc/shared-topic2");
    }

    @Test
    public void testReceiveWithTimeout() throws Exception {
        ConsumerConfiguration consumerConfiguration = new ConsumerConfiguration();
        consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive);
        consumerConfiguration.setReceiverQueueSize(1000);
        ConsumerImpl subscribe = this.pulsarClient.subscribe("persistent://prop/use/ns-abc/topic-receive-timeout", "sub", consumerConfiguration);
        Producer createProducer = this.pulsarClient.createProducer("persistent://prop/use/ns-abc/topic-receive-timeout");
        Assert.assertEquals(subscribe.getAvailablePermits(), 0);
        Assert.assertNull(subscribe.receive(10, TimeUnit.MILLISECONDS));
        Assert.assertEquals(subscribe.getAvailablePermits(), 0);
        createProducer.send("test".getBytes());
        Thread.sleep(100L);
        Assert.assertEquals(subscribe.getAvailablePermits(), 0);
        Assert.assertNotNull(subscribe.receive(10, TimeUnit.MILLISECONDS));
        Assert.assertEquals(subscribe.getAvailablePermits(), 1);
        Assert.assertNull(subscribe.receive(10, TimeUnit.MILLISECONDS));
        Assert.assertEquals(subscribe.getAvailablePermits(), 1);
    }

    @Test
    public void testProducerReturnedMessageId() throws Exception {
        Producer createProducer = this.pulsarClient.createProducer("persistent://prop/use/ns-abc/topic-xyz");
        PersistentTopic topicReference = this.pulsar.getBrokerService().getTopicReference("persistent://prop/use/ns-abc/topic-xyz");
        Assert.assertNotNull(topicReference);
        Assert.assertEquals(topicReference.getProducers().size(), 1L);
        long ledgerId = ((MLDataFormats.ManagedLedgerInfo.LedgerInfo) topicReference.getManagedLedger().getLedgersInfoAsList().get(0)).getLedgerId();
        for (int i = 0; i < 10; i++) {
            Assert.assertEquals(createProducer.send(("my-message-" + i).getBytes()), new MessageIdImpl(ledgerId, i, -1));
        }
        CountDownLatch countDownLatch = new CountDownLatch(10);
        for (int i2 = 10; i2 < 20; i2++) {
            Message build = MessageBuilder.create().setContent(("my-message-" + i2).getBytes()).build();
            int i3 = i2;
            createProducer.sendAsync(build).thenRun(() -> {
                Assert.assertEquals(build.getMessageId(), new MessageIdImpl(ledgerId, i3, -1));
                countDownLatch.countDown();
            }).exceptionally(th -> {
                return null;
            });
        }
        countDownLatch.await();
        createProducer.close();
    }

    @Test
    public void testProducerQueueFullBlocking() throws Exception {
        PulsarClient create = PulsarClient.create(this.brokerUrl.toString());
        ProducerImpl createProducer = create.createProducer("persistent://prop/use/ns-abc/topic-xyzx", new ProducerConfiguration().setMaxPendingMessages(10).setBlockIfQueueFull(true).setSendTimeout(1, TimeUnit.SECONDS));
        cleanup();
        long nanoTime = System.nanoTime();
        for (int i = 0; i < 10; i++) {
            createProducer.sendAsync("msg".getBytes());
        }
        Assert.assertTrue(System.nanoTime() - nanoTime < TimeUnit.SECONDS.toNanos(1L));
        Assert.assertEquals(createProducer.getPendingQueueSize(), 10);
        long nanoTime2 = System.nanoTime();
        createProducer.sendAsync("msg".getBytes());
        long nanoTime3 = System.nanoTime() - nanoTime2;
        Assert.assertTrue(nanoTime3 > TimeUnit.MILLISECONDS.toNanos(500L));
        Assert.assertTrue(nanoTime3 < TimeUnit.MILLISECONDS.toNanos(1500L));
        Assert.assertEquals(createProducer.getPendingQueueSize(), 1);
        createProducer.close();
        create.close();
        setup();
    }

    @Test
    public void testProducerQueueFullNonBlocking() throws Exception {
        PulsarClient create = PulsarClient.create(this.brokerUrl.toString());
        ProducerImpl createProducer = create.createProducer("persistent://prop/use/ns-abc/topic-xyzx", new ProducerConfiguration().setMaxPendingMessages(10).setBlockIfQueueFull(false).setSendTimeout(1, TimeUnit.SECONDS));
        cleanup();
        long nanoTime = System.nanoTime();
        for (int i = 0; i < 10; i++) {
            createProducer.sendAsync("msg".getBytes());
        }
        Assert.assertTrue(System.nanoTime() - nanoTime < TimeUnit.SECONDS.toNanos(1L));
        Assert.assertEquals(createProducer.getPendingQueueSize(), 10);
        long nanoTime2 = System.nanoTime();
        try {
            createProducer.send("msg".getBytes());
            Assert.fail("Send should have failed");
        } catch (PulsarClientException.ProducerQueueIsFullError e) {
        }
        Assert.assertTrue(System.nanoTime() - nanoTime2 < TimeUnit.SECONDS.toNanos(1L));
        Assert.assertEquals(createProducer.getPendingQueueSize(), 10);
        createProducer.close();
        create.close();
        setup();
    }

    @Test
    public void testDeleteTopics() throws Exception {
        BrokerService brokerService = this.pulsar.getBrokerService();
        Producer createProducer = this.pulsarClient.createProducer("persistent://prop/use/ns-abc/topic-1");
        this.pulsarClient.createProducer("persistent://prop/use/ns-abc/topic-2");
        brokerService.updateRates();
        Map bundleStats = brokerService.getBundleStats();
        Assert.assertEquals(bundleStats.size(), 1);
        Assert.assertNotNull((NamespaceBundleStats) bundleStats.get("prop/use/ns-abc/0x00000000_0xffffffff"));
        createProducer.close();
        this.admin.persistentTopics().delete("persistent://prop/use/ns-abc/topic-1");
        brokerService.updateRates();
        Map bundleStats2 = brokerService.getBundleStats();
        Assert.assertEquals(bundleStats2.size(), 1);
        Assert.assertNotNull((NamespaceBundleStats) bundleStats2.get("prop/use/ns-abc/0x00000000_0xffffffff"));
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "codec")
    public Object[][] codecProvider() {
        return new Object[]{new Object[]{CompressionType.NONE}, new Object[]{CompressionType.LZ4}, new Object[]{CompressionType.ZLIB}};
    }

    @Test(dataProvider = "codec")
    public void testCompression(CompressionType compressionType) throws Exception {
        String str = "persistent://prop/use/ns-abc/topic0" + compressionType;
        ProducerConfiguration producerConfiguration = new ProducerConfiguration();
        producerConfiguration.setCompressionType(compressionType);
        Producer createProducer = this.pulsarClient.createProducer(str, producerConfiguration);
        Consumer subscribe = this.pulsarClient.subscribe(str, "my-sub");
        PersistentTopic topicReference = this.pulsar.getBrokerService().getTopicReference(str);
        Assert.assertNotNull(topicReference);
        Assert.assertEquals(topicReference.getProducers().size(), 1L);
        for (int i = 0; i < 10; i++) {
            createProducer.send(("my-message-" + i).getBytes());
        }
        for (int i2 = 0; i2 < 10; i2++) {
            Message receive = subscribe.receive(5, TimeUnit.SECONDS);
            Assert.assertNotNull(receive);
            Assert.assertEquals(receive.getData(), ("my-message-" + i2).getBytes());
        }
        createProducer.close();
        subscribe.close();
    }

    @Test
    public void testBrokerTopicStats() throws Exception {
        BrokerService brokerService = this.pulsar.getBrokerService();
        Field declaredField = BrokerService.class.getDeclaredField("statsUpdater");
        declaredField.setAccessible(true);
        ((ScheduledExecutorService) declaredField.get(brokerService)).shutdown();
        Producer createProducer = this.pulsarClient.createProducer("persistent://prop/use/ns-abc/topic0", new ProducerConfiguration());
        for (int i = 0; i < 10; i++) {
            createProducer.send(("my-message-" + i).getBytes());
        }
        Metrics metrics = null;
        Thread.sleep(1000L);
        brokerService.updateRates();
        List destinationMetrics = brokerService.getDestinationMetrics();
        int i2 = 0;
        while (true) {
            if (i2 >= destinationMetrics.size()) {
                break;
            }
            if (((Metrics) destinationMetrics.get(i2)).getDimension("namespace").equalsIgnoreCase("prop/use/ns-abc")) {
                metrics = (Metrics) destinationMetrics.get(i2);
                break;
            }
            i2++;
        }
        Assert.assertNotNull(metrics);
        Assert.assertTrue(((Double) ((Metrics) destinationMetrics.get(0)).getMetrics().get("brk_in_rate")).doubleValue() > 0.0d);
    }

    @Test
    public void testPayloadCorruptionDetection() throws Exception {
        Producer createProducer = this.pulsarClient.createProducer("persistent://prop/use/ns-abc/topic1");
        Consumer subscribe = this.pulsarClient.subscribe("persistent://prop/use/ns-abc/topic1", "my-sub");
        CompletableFuture sendAsync = createProducer.sendAsync(MessageBuilder.create().setContent("message-1".getBytes()).build());
        stopBroker();
        Message build = MessageBuilder.create().setContent("message-2".getBytes()).build();
        CompletableFuture sendAsync2 = createProducer.sendAsync(build);
        build.getData()[build.getData().length - 1] = 51;
        startBroker();
        sendAsync.get();
        try {
            sendAsync2.get();
            Assert.fail("since we corrupted the message, it should be rejected by the broker");
        } catch (Exception e) {
        }
        Assert.assertEquals(new String(subscribe.receive(1, TimeUnit.SECONDS).getData()), "message-1");
        while (true) {
            Message receive = subscribe.receive(1, TimeUnit.SECONDS);
            if (receive == null) {
                return;
            } else {
                Assert.assertEquals(new String(receive.getData()), "message-1");
            }
        }
    }

    @Test
    public void testMessageRedelivery() throws Exception {
        ConsumerConfiguration consumerConfiguration = new ConsumerConfiguration();
        consumerConfiguration.setSubscriptionType(SubscriptionType.Shared);
        Consumer subscribe = this.pulsarClient.subscribe("persistent://prop/use/ns-abc/topic2", "sub2", consumerConfiguration);
        Producer createProducer = this.pulsarClient.createProducer("persistent://prop/use/ns-abc/topic2");
        for (int i = 0; i < 10; i++) {
            createProducer.send(("my-message-" + i).getBytes());
        }
        Message message = null;
        for (int i2 = 0; i2 < 10; i2++) {
            Message receive = subscribe.receive();
            if (i2 == 0) {
                message = receive;
            } else {
                subscribe.acknowledge(receive);
            }
        }
        subscribe.redeliverUnacknowledgedMessages();
        try {
            Assert.assertEquals(new String(subscribe.receive(1, TimeUnit.SECONDS).getData()), new String(message.getData()));
        } catch (Exception e) {
            Assert.fail("msg should be redelivered ", e);
        }
        Assert.assertNull(subscribe.receive(100, TimeUnit.MILLISECONDS));
        subscribe.close();
        createProducer.close();
    }

    @Test
    public void testMessageReplay() throws Exception {
        int i = 10 / 2;
        ConsumerConfiguration consumerConfiguration = new ConsumerConfiguration();
        consumerConfiguration.setSubscriptionType(SubscriptionType.Shared);
        consumerConfiguration.setReceiverQueueSize(1);
        Consumer subscribe = this.pulsarClient.subscribe("persistent://prop/use/ns-abc/topic2", "sub2", consumerConfiguration);
        Producer createProducer = this.pulsarClient.createProducer("persistent://prop/use/ns-abc/topic2");
        PersistentTopic topicReference = this.pulsar.getBrokerService().getTopicReference("persistent://prop/use/ns-abc/topic2");
        Assert.assertNotNull(topicReference);
        PersistentSubscription subscription = topicReference.getSubscription("sub2");
        PersistentDispatcherMultipleConsumers dispatcher = subscription.getDispatcher();
        Field declaredField = PersistentDispatcherMultipleConsumers.class.getDeclaredField("messagesToReplay");
        declaredField.setAccessible(true);
        ConcurrentLongPairSet concurrentLongPairSet = new ConcurrentLongPairSet(64, 1);
        Assert.assertNotNull(subscription);
        for (int i2 = 0; i2 < 10; i2++) {
            createProducer.send(("my-message-" + i2).getBytes());
        }
        MessageIdImpl messageIdImpl = null;
        for (int i3 = 0; i3 < 10; i3++) {
            Message receive = subscribe.receive();
            subscribe.acknowledge(receive);
            MessageIdImpl messageIdImpl2 = (MessageIdImpl) receive.getMessageId();
            if (i3 == 0) {
                messageIdImpl = messageIdImpl2;
            }
            if (i3 < i) {
                concurrentLongPairSet.add(messageIdImpl2.getLedgerId(), messageIdImpl2.getEntryId());
            }
        }
        Thread.sleep(1000L);
        declaredField.set(dispatcher, concurrentLongPairSet);
        dispatcher.redeliverUnacknowledgedMessages((Consumer) dispatcher.getConsumers().get(0));
        Assert.assertEquals(concurrentLongPairSet.size(), 0L);
        concurrentLongPairSet.add(messageIdImpl.getLedgerId(), messageIdImpl.getEntryId());
        declaredField.set(dispatcher, concurrentLongPairSet);
        createProducer.send("testMsg".getBytes());
        dispatcher.consumerFlow((Consumer) dispatcher.getConsumers().get(0), 1);
        Message receive2 = subscribe.receive(1, TimeUnit.SECONDS);
        Assert.assertNotNull(receive2);
        Assert.assertEquals(receive2.getData(), "testMsg".getBytes());
        subscribe.close();
        createProducer.close();
    }
}
