package org.apache.flink.runtime.zookeeper;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.state.RetrievableStateHandle;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.data.Stat;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matchers;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.class */
public class ZooKeeperStateHandleStoreTest extends TestLogger {
    private static final ZooKeeperTestEnvironment ZOOKEEPER = new ZooKeeperTestEnvironment(1);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest$LongRetrievableStateHandle.class */
    public static class LongRetrievableStateHandle implements RetrievableStateHandle<Long> {
        private static final long serialVersionUID = -3555329254423838912L;
        private static int numberOfGlobalDiscardCalls = 0;
        private final Long state;
        private int numberOfDiscardCalls = 0;

        public LongRetrievableStateHandle(Long l) {
            this.state = l;
        }

        /* renamed from: retrieveState, reason: merged with bridge method [inline-methods] */
        public Long m503retrieveState() {
            return this.state;
        }

        public void discardState() throws Exception {
            numberOfGlobalDiscardCalls++;
            this.numberOfDiscardCalls++;
        }

        public long getStateSize() {
            return 0L;
        }

        int getNumberOfDiscardCalls() {
            return this.numberOfDiscardCalls;
        }

        public static int getNumberOfGlobalDiscardCalls() {
            return numberOfGlobalDiscardCalls;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest$LongStateStorage.class */
    private static class LongStateStorage implements RetrievableStateStorageHelper<Long> {
        private final List<LongRetrievableStateHandle> stateHandles;

        private LongStateStorage() {
            this.stateHandles = new ArrayList();
        }

        public RetrievableStateHandle<Long> store(Long l) throws Exception {
            LongRetrievableStateHandle longRetrievableStateHandle = new LongRetrievableStateHandle(l);
            this.stateHandles.add(longRetrievableStateHandle);
            return longRetrievableStateHandle;
        }

        List<LongRetrievableStateHandle> getStateHandles() {
            return this.stateHandles;
        }
    }

    @AfterClass
    public static void tearDown() throws Exception {
        if (ZOOKEEPER != null) {
            ZOOKEEPER.shutdown();
        }
    }

    @Before
    public void cleanUp() throws Exception {
        ZOOKEEPER.deleteAll();
    }

    @Test
    public void testAddAndLock() throws Exception {
        ZooKeeperStateHandleStore zooKeeperStateHandleStore = new ZooKeeperStateHandleStore(ZOOKEEPER.getClient(), new LongStateStorage());
        zooKeeperStateHandleStore.addAndLock("/testAdd", 1239712317L);
        Assert.assertEquals(1L, zooKeeperStateHandleStore.getAllAndLock().size());
        Assert.assertEquals(1239712317L, zooKeeperStateHandleStore.getAndLock("/testAdd").retrieveState());
        Stat stat = (Stat) ZOOKEEPER.getClient().checkExists().forPath("/testAdd");
        Assert.assertNotNull(stat);
        Assert.assertEquals(0L, stat.getEphemeralOwner());
        List list = (List) ZOOKEEPER.getClient().getChildren().forPath("/testAdd");
        Assert.assertEquals(1L, list.size());
        Stat stat2 = (Stat) ZOOKEEPER.getClient().checkExists().forPath("/testAdd/" + ((String) list.get(0)));
        Assert.assertNotNull(stat2);
        Assert.assertNotEquals(0L, stat2.getEphemeralOwner());
        Assert.assertEquals(1239712317L, (Long) ((RetrievableStateHandle) InstantiationUtil.deserializeObject((byte[]) ZOOKEEPER.getClient().getData().forPath("/testAdd"), ClassLoader.getSystemClassLoader())).retrieveState());
    }

    @Test(expected = Exception.class)
    public void testAddAlreadyExistingPath() throws Exception {
        LongStateStorage longStateStorage = new LongStateStorage();
        ZooKeeperStateHandleStore zooKeeperStateHandleStore = new ZooKeeperStateHandleStore(ZOOKEEPER.getClient(), longStateStorage);
        ZOOKEEPER.getClient().create().forPath("/testAddAlreadyExistingPath");
        zooKeeperStateHandleStore.addAndLock("/testAddAlreadyExistingPath", 1L);
        Assert.assertEquals(1, longStateStorage.getStateHandles());
        Assert.assertEquals(1L, longStateStorage.getStateHandles().get(0).getNumberOfDiscardCalls());
    }

    @Test
    public void testAddDiscardStateHandleAfterFailure() throws Exception {
        LongStateStorage longStateStorage = new LongStateStorage();
        CuratorFramework curatorFramework = (CuratorFramework) Mockito.spy(ZOOKEEPER.getClient());
        Mockito.when(curatorFramework.inTransaction().create()).thenThrow(new Throwable[]{new RuntimeException("Expected test Exception.")});
        try {
            new ZooKeeperStateHandleStore(curatorFramework, longStateStorage).addAndLock("/testAddDiscardStateHandleAfterFailure", 81282227L);
            Assert.fail("Did not throw expected exception");
        } catch (Exception e) {
        }
        Assert.assertEquals(1L, longStateStorage.getStateHandles().size());
        Assert.assertEquals(81282227L, longStateStorage.getStateHandles().get(0).m503retrieveState());
        Assert.assertEquals(1L, longStateStorage.getStateHandles().get(0).getNumberOfDiscardCalls());
    }

    @Test
    public void testReplace() throws Exception {
        LongStateStorage longStateStorage = new LongStateStorage();
        ZooKeeperStateHandleStore zooKeeperStateHandleStore = new ZooKeeperStateHandleStore(ZOOKEEPER.getClient(), longStateStorage);
        zooKeeperStateHandleStore.addAndLock("/testReplace", 30968470898L);
        zooKeeperStateHandleStore.replace("/testReplace", 0, 88383776661L);
        Assert.assertEquals(2L, longStateStorage.getStateHandles().size());
        Assert.assertEquals(30968470898L, longStateStorage.getStateHandles().get(0).m503retrieveState());
        Assert.assertEquals(88383776661L, longStateStorage.getStateHandles().get(1).m503retrieveState());
        Stat stat = (Stat) ZOOKEEPER.getClient().checkExists().forPath("/testReplace");
        Assert.assertNotNull(stat);
        Assert.assertEquals(0L, stat.getEphemeralOwner());
        Assert.assertEquals(88383776661L, (Long) ((RetrievableStateHandle) InstantiationUtil.deserializeObject((byte[]) ZOOKEEPER.getClient().getData().forPath("/testReplace"), ClassLoader.getSystemClassLoader())).retrieveState());
    }

    @Test(expected = Exception.class)
    public void testReplaceNonExistingPath() throws Exception {
        new ZooKeeperStateHandleStore(ZOOKEEPER.getClient(), new LongStateStorage()).replace("/testReplaceNonExistingPath", 0, 1L);
    }

    @Test
    public void testReplaceDiscardStateHandleAfterFailure() throws Exception {
        LongStateStorage longStateStorage = new LongStateStorage();
        CuratorFramework curatorFramework = (CuratorFramework) Mockito.spy(ZOOKEEPER.getClient());
        Mockito.when(curatorFramework.setData()).thenThrow(new Throwable[]{new RuntimeException("Expected test Exception.")});
        ZooKeeperStateHandleStore zooKeeperStateHandleStore = new ZooKeeperStateHandleStore(curatorFramework, longStateStorage);
        zooKeeperStateHandleStore.addAndLock("/testReplaceDiscardStateHandleAfterFailure", 30968470898L);
        try {
            zooKeeperStateHandleStore.replace("/testReplaceDiscardStateHandleAfterFailure", 0, 88383776661L);
            Assert.fail("Did not throw expected exception");
        } catch (Exception e) {
        }
        Assert.assertEquals(2L, longStateStorage.getStateHandles().size());
        Assert.assertEquals(30968470898L, longStateStorage.getStateHandles().get(0).m503retrieveState());
        Assert.assertEquals(88383776661L, longStateStorage.getStateHandles().get(1).m503retrieveState());
        Assert.assertEquals(1L, longStateStorage.getStateHandles().get(1).getNumberOfDiscardCalls());
        Assert.assertEquals(30968470898L, (Long) ((RetrievableStateHandle) InstantiationUtil.deserializeObject((byte[]) ZOOKEEPER.getClient().getData().forPath("/testReplaceDiscardStateHandleAfterFailure"), ClassLoader.getSystemClassLoader())).retrieveState());
    }

    @Test
    public void testGetAndExists() throws Exception {
        ZooKeeperStateHandleStore zooKeeperStateHandleStore = new ZooKeeperStateHandleStore(ZOOKEEPER.getClient(), new LongStateStorage());
        Assert.assertEquals(-1L, zooKeeperStateHandleStore.exists("/testGetAndExists"));
        zooKeeperStateHandleStore.addAndLock("/testGetAndExists", 311222268470898L);
        Assert.assertEquals(311222268470898L, zooKeeperStateHandleStore.getAndLock("/testGetAndExists").retrieveState());
        Assert.assertTrue(zooKeeperStateHandleStore.exists("/testGetAndExists") >= 0);
    }

    @Test(expected = Exception.class)
    public void testGetNonExistingPath() throws Exception {
        new ZooKeeperStateHandleStore(ZOOKEEPER.getClient(), new LongStateStorage()).getAndLock("/testGetNonExistingPath");
    }

    @Test
    public void testGetAll() throws Exception {
        ZooKeeperStateHandleStore zooKeeperStateHandleStore = new ZooKeeperStateHandleStore(ZOOKEEPER.getClient(), new LongStateStorage());
        HashSet hashSet = new HashSet();
        hashSet.add(311222268470898L);
        hashSet.add(132812888L);
        hashSet.add(27255442L);
        hashSet.add(11122233124L);
        Iterator it = hashSet.iterator();
        while (it.hasNext()) {
            long longValue = ((Long) it.next()).longValue();
            zooKeeperStateHandleStore.addAndLock("/testGetAll" + longValue, Long.valueOf(longValue));
        }
        Iterator it2 = zooKeeperStateHandleStore.getAllAndLock().iterator();
        while (it2.hasNext()) {
            Assert.assertTrue(hashSet.remove(((RetrievableStateHandle) ((Tuple2) it2.next()).f0).retrieveState()));
        }
        Assert.assertEquals(0L, hashSet.size());
    }

    @Test
    public void testGetAllSortedByName() throws Exception {
        ZooKeeperStateHandleStore zooKeeperStateHandleStore = new ZooKeeperStateHandleStore(ZOOKEEPER.getClient(), new LongStateStorage());
        Long[] lArr = {311222268470898L, 132812888L, 27255442L, 11122233124L};
        for (Long l : lArr) {
            long longValue = l.longValue();
            zooKeeperStateHandleStore.addAndLock(String.format("%s%016d", "/testGetAllSortedByName", Long.valueOf(longValue)), Long.valueOf(longValue));
        }
        List allAndLock = zooKeeperStateHandleStore.getAllAndLock();
        Assert.assertEquals(lArr.length, allAndLock.size());
        Arrays.sort(lArr);
        Collections.sort(allAndLock, Comparator.comparing(tuple2 -> {
            return (String) tuple2.f1;
        }));
        for (int i = 0; i < lArr.length; i++) {
            Assert.assertEquals(lArr[i], ((RetrievableStateHandle) ((Tuple2) allAndLock.get(i)).f0).retrieveState());
        }
    }

    @Test
    public void testRemove() throws Exception {
        ZooKeeperStateHandleStore zooKeeperStateHandleStore = new ZooKeeperStateHandleStore(ZOOKEEPER.getClient(), new LongStateStorage());
        zooKeeperStateHandleStore.addAndLock("/testRemove", 27255442L);
        int numberOfGlobalDiscardCalls = LongRetrievableStateHandle.getNumberOfGlobalDiscardCalls();
        zooKeeperStateHandleStore.releaseAndTryRemove("/testRemove");
        Assert.assertEquals(0L, ((List) ZOOKEEPER.getClient().getChildren().forPath("/")).size());
        Assert.assertEquals(numberOfGlobalDiscardCalls + 1, LongRetrievableStateHandle.getNumberOfGlobalDiscardCalls());
    }

    @Test
    public void testReleaseAndTryRemoveAll() throws Exception {
        ZooKeeperStateHandleStore zooKeeperStateHandleStore = new ZooKeeperStateHandleStore(ZOOKEEPER.getClient(), new LongStateStorage());
        HashSet hashSet = new HashSet();
        hashSet.add(311222268470898L);
        hashSet.add(132812888L);
        hashSet.add(27255442L);
        hashSet.add(11122233124L);
        Iterator it = hashSet.iterator();
        while (it.hasNext()) {
            long longValue = ((Long) it.next()).longValue();
            zooKeeperStateHandleStore.addAndLock("/testDiscardAll" + longValue, Long.valueOf(longValue));
        }
        zooKeeperStateHandleStore.releaseAndTryRemoveAll();
        Assert.assertEquals(0L, ((List) ZOOKEEPER.getClient().getChildren().forPath("/")).size());
    }

    @Test
    public void testCorruptedData() throws Exception {
        ZooKeeperStateHandleStore zooKeeperStateHandleStore = new ZooKeeperStateHandleStore(ZOOKEEPER.getClient(), new LongStateStorage());
        HashSet<Long> hashSet = new HashSet();
        hashSet.add(1L);
        hashSet.add(2L);
        hashSet.add(3L);
        for (Long l : hashSet) {
            zooKeeperStateHandleStore.addAndLock("/" + l, l);
        }
        ZOOKEEPER.getClient().setData().forPath("/2", new byte[2]);
        List allAndLock = zooKeeperStateHandleStore.getAllAndLock();
        HashSet hashSet2 = new HashSet(hashSet);
        hashSet2.remove(2L);
        HashSet hashSet3 = new HashSet(hashSet2.size());
        Iterator it = allAndLock.iterator();
        while (it.hasNext()) {
            hashSet3.add(((RetrievableStateHandle) ((Tuple2) it.next()).f0).retrieveState());
        }
        Assert.assertEquals(hashSet2, hashSet3);
    }

    @Test
    public void testConcurrentDeleteOperation() throws Exception {
        LongStateStorage longStateStorage = new LongStateStorage();
        ZooKeeperStateHandleStore zooKeeperStateHandleStore = new ZooKeeperStateHandleStore(ZOOKEEPER.getClient(), longStateStorage);
        ZooKeeperStateHandleStore zooKeeperStateHandleStore2 = new ZooKeeperStateHandleStore(ZOOKEEPER.getClient(), longStateStorage);
        zooKeeperStateHandleStore.addAndLock("/state", 42L);
        RetrievableStateHandle andLock = zooKeeperStateHandleStore2.getAndLock("/state");
        zooKeeperStateHandleStore.releaseAndTryRemove("/state");
        Assert.assertEquals(42L, ((Long) andLock.retrieveState()).longValue());
        Assert.assertNotNull("NodeStat should not be null, otherwise the referenced node does not exist.", (Stat) ZOOKEEPER.getClient().checkExists().forPath("/state"));
        zooKeeperStateHandleStore2.releaseAndTryRemove("/state");
        Assert.assertNull("NodeState should be null, because the referenced node should no longer exist.", (Stat) ZOOKEEPER.getClient().checkExists().forPath("/state"));
    }

    @Test
    public void testLockCleanupWhenGetAndLockFails() throws Exception {
        LongStateStorage longStateStorage = new LongStateStorage();
        ZooKeeperStateHandleStore zooKeeperStateHandleStore = new ZooKeeperStateHandleStore(ZOOKEEPER.getClient(), longStateStorage);
        ZooKeeperStateHandleStore zooKeeperStateHandleStore2 = new ZooKeeperStateHandleStore(ZOOKEEPER.getClient(), longStateStorage);
        zooKeeperStateHandleStore.addAndLock("/state", 42L);
        ZOOKEEPER.getClient().setData().forPath("/state", new byte[]{1, 2});
        try {
            zooKeeperStateHandleStore2.getAndLock("/state");
            Assert.fail("Should fail because we cannot deserialize the node's data");
        } catch (IOException e) {
        }
        Assert.assertNull("zkStore2 should not have created a lock node.", (Stat) ZOOKEEPER.getClient().checkExists().forPath(zooKeeperStateHandleStore2.getLockPath("/state")));
        Assert.assertEquals(1L, ((Collection) ZOOKEEPER.getClient().getChildren().forPath("/state")).size());
        zooKeeperStateHandleStore.releaseAndTryRemove("/state");
        Assert.assertNull("The state node should have been removed.", (Stat) ZOOKEEPER.getClient().checkExists().forPath("/state"));
    }

    @Test
    public void testLockCleanupWhenClientTimesOut() throws Exception {
        LongStateStorage longStateStorage = new LongStateStorage();
        Configuration configuration = new Configuration();
        configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, ZOOKEEPER.getConnectString());
        configuration.setInteger(HighAvailabilityOptions.ZOOKEEPER_SESSION_TIMEOUT, 100);
        configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_ROOT, "timeout");
        CuratorFramework startCuratorFramework = ZooKeeperUtils.startCuratorFramework(configuration);
        Throwable th = null;
        try {
            CuratorFramework startCuratorFramework2 = ZooKeeperUtils.startCuratorFramework(configuration);
            Throwable th2 = null;
            try {
                try {
                    new ZooKeeperStateHandleStore(startCuratorFramework, longStateStorage).addAndLock("/state", 42L);
                    startCuratorFramework.close();
                    Assert.assertNotNull((Stat) startCuratorFramework2.checkExists().forPath("/state"));
                    Assert.assertEquals(0L, ((Collection) startCuratorFramework2.getChildren().forPath("/state")).size());
                    if (startCuratorFramework2 != null) {
                        if (0 != 0) {
                            try {
                                startCuratorFramework2.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            startCuratorFramework2.close();
                        }
                    }
                    if (startCuratorFramework != null) {
                        if (0 == 0) {
                            startCuratorFramework.close();
                            return;
                        }
                        try {
                            startCuratorFramework.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    th2 = th5;
                    throw th5;
                }
            } catch (Throwable th6) {
                if (startCuratorFramework2 != null) {
                    if (th2 != null) {
                        try {
                            startCuratorFramework2.close();
                        } catch (Throwable th7) {
                            th2.addSuppressed(th7);
                        }
                    } else {
                        startCuratorFramework2.close();
                    }
                }
                throw th6;
            }
        } catch (Throwable th8) {
            if (startCuratorFramework != null) {
                if (0 != 0) {
                    try {
                        startCuratorFramework.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    startCuratorFramework.close();
                }
            }
            throw th8;
        }
    }

    @Test
    public void testRelease() throws Exception {
        ZooKeeperStateHandleStore zooKeeperStateHandleStore = new ZooKeeperStateHandleStore(ZOOKEEPER.getClient(), new LongStateStorage());
        zooKeeperStateHandleStore.addAndLock("/state", 42L);
        Assert.assertNotNull("Expected an existing lock", (Stat) ZOOKEEPER.getClient().checkExists().forPath(zooKeeperStateHandleStore.getLockPath("/state")));
        zooKeeperStateHandleStore.release("/state");
        Assert.assertEquals("Expected no lock nodes as children", 0L, ((Stat) ZOOKEEPER.getClient().checkExists().forPath("/state")).getNumChildren());
        zooKeeperStateHandleStore.releaseAndTryRemove("/state");
        Assert.assertNull("State node should have been removed.", (Stat) ZOOKEEPER.getClient().checkExists().forPath("/state"));
    }

    @Test
    public void testReleaseAll() throws Exception {
        ZooKeeperStateHandleStore zooKeeperStateHandleStore = new ZooKeeperStateHandleStore(ZOOKEEPER.getClient(), new LongStateStorage());
        List asList = Arrays.asList("/state1", "/state2", "/state3");
        Iterator it = asList.iterator();
        while (it.hasNext()) {
            zooKeeperStateHandleStore.addAndLock((String) it.next(), 42L);
        }
        Iterator it2 = asList.iterator();
        while (it2.hasNext()) {
            Assert.assertNotNull("Expecte and existing lock.", (Stat) ZOOKEEPER.getClient().checkExists().forPath(zooKeeperStateHandleStore.getLockPath((String) it2.next())));
        }
        zooKeeperStateHandleStore.releaseAll();
        Iterator it3 = asList.iterator();
        while (it3.hasNext()) {
            Assert.assertEquals(0L, ((Stat) ZOOKEEPER.getClient().checkExists().forPath((String) it3.next())).getNumChildren());
        }
        zooKeeperStateHandleStore.releaseAndTryRemoveAll();
        Assert.assertEquals(0L, ((Stat) ZOOKEEPER.getClient().checkExists().forPath("/")).getNumChildren());
    }

    @Test
    public void testDeleteAllShouldRemoveAllPaths() throws Exception {
        ZooKeeperStateHandleStore zooKeeperStateHandleStore = new ZooKeeperStateHandleStore(ZooKeeperUtils.useNamespaceAndEnsurePath(ZOOKEEPER.getClient(), "/path"), new LongStateStorage());
        zooKeeperStateHandleStore.addAndLock("/state", 1L);
        zooKeeperStateHandleStore.deleteChildren();
        Assert.assertThat(zooKeeperStateHandleStore.getAllPaths(), Matchers.is(Matchers.empty()));
    }
}
