package org.apache.flink.runtime.leaderelection;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.curator.test.TestingServer;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.shaded.curator.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.curator.org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
import org.apache.flink.shaded.curator.org.apache.curator.framework.api.CreateBuilder;
import org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.flink.shaded.zookeeper.org.apache.zookeeper.CreateMode;
import org.apache.flink.shaded.zookeeper.org.apache.zookeeper.KeeperException;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.class */
public class ZooKeeperLeaderElectionTest extends TestLogger {
    private TestingServer testingServer;
    private Configuration configuration;
    private CuratorFramework client;
    private static final String TEST_URL = "akka//user/jobmanager";
    private static final long timeout = 200000;
    private static Logger LOG = LoggerFactory.getLogger(ZooKeeperLeaderElectionTest.class);

    /* loaded from: input_file:org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest$DeletedCacheListener.class */
    private static class DeletedCacheListener implements NodeCacheListener {
        final CompletableFuture<Boolean> deletedPromise = new CompletableFuture<>();
        final NodeCache cache;

        public DeletedCacheListener(NodeCache nodeCache) {
            this.cache = nodeCache;
        }

        public Future<Boolean> nodeDeleted() {
            return this.deletedPromise;
        }

        public void nodeChanged() throws Exception {
            if (this.cache.getCurrentData() != null || this.deletedPromise.isDone()) {
                return;
            }
            this.deletedPromise.complete(true);
            this.cache.getListenable().removeListener(this);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest$ExistsCacheListener.class */
    private static class ExistsCacheListener implements NodeCacheListener {
        final CompletableFuture<Boolean> existsPromise = new CompletableFuture<>();
        final NodeCache cache;

        public ExistsCacheListener(NodeCache nodeCache) {
            this.cache = nodeCache;
        }

        public Future<Boolean> nodeExists() {
            return this.existsPromise;
        }

        public void nodeChanged() throws Exception {
            if (this.cache.getCurrentData() == null || this.existsPromise.isDone()) {
                return;
            }
            this.existsPromise.complete(true);
            this.cache.getListenable().removeListener(this);
        }
    }

    @Before
    public void before() {
        try {
            this.testingServer = new TestingServer();
            this.configuration = new Configuration();
            this.configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, this.testingServer.getConnectString());
            this.configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
            this.client = ZooKeeperUtils.startCuratorFramework(this.configuration);
        } catch (Exception e) {
            throw new RuntimeException("Could not start ZooKeeper testing cluster.", e);
        }
    }

    @After
    public void after() throws IOException {
        if (this.client != null) {
            this.client.close();
            this.client = null;
        }
        if (this.testingServer != null) {
            this.testingServer.stop();
            this.testingServer = null;
        }
    }

    @Test
    public void testZooKeeperLeaderElectionRetrieval() throws Exception {
        ZooKeeperLeaderElectionService zooKeeperLeaderElectionService = null;
        ZooKeeperLeaderRetrievalService zooKeeperLeaderRetrievalService = null;
        try {
            zooKeeperLeaderElectionService = ZooKeeperUtils.createLeaderElectionService(this.client, this.configuration);
            zooKeeperLeaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(this.client, this.configuration);
            TestingContender testingContender = new TestingContender(TEST_URL, zooKeeperLeaderElectionService);
            TestingListener testingListener = new TestingListener();
            zooKeeperLeaderElectionService.start(testingContender);
            zooKeeperLeaderRetrievalService.start(testingListener);
            testingContender.waitForLeader(timeout);
            Assert.assertTrue(testingContender.isLeader());
            Assert.assertEquals(zooKeeperLeaderElectionService.getLeaderSessionID(), testingContender.getLeaderSessionID());
            testingListener.waitForNewLeader(timeout);
            Assert.assertEquals(TEST_URL, testingListener.getAddress());
            Assert.assertEquals(zooKeeperLeaderElectionService.getLeaderSessionID(), testingListener.getLeaderSessionID());
            if (zooKeeperLeaderElectionService != null) {
                zooKeeperLeaderElectionService.stop();
            }
            if (zooKeeperLeaderRetrievalService != null) {
                zooKeeperLeaderRetrievalService.stop();
            }
        } catch (Throwable th) {
            if (zooKeeperLeaderElectionService != null) {
                zooKeeperLeaderElectionService.stop();
            }
            if (zooKeeperLeaderRetrievalService != null) {
                zooKeeperLeaderRetrievalService.stop();
            }
            throw th;
        }
    }

