package org.apache.distributedlog;

import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
import org.apache.bookkeeper.zookeeper.RetryPolicy;
import org.apache.distributedlog.ZooKeeperClient;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/distributedlog/TestZooKeeperClient.class */
public class TestZooKeeperClient extends ZooKeeperClusterTestCase {
    static final Logger LOG = LoggerFactory.getLogger(TestZooKeeperClient.class);
    private static final int sessionTimeoutMs = 2000;
    private ZooKeeperClient zkc;

    /* loaded from: input_file:org/apache/distributedlog/TestZooKeeperClient$FailingCredentials.class */
    class FailingCredentials implements ZooKeeperClient.Credentials {
        boolean shouldFail = true;

        FailingCredentials() {
        }

        public void authenticate(ZooKeeper zooKeeper) {
            if (this.shouldFail) {
                throw new RuntimeException("authfailed");
            }
        }

        public void setShouldFail(boolean z) {
            this.shouldFail = z;
        }
    }

    /* loaded from: input_file:org/apache/distributedlog/TestZooKeeperClient$TestWatcher.class */
    static class TestWatcher implements Watcher {
        final List<WatchedEvent> receivedEvents = new ArrayList();
        CountDownLatch latch = new CountDownLatch(0);

        TestWatcher() {
        }

        public TestWatcher setLatch(CountDownLatch countDownLatch) {
            this.latch = countDownLatch;
            return this;
        }

        public void process(WatchedEvent watchedEvent) {
            if (watchedEvent.getType() == Watcher.Event.EventType.NodeDataChanged) {
                synchronized (this.receivedEvents) {
                    this.receivedEvents.add(watchedEvent);
                }
                this.latch.countDown();
            }
        }
    }

    @Before
    public void setup() throws Exception {
        this.zkc = buildClient();
    }

    @After
    public void teardown() throws Exception {
        this.zkc.close();
    }

    private ZooKeeperClientBuilder clientBuilder() throws Exception {
        return clientBuilder(sessionTimeoutMs);
    }

    private ZooKeeperClientBuilder clientBuilder(int i) throws Exception {
        return ZooKeeperClientBuilder.newBuilder().name("zkc").uri(DLMTestUtil.createDLMURI(zkPort, "/")).sessionTimeoutMs(i).zkServers(zkServers).retryPolicy(new BoundExponentialBackoffRetryPolicy(100L, 200L, 2));
    }

    private ZooKeeperClient buildClient() throws Exception {
        return clientBuilder().zkAclId((String) null).build();
    }

    private ZooKeeperClient buildAuthdClient(String str) throws Exception {
        return clientBuilder().zkAclId(str).build();
    }

    private void rmAll(ZooKeeperClient zooKeeperClient, String str) throws Exception {
        Iterator it = zooKeeperClient.get().getChildren(str, false).iterator();
        while (it.hasNext()) {
            rmAll(zooKeeperClient, str + "/" + ((String) it.next()));
        }
        zooKeeperClient.get().delete(str, 0);
    }

