package org.apache.pulsar.broker.namespace;

import com.google.common.base.Preconditions;
import com.google.common.hash.Hashing;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.zookeeper.LocalZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperCacheListener;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.MockZooKeeper;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/broker/namespace/OwnershipCacheTest.class */
public class OwnershipCacheTest {
    private PulsarService pulsar;
    private ServiceConfiguration config;
    private String selfBrokerUrl;
    private ZooKeeperCache zkCache;
    private LocalZooKeeperCacheService localCache;
    private NamespaceBundleFactory bundleFactory;
    private NamespaceService nsService;
    private BrokerService brokerService;
    private OrderedScheduler executor;

    @BeforeMethod
    public void setup() throws Exception {
        this.selfBrokerUrl = "tcp://localhost:8080";
        this.pulsar = (PulsarService) Mockito.mock(PulsarService.class);
        this.config = (ServiceConfiguration) Mockito.mock(ServiceConfiguration.class);
        this.executor = OrderedScheduler.newSchedulerBuilder().numThreads(1).name("test").build();
        this.zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(), 30, this.executor);
        this.localCache = (LocalZooKeeperCacheService) Mockito.spy(new LocalZooKeeperCacheService(this.zkCache, (ConfigurationCacheService) null));
        ZooKeeperDataCache zooKeeperDataCache = (ZooKeeperDataCache) Mockito.mock(ZooKeeperDataCache.class);
        Mockito.when(this.pulsar.getLocalZkCacheService()).thenReturn(this.localCache);
        Mockito.when(this.localCache.policiesCache()).thenReturn(zooKeeperDataCache);
        ((ZooKeeperDataCache) Mockito.doNothing().when(zooKeeperDataCache)).registerListener((ZooKeeperCacheListener) Mockito.any());
        this.bundleFactory = new NamespaceBundleFactory(this.pulsar, Hashing.crc32());
        this.nsService = (NamespaceService) Mockito.mock(NamespaceService.class);
        this.brokerService = (BrokerService) Mockito.mock(BrokerService.class);
        ((BrokerService) Mockito.doReturn(CompletableFuture.completedFuture(1)).when(this.brokerService)).unloadServiceUnit((NamespaceBundle) Mockito.any());
        ((PulsarService) Mockito.doReturn(this.zkCache).when(this.pulsar)).getLocalZkCache();
        ((PulsarService) Mockito.doReturn(this.localCache).when(this.pulsar)).getLocalZkCacheService();
        ((PulsarService) Mockito.doReturn(this.config).when(this.pulsar)).getConfiguration();
        ((PulsarService) Mockito.doReturn(this.nsService).when(this.pulsar)).getNamespaceService();
        ((ServiceConfiguration) Mockito.doReturn(Optional.ofNullable(new Integer(8080))).when(this.config)).getBrokerServicePort();
        ((ServiceConfiguration) Mockito.doReturn(Optional.ofNullable(null)).when(this.config)).getWebServicePort();
        ((PulsarService) Mockito.doReturn(this.brokerService).when(this.pulsar)).getBrokerService();
        ((PulsarService) Mockito.doReturn(PulsarService.webAddress(this.config)).when(this.pulsar)).getSafeWebServiceAddress();
        ((PulsarService) Mockito.doReturn(this.selfBrokerUrl).when(this.pulsar)).getSafeBrokerServiceUrl();
    }

    @AfterMethod
    public void teardown() throws Exception {
        this.executor.shutdown();
    }

    @Test
    public void testConstructor() {
        OwnershipCache ownershipCache = new OwnershipCache(this.pulsar, this.bundleFactory);
        Assert.assertNotNull(ownershipCache);
        Assert.assertNotNull(ownershipCache.getOwnedBundles());
    }

    @Test
    public void testDisableOwnership() throws Exception {
        OwnershipCache ownershipCache = new OwnershipCache(this.pulsar, this.bundleFactory);
        NamespaceBundle fullBundle = this.bundleFactory.getFullBundle(NamespaceName.get("pulsar/test/ns-1"));
        Assert.assertFalse(((Optional) ownershipCache.getOwnerAsync(fullBundle).get()).isPresent());
        Assert.assertFalse(((NamespaceEphemeralData) ownershipCache.tryAcquiringOwnership(fullBundle).get()).isDisabled());
        ownershipCache.disableOwnership(fullBundle);
        Assert.assertTrue(((NamespaceEphemeralData) ((Optional) ownershipCache.getOwnerAsync(fullBundle).get()).get()).isDisabled());
    }

    @Test
    public void testGetOrSetOwner() throws Exception {
        OwnershipCache ownershipCache = new OwnershipCache(this.pulsar, this.bundleFactory);
        NamespaceBundle fullBundle = this.bundleFactory.getFullBundle(NamespaceName.get("pulsar/test/ns-2"));
        Assert.assertFalse(((Optional) ownershipCache.getOwnerAsync(fullBundle).get()).isPresent());
        NamespaceEphemeralData namespaceEphemeralData = (NamespaceEphemeralData) ownershipCache.tryAcquiringOwnership(fullBundle).get();
        Assert.assertEquals(namespaceEphemeralData.getNativeUrl(), this.selfBrokerUrl);
        Assert.assertFalse(namespaceEphemeralData.isDisabled());
        OwnedBundle ownedBundle = ownershipCache.getOwnedBundle(fullBundle);
        ((NamespaceService) Mockito.doReturn(ownershipCache).when(this.nsService)).getOwnershipCache();
        ownedBundle.handleUnloadRequest(this.pulsar, 5L, TimeUnit.SECONDS);
        Thread.sleep(1000L);
        this.localCache.ownerInfoCache().invalidate(ServiceUnitZkUtils.path(fullBundle));
        ServiceUnitZkUtils.acquireNameSpace(this.zkCache.getZooKeeper(), ServiceUnitZkUtils.path(fullBundle), new NamespaceEphemeralData("pulsar://otherhost:8881", "pulsar://otherhost:8884", "http://localhost:8080", "https://localhost:4443", false));
        NamespaceEphemeralData namespaceEphemeralData2 = (NamespaceEphemeralData) ownershipCache.tryAcquiringOwnership(fullBundle).get();
        Assert.assertEquals(namespaceEphemeralData2.getNativeUrl(), "pulsar://otherhost:8881");
        Assert.assertEquals(namespaceEphemeralData2.getNativeUrlTls(), "pulsar://otherhost:8884");
        Assert.assertFalse(namespaceEphemeralData2.isDisabled());
    }

    @Test
    public void testGetOwner() throws Exception {
        OwnershipCache ownershipCache = new OwnershipCache(this.pulsar, this.bundleFactory);
        NamespaceBundle fullBundle = this.bundleFactory.getFullBundle(NamespaceName.get("pulsar/test/ns-3"));
        Assert.assertFalse(((Optional) ownershipCache.getOwnerAsync(fullBundle).get()).isPresent());
        ServiceUnitZkUtils.acquireNameSpace(this.zkCache.getZooKeeper(), ServiceUnitZkUtils.path(fullBundle), new NamespaceEphemeralData("pulsar://otherhost:8881", "pulsar://otherhost:8884", "http://otherhost:8080", "https://otherhost:4443", false));
        NamespaceEphemeralData namespaceEphemeralData = (NamespaceEphemeralData) ownershipCache.tryAcquiringOwnership(fullBundle).get();
        Assert.assertEquals(namespaceEphemeralData.getNativeUrl(), "pulsar://otherhost:8881");
        Assert.assertEquals(namespaceEphemeralData.getNativeUrlTls(), "pulsar://otherhost:8884");
        Assert.assertFalse(namespaceEphemeralData.isDisabled());
        Assert.assertEquals(namespaceEphemeralData, (NamespaceEphemeralData) ((Optional) ownershipCache.getOwnerAsync(fullBundle).get()).get());
        this.zkCache.getZooKeeper().failNow(KeeperException.Code.NONODE);
        Assert.assertFalse(((Optional) ownershipCache.getOwnerAsync(this.bundleFactory.getFullBundle(NamespaceName.get("pulsar/test/ns-none"))).get()).isPresent());
    }

    @Test
    public void testGetOwnedServiceUnit() throws Exception {
        OwnershipCache ownershipCache = new OwnershipCache(this.pulsar, this.bundleFactory);
        NamespaceBundle fullBundle = this.bundleFactory.getFullBundle(NamespaceName.get("pulsar/test/ns-5"));
        Assert.assertFalse(((Optional) ownershipCache.getOwnerAsync(fullBundle).get()).isPresent());
        try {
            Preconditions.checkNotNull(ownershipCache.getOwnedBundle(fullBundle));
            Assert.fail("Should have failed");
        } catch (NullPointerException e) {
        }
        ServiceUnitZkUtils.acquireNameSpace(this.zkCache.getZooKeeper(), ServiceUnitZkUtils.path(fullBundle), new NamespaceEphemeralData("pulsar://otherhost:8881", "pulsar://otherhost:8884", "http://otherhost:8080", "https://otherhost:4443", false));
        try {
            Preconditions.checkNotNull(ownershipCache.getOwnedBundle(fullBundle));
            Assert.fail("Should have failed");
        } catch (NullPointerException e2) {
        }
        Thread.sleep(500L);
        NamespaceEphemeralData namespaceEphemeralData = (NamespaceEphemeralData) ownershipCache.tryAcquiringOwnership(fullBundle).get();
        Assert.assertEquals(namespaceEphemeralData.getNativeUrl(), "pulsar://otherhost:8881");
        Assert.assertEquals(namespaceEphemeralData.getNativeUrlTls(), "pulsar://otherhost:8884");
        Assert.assertFalse(namespaceEphemeralData.isDisabled());
        try {
            Preconditions.checkNotNull(ownershipCache.getOwnedBundle(fullBundle));
            Assert.fail("Should have failed");
        } catch (NullPointerException e3) {
        }
        this.zkCache.getZooKeeper().delete(ServiceUnitZkUtils.path(fullBundle), -1);
        this.localCache.ownerInfoCache().invalidate(ServiceUnitZkUtils.path(fullBundle));
        NamespaceEphemeralData namespaceEphemeralData2 = (NamespaceEphemeralData) ownershipCache.tryAcquiringOwnership(fullBundle).get();
        Assert.assertEquals(namespaceEphemeralData2.getNativeUrl(), this.selfBrokerUrl);
        Assert.assertFalse(namespaceEphemeralData2.isDisabled());
        Assert.assertNotNull(ownershipCache.getOwnedBundle(fullBundle));
    }

    @Test
    public void testGetOwnedServiceUnits() throws Exception {
        OwnershipCache ownershipCache = new OwnershipCache(this.pulsar, this.bundleFactory);
        NamespaceBundle fullBundle = this.bundleFactory.getFullBundle(NamespaceName.get("pulsar/test/ns-6"));
        Assert.assertFalse(((Optional) ownershipCache.getOwnerAsync(fullBundle).get()).isPresent());
        Assert.assertTrue(ownershipCache.getOwnedBundles().isEmpty());
        ServiceUnitZkUtils.acquireNameSpace(this.zkCache.getZooKeeper(), ServiceUnitZkUtils.path(fullBundle), new NamespaceEphemeralData("pulsar://otherhost:8881", "pulsar://otherhost:8884", "http://otherhost:8080", "https://otherhost:4443", false));
        Assert.assertTrue(ownershipCache.getOwnedBundles().isEmpty());
        Thread.sleep(500L);
        NamespaceEphemeralData namespaceEphemeralData = (NamespaceEphemeralData) ownershipCache.tryAcquiringOwnership(fullBundle).get();
        Assert.assertEquals(namespaceEphemeralData.getNativeUrl(), "pulsar://otherhost:8881");
        Assert.assertEquals(namespaceEphemeralData.getNativeUrlTls(), "pulsar://otherhost:8884");
        Assert.assertFalse(namespaceEphemeralData.isDisabled());
        Assert.assertTrue(ownershipCache.getOwnedBundles().isEmpty());
        this.zkCache.getZooKeeper().delete(ServiceUnitZkUtils.path(fullBundle), -1);
        this.localCache.ownerInfoCache().invalidate(ServiceUnitZkUtils.path(fullBundle));
        NamespaceEphemeralData namespaceEphemeralData2 = (NamespaceEphemeralData) ownershipCache.tryAcquiringOwnership(fullBundle).get();
        Assert.assertEquals(namespaceEphemeralData2.getNativeUrl(), this.selfBrokerUrl);
        Assert.assertFalse(namespaceEphemeralData2.isDisabled());
        Assert.assertEquals(ownershipCache.getOwnedBundles().size(), 1);
    }

    @Test
    public void testRemoveOwnership() throws Exception {
        OwnershipCache ownershipCache = new OwnershipCache(this.pulsar, this.bundleFactory);
        NamespaceBundle fullBundle = this.bundleFactory.getFullBundle(NamespaceName.get("pulsar/test/ns-7"));
        Assert.assertFalse(((Optional) ownershipCache.getOwnerAsync(fullBundle).get()).isPresent());
        ownershipCache.removeOwnership(fullBundle).get();
        Assert.assertTrue(ownershipCache.getOwnedBundles().isEmpty());
        NamespaceEphemeralData namespaceEphemeralData = (NamespaceEphemeralData) ownershipCache.tryAcquiringOwnership(fullBundle).get();
        Assert.assertEquals(namespaceEphemeralData.getNativeUrl(), this.selfBrokerUrl);
        Assert.assertFalse(namespaceEphemeralData.isDisabled());
        Assert.assertEquals(ownershipCache.getOwnedBundles().size(), 1);
        ownershipCache.removeOwnership(fullBundle);
        Thread.sleep(500L);
        Assert.assertTrue(ownershipCache.getOwnedBundles().isEmpty());
        Thread.sleep(500L);
        try {
            this.zkCache.getZooKeeper().getData(ServiceUnitZkUtils.path(fullBundle), (Watcher) null, (Stat) null);
            Assert.fail("Should have failed");
        } catch (KeeperException.NoNodeException e) {
        }
    }
}
