package org.apache.flink.test.checkpointing;

import java.io.File;
import java.util.concurrent.CountDownLatch;
import org.apache.curator.test.TestingServer;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.test.state.ManualWindowSpeedITCase;
import org.apache.flink.util.TestLogger;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/test/checkpointing/ExternalizedCheckpointITCase.class */
public class ExternalizedCheckpointITCase extends TestLogger {
    private static final int PARALLELISM = 2;
    private static final int NUM_TASK_MANAGERS = 2;
    private static final int SLOTS_PER_TASK_MANAGER = 2;

    @ClassRule
    public static TemporaryFolder temporaryFolder = new TemporaryFolder();

    /* loaded from: input_file:org/apache/flink/test/checkpointing/ExternalizedCheckpointITCase$NotifyingInfiniteTupleSource.class */
    public static class NotifyingInfiniteTupleSource extends ManualWindowSpeedITCase.InfiniteTupleSource {
        private static final long serialVersionUID = 8120981235081181746L;
        private static CountDownLatch countDownLatch;

        public NotifyingInfiniteTupleSource(int i) {
            super(i);
        }

        @Override // org.apache.flink.test.state.ManualWindowSpeedITCase.InfiniteTupleSource
        public void run(SourceFunction.SourceContext<Tuple2<String, Integer>> sourceContext) throws Exception {
            if (countDownLatch != null) {
                countDownLatch.countDown();
            }
            super.run(sourceContext);
        }
    }

    @Test
    public void testExternalizedIncrementalRocksDBCheckpointsStandalone() throws Exception {
        File newFolder = temporaryFolder.newFolder();
        testExternalizedCheckpoints(newFolder, null, new RocksDBStateBackend(newFolder.toURI().toString(), true));
    }

    @Test
    public void testExternalizedFullRocksDBCheckpointsStandalone() throws Exception {
        File newFolder = temporaryFolder.newFolder();
        testExternalizedCheckpoints(newFolder, null, new RocksDBStateBackend(newFolder.toURI().toString(), false));
    }

    @Test
    public void testExternalizedFSCheckpointsStandalone() throws Exception {
        File newFolder = temporaryFolder.newFolder();
        testExternalizedCheckpoints(newFolder, null, new FsStateBackend(newFolder.toURI().toString(), true));
    }

    @Test
    public void testExternalizedIncrementalRocksDBCheckpointsZookeeper() throws Exception {
        TestingServer testingServer = new TestingServer();
        testingServer.start();
        try {
            File newFolder = temporaryFolder.newFolder();
            testExternalizedCheckpoints(newFolder, testingServer.getConnectString(), new RocksDBStateBackend(newFolder.toURI().toString(), true));
        } finally {
            testingServer.stop();
        }
    }

    @Test
    public void testExternalizedFullRocksDBCheckpointsZookeeper() throws Exception {
        TestingServer testingServer = new TestingServer();
        testingServer.start();
        try {
            File newFolder = temporaryFolder.newFolder();
            testExternalizedCheckpoints(newFolder, testingServer.getConnectString(), new RocksDBStateBackend(newFolder.toURI().toString(), false));
        } finally {
            testingServer.stop();
        }
    }

    @Test
    public void testExternalizedFSCheckpointsZookeeper() throws Exception {
        TestingServer testingServer = new TestingServer();
        testingServer.start();
        try {
            File newFolder = temporaryFolder.newFolder();
            testExternalizedCheckpoints(newFolder, testingServer.getConnectString(), new FsStateBackend(newFolder.toURI().toString(), true));
        } finally {
            testingServer.stop();
        }
    }

    private void testExternalizedCheckpoints(File file, String str, AbstractStateBackend abstractStateBackend) throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInteger("local.number-taskmanager", 2);
        configuration.setInteger("taskmanager.numberOfTaskSlots", 2);
        File newFolder = temporaryFolder.newFolder();
        configuration.setString("state.backend.fs.checkpointdir", file.toURI().toString());
        configuration.setString(CoreOptions.SAVEPOINT_DIRECTORY, newFolder.toURI().toString());
        configuration.setString(CoreOptions.CHECKPOINTS_DIRECTORY, file.toURI().toString());
        if (str != null) {
            File newFolder2 = temporaryFolder.newFolder();
            configuration.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
            configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, str);
            configuration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, newFolder2.toURI().toString());
        }
        TestingCluster testingCluster = new TestingCluster(configuration);
        testingCluster.start();
        String str2 = null;
        for (int i = 0; i < 3; i++) {
            try {
                StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
                executionEnvironment.setStateBackend(abstractStateBackend);
                executionEnvironment.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
                executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
                executionEnvironment.setParallelism(2);
                CountDownLatch unused = NotifyingInfiniteTupleSource.countDownLatch = new CountDownLatch(2);
                executionEnvironment.addSource(new NotifyingInfiniteTupleSource(10000)).keyBy(new int[]{0}).timeWindow(Time.seconds(3L)).reduce(new ReduceFunction<Tuple2<String, Integer>>() { // from class: org.apache.flink.test.checkpointing.ExternalizedCheckpointITCase.2
                    private static final long serialVersionUID = 1;

                    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> tuple2, Tuple2<String, Integer> tuple22) throws Exception {
                        return Tuple2.of(tuple2.f0, Integer.valueOf(((Integer) tuple2.f1).intValue() + ((Integer) tuple22.f1).intValue()));
                    }
                }).filter(new FilterFunction<Tuple2<String, Integer>>() { // from class: org.apache.flink.test.checkpointing.ExternalizedCheckpointITCase.1
                    private static final long serialVersionUID = 1;

                    public boolean filter(Tuple2<String, Integer> tuple2) throws Exception {
                        return ((String) tuple2.f0).startsWith("Tuple 0");
                    }
                });
                StreamGraph streamGraph = executionEnvironment.getStreamGraph();
                streamGraph.setJobName("Test");
                JobGraph jobGraph = streamGraph.getJobGraph();
                if (str2 != null) {
                    jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(str2));
                }
                configuration.addAll(jobGraph.getJobConfiguration());
                JobSubmissionResult submitJobDetached = testingCluster.submitJobDetached(jobGraph);
                NotifyingInfiniteTupleSource.countDownLatch.await();
                str2 = testingCluster.requestCheckpoint(submitJobDetached.getJobID(), CheckpointOptions.forCheckpoint());
                testingCluster.cancelJob(submitJobDetached.getJobID());
            } finally {
                testingCluster.stop();
                testingCluster.awaitTermination();
            }
        }
    }
}