    @Test(timeout = 60000)
    public void testAclCreatePerms() throws Exception {
        ZooKeeperClient buildAuthdClient = buildAuthdClient("test");
        buildAuthdClient.get().create("/test", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        buildAuthdClient.get().create("/test/key1", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        buildAuthdClient.get().create("/test/key2", new byte[0], DistributedLogConstants.EVERYONE_READ_CREATOR_ALL, CreateMode.PERSISTENT);
        ZooKeeperClient buildClient = buildClient();
        buildClient.get().create("/test/key1/key1", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        try {
            buildClient.get().create("/test/key2/key1", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            Assert.fail("create should fail on acl protected key");
        } catch (KeeperException.NoAuthException e) {
            LOG.info("caught exception writing to protected key", e);
        }
        rmAll(buildAuthdClient, "/test");
    }

    @Test(timeout = 60000)
    public void testAclNullIdDisablesAuth() throws Exception {
        ZooKeeperClient buildAuthdClient = buildAuthdClient(null);
        buildAuthdClient.get().create("/test", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        buildAuthdClient.get().create("/test/key1", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        try {
            buildAuthdClient.get().create("/test/key2", new byte[0], DistributedLogConstants.EVERYONE_READ_CREATOR_ALL, CreateMode.PERSISTENT);
            Assert.fail("create should fail because we're not authenticated");
        } catch (KeeperException.InvalidACLException e) {
            LOG.info("caught exception writing to protected key", e);
        }
        rmAll(buildAuthdClient, "/test");
    }

    @Test(timeout = 60000)
    public void testAclAllowsReadsForNoAuth() throws Exception {
        ZooKeeperClient buildAuthdClient = buildAuthdClient("test");
        buildAuthdClient.get().create("/test", new byte[0], DistributedLogConstants.EVERYONE_READ_CREATOR_ALL, CreateMode.PERSISTENT);
        buildAuthdClient.get().create("/test/key1", new byte[0], DistributedLogConstants.EVERYONE_READ_CREATOR_ALL, CreateMode.PERSISTENT);
        buildAuthdClient.get().create("/test/key1/key2", new byte[0], DistributedLogConstants.EVERYONE_READ_CREATOR_ALL, CreateMode.PERSISTENT);
        ZooKeeperClient buildClient = buildClient();
        Assert.assertEquals("key2", buildClient.get().getChildren("/test/" + ((String) buildClient.get().getChildren("/test", false).get(0)), false).get(0));
        buildAuthdClient("test2");
        Assert.assertEquals("key2", buildClient.get().getChildren("/test/" + ((String) buildClient.get().getChildren("/test", false).get(0)), false).get(0));
        rmAll(buildAuthdClient, "/test");
    }

    @Test(timeout = 60000)
    public void testAclDigestCredentialsBasics() throws Exception {
        ZooKeeperClient buildClient = buildClient();
        buildClient.get().create("/test", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        try {
            buildClient.get().create("/test/key1", new byte[0], DistributedLogConstants.EVERYONE_READ_CREATOR_ALL, CreateMode.PERSISTENT);
            Assert.fail("should have failed");
        } catch (Exception e) {
        }
        new ZooKeeperClient.DigestCredentials("test", "test").authenticate(buildClient.get());
        buildClient.get().create("/test/key1", new byte[0], DistributedLogConstants.EVERYONE_READ_CREATOR_ALL, CreateMode.PERSISTENT);
        rmAll(buildClient, "/test");
    }

    @Test(timeout = 60000)
    public void testAclNoopCredentialsDoesNothing() throws Exception {
        ZooKeeperClient.Credentials.NONE.authenticate((ZooKeeper) null);
    }

    @Test(timeout = 60000)
    public void testAclFailedAuthenticationCanBeRecovered() throws Exception {
        FailingCredentials failingCredentials = new FailingCredentials();
        ZooKeeperClient zooKeeperClient = new ZooKeeperClient("test", sessionTimeoutMs, sessionTimeoutMs, zkServers, (RetryPolicy) null, NullStatsLogger.INSTANCE, 1, 10000.0d, failingCredentials);
        try {
            zooKeeperClient.get();
            Assert.fail("should have failed on auth");
        } catch (Exception e) {
            Assert.assertEquals("authfailed", e.getMessage());
        }
        failingCredentials.setShouldFail(false);
        zooKeeperClient.get().create("/test", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        rmAll(zooKeeperClient, "/test");
    }

    private void expireZooKeeperSession(ZooKeeper zooKeeper, int i) throws IOException, InterruptedException, KeeperException {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        ZooKeeper zooKeeper2 = new ZooKeeper(zkServers, i, new Watcher() { // from class: org.apache.distributedlog.TestZooKeeperClient.1
            public void process(WatchedEvent watchedEvent) {
                if (watchedEvent.getType() == Watcher.Event.EventType.None && watchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected) {
                    countDownLatch.countDown();
                }
            }
        }, zooKeeper.getSessionId(), zooKeeper.getSessionPasswd());
        if (!countDownLatch.await(i, TimeUnit.MILLISECONDS)) {
            throw KeeperException.create(KeeperException.Code.CONNECTIONLOSS);
        }
        zooKeeper2.close();
    }

    private CountDownLatch awaitConnectionEvent(final Watcher.Event.KeeperState keeperState, ZooKeeperClient zooKeeperClient) {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        zooKeeperClient.register(new Watcher() { // from class: org.apache.distributedlog.TestZooKeeperClient.2
            public void process(WatchedEvent watchedEvent) {
                if (watchedEvent.getType() == Watcher.Event.EventType.None && watchedEvent.getState() == keeperState) {
                    countDownLatch.countDown();
                }
            }
        });
        return countDownLatch;
    }

    @Test(timeout = 60000)
    @Ignore
    public void testAclAuthSpansExpiration() throws Exception {
        ZooKeeperClient buildAuthdClient = buildAuthdClient("test");
        buildAuthdClient.get().create("/test", new byte[0], DistributedLogConstants.EVERYONE_READ_CREATOR_ALL, CreateMode.PERSISTENT);
        CountDownLatch awaitConnectionEvent = awaitConnectionEvent(Watcher.Event.KeeperState.Expired, buildAuthdClient);
        CountDownLatch awaitConnectionEvent2 = awaitConnectionEvent(Watcher.Event.KeeperState.SyncConnected, buildAuthdClient);
        expireZooKeeperSession(buildAuthdClient.get(), sessionTimeoutMs);
        awaitConnectionEvent.await(2L, TimeUnit.SECONDS);
        awaitConnectionEvent2.await(2L, TimeUnit.SECONDS);
        buildAuthdClient.get().create("/test/key1", new byte[0], DistributedLogConstants.EVERYONE_READ_CREATOR_ALL, CreateMode.PERSISTENT);
        rmAll(buildAuthdClient, "/test");
    }

    @Test(timeout = 60000)
    @Ignore
    public void testAclAuthSpansExpirationNonRetryableClient() throws Exception {
        ZooKeeperClient build = clientBuilder().retryPolicy((RetryPolicy) null).zkAclId("test").build();
        build.get().create("/test", new byte[0], DistributedLogConstants.EVERYONE_READ_CREATOR_ALL, CreateMode.PERSISTENT);
        CountDownLatch awaitConnectionEvent = awaitConnectionEvent(Watcher.Event.KeeperState.Expired, build);
        CountDownLatch awaitConnectionEvent2 = awaitConnectionEvent(Watcher.Event.KeeperState.SyncConnected, build);
        expireZooKeeperSession(build.get(), sessionTimeoutMs);
        awaitConnectionEvent.await(2L, TimeUnit.SECONDS);
        awaitConnectionEvent2.await(2L, TimeUnit.SECONDS);
        build.get().create("/test/key1", new byte[0], DistributedLogConstants.EVERYONE_READ_CREATOR_ALL, CreateMode.PERSISTENT);
        rmAll(build, "/test");
    }

    @Test(timeout = 60000)
    public void testRegisterUnregisterWatchers() throws Exception {
        TestWatcher testWatcher = new TestWatcher();
        TestWatcher testWatcher2 = new TestWatcher();
        CountDownLatch countDownLatch = new CountDownLatch(2);
        testWatcher.setLatch(countDownLatch);
        testWatcher2.setLatch(countDownLatch);
        this.zkc.register(testWatcher);
        this.zkc.register(testWatcher2);
        Assert.assertEquals(2L, this.zkc.watchers.size());
        this.zkc.get().create("/test-register-unregister-watchers", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        this.zkc.get().getData("/test-register-unregister-watchers", true, new Stat());
        this.zkc.get().setData("/test-register-unregister-watchers", "first-set".getBytes(), -1);
        countDownLatch.await();
        Assert.assertEquals(1L, testWatcher.receivedEvents.size());
        Assert.assertEquals("/test-register-unregister-watchers", testWatcher.receivedEvents.get(0).getPath());
        Assert.assertEquals(Watcher.Event.EventType.NodeDataChanged, testWatcher.receivedEvents.get(0).getType());
        Assert.assertEquals(1L, testWatcher2.receivedEvents.size());
        Assert.assertEquals("/test-register-unregister-watchers", testWatcher2.receivedEvents.get(0).getPath());
        Assert.assertEquals(Watcher.Event.EventType.NodeDataChanged, testWatcher2.receivedEvents.get(0).getType());
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        CountDownLatch countDownLatch3 = new CountDownLatch(1);
        testWatcher.setLatch(countDownLatch2);
        testWatcher2.setLatch(countDownLatch3);
        this.zkc.unregister(testWatcher2);
        Assert.assertEquals(1L, this.zkc.watchers.size());
        this.zkc.get().getData("/test-register-unregister-watchers", true, new Stat());
        this.zkc.get().setData("/test-register-unregister-watchers", "second-set".getBytes(), -1);
        countDownLatch2.await();
        Assert.assertEquals(2L, testWatcher.receivedEvents.size());
        Assert.assertEquals("/test-register-unregister-watchers", testWatcher.receivedEvents.get(1).getPath());
        Assert.assertEquals(Watcher.Event.EventType.NodeDataChanged, testWatcher.receivedEvents.get(1).getType());
        Assert.assertFalse(countDownLatch3.await(2L, TimeUnit.SECONDS));
        Assert.assertEquals(1L, testWatcher2.receivedEvents.size());
    }

    @Test(timeout = 60000)
    public void testExceptionOnWatchers() throws Exception {
        TestWatcher testWatcher = new TestWatcher();
        TestWatcher testWatcher2 = new TestWatcher();
        CountDownLatch countDownLatch = new CountDownLatch(2);
        testWatcher.setLatch(countDownLatch);
        testWatcher2.setLatch(countDownLatch);
        this.zkc.register(testWatcher);
        this.zkc.register(testWatcher2);
        this.zkc.register(new Watcher() { // from class: org.apache.distributedlog.TestZooKeeperClient.3
            public void process(WatchedEvent watchedEvent) {
                throw new NullPointerException("bad watcher returning null");
            }
        });
        Assert.assertEquals(3L, this.zkc.watchers.size());
        this.zkc.get().create("/test-exception-on-watchers", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        this.zkc.get().getData("/test-exception-on-watchers", true, new Stat());
        this.zkc.get().setData("/test-exception-on-watchers", "first-set".getBytes(), -1);
        countDownLatch.await();
        Assert.assertEquals(1L, testWatcher.receivedEvents.size());
        Assert.assertEquals("/test-exception-on-watchers", testWatcher.receivedEvents.get(0).getPath());
        Assert.assertEquals(Watcher.Event.EventType.NodeDataChanged, testWatcher.receivedEvents.get(0).getType());
        Assert.assertEquals(1L, testWatcher2.receivedEvents.size());
        Assert.assertEquals("/test-exception-on-watchers", testWatcher2.receivedEvents.get(0).getPath());
        Assert.assertEquals(Watcher.Event.EventType.NodeDataChanged, testWatcher2.receivedEvents.get(0).getType());
    }

    @Test(timeout = 60000)
    public void testZooKeeperReconnection() throws Exception {
        ZooKeeperClient build = clientBuilder(100).zkAclId((String) null).build();
        ZooKeeper zooKeeper = build.get();
        long sessionId = zooKeeper.getSessionId();
        ZooKeeperClientUtils.expireSession(build, zkServers, 2 * 100);
        ZooKeeper zooKeeper2 = build.get();
        while (!ZooKeeper.States.CONNECTED.equals(zooKeeper2.getState())) {
            TimeUnit.MILLISECONDS.sleep(100 / 2);
        }
        long sessionId2 = zooKeeper2.getSessionId();
        Assert.assertTrue(zooKeeper2 == zooKeeper);
        Assert.assertFalse(sessionId == sessionId2);
    }

    @Test(timeout = 60000)
    public void testZooKeeperReconnectionBlockingRetryThread() throws Exception {
        ZooKeeperClient build = clientBuilder(100).zkAclId((String) null).build();
        ZooKeeper zooKeeper = build.get();
        Assert.assertTrue(zooKeeper instanceof org.apache.bookkeeper.zookeeper.ZooKeeperClient);
        org.apache.bookkeeper.zookeeper.ZooKeeperClient zooKeeperClient = (org.apache.bookkeeper.zookeeper.ZooKeeperClient) zooKeeper;
        Field declaredField = zooKeeperClient.getClass().getDeclaredField("connectExecutor");
        declaredField.setAccessible(true);
        ExecutorService executorService = (ExecutorService) declaredField.get(zooKeeperClient);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        executorService.submit(new Runnable() { // from class: org.apache.distributedlog.TestZooKeeperClient.4
            @Override // java.lang.Runnable
            public void run() {
                try {
                    countDownLatch.await();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        });
        ZooKeeperClientUtils.expireSession(build, zkServers, 2 * 100);
        while (true) {
            ZooKeeper zooKeeper2 = build.get();
            if (zooKeeper2 != zooKeeper) {
                Assert.assertEquals(ZooKeeper.States.CONNECTED, zooKeeper2.getState());
                return;
            }
            TimeUnit.MILLISECONDS.sleep(100 / 2);
        }
    }
}
