package org.apache.bookkeeper.stream.storage.impl.cluster;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.stream.storage.api.cluster.ClusterControllerLeader;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.distributedlog.ZooKeeperClusterTestCase;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterControllerLeaderSelectorTest.class */
public class ZkClusterControllerLeaderSelectorTest extends ZooKeeperClusterTestCase {
    private static final Logger log = LoggerFactory.getLogger(ZkClusterControllerLeaderSelectorTest.class);

    @Rule
    public final TestName runtime = new TestName();
    private CuratorFramework curatorClient;
    private String zkRootPath;
    private ZkClusterControllerLeaderSelector selector;

    @Before
    public void setup() throws Exception {
        this.curatorClient = CuratorFrameworkFactory.newClient(zkServers, new ExponentialBackoffRetry(200, 10, 5000));
        this.curatorClient.start();
        this.zkRootPath = "/" + this.runtime.getMethodName();
        this.curatorClient.create().forPath(this.zkRootPath);
        this.selector = new ZkClusterControllerLeaderSelector(this.curatorClient, this.zkRootPath);
    }

    @After
    public void teardown() {
        if (null != this.selector) {
            this.selector.close();
        }
        this.curatorClient.close();
    }

    @Test(expected = NullPointerException.class)
    public void testStartBeforeInitialize() {
        this.selector.start();
    }

    @Test
    public void testLeaderElection() throws InterruptedException {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        this.selector.initialize(new ClusterControllerLeader() { // from class: org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterControllerLeaderSelectorTest.1
            public void processAsLeader() throws Exception {
                ZkClusterControllerLeaderSelectorTest.log.info("Become leader");
                countDownLatch.countDown();
                try {
                    TimeUnit.SECONDS.sleep(Long.MAX_VALUE);
                } catch (InterruptedException e) {
                    ZkClusterControllerLeaderSelectorTest.log.info("Leadership is interrupted");
                    Thread.currentThread().interrupt();
                }
                ZkClusterControllerLeaderSelectorTest.log.info("Ended leadership");
            }

            public void suspend() {
            }

            public void resume() {
            }
        });
        this.selector.start();
        countDownLatch.await();
        Assert.assertTrue("Should successfully become leader", true);
        log.info("Ended test");
    }

    @Test
    public void testStateChangedToLost() throws InterruptedException {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final CountDownLatch countDownLatch2 = new CountDownLatch(1);
        this.selector.initialize(new ClusterControllerLeader() { // from class: org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterControllerLeaderSelectorTest.2
            public void processAsLeader() throws Exception {
                ZkClusterControllerLeaderSelectorTest.log.info("Become leader");
                countDownLatch.countDown();
                try {
                    TimeUnit.SECONDS.sleep(Long.MAX_VALUE);
                } catch (InterruptedException e) {
                    ZkClusterControllerLeaderSelectorTest.log.info("Leader is interrupted", e);
                    Thread.currentThread().interrupt();
                    countDownLatch2.countDown();
                }
            }

            public void suspend() {
            }

            public void resume() {
            }
        });
        this.selector.start();
        countDownLatch.await();
        Assert.assertTrue("Should successfully become leader", true);
        this.selector.stateChanged(this.curatorClient, ConnectionState.LOST);
        countDownLatch2.await();
        Assert.assertTrue("Leader should be interrupted", true);
    }

    @Test
    public void testStateChangedToSuspendedResumed() throws InterruptedException {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final CountDownLatch countDownLatch2 = new CountDownLatch(1);
        final CountDownLatch countDownLatch3 = new CountDownLatch(1);
        this.selector.initialize(new ClusterControllerLeader() { // from class: org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterControllerLeaderSelectorTest.3
            public void processAsLeader() throws Exception {
                ZkClusterControllerLeaderSelectorTest.log.info("Become leader");
                countDownLatch.countDown();
                try {
                    TimeUnit.SECONDS.sleep(Long.MAX_VALUE);
                } catch (InterruptedException e) {
                    ZkClusterControllerLeaderSelectorTest.log.info("Leader is interrupted", e);
                    Thread.currentThread().interrupt();
                }
            }

            public void suspend() {
                countDownLatch2.countDown();
            }

            public void resume() {
                countDownLatch3.countDown();
            }
        });
        this.selector.start();
        countDownLatch.await();
        Assert.assertTrue("Should successfully become leader", true);
        this.selector.stateChanged(this.curatorClient, ConnectionState.SUSPENDED);
        countDownLatch2.await();
        Assert.assertTrue("Leader should be suspended", true);
        this.selector.stateChanged(this.curatorClient, ConnectionState.RECONNECTED);
        countDownLatch3.await();
        Assert.assertTrue("Leader should be resumed", true);
    }
}
