package org.apache.pulsar.broker.service.persistent;

import com.google.common.collect.Sets;
import java.io.ByteArrayOutputStream;
import java.lang.invoke.SerializedLambda;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.TopicPoliciesService;
import org.apache.pulsar.broker.stats.PrometheusMetricsTest;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.awaitility.Awaitility;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/service/persistent/PersistentTopicTest.class */
public class PersistentTopicTest extends BrokerTestBase {
    private static final Logger log = LoggerFactory.getLogger(PersistentTopicTest.class);

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod(alwaysRun = true)
    protected void setup() throws Exception {
        super.baseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testCleanFailedUnloadTopic() throws Exception {
        Producer create = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/failedUnload").enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/failedUnload").get();
        Assert.assertNotNull(persistentTopic);
        ManagedLedger managedLedger = persistentTopic.ledger;
        LedgerHandle ledgerHandle = (LedgerHandle) Mockito.mock(LedgerHandle.class);
        Field declaredField = managedLedger.getClass().getDeclaredField("currentLedger");
        declaredField.setAccessible(true);
        declaredField.set(managedLedger, ledgerHandle);
        ((LedgerHandle) Mockito.doNothing().when(ledgerHandle)).asyncClose((AsyncCallback.CloseCallback) ArgumentMatchers.any(), ArgumentMatchers.any());
        this.pulsar.getNamespaceService().unloadNamespaceBundle(this.pulsar.getNamespaceService().getBundle(TopicName.get("persistent://prop/ns-abc/failedUnload")), 5L, TimeUnit.SECONDS).get();
        retryStrategically(r4 -> {
            return !this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/failedUnload").isPresent();
        }, 5, 500L);
        Assert.assertFalse(this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/failedUnload").isPresent());
        create.close();
    }

    @Test
    public void testUnblockStuckSubscription() throws Exception {
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://prop/ns-abc/stuckSubscriptionTopic"}).subscriptionType(SubscriptionType.Shared).subscriptionName("shared").subscribe();
        Consumer subscribe2 = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://prop/ns-abc/stuckSubscriptionTopic"}).subscriptionType(SubscriptionType.Failover).subscriptionName("failOver").subscribe();
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://prop/ns-abc/stuckSubscriptionTopic").create();
        PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/stuckSubscriptionTopic").get();
        PersistentSubscription subscription = persistentTopic.getSubscription("shared");
        PersistentSubscription subscription2 = persistentTopic.getSubscription("failOver");
        PersistentDispatcherMultipleConsumers dispatcher = subscription.getDispatcher();
        PersistentDispatcherSingleActiveConsumer dispatcher2 = subscription2.getDispatcher();
        subscribe.close();
        subscribe2.close();
        dispatcher.havePendingRead = true;
        dispatcher2.havePendingRead = true;
        create.newMessage().value("test").eventTime(5L).send();
        create.newMessage().value("test").eventTime(5L).send();
        Consumer subscribe3 = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://prop/ns-abc/stuckSubscriptionTopic"}).subscriptionType(SubscriptionType.Shared).subscriptionName("shared").subscribe();
        Consumer subscribe4 = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://prop/ns-abc/stuckSubscriptionTopic"}).subscriptionType(SubscriptionType.Failover).subscriptionName("failOver").subscribe();
        Assert.assertNull(subscribe3.receive(2, TimeUnit.SECONDS));
        Assert.assertNull(subscribe4.receive(2, TimeUnit.SECONDS));
        dispatcher.havePendingRead = false;
        dispatcher2.havePendingRead = false;
        subscription.checkAndUnblockIfStuck();
        dispatcher2.checkAndUnblockIfStuck();
        Assert.assertTrue(subscription.checkAndUnblockIfStuck());
        Assert.assertTrue(dispatcher2.checkAndUnblockIfStuck());
        Assert.assertNotNull(subscribe3.receive(5, TimeUnit.SECONDS));
        Assert.assertNotNull(subscribe4.receive(5, TimeUnit.SECONDS));
    }

    @Test
    public void testDeleteNamespaceInfiniteRetry() throws Exception {
        String str = "prop/ns" + UUID.randomUUID();
        this.admin.namespaces().createNamespace(str, Sets.newHashSet(new String[]{"test"}));
        String str2 = "persistent://" + str + "/testDeleteNamespaceInfiniteRetry";
        this.conf.setForceDeleteNamespaceAllowed(true);
        this.pulsarClient.newProducer().topic(str2).create().close();
        this.admin.namespaces().setMaxConsumersPerTopic(str, 0);
        Awaitility.await().atMost(3L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(this.admin.namespaces().getMaxConsumersPerTopic(str).intValue() == 0);
        });
        PersistentTopic persistentTopic = (PersistentTopic) Mockito.spy((PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopicIfExists(str2).get()).get());
        Policies policies = new Policies();
        policies.deleted = true;
        persistentTopic.onPoliciesUpdate(policies);
        ((PersistentTopic) Mockito.verify(persistentTopic, Mockito.times(0))).checkReplicationAndRetryOnFailure();
        policies.deleted = false;
        persistentTopic.onPoliciesUpdate(policies);
        ((PersistentTopic) Mockito.verify(persistentTopic, Mockito.times(1))).checkReplicationAndRetryOnFailure();
    }

