package org.apache.flink.test.checkpointing;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
import java.io.File;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.StateForTask;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.UnrecoverableException;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.shaded.com.google.common.collect.HashMultimap;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
import org.apache.flink.streaming.runtime.tasks.StreamTaskStateList;
import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/test/checkpointing/SavepointITCase.class */
public class SavepointITCase extends TestLogger {
    private static final Logger LOG = LoggerFactory.getLogger(SavepointITCase.class);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/checkpointing/SavepointITCase$InfiniteTestSource.class */
    public static class InfiniteTestSource implements SourceFunction<Integer>, CheckpointListener {
        private static final long serialVersionUID = 1;
        private volatile boolean running;
        private static CountDownLatch CheckpointCompleteLatch = new CountDownLatch(1);

        private InfiniteTestSource() {
            this.running = true;
        }

        public void run(SourceFunction.SourceContext<Integer> sourceContext) throws Exception {
            while (this.running) {
                sourceContext.collect(1);
            }
        }

        public void cancel() {
            this.running = false;
        }

        public void notifyCheckpointComplete(long j) throws Exception {
            CheckpointCompleteLatch.countDown();
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/SavepointITCase$RestoreStateCountingAndFailingSource.class */
    private static class RestoreStateCountingAndFailingSource implements SourceFunction<Integer>, Checkpointed, CheckpointListener {
        private static final long serialVersionUID = 1;
        private static volatile int numRestoreStateCalls = 0;
        private static volatile boolean failOnRestoreStateCall = false;
        private static volatile CountDownLatch checkpointCompleteLatch = new CountDownLatch(1);
        private static volatile int emitted = 0;
        private volatile boolean running;

        private RestoreStateCountingAndFailingSource() {
            this.running = true;
        }

        public void run(SourceFunction.SourceContext<Integer> sourceContext) throws Exception {
            while (this.running) {
                sourceContext.collect(1);
                emitted++;
            }
        }

        public void cancel() {
            this.running = false;
        }

        public Serializable snapshotState(long j, long j2) throws Exception {
            return 1;
        }

        public void restoreState(Serializable serializable) throws Exception {
            numRestoreStateCalls++;
            if (failOnRestoreStateCall) {
                throw new RuntimeException("Restore test failure");
            }
        }

        public void notifyCheckpointComplete(long j) throws Exception {
            checkpointCompleteLatch.countDown();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/checkpointing/SavepointITCase$StatefulCounter.class */
    public static class StatefulCounter extends RichMapFunction<Integer, Integer> implements Checkpointed<byte[]> {
        private static final long serialVersionUID = 7317800376639115920L;
        private byte[] data;

        private StatefulCounter() {
        }

        public void open(Configuration configuration) throws Exception {
            if (this.data == null) {
                Random random = new Random(getRuntimeContext().getIndexOfThisSubtask());
                this.data = new byte[1025];
                random.nextBytes(this.data);
            }
        }

        public Integer map(Integer num) throws Exception {
            for (int i = 0; i < this.data.length; i++) {
                byte[] bArr = this.data;
                int i2 = i;
                bArr[i2] = (byte) (bArr[i2] + 1);
            }
            return num;
        }

        /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
        public byte[] m560snapshotState(long j, long j2) throws Exception {
            SavepointITCase.LOG.info("snapshotState (" + j + "): " + Arrays.toString(this.data));
            return this.data;
        }

        public void restoreState(byte[] bArr) throws Exception {
            SavepointITCase.LOG.info("restoreState: " + Arrays.toString(bArr));
            this.data = bArr;
        }
    }

    @Test
    public void testTriggerSavepointAndResume() throws Exception {
        int i = 2 * 2;
        final Deadline fromNow = new FiniteDuration(5L, TimeUnit.MINUTES).fromNow();
        File createTempDirectory = CommonTestUtils.createTempDirectory();
        LOG.info("Created temporary directory: " + createTempDirectory + ".");
        final ForkableFlinkMiniCluster forkableFlinkMiniCluster = null;
        try {
            ActorSystem createDefaultActorSystem = AkkaUtils.createDefaultActorSystem();
            Configuration configuration = new Configuration();
            configuration.setInteger("local.number-taskmanager", 2);
            configuration.setInteger("taskmanager.numberOfTaskSlots", 2);
            File file = new File(createTempDirectory, "checkpoints");
            File file2 = new File(createTempDirectory, "savepoints");
            if (!file.mkdir() || !file2.mkdirs()) {
                Assert.fail("Test setup failed: failed to create temporary directories.");
            }
            LOG.info("Created temporary checkpoint directory: " + file + ".");
            LOG.info("Created temporary savepoint directory: " + file2 + ".");
            configuration.setString("state.backend", "filesystem");
            configuration.setString("state.backend.fs.checkpointdir", file.toURI().toString());
            configuration.setString("savepoints.state.backend", "filesystem");
            configuration.setString("savepoints.state.backend.fs.dir", file2.toURI().toString());
            LOG.info("Flink configuration: " + configuration + ".");
            forkableFlinkMiniCluster = new ForkableFlinkMiniCluster(configuration);
            LOG.info("Starting Flink cluster.");
            forkableFlinkMiniCluster.start();
            LOG.info("Retrieving JobManager.");
            ActorGateway actorGateway = (ActorGateway) Await.result(forkableFlinkMiniCluster.leaderGateway().future(), fromNow.timeLeft());
            LOG.info("JobManager: " + actorGateway + ".");
            final JobGraph createJobGraph = createJobGraph(i, 0, 1000L, 1000);
            final JobID jobID = createJobGraph.getJobID();
            CountDownLatch unused = InfiniteTestSource.CheckpointCompleteLatch = new CountDownLatch(10);
            LOG.info("Submitting job " + createJobGraph.getJobID() + " in detached mode.");
            forkableFlinkMiniCluster.submitJobDetached(createJobGraph);
            LOG.info("Waiting for 10 checkpoint complete notifications.");
            InfiniteTestSource.CheckpointCompleteLatch.await();
            LOG.info("Received all 10 checkpoint complete notifications.");
            LOG.info("Triggering a savepoint.");
            final String savepointPath = ((JobManagerMessages.TriggerSavepointSuccess) Await.result(actorGateway.ask(new JobManagerMessages.TriggerSavepoint(jobID), fromNow.timeLeft()), fromNow.timeLeft())).savepointPath();
            LOG.info("Retrieved savepoint path: " + savepointPath + ".");
            LOG.info("Requesting the savepoint.");
            CompletedCheckpoint savepoint = ((TestingJobManagerMessages.ResponseSavepoint) Await.result(actorGateway.ask(new TestingJobManagerMessages.RequestSavepoint(savepointPath), fromNow.timeLeft()), fromNow.timeLeft())).savepoint();
            LOG.info("Retrieved savepoint: " + savepoint + ".");
            LOG.info("Shutting down Flink cluster.");
            forkableFlinkMiniCluster.shutdown();
            if (file.listFiles() != null) {
                Assert.assertEquals("Checkpoints directory not cleaned up properly.", 1L, r0.length);
            } else {
                Assert.fail("Checkpoints directory not cleaned up properly.");
            }
            if (file2.listFiles() != null) {
                Assert.assertEquals("Savepoints directory cleaned up.", 1L, r0.length);
            } else {
                Assert.fail("Savepoints directory cleaned up.");
            }
            LOG.info("Restarting Flink cluster.");
            forkableFlinkMiniCluster.start();
            LOG.info("Retrieving JobManager.");
            ActorGateway actorGateway2 = (ActorGateway) Await.result(forkableFlinkMiniCluster.leaderGateway().future(), fromNow.timeLeft());
            LOG.info("JobManager: " + actorGateway2 + ".");
            final Throwable[] thArr = new Throwable[1];
            final HashMultimap create = HashMultimap.create();
            new JavaTestKit(createDefaultActorSystem) { // from class: org.apache.flink.test.checkpointing.SavepointITCase.1
                {
                    new JavaTestKit.Within(fromNow.timeLeft()) { // from class: org.apache.flink.test.checkpointing.SavepointITCase.1.1
                        protected void run() {
                            try {
                                Iterator it = forkableFlinkMiniCluster.getTaskManagersAsJava().iterator();
                                while (it.hasNext()) {
                                    ((ActorRef) it.next()).tell(new TestingTaskManagerMessages.RegisterSubmitTaskListener(jobID), getTestActor());
                                }
                                createJobGraph.setSavepointPath(savepointPath);
                                SavepointITCase.LOG.info("Resubmitting job " + createJobGraph.getJobID() + " with savepoint path " + savepointPath + " in detached mode.");
                                forkableFlinkMiniCluster.submitJobDetached(createJobGraph);
                                int i2 = 0;
                                Iterator it2 = createJobGraph.getVertices().iterator();
                                while (it2.hasNext()) {
                                    i2 += ((JobVertex) it2.next()).getParallelism();
                                }
                                SavepointITCase.LOG.info("Gathering " + i2 + " submitted TaskDeploymentDescriptor instances.");
                                for (int i3 = 0; i3 < i2; i3++) {
                                    TaskDeploymentDescriptor tdd = ((TestingTaskManagerMessages.ResponseSubmitTaskListener) expectMsgAnyClassOf(getRemainingTime(), new Class[]{TestingTaskManagerMessages.ResponseSubmitTaskListener.class})).tdd();
                                    SavepointITCase.LOG.info("Received: " + tdd.toString() + ".");
                                    create.put(tdd.getVertexID(), tdd);
                                }
                            } catch (Throwable th) {
                                thArr[0] = th;
                            }
                        }
                    };
                }
            };
            Assert.assertNull("Error during gathering of TaskDeploymentDescriptors", thArr[0]);
            for (StateForTask stateForTask : savepoint.getStates()) {
                Collection<TaskDeploymentDescriptor> collection = create.get(stateForTask.getOperatorId());
                Assert.assertTrue("Missing task for savepoint state for operator " + stateForTask.getOperatorId() + ".", collection.size() > 0);
                boolean z = false;
                for (TaskDeploymentDescriptor taskDeploymentDescriptor : collection) {
                    if (taskDeploymentDescriptor.getIndexInSubtaskGroup() == stateForTask.getSubtask()) {
                        z = true;
                        Assert.assertEquals("Initial operator state mismatch.", stateForTask.getState(), taskDeploymentDescriptor.getOperatorState());
                    }
                }
                Assert.assertTrue("No matching task deployment descriptor found.", z);
            }
            LOG.info("Cancelling job " + jobID + ".");
            actorGateway2.tell(new JobManagerMessages.CancelJob(jobID));
            LOG.info("Disposing savepoint " + savepointPath + ".");
            Assert.assertTrue("Failed to dispose savepoint " + savepointPath + ".", Await.result(actorGateway2.ask(new JobManagerMessages.DisposeSavepoint(savepointPath), fromNow.timeLeft()), fromNow.timeLeft()).getClass() == JobManagerMessages.getDisposeSavepointSuccess().getClass());
            ArrayList<File> arrayList = new ArrayList();
            Iterator it = savepoint.getStates().iterator();
            while (it.hasNext()) {
                for (StreamTaskState streamTaskState : ((StreamTaskStateList) ((StateForTask) it.next()).getState().deserializeValue(ClassLoader.getSystemClassLoader())).getState(ClassLoader.getSystemClassLoader())) {
                    arrayList.add(new File(streamTaskState.getFunctionState().getFilePath().toUri()));
                }
            }
            for (File file3 : arrayList) {
                Assert.assertFalse("Checkpoint file " + file3 + " not cleaned up properly.", file3.exists());
            }
            if (arrayList.size() > 0) {
                File parentFile = ((File) arrayList.get(0)).getParentFile();
                Assert.assertFalse("Checkpoint parent directory " + parentFile + " not cleaned up properly.", parentFile.exists());
            }
            Assert.assertNull("Savepoints directory not cleaned up properly: " + Arrays.toString(file2.listFiles()) + ".", file2.listFiles());
            if (forkableFlinkMiniCluster != null) {
                forkableFlinkMiniCluster.shutdown();
            }
            if (createTempDirectory != null) {
                FileUtils.deleteDirectory(createTempDirectory);
            }
        } catch (Throwable th) {
            if (forkableFlinkMiniCluster != null) {
                forkableFlinkMiniCluster.shutdown();
            }
            if (createTempDirectory != null) {
                FileUtils.deleteDirectory(createTempDirectory);
            }
            throw th;
        }
    }

    @Test
    public void testCheckpointHasBeenRemoved() throws Exception {
        int i = 2 * 2;
        Deadline fromNow = new FiniteDuration(5L, TimeUnit.MINUTES).fromNow();
        File createTempDirectory = CommonTestUtils.createTempDirectory();
        LOG.info("Created temporary directory: " + createTempDirectory + ".");
        ForkableFlinkMiniCluster forkableFlinkMiniCluster = null;
        try {
            Configuration configuration = new Configuration();
            configuration.setInteger("local.number-taskmanager", 2);
            configuration.setInteger("taskmanager.numberOfTaskSlots", 2);
            File file = new File(createTempDirectory, "checkpoints");
            File file2 = new File(createTempDirectory, "savepoints");
            if (!file.mkdir() || !file2.mkdirs()) {
                Assert.fail("Test setup failed: failed to create temporary directories.");
            }
            LOG.info("Created temporary checkpoint directory: " + file + ".");
            LOG.info("Created temporary savepoint directory: " + file2 + ".");
            configuration.setString("state.backend", "filesystem");
            configuration.setString("savepoints.state.backend", "filesystem");
            configuration.setString("state.backend.fs.checkpointdir", file.toURI().toString());
            configuration.setString("savepoints.state.backend.fs.dir", file2.toURI().toString());
            LOG.info("Flink configuration: " + configuration + ".");
            forkableFlinkMiniCluster = new ForkableFlinkMiniCluster(configuration);
            LOG.info("Starting Flink cluster.");
            forkableFlinkMiniCluster.start();
            LOG.info("Retrieving JobManager.");
            ActorGateway actorGateway = (ActorGateway) Await.result(forkableFlinkMiniCluster.leaderGateway().future(), fromNow.timeLeft());
            LOG.info("JobManager: " + actorGateway + ".");
            JobGraph createJobGraph = createJobGraph(i, 0, 1000L, 1000);
            JobID jobID = createJobGraph.getJobID();
            CountDownLatch unused = InfiniteTestSource.CheckpointCompleteLatch = new CountDownLatch(10);
            LOG.info("Submitting job " + createJobGraph.getJobID() + " in detached mode.");
            forkableFlinkMiniCluster.submitJobDetached(createJobGraph);
            LOG.info("Waiting for 10 checkpoint complete notifications.");
            InfiniteTestSource.CheckpointCompleteLatch.await();
            LOG.info("Received all 10 checkpoint complete notifications.");
            LOG.info("Triggering a savepoint.");
            String savepointPath = ((JobManagerMessages.TriggerSavepointSuccess) Await.result(actorGateway.ask(new JobManagerMessages.TriggerSavepoint(jobID), fromNow.timeLeft()), fromNow.timeLeft())).savepointPath();
            LOG.info("Retrieved savepoint path: " + savepointPath + ".");
            LOG.info("Requesting the savepoint.");
            LOG.info("Retrieved savepoint: " + ((TestingJobManagerMessages.ResponseSavepoint) Await.result(actorGateway.ask(new TestingJobManagerMessages.RequestSavepoint(savepointPath), fromNow.timeLeft()), fromNow.timeLeft())).savepoint() + ".");
            LOG.info("Shutting down Flink cluster.");
            forkableFlinkMiniCluster.shutdown();
            FileUtils.deleteDirectory(file);
            LOG.info("Restarting Flink cluster.");
            forkableFlinkMiniCluster.start();
            createJobGraph.setSavepointPath(savepointPath);
            LOG.info("Resubmitting job " + createJobGraph.getJobID() + " with savepoint path " + savepointPath + " in detached mode.");
            try {
                forkableFlinkMiniCluster.submitJobAndWait(createJobGraph, false, fromNow.timeLeft());
                Assert.fail("Did not throw expected Exception because of missing checkpoint files");
            } catch (Exception e) {
            }
            if (forkableFlinkMiniCluster != null) {
                forkableFlinkMiniCluster.shutdown();
            }
            if (createTempDirectory != null) {
                FileUtils.deleteDirectory(createTempDirectory);
            }
        } catch (Throwable th) {
            if (forkableFlinkMiniCluster != null) {
                forkableFlinkMiniCluster.shutdown();
            }
            if (createTempDirectory != null) {
                FileUtils.deleteDirectory(createTempDirectory);
            }
            throw th;
        }
    }

    @Test
    public void testCheckpointsRemovedWithJobManagerBackendOnShutdown() throws Exception {
        int i = 2 * 2;
        Deadline fromNow = new FiniteDuration(5L, TimeUnit.MINUTES).fromNow();
        File createTempDirectory = CommonTestUtils.createTempDirectory();
        LOG.info("Created temporary directory: " + createTempDirectory + ".");
        ForkableFlinkMiniCluster forkableFlinkMiniCluster = null;
        ArrayList<File> arrayList = new ArrayList();
        try {
            Configuration configuration = new Configuration();
            configuration.setInteger("local.number-taskmanager", 2);
            configuration.setInteger("taskmanager.numberOfTaskSlots", 2);
            File file = new File(createTempDirectory, "checkpoints");
            if (!file.mkdir()) {
                Assert.fail("Test setup failed: failed to create temporary directories.");
            }
            LOG.info("Created temporary checkpoint directory: " + file + ".");
            configuration.setString("savepoints.state.backend", "jobmanager");
            configuration.setString("state.backend", "filesystem");
            configuration.setString("state.backend.fs.checkpointdir", file.toURI().toString());
            LOG.info("Flink configuration: " + configuration + ".");
            forkableFlinkMiniCluster = new ForkableFlinkMiniCluster(configuration);
            LOG.info("Starting Flink cluster.");
            forkableFlinkMiniCluster.start();
            LOG.info("Retrieving JobManager.");
            ActorGateway actorGateway = (ActorGateway) Await.result(forkableFlinkMiniCluster.leaderGateway().future(), fromNow.timeLeft());
            LOG.info("JobManager: " + actorGateway + ".");
            JobGraph createJobGraph = createJobGraph(i, 0, 1000L, 1000);
            JobID jobID = createJobGraph.getJobID();
            CountDownLatch unused = InfiniteTestSource.CheckpointCompleteLatch = new CountDownLatch(10);
            LOG.info("Submitting job " + createJobGraph.getJobID() + " in detached mode.");
            forkableFlinkMiniCluster.submitJobDetached(createJobGraph);
            LOG.info("Waiting for 10 checkpoint complete notifications.");
            InfiniteTestSource.CheckpointCompleteLatch.await();
            LOG.info("Received all 10 checkpoint complete notifications.");
            LOG.info("Triggering a savepoint.");
            String savepointPath = ((JobManagerMessages.TriggerSavepointSuccess) Await.result(actorGateway.ask(new JobManagerMessages.TriggerSavepoint(jobID), fromNow.timeLeft()), fromNow.timeLeft())).savepointPath();
            LOG.info("Retrieved savepoint path: " + savepointPath + ".");
            LOG.info("Requesting the savepoint.");
            CompletedCheckpoint savepoint = ((TestingJobManagerMessages.ResponseSavepoint) Await.result(actorGateway.ask(new TestingJobManagerMessages.RequestSavepoint(savepointPath), fromNow.timeLeft()), fromNow.timeLeft())).savepoint();
            LOG.info("Retrieved savepoint: " + savepoint + ".");
            LOG.info("Cancelling job " + jobID + ".");
            Assert.assertTrue(Await.result(actorGateway.ask(new JobManagerMessages.CancelJob(jobID), fromNow.timeLeft()), fromNow.timeLeft()) instanceof JobManagerMessages.CancellationSuccess);
            LOG.info("Waiting for job " + jobID + " to be removed.");
            Assert.assertTrue(((Boolean) Await.result(actorGateway.ask(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobID), fromNow.timeLeft()), fromNow.timeLeft())).booleanValue());
            Iterator it = savepoint.getStates().iterator();
            while (it.hasNext()) {
                for (StreamTaskState streamTaskState : ((StreamTaskStateList) ((StateForTask) it.next()).getState().deserializeValue(ClassLoader.getSystemClassLoader())).getState(ClassLoader.getSystemClassLoader())) {
                    arrayList.add(new File(streamTaskState.getFunctionState().getFilePath().toUri()));
                }
            }
            if (forkableFlinkMiniCluster != null) {
                forkableFlinkMiniCluster.shutdown();
            }
            Assert.assertTrue(arrayList.size() > 0);
            for (File file2 : arrayList) {
                Assert.assertFalse("Checkpoint file " + file2 + " not cleaned up properly.", file2.exists());
            }
            if (createTempDirectory != null) {
                FileUtils.deleteDirectory(createTempDirectory);
            }
        } catch (Throwable th) {
            if (forkableFlinkMiniCluster != null) {
                forkableFlinkMiniCluster.shutdown();
            }
            Assert.assertTrue(arrayList.size() > 0);
            for (File file3 : arrayList) {
                Assert.assertFalse("Checkpoint file " + file3 + " not cleaned up properly.", file3.exists());
            }
            if (createTempDirectory != null) {
                FileUtils.deleteDirectory(createTempDirectory);
            }
            throw th;
        }
    }

    @Test
    public void testSubmitWithUnknownSavepointPath() throws Exception {
        int i = 1 * 1;
        Deadline fromNow = new FiniteDuration(5L, TimeUnit.MINUTES).fromNow();
        ForkableFlinkMiniCluster forkableFlinkMiniCluster = null;
        try {
            Configuration configuration = new Configuration();
            configuration.setInteger("local.number-taskmanager", 1);
            configuration.setInteger("taskmanager.numberOfTaskSlots", 1);
            LOG.info("Flink configuration: " + configuration + ".");
            forkableFlinkMiniCluster = new ForkableFlinkMiniCluster(configuration);
            LOG.info("Starting Flink cluster.");
            forkableFlinkMiniCluster.start();
            LOG.info("Retrieving JobManager.");
            LOG.info("JobManager: " + ((ActorGateway) Await.result(forkableFlinkMiniCluster.leaderGateway().future(), fromNow.timeLeft())) + ".");
            JobGraph createJobGraph = createJobGraph(i, 1000, 3600000L, 1000);
            createJobGraph.setSavepointPath("unknown path");
            Assert.assertEquals("unknown path", createJobGraph.getSnapshotSettings().getSavepointPath());
            LOG.info("Submitting job " + createJobGraph.getJobID() + " in detached mode.");
            try {
                forkableFlinkMiniCluster.submitJobAndWait(createJobGraph, false);
            } catch (Exception e) {
                Assert.assertEquals(UnrecoverableException.class, e.getCause().getClass());
                Assert.assertEquals(IllegalArgumentException.class, e.getCause().getCause().getClass());
            }
            if (forkableFlinkMiniCluster != null) {
                forkableFlinkMiniCluster.shutdown();
            }
        } catch (Throwable th) {
            if (forkableFlinkMiniCluster != null) {
                forkableFlinkMiniCluster.shutdown();
            }
            throw th;
        }
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void testRestoreFailure() throws Exception {
        Deadline fromNow = new FiniteDuration(3L, TimeUnit.MINUTES).fromNow();
        ForkableFlinkMiniCluster forkableFlinkMiniCluster = null;
        try {
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setParallelism(1);
            executionEnvironment.enableCheckpointing(100000000);
            executionEnvironment.setNumberOfExecutionRetries(2);
            executionEnvironment.getConfig().setExecutionRetryDelay(500);
            DataStreamSource addSource = executionEnvironment.addSource(new RestoreStateCountingAndFailingSource());
            boolean unused = RestoreStateCountingAndFailingSource.failOnRestoreStateCall = false;
            int unused2 = RestoreStateCountingAndFailingSource.numRestoreStateCalls = 0;
            CountDownLatch unused3 = RestoreStateCountingAndFailingSource.checkpointCompleteLatch = new CountDownLatch(1);
            int unused4 = RestoreStateCountingAndFailingSource.emitted = 0;
            addSource.addSink(new DiscardingSink());
            JobGraph jobGraph = executionEnvironment.getStreamGraph().getJobGraph();
            Configuration configuration = new Configuration();
            configuration.setInteger("local.number-taskmanager", 1);
            configuration.setInteger("taskmanager.numberOfTaskSlots", 1);
            LOG.info("Flink configuration: " + configuration + ".");
            forkableFlinkMiniCluster = new ForkableFlinkMiniCluster(configuration);
            LOG.info("Starting Flink cluster.");
            forkableFlinkMiniCluster.start();
            LOG.info("Retrieving JobManager.");
            ActorGateway leaderGateway = forkableFlinkMiniCluster.getLeaderGateway(fromNow.timeLeft());
            LOG.info("JobManager: " + leaderGateway + ".");
            forkableFlinkMiniCluster.submitJobDetached(jobGraph);
            while (fromNow.hasTimeLeft() && RestoreStateCountingAndFailingSource.emitted < 100) {
                Thread.sleep(100L);
            }
            Assert.assertTrue("No progress", RestoreStateCountingAndFailingSource.emitted >= 100);
            String savepointPath = ((JobManagerMessages.TriggerSavepointSuccess) Await.result(leaderGateway.ask(new JobManagerMessages.TriggerSavepoint(jobGraph.getJobID()), fromNow.timeLeft()), fromNow.timeLeft())).savepointPath();
            LOG.info("Retrieved savepoint path: " + savepointPath + ".");
            RestoreStateCountingAndFailingSource.checkpointCompleteLatch.await();
            Await.ready(leaderGateway.ask(new JobManagerMessages.CancelJob(jobGraph.getJobID()), fromNow.timeLeft()), fromNow.timeLeft());
            Await.ready(leaderGateway.ask(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()), fromNow.timeLeft()), fromNow.timeLeft());
            boolean unused5 = RestoreStateCountingAndFailingSource.failOnRestoreStateCall = true;
            jobGraph.setSavepointPath(savepointPath);
            try {
                try {
                    forkableFlinkMiniCluster.submitJobAndWait(jobGraph, false, fromNow.timeLeft());
                    Assert.fail("Did not throw expected Exception");
                    Assert.assertEquals(1 + 2, RestoreStateCountingAndFailingSource.numRestoreStateCalls);
                } catch (Throwable th) {
                    Assert.assertEquals(1 + 2, RestoreStateCountingAndFailingSource.numRestoreStateCalls);
                    throw th;
                }
            } catch (Exception e) {
                Assert.assertEquals(1 + 2, RestoreStateCountingAndFailingSource.numRestoreStateCalls);
            }
            if (forkableFlinkMiniCluster != null) {
                forkableFlinkMiniCluster.shutdown();
            }
        } catch (Throwable th2) {
            if (forkableFlinkMiniCluster != null) {
                forkableFlinkMiniCluster.shutdown();
            }
            throw th2;
        }
    }

    private JobGraph createJobGraph(int i, int i2, long j, int i3) {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(i);
        executionEnvironment.enableCheckpointing(i3);
        executionEnvironment.disableOperatorChaining();
        executionEnvironment.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(i2, j));
        executionEnvironment.getConfig().disableSysoutLogging();
        executionEnvironment.addSource(new InfiniteTestSource()).shuffle().map(new StatefulCounter()).addSink(new DiscardingSink());
        return executionEnvironment.getStreamGraph().getJobGraph();
    }
}
