package org.apache.pulsar.zookeeper;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.MoreExecutors;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.List;
import java.util.Optional;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.MockZooKeeper;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.testng.Assert;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test
/* loaded from: input_file:org/apache/pulsar/zookeeper/ZookeeperCacheTest.class */
public class ZookeeperCacheTest {
    private MockZooKeeper zkClient;

    @BeforeMethod
    void setup() throws Exception {
        this.zkClient = MockZooKeeper.newInstance(MoreExecutors.newDirectExecutorService());
    }

    @AfterMethod
    void teardown() throws Exception {
        this.zkClient.shutdown();
    }

    @Test
    void testSimpleCache() throws Exception {
        OrderedSafeExecutor orderedSafeExecutor = new OrderedSafeExecutor(1, "test");
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        LocalZooKeeperCache localZooKeeperCache = new LocalZooKeeperCache(this.zkClient, orderedSafeExecutor, newSingleThreadScheduledExecutor);
        ZooKeeperDataCache<String> zooKeeperDataCache = new ZooKeeperDataCache<String>(localZooKeeperCache) { // from class: org.apache.pulsar.zookeeper.ZookeeperCacheTest.1
            /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
            public String m0deserialize(String str, byte[] bArr) throws Exception {
                return new String(bArr);
            }
        };
        this.zkClient.create("/my_test", "test".getBytes(), (List) null, (CreateMode) null);
        Assert.assertEquals((String) zooKeeperDataCache.get("/my_test").get(), "test");
        this.zkClient.setData("/my_test", "test2".getBytes(), -1);
        Thread.sleep(100L);
        Assert.assertEquals((String) zooKeeperDataCache.get("/my_test").get(), "test2");
        localZooKeeperCache.process(new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Expired, (String) null));
        Assert.assertEquals((String) zooKeeperDataCache.get("/my_test").get(), "test2");
        this.zkClient.failNow(KeeperException.Code.SESSIONEXPIRED);
        Assert.assertEquals((String) zooKeeperDataCache.get("/my_test").get(), "test2");
        try {
            zooKeeperDataCache.get("/other");
            Assert.fail("shuld have thrown exception");
        } catch (Exception e) {
        }
        orderedSafeExecutor.shutdown();
        newSingleThreadScheduledExecutor.shutdown();
    }

    @Test
    void testChildrenCache() throws Exception {
        OrderedSafeExecutor orderedSafeExecutor = new OrderedSafeExecutor(1, "test");
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        this.zkClient.create("/test", new byte[0], (List) null, (CreateMode) null);
        LocalZooKeeperCache localZooKeeperCache = new LocalZooKeeperCache(this.zkClient, orderedSafeExecutor, newSingleThreadScheduledExecutor);
        ZooKeeperChildrenCache zooKeeperChildrenCache = new ZooKeeperChildrenCache(localZooKeeperCache, "/test");
        AtomicInteger atomicInteger = new AtomicInteger(0);
        ZooKeeperCacheListener zooKeeperCacheListener = (str, set, stat) -> {
            atomicInteger.incrementAndGet();
        };
        zooKeeperChildrenCache.registerListener(zooKeeperCacheListener);
        zooKeeperChildrenCache.registerListener(zooKeeperCacheListener);
        zooKeeperChildrenCache.unregisterListener(zooKeeperCacheListener);
        Assert.assertEquals(atomicInteger.get(), 0);
        Assert.assertEquals(zooKeeperChildrenCache.get(), Sets.newTreeSet());
        this.zkClient.create("/test/z1", new byte[0], (List) null, (CreateMode) null);
        this.zkClient.create("/test/z2", new byte[0], (List) null, (CreateMode) null);
        while (atomicInteger.get() < 2) {
            Thread.sleep(1L);
        }
        Assert.assertEquals(zooKeeperChildrenCache.get(), new TreeSet(Lists.newArrayList(new String[]{"z1", "z2"})));
        Assert.assertEquals(zooKeeperChildrenCache.get("/test"), new TreeSet(Lists.newArrayList(new String[]{"z1", "z2"})));
        Assert.assertEquals(atomicInteger.get(), 2);
        this.zkClient.delete("/test/z2", -1);
        while (atomicInteger.get() < 3) {
            Thread.sleep(1L);
        }
        Assert.assertEquals(zooKeeperChildrenCache.get(), new TreeSet(Lists.newArrayList(new String[]{"z1"})));
        Assert.assertEquals(zooKeeperChildrenCache.get(), new TreeSet(Lists.newArrayList(new String[]{"z1"})));
        localZooKeeperCache.process(new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Expired, (String) null));
        this.zkClient.failNow(KeeperException.Code.SESSIONEXPIRED);
        try {
            zooKeeperChildrenCache.get();
            Assert.fail("shuld have thrown exception");
        } catch (Exception e) {
        }
        Assert.assertEquals(atomicInteger.get(), 3);
        orderedSafeExecutor.shutdown();
        newSingleThreadScheduledExecutor.shutdown();
    }

    @Test
    void testExistsCache() throws Exception {
        OrderedSafeExecutor orderedSafeExecutor = new OrderedSafeExecutor(1, "test");
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        this.zkClient.create("/test", new byte[0], (List) null, (CreateMode) null);
        Thread.sleep(20L);
        LocalZooKeeperCache localZooKeeperCache = new LocalZooKeeperCache(this.zkClient, orderedSafeExecutor, newSingleThreadScheduledExecutor);
        Assert.assertTrue(localZooKeeperCache.exists("/test"), "/test should exists in the cache");
        this.zkClient.delete("/test", -1);
        Thread.sleep(20L);
        Assert.assertFalse(localZooKeeperCache.exists("/test"), "/test should not exist in the cache");
        orderedSafeExecutor.shutdown();
        newSingleThreadScheduledExecutor.shutdown();
    }

    @Test
    void testInvalidateCache() throws Exception {
        OrderedSafeExecutor orderedSafeExecutor = new OrderedSafeExecutor(1, "test");
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        this.zkClient.create("/test", new byte[0], (List) null, (CreateMode) null);
        this.zkClient.create("/test/c1", new byte[0], (List) null, (CreateMode) null);
        this.zkClient.create("/test/c2", new byte[0], (List) null, (CreateMode) null);
        Thread.sleep(20L);
        LocalZooKeeperCache localZooKeeperCache = new LocalZooKeeperCache(this.zkClient, orderedSafeExecutor, newSingleThreadScheduledExecutor);
        Assert.assertTrue(localZooKeeperCache.exists("/test"), "/test should exists in the cache");
        AssertJUnit.assertNull(localZooKeeperCache.getChildrenIfPresent("/test"));
        AssertJUnit.assertNotNull(localZooKeeperCache.getChildren("/test"));
        AssertJUnit.assertNotNull(localZooKeeperCache.getChildrenIfPresent("/test"));
        localZooKeeperCache.invalidateAllChildren();
        AssertJUnit.assertNull(localZooKeeperCache.getChildrenIfPresent("/test"));
        AssertJUnit.assertNull(localZooKeeperCache.getDataIfPresent("/test"));
        AssertJUnit.assertNotNull(localZooKeeperCache.getData("/test", Deserializers.STRING_DESERIALIZER));
        AssertJUnit.assertNotNull(localZooKeeperCache.getDataIfPresent("/test"));
        localZooKeeperCache.invalidateData("/test");
        AssertJUnit.assertNull(localZooKeeperCache.getDataIfPresent("/test"));
        AssertJUnit.assertNotNull(localZooKeeperCache.getChildren("/test"));
        AssertJUnit.assertNotNull(localZooKeeperCache.getData("/test", Deserializers.STRING_DESERIALIZER));
        localZooKeeperCache.invalidateAll();
        AssertJUnit.assertNull(localZooKeeperCache.getChildrenIfPresent("/test"));
        AssertJUnit.assertNull(localZooKeeperCache.getDataIfPresent("/test"));
        AssertJUnit.assertNotNull(localZooKeeperCache.getChildren("/test"));
        localZooKeeperCache.invalidateRoot("/test");
        AssertJUnit.assertNull(localZooKeeperCache.getChildrenIfPresent("/test"));
        orderedSafeExecutor.shutdown();
        newSingleThreadScheduledExecutor.shutdown();
    }

    @Test
    void testGlobalZooKeeperCache() throws Exception {
        OrderedSafeExecutor orderedSafeExecutor = new OrderedSafeExecutor(1, "test");
        ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
        final MockZooKeeper newInstance = MockZooKeeper.newInstance();
        GlobalZooKeeperCache globalZooKeeperCache = new GlobalZooKeeperCache(new ZooKeeperClientFactory() { // from class: org.apache.pulsar.zookeeper.ZookeeperCacheTest.2
            public CompletableFuture<ZooKeeper> create(String str, ZooKeeperClientFactory.SessionType sessionType, int i) {
                return CompletableFuture.completedFuture(newInstance);
            }
        }, -1, "", orderedSafeExecutor, scheduledThreadPoolExecutor);
        globalZooKeeperCache.start();
        this.zkClient = globalZooKeeperCache.getZooKeeper();
        ZooKeeperDataCache<String> zooKeeperDataCache = new ZooKeeperDataCache<String>(globalZooKeeperCache) { // from class: org.apache.pulsar.zookeeper.ZookeeperCacheTest.3
            /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
            public String m1deserialize(String str, byte[] bArr) throws Exception {
                return new String(bArr);
            }
        };
        AtomicInteger atomicInteger = new AtomicInteger(0);
        ZooKeeperCacheListener zooKeeperCacheListener = (str, str2, stat) -> {
            atomicInteger.incrementAndGet();
        };
        zooKeeperDataCache.registerListener(zooKeeperCacheListener);
        zooKeeperDataCache.registerListener(zooKeeperCacheListener);
        zooKeeperDataCache.unregisterListener(zooKeeperCacheListener);
        this.zkClient.create("/my_test", "test".getBytes(), (List) null, (CreateMode) null);
        Assert.assertEquals((String) zooKeeperDataCache.get("/my_test").get(), "test");
        Assert.assertEquals(atomicInteger.get(), 0);
        this.zkClient.setData("/my_test", "test2".getBytes(), -1);
        this.zkClient.create("/my_test2", "test".getBytes(), (List) null, (CreateMode) null);
        while (atomicInteger.get() < 1) {
            Thread.sleep(1L);
        }
        Assert.assertEquals((String) zooKeeperDataCache.get("/my_test").get(), "test2");
        Assert.assertEquals((String) zooKeeperDataCache.get("/my_test2").get(), "test");
        Assert.assertEquals(atomicInteger.get(), 1);
        globalZooKeeperCache.process(new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Expired, (String) null));
        Assert.assertEquals((String) zooKeeperDataCache.get("/my_test").get(), "test2");
        Assert.assertEquals((String) zooKeeperDataCache.get("/my_test2").get(), "test");
        this.zkClient.create("/other", "test2".getBytes(), (List) null, (CreateMode) null);
        this.zkClient.failNow(KeeperException.Code.SESSIONEXPIRED);
        Assert.assertEquals((String) zooKeeperDataCache.get("/my_test").get(), "test2");
        Assert.assertEquals((String) zooKeeperDataCache.get("/my_test2").get(), "test");
        try {
            zooKeeperDataCache.get("/other");
            Assert.fail("shuld have thrown exception");
        } catch (Exception e) {
        }
        this.zkClient.failAfter(-1, KeeperException.Code.OK);
        this.zkClient.delete("/my_test2", -1);
        globalZooKeeperCache.process(new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.SyncConnected, (String) null));
        Assert.assertEquals((String) zooKeeperDataCache.get("/other").get(), "test2");
        Assert.assertFalse(zooKeeperDataCache.get("/my_test2").isPresent());
        globalZooKeeperCache.process(new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Disconnected, (String) null));
        this.zkClient.create("/other2", "test2".getBytes(), (List) null, (CreateMode) null);
        globalZooKeeperCache.process(new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.SyncConnected, (String) null));
        Assert.assertEquals((String) zooKeeperDataCache.get("/other2").get(), "test2");
        globalZooKeeperCache.close();
        orderedSafeExecutor.shutdown();
        scheduledThreadPoolExecutor.shutdown();
        Assert.assertEquals(atomicInteger.get(), 1);
    }

    @Test(timeOut = 2000)
    void testZkCallbackThreadStuck() throws Exception {
        OrderedSafeExecutor orderedSafeExecutor = new OrderedSafeExecutor(1, "test");
        ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(2);
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor(new DefaultThreadFactory("mockZk"));
        MockZooKeeper newInstance = MockZooKeeper.newInstance(newSingleThreadExecutor, 100);
        ZooKeeperDataCache<String> zooKeeperDataCache = new ZooKeeperDataCache<String>(new LocalZooKeeperCache(newInstance, orderedSafeExecutor, newScheduledThreadPool)) { // from class: org.apache.pulsar.zookeeper.ZookeeperCacheTest.4
            /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
            public String m2deserialize(String str, byte[] bArr) throws Exception {
                return new String(bArr);
            }
        };
        String str = "/" + UUID.randomUUID().toString().substring(0, 8);
        String str2 = "/" + UUID.randomUUID().toString().substring(0, 8);
        String str3 = "/" + UUID.randomUUID().toString().substring(0, 8);
        newInstance.create(str, "test".getBytes(), (List) null, (CreateMode) null);
        newInstance.create(str2, "test".getBytes(), (List) null, (CreateMode) null);
        newInstance.create(str3, "test".getBytes(), (List) null, (CreateMode) null);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        zooKeeperDataCache.getAsync(str).thenAccept(optional -> {
            try {
                zooKeeperDataCache.get(str2);
            } catch (Exception e) {
                Assert.fail("failed to get " + str3, e);
            }
            countDownLatch.countDown();
        });
        countDownLatch.await();
        orderedSafeExecutor.shutdown();
        newSingleThreadExecutor.shutdown();
        newScheduledThreadPool.shutdown();
    }

    @Test
    public void testInvalidateCacheOnFailure() throws Exception {
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor(new DefaultThreadFactory("mockZk"));
        OrderedSafeExecutor orderedSafeExecutor = new OrderedSafeExecutor(1, "test");
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        MockZooKeeper newInstance = MockZooKeeper.newInstance(newSingleThreadExecutor, 100);
        LocalZooKeeperCache localZooKeeperCache = new LocalZooKeeperCache(newInstance, orderedSafeExecutor, newSingleThreadScheduledExecutor);
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        ZooKeeperDataCache<String> zooKeeperDataCache = new ZooKeeperDataCache<String>(localZooKeeperCache) { // from class: org.apache.pulsar.zookeeper.ZookeeperCacheTest.5
            /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
            public String m3deserialize(String str, byte[] bArr) throws Exception {
                if (atomicInteger.getAndIncrement() == 0) {
                    throw new NullPointerException("data is null");
                }
                return new String(bArr);
            }
        };
        newInstance.create("/zkDesrializationExceptionTest", "test".getBytes(), (List) null, (CreateMode) null);
        newInstance.create("/zkSessionExceptionTest", "test".getBytes(), (List) null, (CreateMode) null);
        try {
            zooKeeperDataCache.getAsync("/zkDesrializationExceptionTest").get();
            Assert.fail("it should have failed with NPE");
        } catch (Exception e) {
            Assert.assertTrue(e.getCause() instanceof NullPointerException);
        }
        Thread.sleep(1000L);
        Assert.assertEquals((String) ((Optional) zooKeeperDataCache.getAsync("/zkDesrializationExceptionTest").get()).get(), "test");
        ZooKeeper zooKeeper = (ZooKeeper) ((ZooKeeperCache) localZooKeeperCache).zkSession.get();
        ((ZooKeeperCache) localZooKeeperCache).zkSession.set(null);
        try {
            zooKeeperDataCache.getAsync("/zkSessionExceptionTest").get();
            Assert.fail("it should have failed with NPE");
        } catch (Exception e2) {
            Assert.assertTrue(e2.getCause() instanceof NullPointerException);
        }
        ((ZooKeeperCache) localZooKeeperCache).zkSession.set(zooKeeper);
        Thread.sleep(1000L);
        Assert.assertEquals((String) ((Optional) zooKeeperDataCache.getAsync("/zkDesrializationExceptionTest").get()).get(), "test");
        newSingleThreadExecutor.shutdown();
        orderedSafeExecutor.shutdown();
        newSingleThreadScheduledExecutor.shutdown();
    }
}