    @Test
    public void testZooKeeperReelection() throws Exception {
        Deadline fromNow = Deadline.fromNow(Duration.ofMinutes(5L));
        ZooKeeperLeaderElectionService[] zooKeeperLeaderElectionServiceArr = new ZooKeeperLeaderElectionService[10];
        TestingContender[] testingContenderArr = new TestingContender[10];
        ZooKeeperLeaderRetrievalService zooKeeperLeaderRetrievalService = null;
        TestingListener testingListener = new TestingListener();
        try {
            zooKeeperLeaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(this.client, this.configuration);
            LOG.debug("Start leader retrieval service for the TestingListener.");
            zooKeeperLeaderRetrievalService.start(testingListener);
            for (int i = 0; i < 10; i++) {
                zooKeeperLeaderElectionServiceArr[i] = ZooKeeperUtils.createLeaderElectionService(this.client, this.configuration);
                testingContenderArr[i] = new TestingContender("akka//user/jobmanager_" + i, zooKeeperLeaderElectionServiceArr[i]);
                LOG.debug("Start leader election service for contender #{}.", Integer.valueOf(i));
                zooKeeperLeaderElectionServiceArr[i].start(testingContenderArr[i]);
            }
            Pattern compile = Pattern.compile("akka//user/jobmanager_(\\d+)");
            int i2 = 0;
            while (fromNow.hasTimeLeft() && i2 < 10) {
                LOG.debug("Wait for new leader #{}.", Integer.valueOf(i2));
                String waitForNewLeader = testingListener.waitForNewLeader(fromNow.timeLeft().toMillis());
                Matcher matcher = compile.matcher(waitForNewLeader);
                if (matcher.find()) {
                    int parseInt = Integer.parseInt(matcher.group(1));
                    TestingContender testingContender = testingContenderArr[parseInt];
                    if (waitForNewLeader.equals(testingContender.getAddress()) && testingListener.getLeaderSessionID().equals(testingContender.getLeaderSessionID())) {
                        LOG.debug("Stop leader election service of contender #{}.", Integer.valueOf(i2));
                        zooKeeperLeaderElectionServiceArr[parseInt].stop();
                        zooKeeperLeaderElectionServiceArr[parseInt] = null;
                        i2++;
                    }
                } else {
                    Assert.fail("Did not find the leader's index.");
                }
            }
            Assert.assertFalse("Did not complete the leader reelection in time.", fromNow.isOverdue());
            Assert.assertEquals(10, i2);
            if (zooKeeperLeaderRetrievalService != null) {
                zooKeeperLeaderRetrievalService.stop();
            }
            for (ZooKeeperLeaderElectionService zooKeeperLeaderElectionService : zooKeeperLeaderElectionServiceArr) {
                if (zooKeeperLeaderElectionService != null) {
                    zooKeeperLeaderElectionService.stop();
                }
            }
        } catch (Throwable th) {
            if (zooKeeperLeaderRetrievalService != null) {
                zooKeeperLeaderRetrievalService.stop();
            }
            for (ZooKeeperLeaderElectionService zooKeeperLeaderElectionService2 : zooKeeperLeaderElectionServiceArr) {
                if (zooKeeperLeaderElectionService2 != null) {
                    zooKeeperLeaderElectionService2.stop();
                }
            }
            throw th;
        }
    }

    @Test
    public void testZooKeeperReelectionWithReplacement() throws Exception {
        ZooKeeperLeaderElectionService[] zooKeeperLeaderElectionServiceArr = new ZooKeeperLeaderElectionService[3];
        TestingContender[] testingContenderArr = new TestingContender[3];
        ZooKeeperLeaderRetrievalService zooKeeperLeaderRetrievalService = null;
        TestingListener testingListener = new TestingListener();
        try {
            zooKeeperLeaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(this.client, this.configuration);
            zooKeeperLeaderRetrievalService.start(testingListener);
            for (int i = 0; i < 3; i++) {
                zooKeeperLeaderElectionServiceArr[i] = ZooKeeperUtils.createLeaderElectionService(this.client, this.configuration);
                testingContenderArr[i] = new TestingContender("akka//user/jobmanager_" + i + "_0", zooKeeperLeaderElectionServiceArr[i]);
                zooKeeperLeaderElectionServiceArr[i].start(testingContenderArr[i]);
            }
            Pattern compile = Pattern.compile("akka//user/jobmanager_(\\d+)_(\\d+)");
            for (int i2 = 0; i2 < 30; i2++) {
                testingListener.waitForNewLeader(timeout);
                Matcher matcher = compile.matcher(testingListener.getAddress());
                if (!matcher.find()) {
                    throw new Exception("Did not find the leader's index.");
                }
                int parseInt = Integer.parseInt(matcher.group(1));
                int parseInt2 = Integer.parseInt(matcher.group(2));
                Assert.assertEquals(testingListener.getLeaderSessionID(), testingContenderArr[parseInt].getLeaderSessionID());
                zooKeeperLeaderElectionServiceArr[parseInt].stop();
                zooKeeperLeaderElectionServiceArr[parseInt] = ZooKeeperUtils.createLeaderElectionService(this.client, this.configuration);
                testingContenderArr[parseInt] = new TestingContender("akka//user/jobmanager_" + parseInt + "_" + (parseInt2 + 1), zooKeeperLeaderElectionServiceArr[parseInt]);
                zooKeeperLeaderElectionServiceArr[parseInt].start(testingContenderArr[parseInt]);
            }
            if (zooKeeperLeaderRetrievalService != null) {
                zooKeeperLeaderRetrievalService.stop();
            }
            for (ZooKeeperLeaderElectionService zooKeeperLeaderElectionService : zooKeeperLeaderElectionServiceArr) {
                if (zooKeeperLeaderElectionService != null) {
                    zooKeeperLeaderElectionService.stop();
                }
            }
        } catch (Throwable th) {
            if (zooKeeperLeaderRetrievalService != null) {
                zooKeeperLeaderRetrievalService.stop();
            }
            for (ZooKeeperLeaderElectionService zooKeeperLeaderElectionService2 : zooKeeperLeaderElectionServiceArr) {
                if (zooKeeperLeaderElectionService2 != null) {
                    zooKeeperLeaderElectionService2.stop();
                }
            }
            throw th;
        }
    }

