package org.apache.flink.runtime.checkpoint;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.IntSupplier;
import java.util.function.LongConsumer;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStoreTest;
import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.state.RetrievableStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.SharedStateRegistryImpl;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.data.Stat;
import org.apache.flink.util.clock.ManualClock;
import org.apache.flink.util.concurrent.Executors;
import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.class */
public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpointStoreTest {
    private static final String CHECKPOINT_PATH = "/checkpoints";
    private static final ZooKeeperTestEnvironment ZOOKEEPER = new ZooKeeperTestEnvironment(1);
    private static final ZooKeeperCheckpointStoreUtil checkpointStoreUtil = ZooKeeperCheckpointStoreUtil.INSTANCE;

    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase$HeapRetrievableStateHandle.class */
    static class HeapRetrievableStateHandle<T extends Serializable> implements RetrievableStateHandle<T> {
        private static final long serialVersionUID = -268548467968932L;
        private static AtomicInteger nextKey = new AtomicInteger(0);
        private static HashMap<Integer, Object> stateMap = new HashMap<>();
        private final int key = nextKey.getAndIncrement();

        public HeapRetrievableStateHandle(T t) {
            stateMap.put(Integer.valueOf(this.key), t);
        }

        public T retrieveState() {
            return (T) stateMap.get(Integer.valueOf(this.key));
        }

        public void discardState() throws Exception {
            stateMap.remove(Integer.valueOf(this.key));
        }

        public long getStateSize() {
            return 0L;
        }
    }

    @AfterClass
    public static void tearDown() throws Exception {
        ZOOKEEPER.shutdown();
    }

    @Before
    public void cleanUp() throws Exception {
        ZOOKEEPER.deleteAll();
    }

    @Override // org.apache.flink.runtime.checkpoint.CompletedCheckpointStoreTest
    protected CompletedCheckpointStore createRecoveredCompletedCheckpointStore(int i, Executor executor) throws Exception {
        ZooKeeperStateHandleStore createZooKeeperStateHandleStore = ZooKeeperUtils.createZooKeeperStateHandleStore(ZOOKEEPER.getClient(), CHECKPOINT_PATH, new TestingRetrievableStateStorageHelper());
        return new DefaultCompletedCheckpointStore(i, createZooKeeperStateHandleStore, checkpointStoreUtil, DefaultCompletedCheckpointStoreUtils.retrieveCompletedCheckpoints(createZooKeeperStateHandleStore, checkpointStoreUtil), SharedStateRegistry.DEFAULT_FACTORY.create(Executors.directExecutor(), Collections.emptyList(), RestoreMode.DEFAULT), executor);
    }

    @Test
    public void testRecover() throws Exception {
        SharedStateRegistryImpl sharedStateRegistryImpl = new SharedStateRegistryImpl();
        CompletedCheckpointStore createRecoveredCompletedCheckpointStore = createRecoveredCompletedCheckpointStore(3);
        CompletedCheckpointStoreTest.TestCompletedCheckpoint[] testCompletedCheckpointArr = {createCheckpoint(0L, sharedStateRegistryImpl), createCheckpoint(1L, sharedStateRegistryImpl), createCheckpoint(2L, sharedStateRegistryImpl)};
        createRecoveredCompletedCheckpointStore.addCheckpointAndSubsumeOldestOne(testCompletedCheckpointArr[0], new CheckpointsCleaner(), () -> {
        });
        createRecoveredCompletedCheckpointStore.addCheckpointAndSubsumeOldestOne(testCompletedCheckpointArr[1], new CheckpointsCleaner(), () -> {
        });
        createRecoveredCompletedCheckpointStore.addCheckpointAndSubsumeOldestOne(testCompletedCheckpointArr[2], new CheckpointsCleaner(), () -> {
        });
        verifyCheckpointRegistered(testCompletedCheckpointArr[0].getOperatorStates().values(), sharedStateRegistryImpl);
        verifyCheckpointRegistered(testCompletedCheckpointArr[1].getOperatorStates().values(), sharedStateRegistryImpl);
        verifyCheckpointRegistered(testCompletedCheckpointArr[2].getOperatorStates().values(), sharedStateRegistryImpl);
        Assert.assertEquals(3L, ((List) ZOOKEEPER.getClient().getChildren().forPath(CHECKPOINT_PATH)).size());
        Assert.assertEquals(3L, createRecoveredCompletedCheckpointStore.getNumberOfRetainedCheckpoints());
        sharedStateRegistryImpl.close();
        SharedStateRegistryImpl sharedStateRegistryImpl2 = new SharedStateRegistryImpl();
        Assert.assertEquals(3L, ((List) ZOOKEEPER.getClient().getChildren().forPath(CHECKPOINT_PATH)).size());
        Assert.assertEquals(3L, createRecoveredCompletedCheckpointStore.getNumberOfRetainedCheckpoints());
        Assert.assertEquals(testCompletedCheckpointArr[2], createRecoveredCompletedCheckpointStore.getLatestCheckpoint());
        ArrayList arrayList = new ArrayList(3);
        arrayList.add(testCompletedCheckpointArr[1]);
        arrayList.add(testCompletedCheckpointArr[2]);
        arrayList.add(createCheckpoint(3L, sharedStateRegistryImpl2));
        createRecoveredCompletedCheckpointStore.addCheckpointAndSubsumeOldestOne((CompletedCheckpoint) arrayList.get(2), new CheckpointsCleaner(), () -> {
        });
        List allCheckpoints = createRecoveredCompletedCheckpointStore.getAllCheckpoints();
        Assert.assertEquals(arrayList, allCheckpoints);
        Iterator it = allCheckpoints.iterator();
        while (it.hasNext()) {
            verifyCheckpointRegistered(((CompletedCheckpoint) it.next()).getOperatorStates().values(), sharedStateRegistryImpl2);
        }
    }

    @Test
    public void testShutdownDiscardsCheckpoints() throws Exception {
        CuratorFramework client = ZOOKEEPER.getClient();
        SharedStateRegistryImpl sharedStateRegistryImpl = new SharedStateRegistryImpl();
        CompletedCheckpointStore createRecoveredCompletedCheckpointStore = createRecoveredCompletedCheckpointStore(1);
        CompletedCheckpointStoreTest.TestCompletedCheckpoint createCheckpoint = createCheckpoint(0L, sharedStateRegistryImpl);
        createRecoveredCompletedCheckpointStore.addCheckpointAndSubsumeOldestOne(createCheckpoint, new CheckpointsCleaner(), () -> {
        });
        Assert.assertEquals(1L, createRecoveredCompletedCheckpointStore.getNumberOfRetainedCheckpoints());
        Assert.assertNotNull(client.checkExists().forPath(CHECKPOINT_PATH + checkpointStoreUtil.checkpointIDToName(createCheckpoint.getCheckpointID())));
        createRecoveredCompletedCheckpointStore.shutdown(JobStatus.FINISHED, new CheckpointsCleaner());
        Assert.assertEquals(0L, createRecoveredCompletedCheckpointStore.getNumberOfRetainedCheckpoints());
        Assert.assertNull(client.checkExists().forPath(CHECKPOINT_PATH + checkpointStoreUtil.checkpointIDToName(createCheckpoint.getCheckpointID())));
        sharedStateRegistryImpl.close();
        Assert.assertEquals(0L, createRecoveredCompletedCheckpointStore(1).getNumberOfRetainedCheckpoints());
    }

    @Test
    public void testSuspendKeepsCheckpoints() throws Exception {
        CuratorFramework client = ZOOKEEPER.getClient();
        SharedStateRegistryImpl sharedStateRegistryImpl = new SharedStateRegistryImpl();
        CompletedCheckpointStore createRecoveredCompletedCheckpointStore = createRecoveredCompletedCheckpointStore(1);
        CompletedCheckpointStoreTest.TestCompletedCheckpoint createCheckpoint = createCheckpoint(0L, sharedStateRegistryImpl);
        createRecoveredCompletedCheckpointStore.addCheckpointAndSubsumeOldestOne(createCheckpoint, new CheckpointsCleaner(), () -> {
        });
        Assert.assertEquals(1L, createRecoveredCompletedCheckpointStore.getNumberOfRetainedCheckpoints());
        Assert.assertNotNull(client.checkExists().forPath(CHECKPOINT_PATH + checkpointStoreUtil.checkpointIDToName(createCheckpoint.getCheckpointID())));
        createRecoveredCompletedCheckpointStore.shutdown(JobStatus.SUSPENDED, new CheckpointsCleaner());
        Assert.assertEquals(0L, createRecoveredCompletedCheckpointStore.getNumberOfRetainedCheckpoints());
        List list = (List) client.getChildren().forPath(CHECKPOINT_PATH + checkpointStoreUtil.checkpointIDToName(createCheckpoint.getCheckpointID()));
        Assert.assertEquals("The checkpoint node should not be marked for deletion.", 1L, list.size());
        Assert.assertEquals("There shouldn't be any lock node available for the checkpoint", 0L, ((Stat) client.checkExists().forPath(ZooKeeperUtils.generateZookeeperPath(new String[]{r0, (String) Iterables.getOnlyElement(list)}))).getNumChildren());
        sharedStateRegistryImpl.close();
        Assert.assertEquals(createCheckpoint, createRecoveredCompletedCheckpointStore(1).getLatestCheckpoint());
    }

    @Test
    public void testLatestCheckpointRecovery() throws Exception {
        SharedStateRegistryImpl sharedStateRegistryImpl = new SharedStateRegistryImpl();
        CompletedCheckpointStore createRecoveredCompletedCheckpointStore = createRecoveredCompletedCheckpointStore(3);
        ArrayList arrayList = new ArrayList(3);
        arrayList.add(createCheckpoint(9L, sharedStateRegistryImpl));
        arrayList.add(createCheckpoint(10L, sharedStateRegistryImpl));
        arrayList.add(createCheckpoint(11L, sharedStateRegistryImpl));
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            createRecoveredCompletedCheckpointStore.addCheckpointAndSubsumeOldestOne((CompletedCheckpoint) it.next(), new CheckpointsCleaner(), () -> {
            });
        }
        sharedStateRegistryImpl.close();
        Assert.assertEquals(arrayList.get(arrayList.size() - 1), createRecoveredCompletedCheckpointStore(3).getLatestCheckpoint());
    }

    @Test
    public void testConcurrentCheckpointOperations() throws Exception {
        CompletedCheckpointStore createRecoveredCompletedCheckpointStore = createRecoveredCompletedCheckpointStore(1);
        SharedStateRegistryImpl sharedStateRegistryImpl = new SharedStateRegistryImpl();
        createRecoveredCompletedCheckpointStore.addCheckpointAndSubsumeOldestOne(createCheckpoint(1L, sharedStateRegistryImpl), new CheckpointsCleaner(), () -> {
        });
        sharedStateRegistryImpl.close();
        SharedStateRegistryImpl sharedStateRegistryImpl2 = new SharedStateRegistryImpl();
        CompletedCheckpointStore createRecoveredCompletedCheckpointStore2 = createRecoveredCompletedCheckpointStore(1);
        CompletedCheckpoint latestCheckpoint = createRecoveredCompletedCheckpointStore2.getLatestCheckpoint();
        Assert.assertTrue(latestCheckpoint instanceof CompletedCheckpointStoreTest.TestCompletedCheckpoint);
        CompletedCheckpointStoreTest.TestCompletedCheckpoint testCompletedCheckpoint = (CompletedCheckpointStoreTest.TestCompletedCheckpoint) latestCheckpoint;
        Assert.assertFalse(testCompletedCheckpoint.isDiscarded());
        CompletedCheckpointStoreTest.TestCompletedCheckpoint createCheckpoint = createCheckpoint(2L, sharedStateRegistryImpl2);
        createRecoveredCompletedCheckpointStore.addCheckpointAndSubsumeOldestOne(createCheckpoint, new CheckpointsCleaner(), () -> {
        });
        Assert.assertEquals(Collections.singletonList(createCheckpoint), createRecoveredCompletedCheckpointStore.getAllCheckpoints());
        Assert.assertFalse("The checkpoint should not have been discarded.", testCompletedCheckpoint.awaitDiscard(50L));
        Assert.assertFalse(testCompletedCheckpoint.isDiscarded());
        createRecoveredCompletedCheckpointStore2.addCheckpointAndSubsumeOldestOne(createCheckpoint(3L, sharedStateRegistryImpl2), new CheckpointsCleaner(), () -> {
        });
        testCompletedCheckpoint.awaitDiscard();
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v6, types: [org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor, java.util.concurrent.Executor] */
    @Test
    public void testChekpointingPausesAndResumeWhenTooManyCheckpoints() throws Exception {
        ManualClock manualClock = new ManualClock();
        manualClock.advanceTime(1L, TimeUnit.DAYS);
        CheckpointsCleaner checkpointsCleaner = new CheckpointsCleaner();
        LongConsumer longConsumer = j -> {
        };
        AtomicInteger atomicInteger = new AtomicInteger(0);
        atomicInteger.getClass();
        IntSupplier intSupplier = atomicInteger::get;
        checkpointsCleaner.getClass();
        CheckpointRequestDecider checkpointRequestDecider = new CheckpointRequestDecider(1, longConsumer, manualClock, 1L, intSupplier, checkpointsCleaner::getNumberOfCheckpointsToClean);
        ?? manuallyTriggeredScheduledExecutor = new ManuallyTriggeredScheduledExecutor();
        CompletedCheckpointStore createRecoveredCompletedCheckpointStore = createRecoveredCompletedCheckpointStore(1, manuallyTriggeredScheduledExecutor);
        for (int i = 1; i <= 3; i++) {
            createRecoveredCompletedCheckpointStore.addCheckpointAndSubsumeOldestOne(new CompletedCheckpointStoreTest.TestCompletedCheckpoint(new JobID(), i, i, Collections.emptyMap(), CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_FAILURE)), checkpointsCleaner, () -> {
            });
        }
        int i2 = 3 - 1;
        CommonTestUtils.waitUntilCondition(() -> {
            return Boolean.valueOf(checkpointsCleaner.getNumberOfCheckpointsToClean() == i2);
        });
        Assert.assertEquals(i2, checkpointsCleaner.getNumberOfCheckpointsToClean());
        Assert.assertFalse(checkpointRequestDecider.chooseRequestToExecute(CheckpointRequestDeciderTest.regularCheckpoint(), false, 0L).isPresent());
        manuallyTriggeredScheduledExecutor.triggerAll();
        CommonTestUtils.waitUntilCondition(() -> {
            return Boolean.valueOf(checkpointsCleaner.getNumberOfCheckpointsToClean() < i2);
        });
        Assert.assertTrue(checkpointsCleaner.getNumberOfCheckpointsToClean() < i2);
        Assert.assertTrue(checkpointRequestDecider.chooseRequestToExecute(CheckpointRequestDeciderTest.regularCheckpoint(), false, 0L).isPresent());
        createRecoveredCompletedCheckpointStore.shutdown(JobStatus.FINISHED, checkpointsCleaner);
    }
}
