package org.apache.flink.runtime.checkpoint;

import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStoreTest;
import org.apache.flink.runtime.state.LocalStateHandle;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.zookeeper.StateStorageHelper;
import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
import org.apache.flink.shaded.org.apache.curator.framework.CuratorFramework;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.class */
public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpointStoreTest {
    private static final ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1);
    private static final String CheckpointsPath = "/checkpoints";

    @AfterClass
    public static void tearDown() throws Exception {
        if (ZooKeeper != null) {
            ZooKeeper.shutdown();
        }
    }

    @Before
    public void cleanUp() throws Exception {
        ZooKeeper.deleteAll();
    }

    @Override // org.apache.flink.runtime.checkpoint.CompletedCheckpointStoreTest
    protected CompletedCheckpointStore createCompletedCheckpoints(int i, ClassLoader classLoader) throws Exception {
        return new ZooKeeperCompletedCheckpointStore(i, classLoader, ZooKeeper.createClient(), CheckpointsPath, new StateStorageHelper<CompletedCheckpoint>() { // from class: org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStoreITCase.1
            public StateHandle<CompletedCheckpoint> store(CompletedCheckpoint completedCheckpoint) throws Exception {
                return new LocalStateHandle(completedCheckpoint);
            }
        });
    }

    @Test
    public void testRecover() throws Exception {
        CompletedCheckpointStore createCompletedCheckpoints = createCompletedCheckpoints(3, ClassLoader.getSystemClassLoader());
        CompletedCheckpointStoreTest.TestCompletedCheckpoint[] testCompletedCheckpointArr = {createCheckpoint(0), createCheckpoint(1), createCheckpoint(2)};
        createCompletedCheckpoints.addCheckpoint(testCompletedCheckpointArr[0]);
        createCompletedCheckpoints.addCheckpoint(testCompletedCheckpointArr[1]);
        createCompletedCheckpoints.addCheckpoint(testCompletedCheckpointArr[2]);
        Assert.assertEquals(3L, ((List) ZooKeeper.getClient().getChildren().forPath(CheckpointsPath)).size());
        Assert.assertEquals(3L, createCompletedCheckpoints.getNumberOfRetainedCheckpoints());
        createCompletedCheckpoints.recover();
        Deadline fromNow = new FiniteDuration(1L, TimeUnit.MINUTES).fromNow();
        while (fromNow.hasTimeLeft() && ((List) ZooKeeper.getClient().getChildren().forPath(CheckpointsPath)).size() != 1) {
            Thread.sleep(Math.min(100L, fromNow.timeLeft().toMillis()));
        }
        Assert.assertEquals(1L, ((List) ZooKeeper.getClient().getChildren().forPath(CheckpointsPath)).size());
        Assert.assertEquals(1L, createCompletedCheckpoints.getNumberOfRetainedCheckpoints());
        Assert.assertEquals(testCompletedCheckpointArr[2], createCompletedCheckpoints.getLatestCheckpoint());
    }

    @Test
    public void testShutdownDiscardsCheckpoints() throws Exception {
        CuratorFramework client = ZooKeeper.getClient();
        CompletedCheckpointStore createCompletedCheckpoints = createCompletedCheckpoints(1, ClassLoader.getSystemClassLoader());
        CompletedCheckpointStoreTest.TestCompletedCheckpoint createCheckpoint = createCheckpoint(0);
        createCompletedCheckpoints.addCheckpoint(createCheckpoint);
        Assert.assertEquals(1L, createCompletedCheckpoints.getNumberOfRetainedCheckpoints());
        Assert.assertNotNull(client.checkExists().forPath("/checkpoints/" + createCheckpoint.getCheckpointID()));
        createCompletedCheckpoints.shutdown();
        Assert.assertEquals(0L, createCompletedCheckpoints.getNumberOfRetainedCheckpoints());
        Assert.assertNull(client.checkExists().forPath("/checkpoints/" + createCheckpoint.getCheckpointID()));
        createCompletedCheckpoints.recover();
        Assert.assertEquals(0L, createCompletedCheckpoints.getNumberOfRetainedCheckpoints());
    }

    @Test
    public void testSuspendKeepsCheckpoints() throws Exception {
        CuratorFramework client = ZooKeeper.getClient();
        CompletedCheckpointStore createCompletedCheckpoints = createCompletedCheckpoints(1, ClassLoader.getSystemClassLoader());
        CompletedCheckpointStoreTest.TestCompletedCheckpoint createCheckpoint = createCheckpoint(0);
        createCompletedCheckpoints.addCheckpoint(createCheckpoint);
        Assert.assertEquals(1L, createCompletedCheckpoints.getNumberOfRetainedCheckpoints());
        Assert.assertNotNull(client.checkExists().forPath("/checkpoints/" + createCheckpoint.getCheckpointID()));
        createCompletedCheckpoints.suspend();
        Assert.assertEquals(0L, createCompletedCheckpoints.getNumberOfRetainedCheckpoints());
        Assert.assertNotNull(client.checkExists().forPath("/checkpoints/" + createCheckpoint.getCheckpointID()));
        createCompletedCheckpoints.recover();
        Assert.assertEquals(createCheckpoint, createCompletedCheckpoints.getLatestCheckpoint());
    }
}
