package org.apache.pulsar.broker.service;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.URL;
import java.util.ArrayList;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/broker/service/PersistentTopicTest.class */
public class PersistentTopicTest {
    private PulsarService pulsar;
    private BrokerService brokerService;
    private ManagedLedgerFactory mlFactoryMock;
    private ServerCnx serverCnx;
    private ManagedLedger ledgerMock;
    private ManagedCursor cursorMock;
    private ConfigurationCacheService configCacheService;
    final String successTopicName = "persistent://prop/use/ns-abc/successTopic";
    final String successPartitionTopicName = "persistent://prop/use/ns-abc/successTopic-partition-0";
    final String failTopicName = "persistent://prop/use/ns-abc/failTopic";
    final String successSubName = "successSub";
    final String successSubName2 = "successSub2";
    final String successSubName3 = "successSub3";
    private static final Logger log = LoggerFactory.getLogger(PersistentTopicTest.class);

    @BeforeMethod
    public void setup() throws Exception {
        ServiceConfiguration serviceConfiguration = (ServiceConfiguration) Mockito.spy(new ServiceConfiguration());
        this.pulsar = (PulsarService) Mockito.spy(new PulsarService(serviceConfiguration));
        ((PulsarService) Mockito.doReturn(serviceConfiguration).when(this.pulsar)).getConfiguration();
        this.mlFactoryMock = (ManagedLedgerFactory) Mockito.mock(ManagedLedgerFactory.class);
        ((PulsarService) Mockito.doReturn(this.mlFactoryMock).when(this.pulsar)).getManagedLedgerFactory();
        ((PulsarService) Mockito.doReturn(MockedPulsarServiceBaseTest.createMockZooKeeper()).when(this.pulsar)).getZkClient();
        this.configCacheService = (ConfigurationCacheService) Mockito.mock(ConfigurationCacheService.class);
        ZooKeeperDataCache zooKeeperDataCache = (ZooKeeperDataCache) Mockito.mock(ZooKeeperDataCache.class);
        ((ConfigurationCacheService) Mockito.doReturn(zooKeeperDataCache).when(this.configCacheService)).policiesCache();
        ((PulsarService) Mockito.doReturn(this.configCacheService).when(this.pulsar)).getConfigurationCache();
        ((ZooKeeperDataCache) Mockito.doReturn(Optional.empty()).when(zooKeeperDataCache)).get(Matchers.anyString());
        LocalZooKeeperCacheService localZooKeeperCacheService = (LocalZooKeeperCacheService) Mockito.mock(LocalZooKeeperCacheService.class);
        ((ZooKeeperDataCache) Mockito.doReturn(CompletableFuture.completedFuture(Optional.empty())).when(zooKeeperDataCache)).getAsync((String) Matchers.any());
        ((LocalZooKeeperCacheService) Mockito.doReturn(zooKeeperDataCache).when(localZooKeeperCacheService)).policiesCache();
        ((PulsarService) Mockito.doReturn(this.configCacheService).when(this.pulsar)).getConfigurationCache();
        ((PulsarService) Mockito.doReturn(localZooKeeperCacheService).when(this.pulsar)).getLocalZkCacheService();
        this.brokerService = (BrokerService) Mockito.spy(new BrokerService(this.pulsar));
        ((PulsarService) Mockito.doReturn(this.brokerService).when(this.pulsar)).getBrokerService();
        this.serverCnx = (ServerCnx) Mockito.spy(new ServerCnx(this.brokerService));
        ((ServerCnx) Mockito.doReturn(true).when(this.serverCnx)).isActive();
        ((ServerCnx) Mockito.doReturn(true).when(this.serverCnx)).isWritable();
        ((ServerCnx) Mockito.doReturn(new InetSocketAddress("localhost", 1234)).when(this.serverCnx)).clientAddress();
        NamespaceService namespaceService = (NamespaceService) Mockito.mock(NamespaceService.class);
        ((PulsarService) Mockito.doReturn(namespaceService).when(this.pulsar)).getNamespaceService();
        ((NamespaceService) Mockito.doReturn(true).when(namespaceService)).isServiceUnitOwned((ServiceUnitId) Matchers.any(NamespaceBundle.class));
        ((NamespaceService) Mockito.doReturn(true).when(namespaceService)).isServiceUnitActive((DestinationName) Matchers.any(DestinationName.class));
        setupMLAsyncCallbackMocks();
    }

    @AfterMethod
    public void teardown() throws Exception {
        this.brokerService.getTopics().clear();
        this.brokerService.close();
        try {
            this.pulsar.close();
        } catch (Exception e) {
            log.warn("Failed to close pulsar service", e);
            throw e;
        }
    }

