package org.apache.flink.runtime.checkpoint;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStoreTest;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
import org.apache.flink.runtime.state.RetrievableStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.TriConsumer;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.class */
public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger {

    @ClassRule
    public static ZooKeeperResource zooKeeperResource = new ZooKeeperResource();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest$CheckpointStateHandle.class */
    public static class CheckpointStateHandle implements RetrievableStateHandle<CompletedCheckpoint> {
        private static final long serialVersionUID = 1;
        private final Function<Long, CompletedCheckpoint> checkpointSupplier;
        private final long id;

        CheckpointStateHandle(Function<Long, CompletedCheckpoint> function, long j) {
            this.checkpointSupplier = function;
            this.id = j;
        }

        /* renamed from: retrieveState, reason: merged with bridge method [inline-methods] */
        public CompletedCheckpoint m57retrieveState() {
            return this.checkpointSupplier.apply(Long.valueOf(this.id));
        }

        public void discardState() {
        }

        public long getStateSize() {
            return 0L;
        }
    }

    @Test
    public void testPathConversion() {
        Assert.assertEquals(42L, ZooKeeperCompletedCheckpointStore.pathToCheckpointId(ZooKeeperCompletedCheckpointStore.checkpointIdToPath(42L)));
    }

    @Test(expected = ExpectedTestException.class)
    public void testRecoverFailsIfDownloadFails() throws Exception {
        testDownloadInternal((zooKeeperCompletedCheckpointStore, list, sharedStateRegistry) -> {
            try {
                list.add(createHandle(1L, l -> {
                    throw new ExpectedTestException();
                }));
                zooKeeperCompletedCheckpointStore.recover();
            } catch (Exception e) {
                ExceptionUtils.findThrowable(e, ExpectedTestException.class).ifPresent((v0) -> {
                    ExceptionUtils.rethrow(v0);
                });
                ExceptionUtils.rethrow(e);
            }
        });
    }

    @Test
    public void testNoDownloadIfCheckpointsNotChanged() throws Exception {
        testDownloadInternal((zooKeeperCompletedCheckpointStore, list, sharedStateRegistry) -> {
            try {
                list.add(createHandle(1L, l -> {
                    throw new AssertionError("retrieveState was attempted for checkpoint " + l);
                }));
                zooKeeperCompletedCheckpointStore.addCheckpoint(CompletedCheckpointStoreTest.createCheckpoint(1L, sharedStateRegistry));
                zooKeeperCompletedCheckpointStore.recover();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }

    @Test
    public void testDownloadIfCheckpointsChanged() throws Exception {
        testDownloadInternal((zooKeeperCompletedCheckpointStore, list, sharedStateRegistry) -> {
            try {
                IntStream.range(0, 10 + 1).forEach(i -> {
                    list.add(createHandle(i, l -> {
                        return CompletedCheckpointStoreTest.createCheckpoint(l.longValue(), sharedStateRegistry);
                    }));
                });
                zooKeeperCompletedCheckpointStore.addCheckpoint(CompletedCheckpointStoreTest.createCheckpoint(1L, sharedStateRegistry));
                zooKeeperCompletedCheckpointStore.addCheckpoint(CompletedCheckpointStoreTest.createCheckpoint(5L, sharedStateRegistry));
                zooKeeperCompletedCheckpointStore.recover();
                Assert.assertEquals(10, zooKeeperCompletedCheckpointStore.getLatestCheckpoint(false).getCheckpointID());
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }

    private void testDownloadInternal(TriConsumer<ZooKeeperCompletedCheckpointStore, List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>>, SharedStateRegistry> triConsumer) throws Exception {
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        Configuration configuration = new Configuration();
        configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperResource.getConnectString());
        final ArrayList arrayList = new ArrayList();
        ZooKeeperCompletedCheckpointStore zooKeeperCompletedCheckpointStore = new ZooKeeperCompletedCheckpointStore(10, new ZooKeeperStateHandleStore<CompletedCheckpoint>(ZooKeeperUtils.startCuratorFramework(configuration), new TestingRetrievableStateStorageHelper()) { // from class: org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStoreTest.1
            public List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> getAllAndLock() {
                return arrayList;
            }
        }, Executors.directExecutor());
        try {
            triConsumer.accept(zooKeeperCompletedCheckpointStore, arrayList, sharedStateRegistry);
            zooKeeperCompletedCheckpointStore.shutdown(JobStatus.FINISHED);
            sharedStateRegistry.close();
        } catch (Throwable th) {
            zooKeeperCompletedCheckpointStore.shutdown(JobStatus.FINISHED);
            sharedStateRegistry.close();
            throw th;
        }
    }

    private Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> createHandle(long j, Function<Long, CompletedCheckpoint> function) {
        return Tuple2.of(new CheckpointStateHandle(function, j), ZooKeeperCompletedCheckpointStore.checkpointIdToPath(j));
    }

    @Test
    public void testDiscardingSubsumedCheckpoints() throws Exception {
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        Configuration configuration = new Configuration();
        configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperResource.getConnectString());
        CuratorFramework startCuratorFramework = ZooKeeperUtils.startCuratorFramework(configuration);
        ZooKeeperCompletedCheckpointStore createZooKeeperCheckpointStore = createZooKeeperCheckpointStore(startCuratorFramework);
        try {
            CompletedCheckpointStoreTest.TestCompletedCheckpoint createCheckpoint = CompletedCheckpointStoreTest.createCheckpoint(0L, sharedStateRegistry);
            createZooKeeperCheckpointStore.addCheckpoint(createCheckpoint);
            Assert.assertThat(createZooKeeperCheckpointStore.getAllCheckpoints(), Matchers.contains(new CompletedCheckpoint[]{createCheckpoint}));
            CompletedCheckpointStoreTest.TestCompletedCheckpoint createCheckpoint2 = CompletedCheckpointStoreTest.createCheckpoint(1L, sharedStateRegistry);
            createZooKeeperCheckpointStore.addCheckpoint(createCheckpoint2);
            List allCheckpoints = createZooKeeperCheckpointStore.getAllCheckpoints();
            Assert.assertThat(allCheckpoints, Matchers.contains(new CompletedCheckpoint[]{createCheckpoint2}));
            Assert.assertThat(allCheckpoints, Matchers.not(Matchers.contains(new CompletedCheckpoint[]{createCheckpoint})));
            CompletedCheckpointStoreTest.verifyCheckpointDiscarded(createCheckpoint);
            startCuratorFramework.close();
        } catch (Throwable th) {
            startCuratorFramework.close();
            throw th;
        }
    }

    @Test
    public void testDiscardingCheckpointsAtShutDown() throws Exception {
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        Configuration configuration = new Configuration();
        configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperResource.getConnectString());
        CuratorFramework startCuratorFramework = ZooKeeperUtils.startCuratorFramework(configuration);
        ZooKeeperCompletedCheckpointStore createZooKeeperCheckpointStore = createZooKeeperCheckpointStore(startCuratorFramework);
        try {
            CompletedCheckpointStoreTest.TestCompletedCheckpoint createCheckpoint = CompletedCheckpointStoreTest.createCheckpoint(0L, sharedStateRegistry);
            createZooKeeperCheckpointStore.addCheckpoint(createCheckpoint);
            Assert.assertThat(createZooKeeperCheckpointStore.getAllCheckpoints(), Matchers.contains(new CompletedCheckpoint[]{createCheckpoint}));
            createZooKeeperCheckpointStore.shutdown(JobStatus.FINISHED);
            CompletedCheckpointStoreTest.verifyCheckpointDiscarded(createCheckpoint);
            startCuratorFramework.close();
        } catch (Throwable th) {
            startCuratorFramework.close();
            throw th;
        }
    }

    @Nonnull
    private ZooKeeperCompletedCheckpointStore createZooKeeperCheckpointStore(CuratorFramework curatorFramework) throws Exception {
        return new ZooKeeperCompletedCheckpointStore(1, ZooKeeperUtils.createZooKeeperStateHandleStore(curatorFramework, "/checkpoints", new TestingRetrievableStateStorageHelper()), Executors.directExecutor());
    }
}
