package org.apache.flink.test.checkpointing;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.IterativeStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeDiagnosingMatcher;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/test/checkpointing/SavepointITCase.class */
public class SavepointITCase extends TestLogger {

    @Rule
    public final TemporaryFolder folder = new TemporaryFolder();
    private File checkpointDir;
    private File savepointDir;
    private static final Logger LOG = LoggerFactory.getLogger(SavepointITCase.class);
    private static final int ITER_TEST_PARALLELISM = 1;
    private static OneShotLatch[] iterTestSnapshotWait = new OneShotLatch[ITER_TEST_PARALLELISM];
    private static OneShotLatch[] iterTestRestoreWait = new OneShotLatch[ITER_TEST_PARALLELISM];
    private static int[] iterTestCheckpointVerify = new int[ITER_TEST_PARALLELISM];

    /* loaded from: input_file:org/apache/flink/test/checkpointing/SavepointITCase$DuplicateFilter.class */
    private static class DuplicateFilter extends RichFlatMapFunction<Integer, Integer> {
        static final ValueStateDescriptor<Boolean> DESCRIPTOR = new ValueStateDescriptor<>("seen", Boolean.class, false);
        private static final long serialVersionUID = 1;
        private ValueState<Boolean> operatorState;

        private DuplicateFilter() {
        }

        public void open(Configuration configuration) {
            this.operatorState = getRuntimeContext().getState(DESCRIPTOR);
        }

        public void flatMap(Integer num, Collector<Integer> collector) throws Exception {
            if (!((Boolean) this.operatorState.value()).booleanValue()) {
                collector.collect(num);
                this.operatorState.update(true);
            }
            if (30 == num.intValue()) {
                SavepointITCase.iterTestSnapshotWait[getRuntimeContext().getIndexOfThisSubtask()].trigger();
            }
        }

        public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
            flatMap((Integer) obj, (Collector<Integer>) collector);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/checkpointing/SavepointITCase$InfiniteTestSource.class */
    public static class InfiniteTestSource implements SourceFunction<Integer> {
        private static final long serialVersionUID = 1;
        private volatile boolean running;

        private InfiniteTestSource() {
            this.running = true;
        }

        public void run(SourceFunction.SourceContext<Integer> sourceContext) throws Exception {
            while (this.running) {
                synchronized (sourceContext.getCheckpointLock()) {
                    sourceContext.collect(Integer.valueOf(SavepointITCase.ITER_TEST_PARALLELISM));
                }
                Thread.sleep(1L);
            }
        }