    @Test
    public void testCreateTopic() throws Exception {
        final ManagedLedger managedLedger = (ManagedLedger) Mockito.mock(ManagedLedger.class);
        ((ManagedLedger) Mockito.doReturn(new ArrayList()).when(managedLedger)).getCursors();
        ((ManagedLedgerFactory) Mockito.doAnswer(new Answer<Object>() { // from class: org.apache.pulsar.broker.service.PersistentTopicTest.1
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerComplete(managedLedger, (Object) null);
                return null;
            }
        }).when(this.mlFactoryMock)).asyncOpen(Matchers.anyString(), (ManagedLedgerConfig) Matchers.any(ManagedLedgerConfig.class), (AsyncCallbacks.OpenLedgerCallback) Matchers.any(AsyncCallbacks.OpenLedgerCallback.class), Matchers.anyObject());
        try {
            this.brokerService.getTopic("persistent://prop/use/ns-abc/topic1").thenAccept(topic -> {
                Assert.assertTrue(topic.toString().contains("persistent://prop/use/ns-abc/topic1"));
            }).exceptionally(th -> {
                Assert.fail("should not fail");
                return null;
            }).get(1L, TimeUnit.SECONDS);
        } catch (Exception e) {
            Assert.fail("Should not fail or time out");
        }
    }

    @Test
    public void testCreateTopicMLFailure() throws Exception {
        ((ManagedLedgerFactory) Mockito.doAnswer(new Answer<Object>() { // from class: org.apache.pulsar.broker.service.PersistentTopicTest.2
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                new Thread(() -> {
                    ((AsyncCallbacks.OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerFailed(new ManagedLedgerException("Managed ledger failure"), (Object) null);
                }).start();
                return null;
            }
        }).when(this.mlFactoryMock)).asyncOpen(Matchers.anyString(), (ManagedLedgerConfig) Matchers.any(ManagedLedgerConfig.class), (AsyncCallbacks.OpenLedgerCallback) Matchers.any(AsyncCallbacks.OpenLedgerCallback.class), Matchers.anyObject());
        try {
            this.brokerService.getTopic("persistent://prop/use/ns-abc/topic3").get(1L, TimeUnit.SECONDS);
            Assert.fail("should have failed");
        } catch (TimeoutException e) {
            Assert.fail("Should not time out");
        } catch (Exception e2) {
        }
    }

    @Test
    public void testPublishMessage() throws Exception {
        PersistentTopic persistentTopic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        ByteBuf wrappedBuffer = Unpooled.wrappedBuffer("content".getBytes());
        CountDownLatch countDownLatch = new CountDownLatch(1);
        persistentTopic.publishMessage(wrappedBuffer, (exc, j, j2) -> {
            countDownLatch.countDown();
        });
        Assert.assertTrue(countDownLatch.await(1L, TimeUnit.SECONDS));
    }

    @Test
    public void testDispatcherMultiConsumerReadFailed() throws Exception {
        PersistentTopic persistentTopic = (PersistentTopic) Mockito.spy(new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService));
        ManagedCursor managedCursor = (ManagedCursor) Mockito.mock(ManagedCursor.class);
        Mockito.when(managedCursor.getName()).thenReturn("cursor");
        new PersistentDispatcherMultipleConsumers(persistentTopic, managedCursor).readEntriesFailed(new ManagedLedgerException.InvalidCursorPositionException("failed"), (Object) null);
        ((PersistentTopic) Mockito.verify(persistentTopic, Mockito.atLeast(1))).getBrokerService();
    }

    @Test
    public void testDispatcherSingleConsumerReadFailed() throws Exception {
        PersistentTopic persistentTopic = (PersistentTopic) Mockito.spy(new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService));
        ManagedCursor managedCursor = (ManagedCursor) Mockito.mock(ManagedCursor.class);
        Mockito.when(managedCursor.getName()).thenReturn("cursor");
        new PersistentDispatcherSingleActiveConsumer(managedCursor, PulsarApi.CommandSubscribe.SubType.Exclusive, 1, persistentTopic).readEntriesFailed(new ManagedLedgerException.InvalidCursorPositionException("failed"), (Consumer) Mockito.mock(Consumer.class));
        ((PersistentTopic) Mockito.verify(persistentTopic, Mockito.atLeast(1))).getBrokerService();
    }

    @Test
    public void testPublishMessageMLFailure() throws Exception {
        ManagedLedger managedLedger = (ManagedLedger) Mockito.mock(ManagedLedger.class);
        ((ManagedLedger) Mockito.doReturn(new ArrayList()).when(managedLedger)).getCursors();
        PersistentTopic persistentTopic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", managedLedger, this.brokerService);
        PulsarApi.MessageMetadata.Builder newBuilder = PulsarApi.MessageMetadata.newBuilder();
        newBuilder.setPublishTime(System.currentTimeMillis());
        newBuilder.setProducerName("prod-name");
        newBuilder.setSequenceId(1L);
        ByteBuf wrappedBuffer = Unpooled.wrappedBuffer("content".getBytes());
        CountDownLatch countDownLatch = new CountDownLatch(1);
        ((ManagedLedger) Mockito.doAnswer(new Answer<Object>() { // from class: org.apache.pulsar.broker.service.PersistentTopicTest.3
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.AddEntryCallback) invocationOnMock.getArguments()[1]).addFailed(new ManagedLedgerException("Managed ledger failure"), invocationOnMock.getArguments()[2]);
                return null;
            }
        }).when(managedLedger)).asyncAddEntry((ByteBuf) Matchers.any(ByteBuf.class), (AsyncCallbacks.AddEntryCallback) Matchers.any(AsyncCallbacks.AddEntryCallback.class), Matchers.anyObject());
        persistentTopic.publishMessage(wrappedBuffer, (exc, j, j2) -> {
            if (exc == null) {
                Assert.fail("publish should have failed");
            } else {
                countDownLatch.countDown();
            }
        });
        Assert.assertTrue(countDownLatch.await(1L, TimeUnit.SECONDS));
    }

    @Test
    public void testAddRemoveProducer() throws Exception {
        PersistentTopic persistentTopic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        Producer producer = new Producer(persistentTopic, this.serverCnx, 1L, "prod-name", "appid1");
        persistentTopic.addProducer(producer);
        Assert.assertEquals(persistentTopic.getProducers().size(), 1L);
        try {
            persistentTopic.addProducer(producer);
            Assert.fail("Should have failed with naming exception because producer 'null' is already connected to the topic");
        } catch (BrokerServiceException e) {
            Assert.assertTrue(e instanceof BrokerServiceException.NamingException);
        }
        Assert.assertEquals(persistentTopic.getProducers().size(), 1L);
        try {
            persistentTopic.addProducer(new Producer(new PersistentTopic("persistent://prop/use/ns-abc/failTopic", this.ledgerMock, this.brokerService), this.serverCnx, 2L, "prod-name", "appid1"));
            Assert.fail("should have failed");
        } catch (IllegalArgumentException e2) {
        }
        persistentTopic.removeProducer(producer);
        Assert.assertEquals(persistentTopic.getProducers().size(), 0L);
        persistentTopic.removeProducer(producer);
    }

    @Test
    public void testSubscribeFail() throws Exception {
        PersistentTopic persistentTopic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        PulsarApi.CommandSubscribe build = PulsarApi.CommandSubscribe.newBuilder().setConsumerId(1L).setTopic("persistent://prop/use/ns-abc/successTopic").setSubscription("").setRequestId(1L).setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build();
        try {
            persistentTopic.subscribe(this.serverCnx, build.getSubscription(), build.getConsumerId(), build.getSubType(), 0, build.getConsumerName(), build.getDurable(), (MessageId) null).get();
            Assert.fail("should fail with exception");
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof BrokerServiceException.NamingException);
        }
    }

    @Test
    public void testSubscribeUnsubscribe() throws Exception {
        PersistentTopic persistentTopic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        PulsarApi.CommandSubscribe build = PulsarApi.CommandSubscribe.newBuilder().setConsumerId(1L).setTopic("persistent://prop/use/ns-abc/successTopic").setSubscription("successSub").setRequestId(1L).setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build();
        persistentTopic.subscribe(this.serverCnx, build.getSubscription(), build.getConsumerId(), build.getSubType(), 0, build.getConsumerName(), build.getDurable(), (MessageId) null).get();
        try {
            persistentTopic.subscribe(this.serverCnx, build.getSubscription(), build.getConsumerId(), build.getSubType(), 0, build.getConsumerName(), build.getDurable(), (MessageId) null).get();
            Assert.fail("should fail with exception");
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof BrokerServiceException.ConsumerBusyException);
        }
        persistentTopic.unsubscribe("successSub").get();
        Assert.assertNull(persistentTopic.getSubscription("successSub"));
    }

    @Test
    public void testAddRemoveConsumer() throws Exception {
        PersistentTopic persistentTopic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        PersistentSubscription persistentSubscription = new PersistentSubscription(persistentTopic, "sub-1", this.cursorMock);
        Consumer consumer = new Consumer(persistentSubscription, PulsarApi.CommandSubscribe.SubType.Exclusive, persistentTopic.getName(), 1L, 0, "Cons1", 50000, this.serverCnx, "myrole-1");
        persistentSubscription.addConsumer(consumer);
        Assert.assertTrue(persistentSubscription.getDispatcher().isConsumerConnected());
        try {
            persistentSubscription.addConsumer(consumer);
            Assert.fail("Should fail with ConsumerBusyException");
        } catch (BrokerServiceException e) {
            Assert.assertTrue(e instanceof BrokerServiceException.ConsumerBusyException);
        }
        persistentSubscription.removeConsumer(consumer);
        Assert.assertFalse(persistentSubscription.getDispatcher().isConsumerConnected());
        try {
            persistentSubscription.removeConsumer(consumer);
            Assert.fail("Should fail with ServerMetadataException");
        } catch (BrokerServiceException e2) {
            Assert.assertTrue(e2 instanceof BrokerServiceException.ServerMetadataException);
        }
    }

    @Test
    public void testUbsubscribeRaceConditions() throws Exception {
        PersistentTopic persistentTopic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        PersistentSubscription persistentSubscription = new PersistentSubscription(persistentTopic, "sub-1", this.cursorMock);
        Consumer consumer = new Consumer(persistentSubscription, PulsarApi.CommandSubscribe.SubType.Exclusive, persistentTopic.getName(), 1L, 0, "Cons1", 50000, this.serverCnx, "myrole-1");
        persistentSubscription.addConsumer(consumer);
        ((ManagedLedger) Mockito.doAnswer(new Answer<Object>() { // from class: org.apache.pulsar.broker.service.PersistentTopicTest.4
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.DeleteCursorCallback) invocationOnMock.getArguments()[1]).deleteCursorComplete((Object) null);
                Thread.sleep(1000L);
                return null;
            }
        }).when(this.ledgerMock)).asyncDeleteCursor(Matchers.matches(".*success.*"), (AsyncCallbacks.DeleteCursorCallback) Matchers.any(AsyncCallbacks.DeleteCursorCallback.class), Matchers.anyObject());
        Executors.newCachedThreadPool().submit(() -> {
            persistentSubscription.doUnsubscribe(consumer);
            return null;
        }).get();
        try {
            Thread.sleep(10L);
            new Consumer(persistentSubscription, PulsarApi.CommandSubscribe.SubType.Exclusive, persistentTopic.getName(), 2L, 0, "Cons2", 50000, this.serverCnx, "myrole-1");
        } catch (BrokerServiceException e) {
            Assert.assertTrue(e instanceof BrokerServiceException.SubscriptionFencedException);
        }
    }

    @Test
    public void testDeleteTopic() throws Exception {
        ((PersistentTopic) this.brokerService.getTopic("persistent://prop/use/ns-abc/successTopic").get()).delete().get();
        Assert.assertNull(this.brokerService.getTopicReference("persistent://prop/use/ns-abc/successTopic"));
        PersistentTopic persistentTopic = (PersistentTopic) this.brokerService.getTopic("persistent://prop/use/ns-abc/successTopic").get();
        Producer producer = new Producer(persistentTopic, this.serverCnx, 1L, "prod-name", "appid1");
        persistentTopic.addProducer(producer);
        Assert.assertTrue(persistentTopic.delete().isCompletedExceptionally());
        persistentTopic.removeProducer(producer);
        PulsarApi.CommandSubscribe build = PulsarApi.CommandSubscribe.newBuilder().setConsumerId(1L).setTopic("persistent://prop/use/ns-abc/successTopic").setSubscription("successSub").setRequestId(1L).setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build();
        persistentTopic.subscribe(this.serverCnx, build.getSubscription(), build.getConsumerId(), build.getSubType(), 0, build.getConsumerName(), build.getDurable(), (MessageId) null).get();
        Assert.assertTrue(persistentTopic.delete().isCompletedExceptionally());
        persistentTopic.unsubscribe("successSub");
    }

    @Test
    public void testDeleteAndUnsubscribeTopic() throws Exception {
        final PersistentTopic persistentTopic = (PersistentTopic) this.brokerService.getTopic("persistent://prop/use/ns-abc/successTopic").get();
        PulsarApi.CommandSubscribe build = PulsarApi.CommandSubscribe.newBuilder().setConsumerId(1L).setTopic("persistent://prop/use/ns-abc/successTopic").setSubscription("successSub").setRequestId(1L).setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build();
        persistentTopic.subscribe(this.serverCnx, build.getSubscription(), build.getConsumerId(), build.getSubType(), 0, build.getConsumerName(), build.getDurable(), (MessageId) null).get();
        final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
        final CountDownLatch countDownLatch = new CountDownLatch(2);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Thread thread = new Thread() { // from class: org.apache.pulsar.broker.service.PersistentTopicTest.5
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    cyclicBarrier.await();
                    Assert.assertFalse(persistentTopic.delete().isCompletedExceptionally());
                } catch (Exception e) {
                    e.printStackTrace();
                    atomicBoolean.set(true);
                } finally {
                    countDownLatch.countDown();
                }
            }
        };
        Thread thread2 = new Thread() { // from class: org.apache.pulsar.broker.service.PersistentTopicTest.6
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    cyclicBarrier.await();
                    persistentTopic.unsubscribe("successSub");
                } catch (Exception e) {
                    e.printStackTrace();
                    atomicBoolean.set(true);
                } finally {
                    countDownLatch.countDown();
                }
            }
        };
        thread.start();
        thread2.start();
        countDownLatch.await();
        Assert.assertEquals(atomicBoolean.get(), false);
    }

    public void testConcurrentTopicAndSubscriptionDelete() throws Exception {
        final PersistentTopic persistentTopic = (PersistentTopic) this.brokerService.getTopic("persistent://prop/use/ns-abc/successTopic").get();
        PulsarApi.CommandSubscribe build = PulsarApi.CommandSubscribe.newBuilder().setConsumerId(1L).setTopic("persistent://prop/use/ns-abc/successTopic").setSubscription("successSub").setRequestId(1L).setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build();
        persistentTopic.subscribe(this.serverCnx, build.getSubscription(), build.getConsumerId(), build.getSubType(), 0, build.getConsumerName(), build.getDurable(), (MessageId) null).get();
        final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
        final CountDownLatch countDownLatch = new CountDownLatch(2);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Thread thread = new Thread() { // from class: org.apache.pulsar.broker.service.PersistentTopicTest.7
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    cyclicBarrier.await();
                    Thread.sleep(5L, 0);
                    PersistentTopicTest.log.info("deleter outcome is {}", persistentTopic.delete().get());
                } catch (Exception e) {
                    e.printStackTrace();
                    atomicBoolean.set(true);
                } finally {
                    countDownLatch.countDown();
                }
            }
        };
        Thread thread2 = new Thread() { // from class: org.apache.pulsar.broker.service.PersistentTopicTest.8
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    cyclicBarrier.await();
                    PersistentSubscription persistentSubscription = (PersistentSubscription) persistentTopic.getSubscriptions().get("successSub");
                    PersistentTopicTest.log.info("unsubscriber outcome is {}", persistentSubscription.doUnsubscribe((Consumer) persistentSubscription.getConsumers().get(0)).get());
                } catch (Exception e) {
                    e.printStackTrace();
                    atomicBoolean.set(true);
                } finally {
                    countDownLatch.countDown();
                }
            }
        };
        thread.start();
        thread2.start();
        countDownLatch.await();
        Assert.assertEquals(atomicBoolean.get(), false);
    }

    @Test
    public void testDeleteTopicRaceConditions() throws Exception {
        PersistentTopic persistentTopic = (PersistentTopic) this.brokerService.getTopic("persistent://prop/use/ns-abc/successTopic").get();
        ((ManagedLedger) Mockito.doAnswer(new Answer<Object>() { // from class: org.apache.pulsar.broker.service.PersistentTopicTest.9
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                Thread.sleep(1000L);
                ((AsyncCallbacks.DeleteLedgerCallback) invocationOnMock.getArguments()[0]).deleteLedgerComplete((Object) null);
                return null;
            }
        }).when(this.ledgerMock)).asyncDelete((AsyncCallbacks.DeleteLedgerCallback) Matchers.any(AsyncCallbacks.DeleteLedgerCallback.class), Matchers.anyObject());
        Executors.newCachedThreadPool().submit(() -> {
            persistentTopic.delete();
            return null;
        }).get();
        try {
            Thread.sleep(10L);
            persistentTopic.addProducer(new Producer(persistentTopic, this.serverCnx, 1L, "prod-name", "appid1"));
            Assert.fail("Should have failed");
        } catch (BrokerServiceException e) {
            Assert.assertTrue(e instanceof BrokerServiceException.TopicFencedException);
        }
        PulsarApi.CommandSubscribe build = PulsarApi.CommandSubscribe.newBuilder().setConsumerId(1L).setTopic("persistent://prop/use/ns-abc/successTopic").setSubscription("successSub").setRequestId(1L).setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build();
        try {
            persistentTopic.subscribe(this.serverCnx, build.getSubscription(), build.getConsumerId(), build.getSubType(), 0, build.getConsumerName(), build.getDurable(), (MessageId) null).get();
            Assert.fail("should have failed");
        } catch (ExecutionException e2) {
            Assert.assertTrue(e2.getCause() instanceof BrokerServiceException.TopicFencedException);
        }
    }

    void setupMLAsyncCallbackMocks() {
        this.ledgerMock = (ManagedLedger) Mockito.mock(ManagedLedger.class);
        this.cursorMock = (ManagedCursor) Mockito.mock(ManagedCursor.class);
        final CompletableFuture completableFuture = new CompletableFuture();
        ((ManagedLedger) Mockito.doReturn(new ArrayList()).when(this.ledgerMock)).getCursors();
        ((ManagedCursor) Mockito.doReturn("mockCursor").when(this.cursorMock)).getName();
        ((ManagedCursor) Mockito.doAnswer(new Answer() { // from class: org.apache.pulsar.broker.service.PersistentTopicTest.11
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                return Boolean.valueOf(completableFuture.complete(null));
            }
        }).when(this.cursorMock)).asyncClose(new AsyncCallbacks.CloseCallback() { // from class: org.apache.pulsar.broker.service.PersistentTopicTest.10
            public void closeComplete(Object obj) {
                PersistentTopicTest.log.info("[{}] Successfully closed cursor ledger", "mockCursor");
                completableFuture.complete(null);
            }

            public void closeFailed(ManagedLedgerException managedLedgerException, Object obj) {
                PersistentTopicTest.log.error("Error closing cursor for subscription", managedLedgerException);
                completableFuture.completeExceptionally(new BrokerServiceException.PersistenceException(managedLedgerException));
            }
        }, (Object) null);
        ((ManagedLedgerFactory) Mockito.doAnswer(new Answer<Object>() { // from class: org.apache.pulsar.broker.service.PersistentTopicTest.12
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerComplete(PersistentTopicTest.this.ledgerMock, (Object) null);
                return null;
            }
        }).when(this.mlFactoryMock)).asyncOpen(Matchers.matches(".*success.*"), (ManagedLedgerConfig) Matchers.any(ManagedLedgerConfig.class), (AsyncCallbacks.OpenLedgerCallback) Matchers.any(AsyncCallbacks.OpenLedgerCallback.class), Matchers.anyObject());
        ((ManagedLedgerFactory) Mockito.doAnswer(new Answer<Object>() { // from class: org.apache.pulsar.broker.service.PersistentTopicTest.13
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerFailed(new ManagedLedgerException("Managed ledger failure"), (Object) null);
                return null;
            }
        }).when(this.mlFactoryMock)).asyncOpen(Matchers.matches(".*fail.*"), (ManagedLedgerConfig) Matchers.any(ManagedLedgerConfig.class), (AsyncCallbacks.OpenLedgerCallback) Matchers.any(AsyncCallbacks.OpenLedgerCallback.class), Matchers.anyObject());
        ((ManagedLedger) Mockito.doAnswer(new Answer<Object>() { // from class: org.apache.pulsar.broker.service.PersistentTopicTest.14
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.AddEntryCallback) invocationOnMock.getArguments()[1]).addComplete(new PositionImpl(1L, 1L), invocationOnMock.getArguments()[2]);
                return null;
            }
        }).when(this.ledgerMock)).asyncAddEntry((ByteBuf) Matchers.any(ByteBuf.class), (AsyncCallbacks.AddEntryCallback) Matchers.any(AsyncCallbacks.AddEntryCallback.class), Matchers.anyObject());
        ((ManagedLedger) Mockito.doAnswer(new Answer<Object>() { // from class: org.apache.pulsar.broker.service.PersistentTopicTest.15
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.OpenCursorCallback) invocationOnMock.getArguments()[1]).openCursorComplete(PersistentTopicTest.this.cursorMock, (Object) null);
                return null;
            }
        }).when(this.ledgerMock)).asyncOpenCursor(Matchers.matches(".*success.*"), (AsyncCallbacks.OpenCursorCallback) Matchers.any(AsyncCallbacks.OpenCursorCallback.class), Matchers.anyObject());
        ((ManagedLedger) Mockito.doAnswer(new Answer<Object>() { // from class: org.apache.pulsar.broker.service.PersistentTopicTest.16
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.DeleteLedgerCallback) invocationOnMock.getArguments()[0]).deleteLedgerComplete((Object) null);
                return null;
            }
        }).when(this.ledgerMock)).asyncDelete((AsyncCallbacks.DeleteLedgerCallback) Matchers.any(AsyncCallbacks.DeleteLedgerCallback.class), Matchers.anyObject());
        ((ManagedLedger) Mockito.doAnswer(new Answer<Object>() { // from class: org.apache.pulsar.broker.service.PersistentTopicTest.17
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AsyncCallbacks.DeleteCursorCallback) invocationOnMock.getArguments()[1]).deleteCursorComplete((Object) null);
                return null;
            }
        }).when(this.ledgerMock)).asyncDeleteCursor(Matchers.matches(".*success.*"), (AsyncCallbacks.DeleteCursorCallback) Matchers.any(AsyncCallbacks.DeleteCursorCallback.class), Matchers.anyObject());
    }

    @Test
    public void testFailoverSubscription() throws Exception {
        PersistentTopic persistentTopic = new PersistentTopic("persistent://prop/use/ns-abc/successTopic", this.ledgerMock, this.brokerService);
        PulsarApi.CommandSubscribe build = PulsarApi.CommandSubscribe.newBuilder().setConsumerId(1L).setTopic("persistent://prop/use/ns-abc/successTopic").setSubscription("successSub").setRequestId(1L).setSubType(PulsarApi.CommandSubscribe.SubType.Failover).build();
        persistentTopic.subscribe(this.serverCnx, build.getSubscription(), build.getConsumerId(), build.getSubType(), 0, build.getConsumerName(), build.getDurable(), (MessageId) null).get();
        PersistentTopic persistentTopic2 = new PersistentTopic("persistent://prop/use/ns-abc/successTopic-partition-0", this.ledgerMock, this.brokerService);
        PulsarApi.CommandSubscribe build2 = PulsarApi.CommandSubscribe.newBuilder().setConsumerId(1L).setConsumerName("C1").setTopic("persistent://prop/use/ns-abc/successTopic-partition-0").setSubscription("successSub").setRequestId(1L).setSubType(PulsarApi.CommandSubscribe.SubType.Failover).build();
        persistentTopic2.subscribe(this.serverCnx, build2.getSubscription(), build2.getConsumerId(), build2.getSubType(), 0, build2.getConsumerName(), build2.getDurable(), (MessageId) null).get();
        PulsarApi.CommandSubscribe build3 = PulsarApi.CommandSubscribe.newBuilder().setConsumerId(2L).setConsumerName("C2").setTopic("persistent://prop/use/ns-abc/successTopic-partition-0").setSubscription("successSub").setRequestId(1L).setSubType(PulsarApi.CommandSubscribe.SubType.Failover).build();
        persistentTopic2.subscribe(this.serverCnx, build3.getSubscription(), build3.getConsumerId(), build3.getSubType(), 0, build3.getConsumerName(), build3.getDurable(), (MessageId) null).get();
        Assert.assertEquals(((Consumer) persistentTopic2.getSubscription("successSub").getDispatcher().getConsumers().get(0)).consumerId(), 1L);
        Assert.assertEquals(((Consumer) persistentTopic2.getSubscription("successSub").getDispatcher().getConsumers().get(0)).consumerName(), "C1");
        Assert.assertEquals(((Consumer) persistentTopic2.getSubscription("successSub").getDispatcher().getConsumers().get(1)).consumerId(), 2L);
        Assert.assertEquals(((Consumer) persistentTopic2.getSubscription("successSub").getDispatcher().getConsumers().get(1)).consumerName(), "C2");
        PulsarApi.CommandSubscribe build4 = PulsarApi.CommandSubscribe.newBuilder().setConsumerId(3L).setConsumerName("C1").setTopic("persistent://prop/use/ns-abc/successTopic-partition-0").setSubscription("successSub").setRequestId(1L).setSubType(PulsarApi.CommandSubscribe.SubType.Failover).build();
        persistentTopic2.subscribe(this.serverCnx, build4.getSubscription(), build4.getConsumerId(), build4.getSubType(), 0, build4.getConsumerName(), build4.getDurable(), (MessageId) null).get();
        Assert.assertEquals(((Consumer) persistentTopic2.getSubscription("successSub").getDispatcher().getConsumers().get(0)).consumerId(), 1L);
        Assert.assertEquals(((Consumer) persistentTopic2.getSubscription("successSub").getDispatcher().getConsumers().get(0)).consumerName(), "C1");
        Assert.assertEquals(((Consumer) persistentTopic2.getSubscription("successSub").getDispatcher().getConsumers().get(1)).consumerId(), 3L);
        Assert.assertEquals(((Consumer) persistentTopic2.getSubscription("successSub").getDispatcher().getConsumers().get(1)).consumerName(), "C1");
        Assert.assertEquals(((Consumer) persistentTopic2.getSubscription("successSub").getDispatcher().getConsumers().get(2)).consumerId(), 2L);
        Assert.assertEquals(((Consumer) persistentTopic2.getSubscription("successSub").getDispatcher().getConsumers().get(2)).consumerName(), "C2");
        PulsarApi.CommandSubscribe build5 = PulsarApi.CommandSubscribe.newBuilder().setConsumerId(2L).setConsumerName("C1").setTopic("persistent://prop/use/ns-abc/successTopic-partition-0").setSubscription("successSub").setRequestId(1L).setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build();
        try {
            persistentTopic2.subscribe(this.serverCnx, build5.getSubscription(), build5.getConsumerId(), build5.getSubType(), 0, build5.getConsumerName(), build5.getDurable(), (MessageId) null).get();
            Assert.fail("should fail with exception");
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof BrokerServiceException.SubscriptionBusyException);
        }
        PulsarApi.CommandSubscribe build6 = PulsarApi.CommandSubscribe.newBuilder().setConsumerId(4L).setConsumerName("C3").setTopic("persistent://prop/use/ns-abc/successTopic-partition-0").setSubscription("successSub2").setRequestId(1L).setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build();
        persistentTopic2.subscribe(this.serverCnx, build6.getSubscription(), build6.getConsumerId(), build6.getSubType(), 0, build6.getConsumerName(), build6.getDurable(), (MessageId) null).get();
        persistentTopic2.unsubscribe("successSub2").get();
        Assert.assertNull(persistentTopic2.getSubscription("successSub2"));
        PersistentSubscription subscription = persistentTopic2.getSubscription("successSub");
        subscription.removeConsumer((Consumer) subscription.getDispatcher().getConsumers().get(0));
        Assert.assertEquals(((Consumer) persistentTopic2.getSubscription("successSub").getDispatcher().getConsumers().get(0)).consumerId(), 3L);
        Assert.assertEquals(((Consumer) persistentTopic2.getSubscription("successSub").getDispatcher().getConsumers().get(0)).consumerName(), "C1");
        subscription.removeConsumer((Consumer) subscription.getDispatcher().getConsumers().get(0));
        Assert.assertEquals(((Consumer) persistentTopic2.getSubscription("successSub").getDispatcher().getConsumers().get(0)).consumerId(), 2L);
        Assert.assertEquals(((Consumer) persistentTopic2.getSubscription("successSub").getDispatcher().getConsumers().get(0)).consumerName(), "C2");
        persistentTopic2.unsubscribe("successSub").get();
        Assert.assertNull(persistentTopic2.getSubscription("successSub"));
    }

    @Test
    public void testAtomicReplicationRemoval() throws Exception {
        ManagedLedger managedLedger = (ManagedLedger) Mockito.mock(ManagedLedger.class);
        ((ManagedLedger) Mockito.doNothing().when(managedLedger)).asyncDeleteCursor((String) Matchers.anyObject(), (AsyncCallbacks.DeleteCursorCallback) Matchers.anyObject(), Matchers.anyObject());
        ((ManagedLedger) Mockito.doReturn(new ArrayList()).when(managedLedger)).getCursors();
        PersistentTopic persistentTopic = new PersistentTopic("persistent://prop/global/ns-abc/successTopic", managedLedger, this.brokerService);
        String str = persistentTopic.replicatorPrefix + ".remote";
        ConcurrentOpenHashMap replicators = persistentTopic.getReplicators();
        PulsarClient create = PulsarClient.create(new URL("http://" + this.pulsar.getAdvertisedAddress() + ":" + this.pulsar.getConfiguration().getBrokerServicePort()).toString());
        ManagedCursor managedCursor = (ManagedCursor) Mockito.mock(ManagedCursorImpl.class);
        ((ManagedCursor) Mockito.doReturn("remote").when(managedCursor)).getName();
        this.brokerService.getReplicationClients().put("remote", create);
        PersistentReplicator persistentReplicator = (PersistentReplicator) Mockito.spy(new PersistentReplicator(persistentTopic, managedCursor, "local", "remote", this.brokerService));
        replicators.put(str, persistentReplicator);
        Method declaredMethod = PersistentTopic.class.getDeclaredMethod("removeReplicator", String.class);
        declaredMethod.setAccessible(true);
        declaredMethod.invoke(persistentTopic, str);
        Mockito.when(this.pulsar.getConfigurationCache().policiesCache().get(AdminResource.path(new String[]{"policies", DestinationName.get("persistent://prop/global/ns-abc/successTopic").getNamespace()}))).thenReturn(Optional.of(new Policies()));
        persistentTopic.startReplProducers();
        ((PersistentReplicator) Mockito.verify(persistentReplicator, Mockito.times(0))).startProducer();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(AsyncCallbacks.DeleteCursorCallback.class);
        ((ManagedLedger) Mockito.verify(managedLedger)).asyncDeleteCursor((String) Matchers.anyObject(), (AsyncCallbacks.DeleteCursorCallback) forClass.capture(), Matchers.anyObject());
        ((AsyncCallbacks.DeleteCursorCallback) forClass.getValue()).deleteCursorComplete((Object) null);
    }

    @Test
    public void testClosingReplicationProducerTwice() throws Exception {
        ManagedLedger managedLedger = (ManagedLedger) Mockito.mock(ManagedLedger.class);
        ((ManagedLedger) Mockito.doNothing().when(managedLedger)).asyncDeleteCursor((String) Matchers.anyObject(), (AsyncCallbacks.DeleteCursorCallback) Matchers.anyObject(), Matchers.anyObject());
        ((ManagedLedger) Mockito.doReturn(new ArrayList()).when(managedLedger)).getCursors();
        PersistentTopic persistentTopic = new PersistentTopic("persistent://prop/global/ns/testClosingReplicationProducerTwice", managedLedger, this.brokerService);
        PulsarClientImpl pulsarClientImpl = (PulsarClient) Mockito.spy(PulsarClient.create(new URL("http://" + this.pulsar.getAdvertisedAddress() + ":" + this.pulsar.getConfiguration().getBrokerServicePort()).toString()));
        PulsarClientImpl pulsarClientImpl2 = pulsarClientImpl;
        ManagedCursor managedCursor = (ManagedCursor) Mockito.mock(ManagedCursorImpl.class);
        ((ManagedCursor) Mockito.doReturn("remote").when(managedCursor)).getName();
        this.brokerService.getReplicationClients().put("remote", pulsarClientImpl);
        PersistentReplicator persistentReplicator = new PersistentReplicator(persistentTopic, managedCursor, "local", "remote", this.brokerService);
        ((PulsarClientImpl) Mockito.doReturn(new CompletableFuture()).when(pulsarClientImpl2)).createProducerAsync("persistent://prop/global/ns/testClosingReplicationProducerTwice", persistentReplicator.getProducerConfiguration());
        persistentReplicator.startProducer();
        ((PulsarClientImpl) Mockito.verify(pulsarClientImpl2)).createProducerAsync("persistent://prop/global/ns/testClosingReplicationProducerTwice", persistentReplicator.getProducerConfiguration());
        persistentReplicator.disconnect(false);
        persistentReplicator.disconnect(false);
        persistentReplicator.startProducer();
        ((PulsarClientImpl) Mockito.verify(pulsarClientImpl2, Mockito.times(2))).createProducerAsync("persistent://prop/global/ns/testClosingReplicationProducerTwice", persistentReplicator.getProducerConfiguration());
    }
}
