package org.apache.flink.test.checkpointing;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
import java.io.File;
import java.io.FileNotFoundException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
import org.apache.flink.shaded.com.google.common.collect.HashMultimap;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.IterativeStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.concurrent.Await;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/test/checkpointing/SavepointITCase.class */
public class SavepointITCase extends TestLogger {

    @Rule
    public TemporaryFolder folder = new TemporaryFolder();
    private static final Logger LOG = LoggerFactory.getLogger(SavepointITCase.class);
    private static final int ITER_TEST_PARALLELISM = 1;
    private static OneShotLatch[] ITER_TEST_SNAPSHOT_WAIT = new OneShotLatch[ITER_TEST_PARALLELISM];
    private static OneShotLatch[] ITER_TEST_RESTORE_WAIT = new OneShotLatch[ITER_TEST_PARALLELISM];
    private static int[] ITER_TEST_CHECKPOINT_VERIFY = new int[ITER_TEST_PARALLELISM];

    /* loaded from: input_file:org/apache/flink/test/checkpointing/SavepointITCase$DuplicateFilter.class */
    public static class DuplicateFilter extends RichFlatMapFunction<Integer, Integer> {
        static final ValueStateDescriptor<Boolean> descriptor = new ValueStateDescriptor<>("seen", Boolean.class, false);
        private static final long serialVersionUID = 1;
        private ValueState<Boolean> operatorState;

        public void open(Configuration configuration) {
            this.operatorState = getRuntimeContext().getState(descriptor);
        }

        public void flatMap(Integer num, Collector<Integer> collector) throws Exception {
            if (!((Boolean) this.operatorState.value()).booleanValue()) {
                collector.collect(num);
                this.operatorState.update(true);
            }
            if (30 == num.intValue()) {
                SavepointITCase.ITER_TEST_SNAPSHOT_WAIT[getRuntimeContext().getIndexOfThisSubtask()].trigger();
            }
        }

        public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
            flatMap((Integer) obj, (Collector<Integer>) collector);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/checkpointing/SavepointITCase$InfiniteTestSource.class */
    public static class InfiniteTestSource implements SourceFunction<Integer> {
        private static final long serialVersionUID = 1;
        private volatile boolean running;

        private InfiniteTestSource() {
            this.running = true;
        }

        public void run(SourceFunction.SourceContext<Integer> sourceContext) throws Exception {
            while (this.running) {
                synchronized (sourceContext.getCheckpointLock()) {
                    sourceContext.collect(Integer.valueOf(SavepointITCase.ITER_TEST_PARALLELISM));
                }
                Thread.sleep(1L);
            }
        }