        public void cancel() {
            this.running = false;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/SavepointITCase$IntegerStreamSource.class */
    private static final class IntegerStreamSource extends RichSourceFunction<Integer> implements ListCheckpointed<Integer> {
        private static final long serialVersionUID = 1;
        private volatile boolean running = true;
        private volatile boolean isRestored = false;
        private int emittedCount = 0;

        public void run(SourceFunction.SourceContext<Integer> sourceContext) throws Exception {
            while (this.running) {
                synchronized (sourceContext.getCheckpointLock()) {
                    sourceContext.collect(Integer.valueOf(this.emittedCount));
                }
                if (this.emittedCount < 100) {
                    this.emittedCount += SavepointITCase.ITER_TEST_PARALLELISM;
                } else {
                    this.emittedCount = 0;
                }
                Thread.sleep(1L);
            }
        }

        public void cancel() {
            this.running = false;
        }

        public List<Integer> snapshotState(long j, long j2) throws Exception {
            SavepointITCase.iterTestCheckpointVerify[getRuntimeContext().getIndexOfThisSubtask()] = this.emittedCount;
            return Collections.singletonList(Integer.valueOf(this.emittedCount));
        }

        public void restoreState(List<Integer> list) throws Exception {
            if (!list.isEmpty()) {
                this.emittedCount = list.get(0).intValue();
            }
            Assert.assertEquals(SavepointITCase.iterTestCheckpointVerify[getRuntimeContext().getIndexOfThisSubtask()], this.emittedCount);
            SavepointITCase.iterTestRestoreWait[getRuntimeContext().getIndexOfThisSubtask()].trigger();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/checkpointing/SavepointITCase$MiniClusterResourceFactory.class */
    public static class MiniClusterResourceFactory {
        private final int numTaskManagers;
        private final int numSlotsPerTaskManager;
        private final Configuration config;

        private MiniClusterResourceFactory(int i, int i2, Configuration configuration) {
            this.numTaskManagers = i;
            this.numSlotsPerTaskManager = i2;
            this.config = configuration;
        }

        MiniClusterWithClientResource get() {
            return new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(this.config).setNumberTaskManagers(this.numTaskManagers).setNumberSlotsPerTaskManager(this.numSlotsPerTaskManager).build());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/checkpointing/SavepointITCase$StatefulCounter.class */
    public static class StatefulCounter extends RichMapFunction<Integer, Integer> implements ListCheckpointed<byte[]> {
        private static volatile CountDownLatch progressLatch = new CountDownLatch(0);
        private static volatile CountDownLatch restoreLatch = new CountDownLatch(0);
        private int numCollectedElements;
        private static final long serialVersionUID = 7317800376639115920L;
        private byte[] data;

        private StatefulCounter() {
            this.numCollectedElements = 0;
        }

        public void open(Configuration configuration) throws Exception {
            if (this.data == null) {
                Random random = new Random(getRuntimeContext().getIndexOfThisSubtask());
                this.data = new byte[((Integer) CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue()).intValue() + SavepointITCase.ITER_TEST_PARALLELISM];
                random.nextBytes(this.data);
            }
        }

        public Integer map(Integer num) throws Exception {
            for (int i = 0; i < this.data.length; i += SavepointITCase.ITER_TEST_PARALLELISM) {
                byte[] bArr = this.data;
                int i2 = i;
                bArr[i2] = (byte) (bArr[i2] + SavepointITCase.ITER_TEST_PARALLELISM);
            }
            int i3 = this.numCollectedElements;
            this.numCollectedElements = i3 + SavepointITCase.ITER_TEST_PARALLELISM;
            if (i3 > 10) {
                progressLatch.countDown();
            }
            return num;
        }

        public List<byte[]> snapshotState(long j, long j2) throws Exception {
            return Collections.singletonList(this.data);
        }

        public void restoreState(List<byte[]> list) throws Exception {
            if (list.isEmpty() || list.size() > SavepointITCase.ITER_TEST_PARALLELISM) {
                throw new RuntimeException("Test failed due to unexpected recovered state size " + list.size());
            }
            this.data = list.get(0);
            restoreLatch.countDown();
        }

        static CountDownLatch getProgressLatch() {
            return progressLatch;
        }

        static CountDownLatch getRestoreLatch() {
            return restoreLatch;
        }

        static void resetForTest(int i) {
            progressLatch = new CountDownLatch(i);
            restoreLatch = new CountDownLatch(i);
        }
    }

    @Before
    public void setUp() throws Exception {
        File newFolder = this.folder.newFolder();
        this.checkpointDir = new File(newFolder, "checkpoints");
        this.savepointDir = new File(newFolder, "savepoints");
        if (this.checkpointDir.mkdir() && this.savepointDir.mkdirs()) {
            return;
        }
        Assert.fail("Test setup failed: failed to create temporary directories.");
    }

    @Test
    public void testTriggerSavepointAndResumeWithFileBasedCheckpoints() throws Exception {
        MiniClusterResourceFactory miniClusterResourceFactory = new MiniClusterResourceFactory(2, 2, getFileBasedCheckpointsConfig());
        String submitJobAndTakeSavepoint = submitJobAndTakeSavepoint(miniClusterResourceFactory, 4);
        verifySavepoint(4, submitJobAndTakeSavepoint);
        restoreJobAndVerifyState(submitJobAndTakeSavepoint, miniClusterResourceFactory, 4);
    }

    @Test
    public void testShouldAddEntropyToSavepointPath() throws Exception {
        MiniClusterResourceFactory miniClusterResourceFactory = new MiniClusterResourceFactory(2, 2, getCheckpointingWithEntropyConfig());
        String submitJobAndTakeSavepoint = submitJobAndTakeSavepoint(miniClusterResourceFactory, 4);
        Assert.assertThat(this.savepointDir, hasEntropyInFileStateHandlePaths());
        restoreJobAndVerifyState(submitJobAndTakeSavepoint, miniClusterResourceFactory, 4);
    }

    private Configuration getCheckpointingWithEntropyConfig() {
        Configuration fileBasedCheckpointsConfig = getFileBasedCheckpointsConfig("test-entropy://" + new File(this.savepointDir, "_entropy_").getPath());
        fileBasedCheckpointsConfig.setString("s3.entropy.key", "_entropy_");
        return fileBasedCheckpointsConfig;
    }

    private String submitJobAndTakeSavepoint(MiniClusterResourceFactory miniClusterResourceFactory, int i) throws Exception {
        JobGraph createJobGraph = createJobGraph(i, 0, 1000L);
        JobID jobID = createJobGraph.getJobID();
        StatefulCounter.resetForTest(i);
        MiniClusterWithClientResource miniClusterWithClientResource = miniClusterResourceFactory.get();
        miniClusterWithClientResource.before();
        ClusterClient clusterClient = miniClusterWithClientResource.getClusterClient();
        try {
            clusterClient.setDetached(true);
            clusterClient.submitJob(createJobGraph, SavepointITCase.class.getClassLoader());
            StatefulCounter.getProgressLatch().await();
            String cancelWithSavepoint = clusterClient.cancelWithSavepoint(jobID, (String) null);
            miniClusterWithClientResource.after();
            StatefulCounter.resetForTest(i);
            return cancelWithSavepoint;
        } catch (Throwable th) {
            miniClusterWithClientResource.after();
            StatefulCounter.resetForTest(i);
            throw th;
        }
    }

    private void verifySavepoint(int i, String str) throws URISyntaxException {
        File file = new File(new URI(str));
        Assert.assertTrue("Savepoint directory does not exist.", file.exists());
        Assert.assertTrue("Savepoint did not create self-contained directory.", file.isDirectory());
        File[] listFiles = file.listFiles();
        if (listFiles != null) {
            Assert.assertEquals("Did not write expected number of savepoint/checkpoint files to directory: " + Arrays.toString(listFiles), ITER_TEST_PARALLELISM + i, listFiles.length);
        } else {
            Assert.fail(String.format("Returned savepoint path (%s) is not valid.", str));
        }
    }

    private void restoreJobAndVerifyState(String str, MiniClusterResourceFactory miniClusterResourceFactory, int i) throws Exception {
        JobGraph createJobGraph = createJobGraph(i, 0, 1000L);
        createJobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(str));
        JobID jobID = createJobGraph.getJobID();
        StatefulCounter.resetForTest(i);
        MiniClusterWithClientResource miniClusterWithClientResource = miniClusterResourceFactory.get();
        miniClusterWithClientResource.before();
        ClusterClient clusterClient = miniClusterWithClientResource.getClusterClient();
        try {
            clusterClient.setDetached(true);
            clusterClient.submitJob(createJobGraph, SavepointITCase.class.getClassLoader());
            StatefulCounter.getRestoreLatch().await();
            StatefulCounter.getProgressLatch().await();
            clusterClient.cancel(jobID);
            FutureUtils.retrySuccessfulWithDelay(() -> {
                return clusterClient.getJobStatus(jobID);
            }, Time.milliseconds(50L), Deadline.now().plus(Duration.ofSeconds(30L)), jobStatus -> {
                return jobStatus == JobStatus.CANCELED;
            }, TestingUtils.defaultScheduledExecutor());
            clusterClient.disposeSavepoint(str).get();
            Assert.assertFalse("Savepoint not properly cleaned up.", new File(str).exists());
            miniClusterWithClientResource.after();
            StatefulCounter.resetForTest(i);
        } catch (Throwable th) {
            miniClusterWithClientResource.after();
            StatefulCounter.resetForTest(i);
            throw th;
        }
    }

    @Test
    public void testTriggerSavepointForNonExistingJob() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, this.savepointDir.toURI().toString());
        MiniClusterWithClientResource miniClusterWithClientResource = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(configuration).setNumberTaskManagers(ITER_TEST_PARALLELISM).setNumberSlotsPerTaskManager(ITER_TEST_PARALLELISM).build());
        miniClusterWithClientResource.before();
        ClusterClient clusterClient = miniClusterWithClientResource.getClusterClient();
        JobID jobID = new JobID();
        try {
            try {
                clusterClient.triggerSavepoint(jobID, (String) null).get();
                Assert.fail();
                miniClusterWithClientResource.after();
            } catch (ExecutionException e) {
                Assert.assertTrue(ExceptionUtils.findThrowable(e, FlinkJobNotFoundException.class).isPresent());
                Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(e, jobID.toString()).isPresent());
                miniClusterWithClientResource.after();
            }
        } catch (Throwable th) {
            miniClusterWithClientResource.after();
            throw th;
        }
    }

