/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.leaderelection;

import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.leaderelection.TestingContender;
import org.apache.flink.runtime.leaderelection.TestingListener;
import org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.shaded.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.org.apache.curator.framework.api.CreateBuilder;
import org.apache.flink.shaded.org.apache.curator.framework.api.ProtectACLCreateModePathAndBytesable;
import org.apache.flink.shaded.org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.flink.shaded.org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.flink.shaded.org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.flink.shaded.org.apache.curator.test.TestingServer;
import org.apache.flink.util.TestLogger;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.Promise;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.impl.Promise;

public class ZooKeeperLeaderElectionTest
extends TestLogger {
    private TestingServer testingServer;
    private static final String TEST_URL = "akka//user/jobmanager";
    private static final FiniteDuration timeout = new FiniteDuration(200L, TimeUnit.SECONDS);
    private static Logger LOG = LoggerFactory.getLogger(ZooKeeperLeaderElectionTest.class);

    @Before
    public void before() {
        try {
            this.testingServer = new TestingServer();
        }
        catch (Exception e) {
            throw new RuntimeException("Could not start ZooKeeper testing cluster.", e);
        }
    }

    @After
    public void after() {
        try {
            this.testingServer.stop();
        }
        catch (Exception e) {
            throw new RuntimeException("Could not stop ZooKeeper testing cluster.", e);
        }
        this.testingServer = null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testZooKeeperLeaderElectionRetrieval() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setString("recovery.zookeeper.quorum", this.testingServer.getConnectString());
        configuration.setString("recovery.mode", "zookeeper");
        ZooKeeperLeaderElectionService leaderElectionService = null;
        ZooKeeperLeaderRetrievalService leaderRetrievalService = null;
        try {
            leaderElectionService = ZooKeeperUtils.createLeaderElectionService((Configuration)configuration);
            leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService((Configuration)configuration);
            TestingContender contender = new TestingContender(TEST_URL, (LeaderElectionService)leaderElectionService);
            TestingListener listener = new TestingListener();
            leaderElectionService.start((LeaderContender)contender);
            leaderRetrievalService.start((LeaderRetrievalListener)listener);
            contender.waitForLeader(timeout.toMillis());
            Assert.assertTrue((boolean)contender.isLeader());
            Assert.assertEquals((Object)leaderElectionService.getLeaderSessionID(), (Object)contender.getLeaderSessionID());
            listener.waitForNewLeader(timeout.toMillis());
            Assert.assertEquals((Object)TEST_URL, (Object)listener.getAddress());
            Assert.assertEquals((Object)leaderElectionService.getLeaderSessionID(), (Object)listener.getLeaderSessionID());
        }
        finally {
            if (leaderElectionService != null) {
                leaderElectionService.stop();
            }
            if (leaderRetrievalService != null) {
                leaderRetrievalService.stop();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testZooKeeperReelection() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setString("recovery.zookeeper.quorum", this.testingServer.getConnectString());
        configuration.setString("recovery.mode", "zookeeper");
        Deadline deadline = new FiniteDuration(5L, TimeUnit.MINUTES).fromNow();
        int num = 20;
        ZooKeeperLeaderElectionService[] leaderElectionService = new ZooKeeperLeaderElectionService[num];
        TestingContender[] contenders = new TestingContender[num];
        ZooKeeperLeaderRetrievalService leaderRetrievalService = null;
        TestingListener listener = new TestingListener();
        try {
            leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService((Configuration)configuration);
            LOG.debug("Start leader retrieval service for the TestingListener.");
            leaderRetrievalService.start((LeaderRetrievalListener)listener);
            for (int i = 0; i < num; ++i) {
                leaderElectionService[i] = ZooKeeperUtils.createLeaderElectionService((Configuration)configuration);
                contenders[i] = new TestingContender("akka//user/jobmanager_" + i, (LeaderElectionService)leaderElectionService[i]);
                LOG.debug("Start leader election service for contender #{}.", (Object)i);
                leaderElectionService[i].start((LeaderContender)contenders[i]);
            }
            String pattern = "akka//user/jobmanager_(\\d+)";
            Pattern regex = Pattern.compile(pattern);
            int numberSeenLeaders = 0;
            while (deadline.hasTimeLeft() && numberSeenLeaders < num) {
                LOG.debug("Wait for new leader #{}.", (Object)numberSeenLeaders);
                String address = listener.waitForNewLeader(deadline.timeLeft().toMillis());
                Matcher m = regex.matcher(address);
                if (m.find()) {
                    int index = Integer.parseInt(m.group(1));
                    TestingContender contender = contenders[index];
                    if (!address.equals(contender.getAddress()) || !listener.getLeaderSessionID().equals(contender.getLeaderSessionID())) continue;
                    LOG.debug("Stop leader election service of contender #{}.", (Object)numberSeenLeaders);
                    leaderElectionService[index].stop();
                    leaderElectionService[index] = null;
                    ++numberSeenLeaders;
                    continue;
                }
                Assert.fail((String)"Did not find the leader's index.");
            }
            Assert.assertFalse((boolean)deadline.isOverdue());
            Assert.assertEquals((long)num, (long)numberSeenLeaders);
        }
        finally {
            if (leaderRetrievalService != null) {
                leaderRetrievalService.stop();
            }
            for (ZooKeeperLeaderElectionService electionService : leaderElectionService) {
                if (electionService == null) continue;
                electionService.stop();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testZooKeeperReelectionWithReplacement() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setString("recovery.zookeeper.quorum", this.testingServer.getConnectString());
        configuration.setString("recovery.mode", "zookeeper");
        int num = 3;
        int numTries = 30;
        ZooKeeperLeaderElectionService[] leaderElectionService = new ZooKeeperLeaderElectionService[num];
        TestingContender[] contenders = new TestingContender[num];
        ZooKeeperLeaderRetrievalService leaderRetrievalService = null;
        TestingListener listener = new TestingListener();
        try {
            leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService((Configuration)configuration);
            leaderRetrievalService.start((LeaderRetrievalListener)listener);
            for (int i = 0; i < num; ++i) {
                leaderElectionService[i] = ZooKeeperUtils.createLeaderElectionService((Configuration)configuration);
                contenders[i] = new TestingContender("akka//user/jobmanager_" + i + "_0", (LeaderElectionService)leaderElectionService[i]);
                leaderElectionService[i].start((LeaderContender)contenders[i]);
            }
            String pattern = "akka//user/jobmanager_(\\d+)_(\\d+)";
            Pattern regex = Pattern.compile(pattern);
            for (int i = 0; i < numTries; ++i) {
                listener.waitForNewLeader(timeout.toMillis());
                String address = listener.getAddress();
                Matcher m = regex.matcher(address);
                if (!m.find()) {
                    throw new Exception("Did not find the leader's index.");
                }
                int index = Integer.parseInt(m.group(1));
                int lastTry = Integer.parseInt(m.group(2));
                Assert.assertEquals((Object)listener.getLeaderSessionID(), (Object)contenders[index].getLeaderSessionID());
                leaderElectionService[index].stop();
                leaderElectionService[index] = ZooKeeperUtils.createLeaderElectionService((Configuration)configuration);
                contenders[index] = new TestingContender("akka//user/jobmanager_" + index + "_" + (lastTry + 1), (LeaderElectionService)leaderElectionService[index]);
                leaderElectionService[index].start((LeaderContender)contenders[index]);
            }
        }
        finally {
            if (leaderRetrievalService != null) {
                leaderRetrievalService.stop();
            }
            for (ZooKeeperLeaderElectionService electionService : leaderElectionService) {
                if (electionService == null) continue;
                electionService.stop();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMultipleLeaders() throws Exception {
        String FAULTY_CONTENDER_URL = "faultyContender";
        String leaderPath = "/leader";
        Configuration configuration = new Configuration();
        configuration.setString("recovery.zookeeper.quorum", this.testingServer.getConnectString());
        configuration.setString("recovery.mode", "zookeeper");
        configuration.setString("recovery.zookeeper.path.leader", "/leader");
        ZooKeeperLeaderElectionService leaderElectionService = null;
        ZooKeeperLeaderRetrievalService leaderRetrievalService = null;
        ZooKeeperLeaderRetrievalService leaderRetrievalService2 = null;
        TestingListener listener = new TestingListener();
        TestingListener listener2 = new TestingListener();
        try {
            leaderElectionService = ZooKeeperUtils.createLeaderElectionService((Configuration)configuration);
            leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService((Configuration)configuration);
            leaderRetrievalService2 = ZooKeeperUtils.createLeaderRetrievalService((Configuration)configuration);
            TestingContender contender = new TestingContender(TEST_URL, (LeaderElectionService)leaderElectionService);
            leaderElectionService.start((LeaderContender)contender);
            leaderRetrievalService.start((LeaderRetrievalListener)listener);
            listener.waitForNewLeader(timeout.toMillis());
            Assert.assertEquals((Object)listener.getLeaderSessionID(), (Object)contender.getLeaderSessionID());
            Assert.assertEquals((Object)TEST_URL, (Object)listener.getAddress());
            CuratorFramework client = ZooKeeperUtils.startCuratorFramework((Configuration)configuration);
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            ObjectOutputStream oos = new ObjectOutputStream(baos);
            oos.writeUTF("faultyContender");
            oos.writeObject(null);
            oos.close();
            boolean dataWritten = false;
            while (!dataWritten) {
                client.delete().forPath("/leader");
                try {
                    client.create().forPath("/leader", baos.toByteArray());
                    dataWritten = true;
                }
                catch (KeeperException.NodeExistsException nodeExistsException) {}
            }
            leaderRetrievalService2.start((LeaderRetrievalListener)listener2);
            listener2.waitForNewLeader(timeout.toMillis());
            if ("faultyContender".equals(listener2.getAddress())) {
                listener2.waitForNewLeader(timeout.toMillis());
            }
            Assert.assertEquals((Object)listener2.getLeaderSessionID(), (Object)contender.getLeaderSessionID());
            Assert.assertEquals((Object)listener2.getAddress(), (Object)contender.getAddress());
        }
        finally {
            if (leaderElectionService != null) {
                leaderElectionService.stop();
            }
            if (leaderRetrievalService != null) {
                leaderRetrievalService.stop();
            }
            if (leaderRetrievalService2 != null) {
                leaderRetrievalService2.stop();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testExceptionForwarding() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setString("recovery.zookeeper.quorum", this.testingServer.getConnectString());
        configuration.setString("recovery.mode", "zookeeper");
        ZooKeeperLeaderElectionService leaderElectionService = null;
        ZooKeeperLeaderRetrievalService leaderRetrievalService = null;
        TestingListener listener = new TestingListener();
        final CreateBuilder mockCreateBuilder = (CreateBuilder)Mockito.mock(CreateBuilder.class);
        ProtectACLCreateModePathAndBytesable mockCreateParentsIfNeeded = (ProtectACLCreateModePathAndBytesable)Mockito.mock(ProtectACLCreateModePathAndBytesable.class);
        Exception testException = new Exception("Test exception");
        try {
            CuratorFramework client = (CuratorFramework)Mockito.spy((Object)ZooKeeperUtils.startCuratorFramework((Configuration)configuration));
            Answer<CreateBuilder> answer = new Answer<CreateBuilder>(){
                private int counter = 0;

                public CreateBuilder answer(InvocationOnMock invocation) throws Throwable {
                    ++this.counter;
                    if (this.counter < 2) {
                        return (CreateBuilder)invocation.callRealMethod();
                    }
                    return mockCreateBuilder;
                }
            };
            ((CuratorFramework)Mockito.doAnswer((Answer)answer).when((Object)client)).create();
            Mockito.when((Object)mockCreateBuilder.creatingParentsIfNeeded()).thenReturn((Object)mockCreateParentsIfNeeded);
            Mockito.when((Object)mockCreateParentsIfNeeded.withMode((CreateMode)Matchers.any(CreateMode.class))).thenReturn((Object)mockCreateParentsIfNeeded);
            Mockito.when((Object)mockCreateParentsIfNeeded.forPath((String)Matchers.any(String.class), (byte[])Matchers.any(byte[].class))).thenThrow(new Throwable[]{testException});
            leaderElectionService = new ZooKeeperLeaderElectionService(client, "/latch", "/leader");
            leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService((Configuration)configuration);
            TestingContender testingContender = new TestingContender(TEST_URL, (LeaderElectionService)leaderElectionService);
            leaderElectionService.start((LeaderContender)testingContender);
            leaderRetrievalService.start((LeaderRetrievalListener)listener);
            testingContender.waitForError(timeout.toMillis());
            Assert.assertNotNull((Object)testingContender.getError());
            Assert.assertEquals((Object)testException, (Object)testingContender.getError().getCause());
        }
        finally {
            if (leaderElectionService != null) {
                leaderElectionService.stop();
            }
            if (leaderRetrievalService != null) {
                leaderRetrievalService.stop();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testEphemeralZooKeeperNodes() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setString("recovery.zookeeper.quorum", this.testingServer.getConnectString());
        configuration.setString("recovery.mode", "zookeeper");
        ZooKeeperLeaderRetrievalService leaderRetrievalService = null;
        CuratorFramework client = null;
        NodeCache cache = null;
        try {
            ZooKeeperLeaderElectionService leaderElectionService = ZooKeeperUtils.createLeaderElectionService((Configuration)configuration);
            leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService((Configuration)configuration);
            TestingContender testingContender = new TestingContender(TEST_URL, (LeaderElectionService)leaderElectionService);
            TestingListener listener = new TestingListener();
            client = ZooKeeperUtils.startCuratorFramework((Configuration)configuration);
            String leaderPath = configuration.getString("recovery.zookeeper.path.leader", "/leader");
            cache = new NodeCache(client, leaderPath);
            ExistsCacheListener existsListener = new ExistsCacheListener(cache);
            DeletedCacheListener deletedCacheListener = new DeletedCacheListener(cache);
            cache.getListenable().addListener((Object)existsListener);
            cache.start();
            leaderElectionService.start((LeaderContender)testingContender);
            testingContender.waitForLeader(timeout.toMillis());
            Future<Boolean> existsFuture = existsListener.nodeExists();
            Await.result(existsFuture, (Duration)timeout);
            cache.getListenable().addListener((Object)deletedCacheListener);
            leaderElectionService.stop();
            Future<Boolean> deletedFuture = deletedCacheListener.nodeDeleted();
            Await.result(deletedFuture, (Duration)timeout);
            leaderRetrievalService.start((LeaderRetrievalListener)listener);
            try {
                listener.waitForNewLeader(1000L);
                Assert.fail((String)"TimeoutException was expected because there is no leader registered and thus there shouldn't be any leader information in ZooKeeper.");
            }
            catch (TimeoutException timeoutException) {
                // empty catch block
            }
        }
        finally {
            if (leaderRetrievalService != null) {
                leaderRetrievalService.stop();
            }
            if (cache != null) {
                cache.close();
            }
            if (client != null) {
                client.close();
            }
        }
    }

    public static class DeletedCacheListener
    implements NodeCacheListener {
        final Promise<Boolean> deletedPromise = new Promise.DefaultPromise();
        final NodeCache cache;

        public DeletedCacheListener(NodeCache cache) {
            this.cache = cache;
        }

        public Future<Boolean> nodeDeleted() {
            return this.deletedPromise.future();
        }

        public void nodeChanged() throws Exception {
            ChildData data = this.cache.getCurrentData();
            if (data == null && !this.deletedPromise.isCompleted()) {
                this.deletedPromise.success((Object)true);
                this.cache.getListenable().removeListener((Object)this);
            }
        }
    }

    public static class ExistsCacheListener
    implements NodeCacheListener {
        final Promise<Boolean> existsPromise = new Promise.DefaultPromise();
        final NodeCache cache;

        public ExistsCacheListener(NodeCache cache) {
            this.cache = cache;
        }

        public Future<Boolean> nodeExists() {
            return this.existsPromise.future();
        }

        public void nodeChanged() throws Exception {
            ChildData data = this.cache.getCurrentData();
            if (data != null && !this.existsPromise.isCompleted()) {
                this.existsPromise.success((Object)true);
                this.cache.getListenable().removeListener((Object)this);
            }
        }
    }
}