    @Test
    public void testAccumulativeStats() throws Exception {
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://prop/ns-abc/aTopic"}).subscriptionType(SubscriptionType.Shared).subscriptionName("shared").subscribe();
        Consumer subscribe2 = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://prop/ns-abc/aTopic"}).subscriptionType(SubscriptionType.Failover).subscriptionName("failOver").subscribe();
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://prop/ns-abc/aTopic").create();
        PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/aTopic").get();
        TopicStatsImpl stats = persistentTopic.getStats(false, false, false);
        Assert.assertEquals(stats.getBytesInCounter(), 0L);
        Assert.assertEquals(stats.getMsgInCounter(), 0L);
        Assert.assertEquals(stats.getBytesOutCounter(), 0L);
        Assert.assertEquals(stats.getMsgOutCounter(), 0L);
        create.newMessage().value("test").eventTime(5L).send();
        Assert.assertNotNull(subscribe.receive());
        Assert.assertNotNull(subscribe2.receive());
        TopicStatsImpl stats2 = persistentTopic.getStats(false, false, false);
        Assert.assertTrue(stats2.getBytesInCounter() > 0);
        Assert.assertTrue(stats2.getMsgInCounter() > 0);
        Assert.assertTrue(stats2.getBytesOutCounter() > 0);
        Assert.assertTrue(stats2.getMsgOutCounter() > 0);
        subscribe.unsubscribe();
        subscribe2.unsubscribe();
        create.close();
        Collection values = persistentTopic.getProducers().values();
        Objects.requireNonNull(persistentTopic);
        values.forEach(persistentTopic::removeProducer);
        Assert.assertEquals(persistentTopic.getProducers().size(), 0);
        TopicStatsImpl stats3 = persistentTopic.getStats(false, false, false);
        Assert.assertEquals(stats3.getBytesInCounter(), stats2.getBytesInCounter());
        Assert.assertEquals(stats3.getMsgInCounter(), stats2.getMsgInCounter());
        Assert.assertEquals(stats3.getBytesOutCounter(), stats2.getBytesOutCounter());
        Assert.assertEquals(stats3.getMsgOutCounter(), stats2.getMsgOutCounter());
    }

