package org.apache.flink.runtime.jobmaster;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.program.MiniClusterClient;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.streaming.runtime.tasks.StreamTaskTest;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.SerializedValue;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.class */
public class JobMasterStopWithSavepointIT extends AbstractTestBase {

    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();
    private static final long CHECKPOINT_INTERVAL = 10;
    private static final int PARALLELISM = 2;
    private static OneShotLatch finishingLatch;
    private static CountDownLatch invokeLatch;
    private static CountDownLatch numberOfRestarts;
    private static AtomicLong syncSavepointId = new AtomicLong();
    private static volatile CountDownLatch checkpointsToWaitFor;
    private Path savepointDirectory;
    private MiniClusterClient clusterClient;
    private JobGraph jobGraph;

    /* loaded from: input_file:org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT$CheckpointCountingTask.class */
    public static class CheckpointCountingTask extends StreamTaskTest.NoOpStreamTask {
        private final transient OneShotLatch finishLatch;

        public CheckpointCountingTask(Environment environment) throws Exception {
            super(environment);
            this.finishLatch = new OneShotLatch();
        }

        protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
            JobMasterStopWithSavepointIT.invokeLatch.countDown();
            this.finishLatch.await();
            controller.allActionsCompleted();
        }

        protected void cancelTask() throws Exception {
            super.cancelTask();
            this.finishLatch.trigger();
        }