        public void cancel() {
            this.running = false;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/SavepointITCase$IntegerStreamSource.class */
    private static final class IntegerStreamSource extends RichSourceFunction<Integer> implements ListCheckpointed<Integer> {
        private static final long serialVersionUID = 1;
        private volatile boolean running = true;
        private volatile boolean isRestored = false;
        private int emittedCount = 0;

        public void run(SourceFunction.SourceContext<Integer> sourceContext) throws Exception {
            while (this.running) {
                synchronized (sourceContext.getCheckpointLock()) {
                    sourceContext.collect(Integer.valueOf(this.emittedCount));
                }
                if (this.emittedCount < 100) {
                    this.emittedCount += SavepointITCase.ITER_TEST_PARALLELISM;
                } else {
                    this.emittedCount = 0;
                }
                Thread.sleep(1L);
            }
        }

        public void cancel() {
            this.running = false;
        }

        public List<Integer> snapshotState(long j, long j2) throws Exception {
            SavepointITCase.ITER_TEST_CHECKPOINT_VERIFY[getRuntimeContext().getIndexOfThisSubtask()] = this.emittedCount;
            return Collections.singletonList(Integer.valueOf(this.emittedCount));
        }

        public void restoreState(List<Integer> list) throws Exception {
            if (!list.isEmpty()) {
                this.emittedCount = list.get(0).intValue();
            }
            Assert.assertEquals(SavepointITCase.ITER_TEST_CHECKPOINT_VERIFY[getRuntimeContext().getIndexOfThisSubtask()], this.emittedCount);
            SavepointITCase.ITER_TEST_RESTORE_WAIT[getRuntimeContext().getIndexOfThisSubtask()].trigger();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/checkpointing/SavepointITCase$StatefulCounter.class */
    public static class StatefulCounter extends RichMapFunction<Integer, Integer> implements ListCheckpointed<byte[]> {
        private static volatile CountDownLatch progressLatch = new CountDownLatch(0);
        private static volatile CountDownLatch restoreLatch = new CountDownLatch(0);
        private int numCollectedElements;
        private static final long serialVersionUID = 7317800376639115920L;
        private byte[] data;

        private StatefulCounter() {
            this.numCollectedElements = 0;
        }

        public void open(Configuration configuration) throws Exception {
            if (this.data == null) {
                Random random = new Random(getRuntimeContext().getIndexOfThisSubtask());
                this.data = new byte[1025];
                random.nextBytes(this.data);
            }
        }

        public Integer map(Integer num) throws Exception {
            for (int i = 0; i < this.data.length; i += SavepointITCase.ITER_TEST_PARALLELISM) {
                byte[] bArr = this.data;
                int i2 = i;
                bArr[i2] = (byte) (bArr[i2] + SavepointITCase.ITER_TEST_PARALLELISM);
            }
            int i3 = this.numCollectedElements;
            this.numCollectedElements = i3 + SavepointITCase.ITER_TEST_PARALLELISM;
            if (i3 > 10) {
                progressLatch.countDown();
            }
            return num;
        }

        public List<byte[]> snapshotState(long j, long j2) throws Exception {
            return Collections.singletonList(this.data);
        }

        public void restoreState(List<byte[]> list) throws Exception {
            if (list.isEmpty() || list.size() > SavepointITCase.ITER_TEST_PARALLELISM) {
                throw new RuntimeException("Test failed due to unexpected recovered state size " + list.size());
            }
            this.data = list.get(0);
            restoreLatch.countDown();
        }

        static CountDownLatch getProgressLatch() {
            return progressLatch;
        }

        static CountDownLatch getRestoreLatch() {
            return restoreLatch;
        }

        static void resetForTest(int i) {
            progressLatch = new CountDownLatch(i);
            restoreLatch = new CountDownLatch(i);
        }
    }

    @Test
    public void testTriggerSavepointAndResumeWithFileBasedCheckpoints() throws Exception {
        final Deadline fromNow = new FiniteDuration(5L, TimeUnit.MINUTES).fromNow();
        File root = this.folder.getRoot();
        TestingCluster testingCluster = null;
        try {
            ActorSystem createDefaultActorSystem = AkkaUtils.createDefaultActorSystem();
            Configuration configuration = new Configuration();
            configuration.setInteger("local.number-taskmanager", 2);
            configuration.setInteger("taskmanager.numberOfTaskSlots", 2);
            File file = new File(root, "checkpoints");
            File file2 = new File(root, "savepoints");
            if (!file.mkdir() || !file2.mkdirs()) {
                Assert.fail("Test setup failed: failed to create temporary directories.");
            }
            configuration.setString(CoreOptions.STATE_BACKEND, "filesystem");
            configuration.setString("state.backend.fs.checkpointdir", file.toURI().toString());
            configuration.setString("state.backend.fs.memory-threshold", "0");
            configuration.setString("state.savepoints.dir", file2.toURI().toString());
            final TestingCluster testingCluster2 = new TestingCluster(configuration);
            testingCluster2.start(true);
            final JobGraph createJobGraph = createJobGraph(4, 0, 1000L);
            final JobID jobID = createJobGraph.getJobID();
            StatefulCounter.resetForTest(4);
            ActorGateway actorGateway = (ActorGateway) Await.result(testingCluster2.leaderGateway().future(), fromNow.timeLeft());
            LOG.info("Submitting job " + createJobGraph.getJobID() + " in detached mode.");
            testingCluster2.submitJobDetached(createJobGraph);
            LOG.info("Waiting for some progress.");
            Await.ready(actorGateway.ask(new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobID), fromNow.timeLeft()), fromNow.timeLeft());
            StatefulCounter.getProgressLatch().await(fromNow.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
            LOG.info("Triggering a savepoint.");
            final String savepointPath = ((JobManagerMessages.TriggerSavepointSuccess) Await.result(actorGateway.ask(new JobManagerMessages.TriggerSavepoint(jobID, Option.empty()), fromNow.timeLeft()), fromNow.timeLeft())).savepointPath();
            LOG.info("Retrieved savepoint path: " + savepointPath + ".");
            LOG.info("Requesting the savepoint.");
            SavepointV2 savepoint = ((TestingJobManagerMessages.ResponseSavepoint) Await.result(actorGateway.ask(new TestingJobManagerMessages.RequestSavepoint(savepointPath), fromNow.timeLeft()), fromNow.timeLeft())).savepoint();
            LOG.info("Retrieved savepoint: " + savepointPath + ".");
            LOG.info("Shutting down Flink cluster.");
            testingCluster2.shutdown();
            testingCluster2.awaitTermination();
            File[] listFiles = file2.listFiles();
            if (listFiles != null) {
                Assert.assertEquals("Savepoint not created in expected directory", 1L, listFiles.length);
                Assert.assertTrue("Savepoint did not create self-contained directory", listFiles[0].isDirectory());
                File[] listFiles2 = listFiles[0].listFiles();
                Assert.assertNotNull(listFiles2);
                Assert.assertEquals("Did not write expected number of savepoint/checkpoint files to directory: " + Arrays.toString(listFiles2), 5L, listFiles2.length);
            } else {
                Assert.fail("Savepoint not created in expected directory");
            }
            File file3 = new File(file, jobID.toString());
            if (file3.exists()) {
                File[] listFiles3 = file3.listFiles();
                Assert.assertNotNull("Checkpoint directory empty", listFiles3);
                Assert.assertEquals("Checkpoints directory not clean: " + Arrays.toString(listFiles3), 0L, listFiles3.length);
            }
            LOG.info("Restarting Flink cluster.");
            testingCluster2.start();
            LOG.info("Retrieving JobManager.");
            ActorGateway actorGateway2 = (ActorGateway) Await.result(testingCluster2.leaderGateway().future(), fromNow.timeLeft());
            LOG.info("JobManager: " + actorGateway2 + ".");
            StatefulCounter.resetForTest(4);
            final Throwable[] thArr = new Throwable[ITER_TEST_PARALLELISM];
            final HashMultimap create = HashMultimap.create();
            new JavaTestKit(createDefaultActorSystem) { // from class: org.apache.flink.test.checkpointing.SavepointITCase.1
                {
                    new JavaTestKit.Within(fromNow.timeLeft()) { // from class: org.apache.flink.test.checkpointing.SavepointITCase.1.1
                        protected void run() {
                            try {
                                Iterator it = testingCluster2.getTaskManagersAsJava().iterator();
                                while (it.hasNext()) {
                                    ((ActorRef) it.next()).tell(new TestingTaskManagerMessages.RegisterSubmitTaskListener(jobID), getTestActor());
                                }
                                createJobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
                                SavepointITCase.LOG.info("Resubmitting job " + createJobGraph.getJobID() + " with savepoint path " + savepointPath + " in detached mode.");
                                testingCluster2.submitJobDetached(createJobGraph);
                                int i = 0;
                                Iterator it2 = createJobGraph.getVertices().iterator();
                                while (it2.hasNext()) {
                                    i += ((JobVertex) it2.next()).getParallelism();
                                }
                                SavepointITCase.LOG.info("Gathering " + i + " submitted TaskDeploymentDescriptor instances.");
                                for (int i2 = 0; i2 < i; i2 += SavepointITCase.ITER_TEST_PARALLELISM) {
                                    TaskDeploymentDescriptor tdd = ((TestingTaskManagerMessages.ResponseSubmitTaskListener) expectMsgAnyClassOf(getRemainingTime(), new Class[]{TestingTaskManagerMessages.ResponseSubmitTaskListener.class})).tdd();
                                    SavepointITCase.LOG.info("Received: " + tdd.toString() + ".");
                                    create.put(((TaskInformation) tdd.getSerializedTaskInformation().deserializeValue(getClass().getClassLoader())).getJobVertexId(), tdd);
                                }
                            } catch (Throwable th) {
                                thArr[0] = th;
                            }
                        }
                    };
                }
            };
            ExecutionGraph executionGraph = ((JobManagerMessages.JobFound) Await.result(actorGateway2.ask(new JobManagerMessages.RequestJob(jobID), fromNow.timeLeft()), fromNow.timeLeft())).executionGraph();
            if (thArr[0] != null) {
                throw new RuntimeException(thArr[0]);
            }
            HashMap hashMap = new HashMap();
            for (ExecutionJobVertex executionJobVertex : executionGraph.getVerticesTopologically()) {
                List operatorIDs = executionJobVertex.getOperatorIDs();
                for (int i = 0; i < operatorIDs.size(); i += ITER_TEST_PARALLELISM) {
                    hashMap.put(operatorIDs.get(i), new Tuple2(Integer.valueOf(i), executionJobVertex));
                }
            }
            for (OperatorState operatorState : savepoint.getOperatorStates()) {
                Tuple2 tuple2 = (Tuple2) hashMap.get(operatorState.getOperatorID());
                Collection<TaskDeploymentDescriptor> collection = create.get(((ExecutionJobVertex) tuple2.f1).getJobVertexId());
                Assert.assertTrue("Missing task for savepoint state for operator " + operatorState.getOperatorID() + ".", collection.size() > 0);
                Assert.assertEquals(operatorState.getNumberCollectedStates(), collection.size());
                for (TaskDeploymentDescriptor taskDeploymentDescriptor : collection) {
                    OperatorSubtaskState state = operatorState.getState(taskDeploymentDescriptor.getSubtaskIndex());
                    Assert.assertNotNull(state);
                    Assert.assertEquals("Initial operator state mismatch.", state.getLegacyOperatorState(), taskDeploymentDescriptor.getTaskStateHandles().getLegacyOperatorState().get(((Integer) tuple2.f0).intValue()));
                }
            }
            StatefulCounter.getRestoreLatch().await(fromNow.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
            StatefulCounter.getProgressLatch().await(fromNow.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
            LOG.info("Cancelling job " + jobID + ".");
            actorGateway2.tell(new JobManagerMessages.CancelJob(jobID));
            LOG.info("Disposing savepoint " + savepointPath + ".");
            Assert.assertTrue("Failed to dispose savepoint " + savepointPath + ".", Await.result(actorGateway2.ask(new JobManagerMessages.DisposeSavepoint(savepointPath), fromNow.timeLeft()), fromNow.timeLeft()).getClass() == JobManagerMessages.getDisposeSavepointSuccess().getClass());
            ArrayList<File> arrayList = new ArrayList();
            Iterator it = savepoint.getOperatorStates().iterator();
            while (it.hasNext()) {
                Iterator it2 = ((OperatorState) it.next()).getStates().iterator();
                while (it2.hasNext()) {
                    FileStateHandle legacyOperatorState = ((OperatorSubtaskState) it2.next()).getLegacyOperatorState();
                    if (legacyOperatorState != null) {
                        arrayList.add(new File(legacyOperatorState.getFilePath().toUri()));
                    }
                }
            }
            for (File file4 : arrayList) {
                Assert.assertFalse("Checkpoint file " + file4 + " not cleaned up properly.", file4.exists());
            }
            if (arrayList.size() > 0) {
                File parentFile = ((File) arrayList.get(0)).getParentFile();
                Assert.assertFalse("Checkpoint parent directory " + parentFile + " not cleaned up properly.", parentFile.exists());
            }
            Assert.assertEquals("Savepoints directory not cleaned up properly: " + Arrays.toString(file2.listFiles()) + ".", 0L, file2.listFiles().length);
            if (testingCluster2 != null) {
                testingCluster2.shutdown();
            }
        } catch (Throwable th) {
            if (0 != 0) {
                testingCluster.shutdown();
            }
            throw th;
        }
    }

    @Test
    public void testSubmitWithUnknownSavepointPath() throws Exception {
        int i = ITER_TEST_PARALLELISM * ITER_TEST_PARALLELISM;
        Deadline fromNow = new FiniteDuration(5L, TimeUnit.MINUTES).fromNow();
        File file = new File(this.folder.getRoot(), "savepoints");
        TestingCluster testingCluster = null;
        try {
            Configuration configuration = new Configuration();
            configuration.setInteger("local.number-taskmanager", ITER_TEST_PARALLELISM);
            configuration.setInteger("taskmanager.numberOfTaskSlots", ITER_TEST_PARALLELISM);
            configuration.setString("state.savepoints.dir", file.toURI().toString());
            LOG.info("Flink configuration: " + configuration + ".");
            testingCluster = new TestingCluster(configuration);
            LOG.info("Starting Flink cluster.");
            testingCluster.start();
            LOG.info("Retrieving JobManager.");
            LOG.info("JobManager: " + ((ActorGateway) Await.result(testingCluster.leaderGateway().future(), fromNow.timeLeft())) + ".");
            JobGraph createJobGraph = createJobGraph(i, 1000, 3600000L);
            createJobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath("unknown path"));
            Assert.assertEquals("unknown path", createJobGraph.getSavepointRestoreSettings().getRestorePath());
            LOG.info("Submitting job " + createJobGraph.getJobID() + " in detached mode.");
            try {
                testingCluster.submitJobAndWait(createJobGraph, false);
            } catch (Exception e) {
                Assert.assertEquals(JobExecutionException.class, e.getClass());
                Assert.assertEquals(FileNotFoundException.class, e.getCause().getClass());
            }
            if (testingCluster != null) {
                testingCluster.shutdown();
            }
        } catch (Throwable th) {
            if (testingCluster != null) {
                testingCluster.shutdown();
            }
            throw th;
        }
    }

    @Test
    public void testCanRestoreWithModifiedStatelessOperators() throws Exception {
        Deadline fromNow = new FiniteDuration(5L, TimeUnit.MINUTES).fromNow();
        File file = new File(this.folder.getRoot(), "savepoints");
        TestingCluster testingCluster = null;
        try {
            Configuration configuration = new Configuration();
            configuration.setInteger("local.number-taskmanager", 2);
            configuration.setInteger("taskmanager.numberOfTaskSlots", 2);
            configuration.setString("state.savepoints.dir", file.toURI().toString());
            LOG.info("Flink configuration: " + configuration + ".");
            testingCluster = new TestingCluster(configuration);
            LOG.info("Starting Flink cluster.");
            testingCluster.start(true);
            LOG.info("Retrieving JobManager.");
            ActorGateway actorGateway = (ActorGateway) Await.result(testingCluster.leaderGateway().future(), fromNow.timeLeft());
            LOG.info("JobManager: " + actorGateway + ".");
            StatefulCounter statefulCounter = new StatefulCounter();
            StatefulCounter.resetForTest(2);
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setParallelism(2);
            executionEnvironment.addSource(new InfiniteTestSource()).shuffle().map(new MapFunction<Integer, Integer>() { // from class: org.apache.flink.test.checkpointing.SavepointITCase.3
                public Integer map(Integer num) throws Exception {
                    return Integer.valueOf(4 * num.intValue());
                }
            }).shuffle().map(statefulCounter).uid("statefulCounter").shuffle().map(new MapFunction<Integer, Integer>() { // from class: org.apache.flink.test.checkpointing.SavepointITCase.2
                public Integer map(Integer num) throws Exception {
                    return Integer.valueOf(2 * num.intValue());
                }
            }).addSink(new DiscardingSink());
            JobID jobID = testingCluster.submitJobDetached(executionEnvironment.getStreamGraph().getJobGraph()).getJobID();
            StatefulCounter.getProgressLatch().await(fromNow.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
            String savepointPath = ((JobManagerMessages.TriggerSavepointSuccess) Await.result(actorGateway.ask(new JobManagerMessages.TriggerSavepoint(jobID, Option.empty()), fromNow.timeLeft()), fromNow.timeLeft())).savepointPath();
            ((TestingJobManagerMessages.ResponseSavepoint) Await.result(actorGateway.ask(new TestingJobManagerMessages.RequestSavepoint(savepointPath), fromNow.timeLeft()), fromNow.timeLeft())).savepoint();
            LOG.info("Retrieved savepoint: " + savepointPath + ".");
            LOG.info("Shutting down Flink cluster.");
            testingCluster.shutdown();
            testingCluster.awaitTermination();
            testingCluster.shutdown();
            testingCluster.awaitTermination();
            try {
                LOG.info("Restarting Flink cluster.");
                testingCluster.start(true);
                LOG.info("Retrieving JobManager.");
                LOG.info("JobManager: " + ((ActorGateway) Await.result(testingCluster.leaderGateway().future(), fromNow.timeLeft())) + ".");
                StatefulCounter.resetForTest(2);
                StreamExecutionEnvironment executionEnvironment2 = StreamExecutionEnvironment.getExecutionEnvironment();
                executionEnvironment2.setParallelism(2);
                executionEnvironment2.addSource(new InfiniteTestSource()).shuffle().map(new StatefulCounter()).uid("statefulCounter").shuffle().map(new MapFunction<Integer, Integer>() { // from class: org.apache.flink.test.checkpointing.SavepointITCase.4
                    public Integer map(Integer num) throws Exception {
                        return num;
                    }
                }).addSink(new DiscardingSink());
                JobGraph jobGraph = executionEnvironment2.getStreamGraph().getJobGraph();
                jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
                LOG.info("Resubmitting job " + jobGraph.getJobID() + " with savepoint path " + savepointPath + " in detached mode.");
                testingCluster.submitJobDetached(jobGraph);
                StatefulCounter.getRestoreLatch().await(fromNow.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
                StatefulCounter.getProgressLatch().await(fromNow.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
                testingCluster.shutdown();
                testingCluster.awaitTermination();
            } catch (Throwable th) {
                testingCluster.shutdown();
                testingCluster.awaitTermination();
                throw th;
            }
        } catch (Throwable th2) {
            testingCluster.shutdown();
            testingCluster.awaitTermination();
            throw th2;
        }
    }

    private JobGraph createJobGraph(int i, int i2, long j) {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(i);
        executionEnvironment.disableOperatorChaining();
        executionEnvironment.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(i2, j));
        executionEnvironment.getConfig().disableSysoutLogging();
        executionEnvironment.addSource(new InfiniteTestSource()).shuffle().map(new StatefulCounter()).addSink(new DiscardingSink());
        return executionEnvironment.getStreamGraph().getJobGraph();
    }

    @Test
    public void testSavepointForJobWithIteration() throws Exception {
        for (int i = 0; i < ITER_TEST_PARALLELISM; i += ITER_TEST_PARALLELISM) {
            ITER_TEST_SNAPSHOT_WAIT[i] = new OneShotLatch();
            ITER_TEST_RESTORE_WAIT[i] = new OneShotLatch();
            ITER_TEST_CHECKPOINT_VERIFY[i] = 0;
        }
        TemporaryFolder temporaryFolder = new TemporaryFolder();
        temporaryFolder.create();
        File newFolder = temporaryFolder.newFolder();
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        IntegerStreamSource integerStreamSource = new IntegerStreamSource();
        IterativeStream iterate = executionEnvironment.addSource(integerStreamSource).flatMap(new RichFlatMapFunction<Integer, Integer>() { // from class: org.apache.flink.test.checkpointing.SavepointITCase.6
            private static final long serialVersionUID = 1;

            public void flatMap(Integer num, Collector<Integer> collector) throws Exception {
                collector.collect(num);
            }

            public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
                flatMap((Integer) obj, (Collector<Integer>) collector);
            }
        }).setParallelism(ITER_TEST_PARALLELISM).keyBy(new KeySelector<Integer, Object>() { // from class: org.apache.flink.test.checkpointing.SavepointITCase.5
            private static final long serialVersionUID = 1;

            public Object getKey(Integer num) throws Exception {
                return num;
            }
        }).flatMap(new DuplicateFilter()).setParallelism(ITER_TEST_PARALLELISM).iterate();
        iterate.closeWith(iterate.map(new MapFunction<Integer, Integer>() { // from class: org.apache.flink.test.checkpointing.SavepointITCase.7
            private static final long serialVersionUID = 1;

            public Integer map(Integer num) throws Exception {
                return num;
            }
        }).setParallelism(ITER_TEST_PARALLELISM));
        StreamGraph streamGraph = executionEnvironment.getStreamGraph();
        streamGraph.setJobName("Test");
        JobGraph jobGraph = streamGraph.getJobGraph();
        Configuration configuration = new Configuration();
        configuration.addAll(jobGraph.getJobConfiguration());
        configuration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1L);
        configuration.setInteger("taskmanager.numberOfTaskSlots", 2 * jobGraph.getMaximumParallelism());
        File file = new File(newFolder, "checkpoints");
        File file2 = new File(newFolder, "savepoints");
        if (!file.mkdir() || !file2.mkdirs()) {
            Assert.fail("Test setup failed: failed to create temporary directories.");
        }
        configuration.setString(CoreOptions.STATE_BACKEND, "filesystem");
        configuration.setString("state.backend.fs.checkpointdir", file.toURI().toString());
        configuration.setString("state.backend.fs.memory-threshold", "0");
        configuration.setString("state.savepoints.dir", file2.toURI().toString());
        TestingCluster testingCluster = new TestingCluster(configuration, false);
        String str = null;
        try {
            testingCluster.start();
            testingCluster.submitJobDetached(jobGraph);
            OneShotLatch[] oneShotLatchArr = ITER_TEST_SNAPSHOT_WAIT;
            int length = oneShotLatchArr.length;
            for (int i2 = 0; i2 < length; i2 += ITER_TEST_PARALLELISM) {
                oneShotLatchArr[i2].await();
            }
            str = testingCluster.triggerSavepoint(jobGraph.getJobID());
            integerStreamSource.cancel();
            JobGraph jobGraph2 = streamGraph.getJobGraph();
            jobGraph2.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(str));
            testingCluster.submitJobDetached(jobGraph2);
            OneShotLatch[] oneShotLatchArr2 = ITER_TEST_RESTORE_WAIT;
            int length2 = oneShotLatchArr2.length;
            for (int i3 = 0; i3 < length2; i3 += ITER_TEST_PARALLELISM) {
                oneShotLatchArr2[i3].await();
            }
            integerStreamSource.cancel();
            if (null != str) {
                testingCluster.disposeSavepoint(str);
            }
            testingCluster.stop();
            testingCluster.awaitTermination();
        } catch (Throwable th) {
            if (null != str) {
                testingCluster.disposeSavepoint(str);
            }
            testingCluster.stop();
            testingCluster.awaitTermination();
            throw th;
        }
    }
}