    @Test
    public void testPersistentPartitionedTopicUnload() throws Exception {
        this.admin.namespaces().createNamespace("prop/ns", 2);
        this.admin.topics().createPartitionedTopic("persistent://prop/ns/failedUnload", 5);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 1; i++) {
            arrayList.add(this.pulsarClient.newProducer(Schema.STRING).topic("persistent://prop/ns/failedUnload").create());
        }
        Assert.assertFalse(this.pulsar.getBrokerService().getTopics().containsKey("persistent://prop/ns/failedUnload"));
        this.pulsar.getBrokerService().getTopicIfExists("persistent://prop/ns/failedUnload").get();
        Assert.assertFalse(this.pulsar.getBrokerService().getTopics().containsKey("persistent://prop/ns/failedUnload"));
        Assert.assertFalse(this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns/failedUnload").isPresent());
        this.pulsar.getNamespaceService().unloadNamespaceBundle(this.pulsar.getNamespaceService().getBundle(TopicName.get("persistent://prop/ns/failedUnload")), 5L, TimeUnit.SECONDS).get();
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((Producer) it.next()).close();
        }
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "topicAndMetricsLevel")
    public Object[][] indexPatternTestData() {
        return new Object[]{new Object[]{"persistent://prop/autoNs/test_delayed_message_metric", true}, new Object[]{"persistent://prop/autoNs/test_delayed_message_metric", false}};
    }

    @Test(dataProvider = "topicAndMetricsLevel")
    public void testDelayedDeliveryTrackerMemoryUsageMetric(String str, boolean z) throws Exception {
        PulsarClient client = this.pulsar.getClient();
        String namespace = TopicName.get(str).getNamespace();
        this.admin.namespaces().createNamespace(namespace);
        CountDownLatch countDownLatch = new CountDownLatch(100);
        Producer create = client.newProducer(Schema.STRING).topic(str).enableBatching(false).create();
        try {
            Consumer subscribe = client.newConsumer(Schema.STRING).topic(new String[]{str}).subscriptionName("test_sub").subscriptionType(SubscriptionType.Shared).messageListener((consumer, message) -> {
                try {
                    countDownLatch.countDown();
                    consumer.acknowledge(message);
                } catch (PulsarClientException e) {
                    e.printStackTrace();
                }
            }).subscribe();
            for (int i = 0; i < 100; i++) {
                try {
                    create.newMessage().value(UUID.randomUUID().toString()).deliverAfter(30L, TimeUnit.SECONDS).sendAsync();
                } catch (Throwable th) {
                    if (Collections.singletonList(subscribe).get(0) != null) {
                        subscribe.close();
                    }
                    throw th;
                }
            }
            create.flush();
            countDownLatch.await(10L, TimeUnit.SECONDS);
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            PrometheusMetricsGenerator.generate(this.pulsar, z, true, true, byteArrayOutputStream);
            Collection<PrometheusMetricsTest.Metric> collection = PrometheusMetricsTest.parseMetrics(byteArrayOutputStream.toString(StandardCharsets.UTF_8)).get("pulsar_delayed_message_index_size_bytes");
            Assert.assertTrue(collection.size() > 0);
            int i2 = 0;
            int i3 = 0;
            int i4 = 0;
            for (PrometheusMetricsTest.Metric metric : collection) {
                if (z && metric.tags.get("topic").equals(str)) {
                    Assert.assertTrue(metric.value > 0.0d);
                    i2++;
                    if ("test_sub".equals(metric.tags.get("subscription"))) {
                        i4++;
                    }
                } else if (!z && metric.tags.get("namespace").equals(namespace)) {
                    Assert.assertTrue(metric.value > 0.0d);
                    i3++;
                }
            }
            if (z) {
                Assert.assertTrue(i2 > 0);
                Assert.assertTrue(i4 > 0);
                Assert.assertEquals(0, i3);
            } else {
                Assert.assertTrue(i3 > 0);
                Assert.assertEquals(i2, 0);
            }
            TopicStats stats = this.admin.topics().getStats(str);
            Assert.assertTrue(((SubscriptionStats) stats.getSubscriptions().get("test_sub")).getDelayedMessageIndexSizeInBytes() > 0);
            Assert.assertTrue(stats.getDelayedMessageIndexSizeInBytes() > 0);
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    @Test
    public void testUpdateCursorLastActive() throws Exception {
        long currentTimeMillis = System.currentTimeMillis();
        Thread.sleep(1L);
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://prop/ns-abc/aTopic"}).subscriptionType(SubscriptionType.Shared).subscriptionName("shared").acknowledgmentGroupTime(0L, TimeUnit.MILLISECONDS).subscribe();
        Consumer subscribe2 = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://prop/ns-abc/aTopic"}).subscriptionType(SubscriptionType.Failover).subscriptionName("failOver").acknowledgmentGroupTime(0L, TimeUnit.MILLISECONDS).subscribe();
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://prop/ns-abc/aTopic").create();
        PersistentTopic persistentTopic = (PersistentTopic) this.pulsar.getBrokerService().getTopicReference("persistent://prop/ns-abc/aTopic").get();
        PersistentSubscription subscription = persistentTopic.getSubscription("shared");
        PersistentSubscription subscription2 = persistentTopic.getSubscription("failOver");
        Assert.assertTrue(subscription.getCursor().getLastActive() > currentTimeMillis);
        Assert.assertTrue(subscription2.getCursor().getLastActive() > currentTimeMillis);
        long currentTimeMillis2 = System.currentTimeMillis();
        Thread.sleep(1L);
        create.newMessage().value("test").send();
        Message receive = subscribe.receive();
        Assert.assertNotNull(receive);
        subscribe.acknowledge(receive);
        Message receive2 = subscribe2.receive();
        Assert.assertNotNull(receive2);
        subscribe2.acknowledge(receive2);
        Awaitility.waitAtMost(5L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(subscription.getCursor().getLastActive() > currentTimeMillis2);
        });
        Awaitility.waitAtMost(5L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(subscription2.getCursor().getLastActive() > currentTimeMillis2);
        });
        Assert.assertTrue(subscription.getCursor().getLastActive() > currentTimeMillis2);
        Assert.assertTrue(subscription2.getCursor().getLastActive() > currentTimeMillis2);
        long currentTimeMillis3 = System.currentTimeMillis();
        Thread.sleep(1L);
        subscribe.unsubscribe();
        subscribe2.unsubscribe();
        create.close();
        Assert.assertTrue(subscription.getCursor().getLastActive() > currentTimeMillis3);
        Assert.assertTrue(subscription2.getCursor().getLastActive() > currentTimeMillis3);
    }

    @Test
    public void testCreateNonExistentPartitions() throws PulsarAdminException, PulsarClientException {
        this.admin.topics().createPartitionedTopic("persistent://prop/ns-abc/testCreateNonExistentPartitions", 4);
        try {
            Producer create = this.pulsarClient.newProducer().topic(TopicName.get("persistent://prop/ns-abc/testCreateNonExistentPartitions").getPartition(4).toString()).create();
            try {
                Assert.fail("unexpected behaviour");
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
                throw th;
            }
        } catch (PulsarClientException.NotAllowedException e) {
        }
        Assert.assertEquals(this.admin.topics().getPartitionedTopicMetadata("persistent://prop/ns-abc/testCreateNonExistentPartitions").partitions, 4);
    }

    @Test
    public void testCompatibilityWithPartitionKeyword() throws PulsarAdminException, PulsarClientException {
        String topicName = TopicName.get("persistent://prop/ns-abc/testCompatibilityWithPartitionKeyword").getPartition(2).toString();
        Producer create = this.pulsarClient.newProducer().topic(topicName).create();
        List list = this.admin.topics().getList("prop/ns-abc");
        create.close();
        this.conf.setAllowAutoTopicCreation(false);
        Assert.assertTrue(list.contains(topicName));
        Assert.assertThrows(PulsarAdminException.NotFoundException.class, () -> {
            this.admin.topics().getPartitionedTopicMetadata("persistent://prop/ns-abc/testCompatibilityWithPartitionKeyword");
        });
        this.pulsarClient.newProducer().topic(topicName).create().close();
        Assert.assertTrue(list.contains(topicName));
        Assert.assertThrows(PulsarAdminException.NotFoundException.class, () -> {
            this.admin.topics().getPartitionedTopicMetadata("persistent://prop/ns-abc/testCompatibilityWithPartitionKeyword");
        });
    }

    @Test
    public void testDeleteTopicFail() throws Exception {
        String str = "persistent://prop/ns-abc/tp_" + UUID.randomUUID().toString().replaceAll("-", "");
        BrokerService brokerService = (BrokerService) Mockito.spy(this.pulsar.getBrokerService());
        ((PulsarService) Mockito.doReturn(brokerService).when(this.pulsar)).getBrokerService();
        this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{str}).subscriptionName("sub1").subscribe().close();
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic(str).create();
        create.send("1");
        create.close();
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        PersistentTopic persistentTopic = (PersistentTopic) ((Optional) brokerService.getTopic(str, false).get()).get();
        ((BrokerService) Mockito.doAnswer(invocationOnMock -> {
            CompletableFuture completableFuture = (CompletableFuture) invocationOnMock.getArguments()[1];
            if (atomicBoolean.get()) {
                completableFuture.completeExceptionally(new RuntimeException("mock ex for test"));
                return null;
            }
            completableFuture.complete(null);
            return null;
        }).when(brokerService)).deleteTopicAuthenticationWithRetry((String) ArgumentMatchers.any(String.class), (CompletableFuture) ArgumentMatchers.any(CompletableFuture.class), ArgumentMatchers.anyInt());
        try {
            persistentTopic.delete().get();
        } catch (Exception e) {
            Assert.assertTrue(e instanceof ExecutionException);
            Assert.assertTrue(e.getCause() instanceof RuntimeException);
            Assert.assertEquals(e.getCause().getMessage(), "mock ex for test");
        }
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{str}).subscriptionName("sub1").subscribe();
        Assert.assertEquals("1", subscribe.receive(2, TimeUnit.SECONDS).getValue());
        subscribe.close();
        atomicBoolean.set(false);
        persistentTopic.delete().get();
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "topicLevelPolicy")
    public static Object[][] topicLevelPolicy() {
        return new Object[]{new Object[]{true}, new Object[]{false}};
    }

    @Test(dataProvider = "topicLevelPolicy")
    public void testCreateTopicWithZombieReplicatorCursor(boolean z) throws Exception {
        String str = "persistent://prop/ns-abc/testCreateTopicWithZombieReplicatorCursor" + z;
        this.admin.topics().createNonPartitionedTopic(str);
        this.admin.topics().createSubscription(str, this.conf.getReplicatorPrefix() + ".remote", MessageId.earliest, true);
        this.admin.clusters().createCluster("remote", ClusterData.builder().serviceUrl("http://localhost:11112").brokerServiceUrl("pulsar://localhost:11111").build());
        TenantInfo tenantInfo = this.admin.tenants().getTenantInfo("prop");
        tenantInfo.getAllowedClusters().add("remote");
        this.admin.tenants().updateTenant("prop", tenantInfo);
        if (z) {
            this.admin.topics().setReplicationClusters(str, Arrays.asList("test", "remote"));
        } else {
            this.admin.namespaces().setNamespaceReplicationClustersAsync("prop/ns-abc", Sets.newHashSet(new String[]{"test", "remote"})).get();
        }
        PersistentTopic persistentTopic = (PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopic(str, false).get(3L, TimeUnit.SECONDS)).orElse(null);
        Assert.assertNotNull(persistentTopic);
        Supplier supplier = () -> {
            HashSet hashSet = new HashSet();
            persistentTopic.getManagedLedger().getCursors().forEach(managedCursor -> {
                hashSet.add(managedCursor.getName());
            });
            return hashSet;
        };
        Assert.assertEquals((Set) supplier.get(), Collections.singleton(this.conf.getReplicatorPrefix() + ".remote"));
        Thread.sleep(100L);
        if (z) {
            this.admin.topics().setReplicationClusters(str, Collections.singletonList("test"));
        } else {
            this.admin.namespaces().setNamespaceReplicationClustersAsync("prop/ns-abc", Collections.singleton("test")).get();
        }
        this.admin.clusters().deleteCluster("remote");
        Awaitility.await().atMost(Duration.ofSeconds(10L)).until(() -> {
            log.info("Before initialize...");
            try {
                persistentTopic.initialize().get(3L, TimeUnit.SECONDS);
            } catch (ExecutionException e) {
                log.warn("Failed to initialize: {}", e.getCause().getMessage());
            }
            return Boolean.valueOf(!persistentTopic.getManagedLedger().getCursors().iterator().hasNext());
        });
    }

    @Test
    public void testCheckPersistencePolicies() throws Exception {
        this.admin.namespaces().createNamespace("prop/ns", Sets.newHashSet(new String[]{"test"}));
        String str = "persistent://prop/ns/testConfig" + UUID.randomUUID();
        this.conf.setForceDeleteNamespaceAllowed(true);
        this.pulsarClient.newProducer().topic(str).create().close();
        RetentionPolicies retentionPolicies = new RetentionPolicies(1, 1L);
        PersistentTopic persistentTopic = (PersistentTopic) Mockito.spy((PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopicIfExists(str).get()).get());
        TopicPoliciesService topicPoliciesService = (TopicPoliciesService) Mockito.spy(this.pulsar.getTopicPoliciesService());
        ((PulsarService) Mockito.doReturn(topicPoliciesService).when(this.pulsar)).getTopicPoliciesService();
        TopicPolicies topicPolicies = new TopicPolicies();
        topicPolicies.setRetentionPolicies(retentionPolicies);
        ((TopicPoliciesService) Mockito.doReturn(CompletableFuture.completedFuture(Optional.of(topicPolicies))).when(topicPoliciesService)).getTopicPoliciesAsync(TopicName.get(str));
        persistentTopic.onUpdate(topicPolicies);
        ((PersistentTopic) Mockito.verify(persistentTopic, Mockito.times(1))).checkPersistencePolicies();
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(persistentTopic.getManagedLedger().getConfig().getRetentionSizeInMB(), 1L);
            Assert.assertEquals(persistentTopic.getManagedLedger().getConfig().getRetentionTimeMillis(), TimeUnit.MINUTES.toMillis(1L));
        });
        ((PersistentTopic) Mockito.doReturn(CompletableFuture.failedFuture(new RuntimeException())).when(persistentTopic)).checkPersistencePolicies();
        topicPolicies.setRetentionPolicies(new RetentionPolicies(2, 2L));
        persistentTopic.onUpdate(topicPolicies);
        Assert.assertEquals(persistentTopic.getManagedLedger().getConfig().getRetentionSizeInMB(), 1L);
        Assert.assertEquals(persistentTopic.getManagedLedger().getConfig().getRetentionTimeMillis(), TimeUnit.MINUTES.toMillis(1L));
    }

    @Test
    public void testDynamicConfigurationAutoSkipNonRecoverableData() throws Exception {
        this.pulsar.getConfiguration().setAutoSkipNonRecoverableData(false);
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/testAutoSkipNonRecoverableData"}).subscriptionName("test_sub").subscribe();
        PersistentTopic persistentTopic = (PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopic("persistent://prop/ns-abc/testAutoSkipNonRecoverableData", false).join()).get();
        PersistentSubscription subscription = persistentTopic.getSubscription("test_sub");
        Assert.assertFalse(persistentTopic.ledger.getConfig().isAutoSkipNonRecoverableData());
        Assert.assertFalse(subscription.getExpiryMonitor().isAutoSkipNonRecoverableData());
        String str = "autoSkipNonRecoverableData";
        this.admin.brokers().updateDynamicConfiguration("autoSkipNonRecoverableData", "true");
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals((String) this.admin.brokers().getAllDynamicConfigurations().get(str), "true");
        });
        Assert.assertTrue(persistentTopic.ledger.getConfig().isAutoSkipNonRecoverableData());
        Assert.assertTrue(subscription.getExpiryMonitor().isAutoSkipNonRecoverableData());
        subscribe.close();
        this.admin.topics().delete("persistent://prop/ns-abc/testAutoSkipNonRecoverableData");
    }

    @Test
    public void testAddWaitingCursorsForNonDurable() throws Exception {
        this.admin.namespaces().createNamespace("prop/ns-test", 2);
        this.admin.topics().createNonPartitionedTopic("persistent://prop/ns-test/testAddWaitingCursors");
        Optional optional = (Optional) this.pulsar.getBrokerService().getTopic("persistent://prop/ns-test/testAddWaitingCursors", false).join();
        Assert.assertNotNull(optional.get());
        ManagedLedgerImpl managedLedger = ((PersistentTopic) optional.get()).getManagedLedger();
        ManagedCursor managedCursor = (ManagedCursor) Mockito.spy(managedLedger.newNonDurableCursor(PositionImpl.LATEST, "sub-2"));
        ((ManagedCursor) Mockito.doAnswer(invocationOnMock -> {
            Thread.sleep(10000L);
            invocationOnMock.callRealMethod();
            return null;
        }).when(managedCursor)).asyncReadEntriesOrWait(((Integer) ArgumentMatchers.any(Integer.TYPE)).intValue(), ((Long) ArgumentMatchers.any(Long.TYPE)).longValue(), (AsyncCallbacks.ReadEntriesCallback) ArgumentMatchers.any(AsyncCallbacks.ReadEntriesCallback.class), ArgumentMatchers.any(Object.class), (PositionImpl) ArgumentMatchers.any(PositionImpl.class));
        Field declaredField = ManagedLedgerImpl.class.getDeclaredField("cursors");
        declaredField.setAccessible(true);
        ManagedCursorContainer managedCursorContainer = (ManagedCursorContainer) declaredField.get(managedLedger);
        managedCursorContainer.removeCursor("sub-2");
        managedCursorContainer.add(managedCursor, (Position) null);
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://prop/ns-test/testAddWaitingCursors"}).subscriptionMode(SubscriptionMode.NonDurable).subscriptionType(SubscriptionType.Exclusive).subscriptionName("sub-2").subscribe();
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://prop/ns-test/testAddWaitingCursors").create();
        create.send("test");
        create.close();
        Assert.assertEquals("test", (String) subscribe.receive().getValue());
        subscribe.close();
        Awaitility.await().pollDelay(5L, TimeUnit.SECONDS).pollInterval(1L, TimeUnit.SECONDS).untilAsserted(() -> {
            Assert.assertEquals(managedLedger.getWaitingCursorsCount(), 0);
        });
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1930177981:
                if (implMethodName.equals("lambda$testDelayedDeliveryTrackerMemoryUsageMetric$cca17657$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case SHARED_VALUE:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pulsar/client/api/MessageListener") && serializedLambda.getFunctionalInterfaceMethodName().equals("received") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V") && serializedLambda.getImplClass().equals("org/apache/pulsar/broker/service/persistent/PersistentTopicTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/CountDownLatch;Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V")) {
                    CountDownLatch countDownLatch = (CountDownLatch) serializedLambda.getCapturedArg(0);
                    return (consumer, message) -> {
                        try {
                            countDownLatch.countDown();
                            consumer.acknowledge(message);
                        } catch (PulsarClientException e) {
                            e.printStackTrace();
                        }
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