    @Test
    public void testTriggerSavepointWithCheckpointingDisabled() throws Exception {
        MiniClusterWithClientResource miniClusterWithClientResource = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(new Configuration()).setNumberTaskManagers(ITER_TEST_PARALLELISM).setNumberSlotsPerTaskManager(ITER_TEST_PARALLELISM).build());
        miniClusterWithClientResource.before();
        ClusterClient clusterClient = miniClusterWithClientResource.getClusterClient();
        JobVertex jobVertex = new JobVertex("Blocking vertex");
        jobVertex.setInvokableClass(BlockingNoOpInvokable.class);
        jobVertex.setParallelism(ITER_TEST_PARALLELISM);
        JobGraph jobGraph = new JobGraph(new JobVertex[]{jobVertex});
        try {
            try {
                clusterClient.setDetached(true);
                clusterClient.submitJob(jobGraph, SavepointITCase.class.getClassLoader());
                clusterClient.triggerSavepoint(jobGraph.getJobID(), (String) null).get();
                Assert.fail();
                miniClusterWithClientResource.after();
            } catch (ExecutionException e) {
                Assert.assertTrue(ExceptionUtils.findThrowable(e, IllegalStateException.class).isPresent());
                Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(e, jobGraph.getJobID().toString()).isPresent());
                Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(e, "is not a streaming job").isPresent());
                miniClusterWithClientResource.after();
            }
        } catch (Throwable th) {
            miniClusterWithClientResource.after();
            throw th;
        }
    }

    @Test
    public void testSubmitWithUnknownSavepointPath() throws Exception {
        int i = ITER_TEST_PARALLELISM * ITER_TEST_PARALLELISM;
        Configuration configuration = new Configuration();
        configuration.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, this.savepointDir.toURI().toString());
        MiniClusterWithClientResource miniClusterWithClientResource = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(configuration).setNumberTaskManagers(ITER_TEST_PARALLELISM).setNumberSlotsPerTaskManager(ITER_TEST_PARALLELISM).build());
        miniClusterWithClientResource.before();
        ClusterClient clusterClient = miniClusterWithClientResource.getClusterClient();
        try {
            JobGraph createJobGraph = createJobGraph(i, 1000, 3600000L);
            createJobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath("unknown path"));
            Assert.assertEquals("unknown path", createJobGraph.getSavepointRestoreSettings().getRestorePath());
            LOG.info("Submitting job " + createJobGraph.getJobID() + " in detached mode.");
            try {
                clusterClient.setDetached(false);
                clusterClient.submitJob(createJobGraph, SavepointITCase.class.getClassLoader());
            } catch (Exception e) {
                Optional findThrowable = ExceptionUtils.findThrowable(e, JobExecutionException.class);
                Optional findThrowable2 = ExceptionUtils.findThrowable(e, FileNotFoundException.class);
                if (!findThrowable.isPresent() || !findThrowable2.isPresent()) {
                    throw e;
                }
            }
        } finally {
            miniClusterWithClientResource.after();
        }
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void testCanRestoreWithModifiedStatelessOperators() throws Exception {
        Deadline plus = Deadline.now().plus(Duration.ofMinutes(5L));
        Configuration configuration = new Configuration();
        configuration.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, this.savepointDir.toURI().toString());
        LOG.info("Flink configuration: " + configuration + ".");
        MiniClusterWithClientResource miniClusterWithClientResource = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(configuration).setNumberTaskManagers(2).setNumberSlotsPerTaskManager(2).build());
        LOG.info("Shutting down Flink cluster.");
        miniClusterWithClientResource.before();
        ClusterClient clusterClient = miniClusterWithClientResource.getClusterClient();
        try {
            StatefulCounter statefulCounter = new StatefulCounter();
            StatefulCounter.resetForTest(2);
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setParallelism(2);
            executionEnvironment.addSource(new InfiniteTestSource()).shuffle().map(num -> {
                return Integer.valueOf(4 * num.intValue());
            }).shuffle().map(statefulCounter).uid("statefulCounter").shuffle().map(num2 -> {
                return Integer.valueOf(2 * num2.intValue());
            }).addSink(new DiscardingSink());
            JobGraph jobGraph = executionEnvironment.getStreamGraph().getJobGraph();
            clusterClient.setDetached(true);
            JobID jobID = clusterClient.submitJob(jobGraph, SavepointITCase.class.getClassLoader()).getJobID();
            StatefulCounter.getProgressLatch().await(plus.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
            String str = (String) clusterClient.triggerSavepoint(jobID, (String) null).get();
            LOG.info("Retrieved savepoint: " + str + ".");
            LOG.info("Shutting down Flink cluster.");
            miniClusterWithClientResource.after();
            MiniClusterWithClientResource miniClusterWithClientResource2 = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(configuration).setNumberTaskManagers(2).setNumberSlotsPerTaskManager(2).build());
            LOG.info("Restarting Flink cluster.");
            miniClusterWithClientResource2.before();
            ClusterClient clusterClient2 = miniClusterWithClientResource2.getClusterClient();
            try {
                StatefulCounter.resetForTest(2);
                StreamExecutionEnvironment executionEnvironment2 = StreamExecutionEnvironment.getExecutionEnvironment();
                executionEnvironment2.setParallelism(2);
                executionEnvironment2.addSource(new InfiniteTestSource()).shuffle().map(new StatefulCounter()).uid("statefulCounter").shuffle().map(num3 -> {
                    return num3;
                }).addSink(new DiscardingSink());
                JobGraph jobGraph2 = executionEnvironment2.getStreamGraph().getJobGraph();
                jobGraph2.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(str));
                LOG.info("Resubmitting job " + jobGraph2.getJobID() + " with savepoint path " + str + " in detached mode.");
                clusterClient2.setDetached(true);
                clusterClient2.submitJob(jobGraph2, SavepointITCase.class.getClassLoader());
                StatefulCounter.getRestoreLatch().await(plus.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
                StatefulCounter.getProgressLatch().await(plus.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
                miniClusterWithClientResource2.after();
            } catch (Throwable th) {
                miniClusterWithClientResource2.after();
                throw th;
            }
        } catch (Throwable th2) {
            LOG.info("Shutting down Flink cluster.");
            miniClusterWithClientResource.after();
            throw th2;
        }
    }

    private JobGraph createJobGraph(int i, int i2, long j) {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(i);
        executionEnvironment.disableOperatorChaining();
        executionEnvironment.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(i2, j));
        executionEnvironment.getConfig().disableSysoutLogging();
        executionEnvironment.addSource(new InfiniteTestSource()).shuffle().map(new StatefulCounter()).addSink(new DiscardingSink());
        return executionEnvironment.getStreamGraph().getJobGraph();
    }

    @Test
    public void testSavepointForJobWithIteration() throws Exception {
        for (int i = 0; i < ITER_TEST_PARALLELISM; i += ITER_TEST_PARALLELISM) {
            iterTestSnapshotWait[i] = new OneShotLatch();
            iterTestRestoreWait[i] = new OneShotLatch();
            iterTestCheckpointVerify[i] = 0;
        }
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        IterativeStream iterate = executionEnvironment.addSource(new IntegerStreamSource()).flatMap(new RichFlatMapFunction<Integer, Integer>() { // from class: org.apache.flink.test.checkpointing.SavepointITCase.2
            private static final long serialVersionUID = 1;

            public void flatMap(Integer num, Collector<Integer> collector) throws Exception {
                collector.collect(num);
            }

            public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
                flatMap((Integer) obj, (Collector<Integer>) collector);
            }
        }).setParallelism(ITER_TEST_PARALLELISM).keyBy(new KeySelector<Integer, Object>() { // from class: org.apache.flink.test.checkpointing.SavepointITCase.1
            private static final long serialVersionUID = 1;

            public Object getKey(Integer num) throws Exception {
                return num;
            }
        }).flatMap(new DuplicateFilter()).setParallelism(ITER_TEST_PARALLELISM).iterate();
        iterate.closeWith(iterate.map(new MapFunction<Integer, Integer>() { // from class: org.apache.flink.test.checkpointing.SavepointITCase.3
            private static final long serialVersionUID = 1;

            public Integer map(Integer num) throws Exception {
                return num;
            }
        }).setParallelism(ITER_TEST_PARALLELISM));
        StreamGraph streamGraph = executionEnvironment.getStreamGraph();
        streamGraph.setJobName("Test");
        JobGraph jobGraph = streamGraph.getJobGraph();
        Configuration fileBasedCheckpointsConfig = getFileBasedCheckpointsConfig();
        fileBasedCheckpointsConfig.addAll(jobGraph.getJobConfiguration());
        fileBasedCheckpointsConfig.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0");
        MiniClusterWithClientResource miniClusterWithClientResource = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(fileBasedCheckpointsConfig).setNumberTaskManagers(ITER_TEST_PARALLELISM).setNumberSlotsPerTaskManager(2 * jobGraph.getMaximumParallelism()).build());
        miniClusterWithClientResource.before();
        ClusterClient clusterClient = miniClusterWithClientResource.getClusterClient();
        String str = null;
        try {
            clusterClient.setDetached(true);
            clusterClient.submitJob(jobGraph, SavepointITCase.class.getClassLoader());
            OneShotLatch[] oneShotLatchArr = iterTestSnapshotWait;
            int length = oneShotLatchArr.length;
            for (int i2 = 0; i2 < length; i2 += ITER_TEST_PARALLELISM) {
                oneShotLatchArr[i2].await();
            }
            str = (String) clusterClient.triggerSavepoint(jobGraph.getJobID(), (String) null).get();
            clusterClient.cancel(jobGraph.getJobID());
            while (!((JobStatus) clusterClient.getJobStatus(jobGraph.getJobID()).get()).isGloballyTerminalState()) {
                Thread.sleep(100L);
            }
            JobGraph jobGraph2 = streamGraph.getJobGraph();
            jobGraph2.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(str));
            clusterClient.setDetached(true);
            clusterClient.submitJob(jobGraph2, SavepointITCase.class.getClassLoader());
            OneShotLatch[] oneShotLatchArr2 = iterTestRestoreWait;
            int length2 = oneShotLatchArr2.length;
            for (int i3 = 0; i3 < length2; i3 += ITER_TEST_PARALLELISM) {
                oneShotLatchArr2[i3].await();
            }
            clusterClient.cancel(jobGraph2.getJobID());
            while (!((JobStatus) clusterClient.getJobStatus(jobGraph2.getJobID()).get()).isGloballyTerminalState()) {
                Thread.sleep(100L);
            }
            if (null != str) {
                clusterClient.disposeSavepoint(str);
            }
            miniClusterWithClientResource.after();
        } catch (Throwable th) {
            if (null != str) {
                clusterClient.disposeSavepoint(str);
            }
            miniClusterWithClientResource.after();
            throw th;
        }
    }

    private Configuration getFileBasedCheckpointsConfig(String str) {
        Configuration configuration = new Configuration();
        configuration.setString(CheckpointingOptions.STATE_BACKEND, "filesystem");
        configuration.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, this.checkpointDir.toURI().toString());
        configuration.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, 0);
        configuration.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, str);
        return configuration;
    }

    private Configuration getFileBasedCheckpointsConfig() {
        return getFileBasedCheckpointsConfig(this.savepointDir.toURI().toString());
    }

    private static Matcher<File> hasEntropyInFileStateHandlePaths() {
        return new TypeSafeDiagnosingMatcher<File>() { // from class: org.apache.flink.test.checkpointing.SavepointITCase.4
            /* JADX INFO: Access modifiers changed from: protected */
            public boolean matchesSafely(File file, Description description) {
                if (file == null) {
                    description.appendText("savepoint dir must not be null");
                    return false;
                }
                List listRecursively = SavepointITCase.listRecursively(file.toPath().resolve("_entropy_"));
                Path resolve = file.toPath().resolve("_resolved_");
                List listRecursively2 = SavepointITCase.listRecursively(resolve);
                if (!listRecursively.isEmpty()) {
                    description.appendText("there are savepoint files with unresolved entropy placeholders");
                    return false;
                }
                if (Files.exists(resolve, new LinkOption[0]) && !listRecursively2.isEmpty()) {
                    return true;
                }
                description.appendText("there are no savepoint files with added entropy");
                return false;
            }

            public void describeTo(Description description) {
                description.appendText("all savepoint files should have added entropy");
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static List<Path> listRecursively(Path path) {
        try {
            if (!Files.exists(path, new LinkOption[0])) {
                return Collections.emptyList();
            }
            Stream<Path> walk = Files.walk(path, FileVisitOption.FOLLOW_LINKS);
            Throwable th = null;
            try {
                List<Path> list = (List) walk.filter(path2 -> {
                    return Files.isRegularFile(path2, new LinkOption[0]);
                }).collect(Collectors.toList());
                if (walk != null) {
                    if (0 != 0) {
                        try {
                            walk.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        walk.close();
                    }
                }
                return list;
            } finally {
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1019551637:
                if (implMethodName.equals("lambda$testCanRestoreWithModifiedStatelessOperators$e0defa2f$1")) {
                    z = false;
                    break;
                }
                break;
            case -1019551636:
                if (implMethodName.equals("lambda$testCanRestoreWithModifiedStatelessOperators$e0defa2f$2")) {
                    z = ITER_TEST_PARALLELISM;
                    break;
                }
                break;
            case -1019551635:
                if (implMethodName.equals("lambda$testCanRestoreWithModifiedStatelessOperators$e0defa2f$3")) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/checkpointing/SavepointITCase") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num -> {
                        return Integer.valueOf(4 * num.intValue());
                    };
                }
                break;
            case ITER_TEST_PARALLELISM /* 1 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/checkpointing/SavepointITCase") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num2 -> {
                        return Integer.valueOf(2 * num2.intValue());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/checkpointing/SavepointITCase") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num3 -> {
                        return num3;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