        public Future<Boolean> triggerCheckpointAsync(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
            if (getEnvironment().getTaskInfo().getIndexOfThisSubtask() == 0) {
                JobMasterStopWithSavepointIT.checkpointsToWaitFor.countDown();
            }
            return CompletableFuture.completedFuture(true);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT$ExceptionOnCallbackStreamTask.class */
    public static class ExceptionOnCallbackStreamTask extends CheckpointCountingTask {
        private long synchronousSavepointId;

        public ExceptionOnCallbackStreamTask(Environment environment) throws Exception {
            super(environment);
            this.synchronousSavepointId = Long.MIN_VALUE;
        }

        @Override // org.apache.flink.runtime.jobmaster.JobMasterStopWithSavepointIT.CheckpointCountingTask
        protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
            if (getEnvironment().getTaskInfo().getIndexOfThisSubtask() == 0) {
                JobMasterStopWithSavepointIT.numberOfRestarts.countDown();
            }
            super.processInput(controller);
        }

        @Override // org.apache.flink.runtime.jobmaster.JobMasterStopWithSavepointIT.CheckpointCountingTask
        public Future<Boolean> triggerCheckpointAsync(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
            long checkpointId = checkpointMetaData.getCheckpointId();
            if (checkpointOptions.getCheckpointType() == CheckpointType.SAVEPOINT_SUSPEND) {
                this.synchronousSavepointId = checkpointId;
                JobMasterStopWithSavepointIT.syncSavepointId.compareAndSet(-1L, this.synchronousSavepointId);
            }
            return super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
        }

        public Future<Void> notifyCheckpointCompleteAsync(long j) {
            long indexOfThisSubtask = getEnvironment().getTaskInfo().getIndexOfThisSubtask();
            if (j == this.synchronousSavepointId && indexOfThisSubtask == 0) {
                throw new RuntimeException("Expected Exception");
            }
            return super.notifyCheckpointCompleteAsync(j);
        }

        public Future<Void> notifyCheckpointAbortAsync(long j) {
            return CompletableFuture.completedFuture(null);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT$NoOpBlockingStreamTask.class */
    public static class NoOpBlockingStreamTask extends StreamTaskTest.NoOpStreamTask {
        private final transient OneShotLatch finishLatch;

        public NoOpBlockingStreamTask(Environment environment) throws Exception {
            super(environment);
            this.finishLatch = new OneShotLatch();
        }

        protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
            JobMasterStopWithSavepointIT.invokeLatch.countDown();
            this.finishLatch.await();
            controller.allActionsCompleted();
        }

        public void finishTask() throws Exception {
            JobMasterStopWithSavepointIT.finishingLatch.await();
            this.finishLatch.trigger();
        }
    }

    @Test(timeout = 5000)
    public void suspendWithSavepointWithoutComplicationsShouldSucceedAndLeadJobToFinished() throws Exception {
        stopWithSavepointNormalExecutionHelper(false);
    }

    @Test(timeout = 5000)
    public void terminateWithSavepointWithoutComplicationsShouldSucceedAndLeadJobToFinished() throws Exception {
        stopWithSavepointNormalExecutionHelper(true);
    }

    private void stopWithSavepointNormalExecutionHelper(boolean z) throws Exception {
        setUpJobGraph(NoOpBlockingStreamTask.class, RestartStrategies.noRestart());
        CompletableFuture<String> stopWithSavepoint = stopWithSavepoint(z);
        MatcherAssert.assertThat(getJobStatus(), Matchers.equalTo(JobStatus.RUNNING));
        finishingLatch.trigger();
        String str = stopWithSavepoint.get();
        MatcherAssert.assertThat(getJobStatus(), Matchers.equalTo(JobStatus.FINISHED));
        Stream<Path> list = Files.list(this.savepointDirectory);
        Throwable th = null;
        try {
            try {
                List list2 = (List) list.map((v0) -> {
                    return v0.getFileName();
                }).collect(Collectors.toList());
                if (list != null) {
                    if (0 != 0) {
                        try {
                            list.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        list.close();
                    }
                }
                MatcherAssert.assertThat(list2, Matchers.hasItem(Paths.get(str, new String[0]).getFileName()));
            } finally {
            }
        } catch (Throwable th3) {
            if (list != null) {
                if (th != null) {
                    try {
                        list.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    list.close();
                }
            }
            throw th3;
        }
    }

    @Test(timeout = 5000)
    public void throwingExceptionOnCallbackWithNoRestartsShouldFailTheSuspend() throws Exception {
        throwingExceptionOnCallbackWithoutRestartsHelper(false);
    }

    @Test(timeout = 5000)
    public void throwingExceptionOnCallbackWithNoRestartsShouldFailTheTerminate() throws Exception {
        throwingExceptionOnCallbackWithoutRestartsHelper(true);
    }

    private void throwingExceptionOnCallbackWithoutRestartsHelper(boolean z) throws Exception {
        setUpJobGraph(ExceptionOnCallbackStreamTask.class, RestartStrategies.noRestart());
        MatcherAssert.assertThat(getJobStatus(), Matchers.equalTo(JobStatus.RUNNING));
        try {
            stopWithSavepoint(z).get();
            Assert.fail();
        } catch (Exception e) {
        }
        Assert.assertTrue(syncSavepointId.get() > 0);
        MatcherAssert.assertThat(getJobStatus(), Matchers.equalTo(JobStatus.FAILED));
    }

    @Test(timeout = 5000)
    public void throwingExceptionOnCallbackWithRestartsShouldSimplyRestartInSuspend() throws Exception {
        throwingExceptionOnCallbackWithRestartsHelper(false);
    }

    @Test(timeout = 5000)
    public void throwingExceptionOnCallbackWithRestartsShouldSimplyRestartInTerminate() throws Exception {
        throwingExceptionOnCallbackWithRestartsHelper(true);
    }

    private void throwingExceptionOnCallbackWithRestartsHelper(boolean z) throws Exception {
        Deadline fromNow = Deadline.fromNow(Duration.ofSeconds(CHECKPOINT_INTERVAL));
        numberOfRestarts = new CountDownLatch(PARALLELISM);
        checkpointsToWaitFor = new CountDownLatch(10);
        setUpJobGraph(ExceptionOnCallbackStreamTask.class, RestartStrategies.fixedDelayRestart(15, Time.milliseconds(CHECKPOINT_INTERVAL)));
        MatcherAssert.assertThat(getJobStatus(), Matchers.equalTo(JobStatus.RUNNING));
        try {
            stopWithSavepoint(z).get(50L, TimeUnit.MILLISECONDS);
            Assert.fail();
        } catch (Exception e) {
        }
        Assert.assertTrue(numberOfRestarts.await(fromNow.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
        Assert.assertTrue(checkpointsToWaitFor.await(fromNow.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
        Assert.assertTrue(syncSavepointId.get() > 0);
        MatcherAssert.assertThat(getJobStatus(), Matchers.equalTo(JobStatus.RUNNING));
        long j = syncSavepointId.get();
        Assert.assertTrue(j > 0 && j < CHECKPOINT_INTERVAL);
        this.clusterClient.cancel(this.jobGraph.getJobID()).get();
        MatcherAssert.assertThat(getJobStatus(), Matchers.either(Matchers.equalTo(JobStatus.CANCELLING)).or(Matchers.equalTo(JobStatus.CANCELED)));
    }

    @Test
    public void testRestartCheckpointCoordinatorIfStopWithSavepointFails() throws Exception {
        setUpJobGraph(CheckpointCountingTask.class, RestartStrategies.noRestart());
        try {
            Files.setPosixFilePermissions(this.savepointDirectory, Collections.emptySet());
        } catch (IOException e) {
            Assume.assumeNoException(e);
        }
        try {
            stopWithSavepoint(true).get();
            Assert.fail();
        } catch (Exception e2) {
            Optional findThrowable = ExceptionUtils.findThrowable(e2, CheckpointException.class);
            if (!findThrowable.isPresent()) {
                throw e2;
            }
            String message = ((CheckpointException) findThrowable.get()).getMessage();
            Assert.assertTrue("Stop with savepoint failed because of another cause " + message, message.contains("Failed to trigger savepoint") && message.contains(CheckpointFailureReason.EXCEPTION.message()));
        }
        MatcherAssert.assertThat((JobStatus) this.clusterClient.getJobStatus(this.jobGraph.getJobID()).get(60L, TimeUnit.SECONDS), Matchers.equalTo(JobStatus.RUNNING));
        checkpointsToWaitFor = new CountDownLatch(1);
        Assert.assertTrue(checkpointsToWaitFor.await(60L, TimeUnit.SECONDS));
    }

    private CompletableFuture<String> stopWithSavepoint(boolean z) {
        return miniClusterResource.getMiniCluster().stopWithSavepoint(this.jobGraph.getJobID(), this.savepointDirectory.toAbsolutePath().toString(), z);
    }

    private JobStatus getJobStatus() throws InterruptedException, ExecutionException {
        return (JobStatus) this.clusterClient.getJobStatus(this.jobGraph.getJobID()).get();
    }

    private void setUpJobGraph(Class<? extends AbstractInvokable> cls, RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration) throws Exception {
        finishingLatch = new OneShotLatch();
        invokeLatch = new CountDownLatch(PARALLELISM);
        numberOfRestarts = new CountDownLatch(PARALLELISM);
        checkpointsToWaitFor = new CountDownLatch(10);
        syncSavepointId.set(-1L);
        this.savepointDirectory = this.temporaryFolder.newFolder().toPath();
        Assume.assumeTrue("ClusterClient is not an instance of MiniClusterClient", miniClusterResource.getClusterClient() instanceof MiniClusterClient);
        this.clusterClient = miniClusterResource.getClusterClient();
        this.jobGraph = new JobGraph(new JobVertex[0]);
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.setRestartStrategy(restartStrategyConfiguration);
        this.jobGraph.setExecutionConfig(executionConfig);
        JobVertex jobVertex = new JobVertex("testVertex");
        jobVertex.setInvokableClass(cls);
        jobVertex.setParallelism(PARALLELISM);
        this.jobGraph.addVertex(jobVertex);
        this.jobGraph.setSnapshotSettings(new JobCheckpointingSettings(Collections.singletonList(jobVertex.getID()), Collections.singletonList(jobVertex.getID()), Collections.singletonList(jobVertex.getID()), new CheckpointCoordinatorConfiguration(CHECKPOINT_INTERVAL, 60000L, CHECKPOINT_INTERVAL, 1, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, true, false, false, 0), (SerializedValue) null));
        ClientUtils.submitJob(this.clusterClient, this.jobGraph);
        Assert.assertTrue(invokeLatch.await(60L, TimeUnit.SECONDS));
        waitForJob();
    }

    private void waitForJob() throws Exception {
        JobStatus jobStatus;
        for (int i = 0; i < 60; i++) {
            try {
                jobStatus = (JobStatus) this.clusterClient.getJobStatus(this.jobGraph.getJobID()).get(60L, TimeUnit.SECONDS);
                MatcherAssert.assertThat(Boolean.valueOf(jobStatus.isGloballyTerminalState()), Matchers.equalTo(false));
            } catch (ExecutionException e) {
            }
            if (jobStatus == JobStatus.RUNNING) {
                return;
            }
            Thread.sleep(1000L);
        }
        throw new AssertionError("Job did not become running within timeout.");
    }
}