    @Test
    public void testMultipleLeaders() throws Exception {
        this.configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_LEADER_PATH, "/leader");
        ZooKeeperLeaderElectionService zooKeeperLeaderElectionService = null;
        ZooKeeperLeaderRetrievalService zooKeeperLeaderRetrievalService = null;
        ZooKeeperLeaderRetrievalService zooKeeperLeaderRetrievalService2 = null;
        TestingListener testingListener = new TestingListener();
        TestingListener testingListener2 = new TestingListener();
        try {
            zooKeeperLeaderElectionService = ZooKeeperUtils.createLeaderElectionService(this.client, this.configuration);
            zooKeeperLeaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(this.client, this.configuration);
            zooKeeperLeaderRetrievalService2 = ZooKeeperUtils.createLeaderRetrievalService(this.client, this.configuration);
            TestingContender testingContender = new TestingContender(TEST_URL, zooKeeperLeaderElectionService);
            zooKeeperLeaderElectionService.start(testingContender);
            zooKeeperLeaderRetrievalService.start(testingListener);
            testingListener.waitForNewLeader(timeout);
            Assert.assertEquals(testingListener.getLeaderSessionID(), testingContender.getLeaderSessionID());
            Assert.assertEquals(TEST_URL, testingListener.getAddress());
            CuratorFramework startCuratorFramework = ZooKeeperUtils.startCuratorFramework(this.configuration);
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);
            objectOutputStream.writeUTF("faultyContender");
            objectOutputStream.writeObject(null);
            objectOutputStream.close();
            boolean z = false;
            while (!z) {
                startCuratorFramework.delete().forPath("/leader");
                try {
                    startCuratorFramework.create().forPath("/leader", byteArrayOutputStream.toByteArray());
                    z = true;
                } catch (KeeperException.NodeExistsException e) {
                }
            }
            zooKeeperLeaderRetrievalService2.start(testingListener2);
            testingListener2.waitForNewLeader(timeout);
            if ("faultyContender".equals(testingListener2.getAddress())) {
                testingListener2.waitForNewLeader(timeout);
            }
            Assert.assertEquals(testingListener2.getLeaderSessionID(), testingContender.getLeaderSessionID());
            Assert.assertEquals(testingListener2.getAddress(), testingContender.getAddress());
            if (zooKeeperLeaderElectionService != null) {
                zooKeeperLeaderElectionService.stop();
            }
            if (zooKeeperLeaderRetrievalService != null) {
                zooKeeperLeaderRetrievalService.stop();
            }
            if (zooKeeperLeaderRetrievalService2 != null) {
                zooKeeperLeaderRetrievalService2.stop();
            }
        } catch (Throwable th) {
            if (zooKeeperLeaderElectionService != null) {
                zooKeeperLeaderElectionService.stop();
            }
            if (zooKeeperLeaderRetrievalService != null) {
                zooKeeperLeaderRetrievalService.stop();
            }
            if (zooKeeperLeaderRetrievalService2 != null) {
                zooKeeperLeaderRetrievalService2.stop();
            }
            throw th;
        }
    }

    @Test
    public void testExceptionForwarding() throws Exception {
        ZooKeeperLeaderElectionService zooKeeperLeaderElectionService = null;
        ZooKeeperLeaderRetrievalService zooKeeperLeaderRetrievalService = null;
        TestingListener testingListener = new TestingListener();
        final CreateBuilder createBuilder = (CreateBuilder) Mockito.mock(CreateBuilder.class, Mockito.RETURNS_DEEP_STUBS);
        Exception exc = new Exception("Test exception");
        try {
            CuratorFramework curatorFramework = (CuratorFramework) Mockito.spy(ZooKeeperUtils.startCuratorFramework(this.configuration));
            ((CuratorFramework) Mockito.doAnswer(new Answer<CreateBuilder>() { // from class: org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionTest.1
                private int counter = 0;

                /* renamed from: answer, reason: merged with bridge method [inline-methods] */
                public CreateBuilder m119answer(InvocationOnMock invocationOnMock) throws Throwable {
                    this.counter++;
                    return this.counter < 2 ? (CreateBuilder) invocationOnMock.callRealMethod() : createBuilder;
                }
            }).when(curatorFramework)).create();
            Mockito.when(((ACLBackgroundPathAndBytesable) createBuilder.creatingParentsIfNeeded().withMode((CreateMode) Matchers.any(CreateMode.class))).forPath(Mockito.anyString(), (byte[]) Mockito.any(byte[].class))).thenThrow(new Throwable[]{exc});
            zooKeeperLeaderElectionService = new ZooKeeperLeaderElectionService(curatorFramework, "/latch", "/leader");
            zooKeeperLeaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(curatorFramework, this.configuration);
            TestingContender testingContender = new TestingContender(TEST_URL, zooKeeperLeaderElectionService);
            zooKeeperLeaderElectionService.start(testingContender);
            zooKeeperLeaderRetrievalService.start(testingListener);
            testingContender.waitForError(timeout);
            Assert.assertNotNull(testingContender.getError());
            Assert.assertEquals(exc, testingContender.getError().getCause());
            if (zooKeeperLeaderElectionService != null) {
                zooKeeperLeaderElectionService.stop();
            }
            if (zooKeeperLeaderRetrievalService != null) {
                zooKeeperLeaderRetrievalService.stop();
            }
        } catch (Throwable th) {
            if (zooKeeperLeaderElectionService != null) {
                zooKeeperLeaderElectionService.stop();
            }
            if (zooKeeperLeaderRetrievalService != null) {
                zooKeeperLeaderRetrievalService.stop();
            }
            throw th;
        }
    }

    @Test
    public void testEphemeralZooKeeperNodes() throws Exception {
        ZooKeeperLeaderRetrievalService zooKeeperLeaderRetrievalService = null;
        CuratorFramework curatorFramework = null;
        NodeCache nodeCache = null;
        try {
            CuratorFramework startCuratorFramework = ZooKeeperUtils.startCuratorFramework(this.configuration);
            curatorFramework = ZooKeeperUtils.startCuratorFramework(this.configuration);
            ZooKeeperLeaderElectionService createLeaderElectionService = ZooKeeperUtils.createLeaderElectionService(startCuratorFramework, this.configuration);
            zooKeeperLeaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(curatorFramework, this.configuration);
            TestingContender testingContender = new TestingContender(TEST_URL, createLeaderElectionService);
            TestingListener testingListener = new TestingListener();
            nodeCache = new NodeCache(curatorFramework, this.configuration.getString(HighAvailabilityOptions.HA_ZOOKEEPER_LEADER_PATH));
            ExistsCacheListener existsCacheListener = new ExistsCacheListener(nodeCache);
            DeletedCacheListener deletedCacheListener = new DeletedCacheListener(nodeCache);
            nodeCache.getListenable().addListener(existsCacheListener);
            nodeCache.start();
            createLeaderElectionService.start(testingContender);
            testingContender.waitForLeader(timeout);
            existsCacheListener.nodeExists().get(timeout, TimeUnit.MILLISECONDS);
            nodeCache.getListenable().addListener(deletedCacheListener);
            createLeaderElectionService.stop();
            startCuratorFramework.close();
            deletedCacheListener.nodeDeleted().get(timeout, TimeUnit.MILLISECONDS);
            zooKeeperLeaderRetrievalService.start(testingListener);
            try {
                testingListener.waitForNewLeader(1000L);
                Assert.fail("TimeoutException was expected because there is no leader registered and thus there shouldn't be any leader information in ZooKeeper.");
            } catch (TimeoutException e) {
            }
            if (zooKeeperLeaderRetrievalService != null) {
                zooKeeperLeaderRetrievalService.stop();
            }
            if (nodeCache != null) {
                nodeCache.close();
            }
            if (curatorFramework != null) {
                curatorFramework.close();
            }
        } catch (Throwable th) {
            if (zooKeeperLeaderRetrievalService != null) {
                zooKeeperLeaderRetrievalService.stop();
            }
            if (nodeCache != null) {
                nodeCache.close();
            }
            if (curatorFramework != null) {
                curatorFramework.close();
            }
            throw th;
        }
    }
}
