package org.apache.flink.runtime.jobmaster;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.client.program.MiniClusterClient;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.SerializedValue;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.class */
public class JobMasterTriggerSavepointITCase extends AbstractTestBase {
    private static CountDownLatch invokeLatch;
    private static volatile CountDownLatch triggerCheckpointLatch;

    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();
    private Path savepointDirectory;
    private MiniClusterClient clusterClient;
    private JobGraph jobGraph;

    /* loaded from: input_file:org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase$NoOpBlockingInvokable.class */
    public static class NoOpBlockingInvokable extends AbstractInvokable {
        public NoOpBlockingInvokable(Environment environment) {
            super(environment);
        }

        public void invoke() {
            JobMasterTriggerSavepointITCase.invokeLatch.countDown();
            try {
                Thread.sleep(Long.MAX_VALUE);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }

        public CompletableFuture<Boolean> triggerCheckpointAsync(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
            TaskStateSnapshot taskStateSnapshot = new TaskStateSnapshot();
            taskStateSnapshot.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID(getEnvironment().getJobVertexId()), OperatorSubtaskState.builder().build());
            getEnvironment().acknowledgeCheckpoint(checkpointMetaData.getCheckpointId(), new CheckpointMetrics(), taskStateSnapshot);
            JobMasterTriggerSavepointITCase.triggerCheckpointLatch.countDown();
            return CompletableFuture.completedFuture(true);
        }

        public Future<Void> notifyCheckpointCompleteAsync(long j) {
            return CompletableFuture.completedFuture(null);
        }

        public Future<Void> notifyCheckpointAbortAsync(long j, long j2) {
            return CompletableFuture.completedFuture(null);
        }
    }

    private void setUpWithCheckpointInterval(long j) throws Exception {
        invokeLatch = new CountDownLatch(1);
        triggerCheckpointLatch = new CountDownLatch(1);
        this.savepointDirectory = this.temporaryFolder.newFolder().toPath();
        Assume.assumeTrue("ClusterClient is not an instance of MiniClusterClient", miniClusterResource.getClusterClient() instanceof MiniClusterClient);
        this.clusterClient = miniClusterResource.getClusterClient();
        JobVertex jobVertex = new JobVertex("testVertex");
        jobVertex.setInvokableClass(NoOpBlockingInvokable.class);
        jobVertex.setParallelism(1);
        this.jobGraph = JobGraphBuilder.newStreamingJobGraphBuilder().addJobVertex(jobVertex).setJobCheckpointingSettings(new JobCheckpointingSettings(new CheckpointCoordinatorConfiguration(j, 60000L, 10L, 1, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, true, false, 0, 0L), (SerializedValue) null)).build();
        this.clusterClient.submitJob(this.jobGraph).get();
        Assert.assertTrue(invokeLatch.await(60L, TimeUnit.SECONDS));
        waitForJob();
    }

    @Test
    public void testStopJobAfterSavepoint() throws Exception {
        setUpWithCheckpointInterval(10L);
        String cancelWithSavepoint = cancelWithSavepoint();
        MatcherAssert.assertThat((JobStatus) this.clusterClient.getJobStatus(this.jobGraph.getJobID()).get(), Matchers.isOneOf(new JobStatus[]{JobStatus.CANCELED, JobStatus.CANCELLING}));
        Stream<Path> list = Files.list(this.savepointDirectory);
        Throwable th = null;
        try {
            try {
                List list2 = (List) list.map((v0) -> {
                    return v0.getFileName();
                }).collect(Collectors.toList());
                if (list != null) {
                    if (0 != 0) {
                        try {
                            list.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        list.close();
                    }
                }
                MatcherAssert.assertThat(list2, Matchers.hasItem(Paths.get(cancelWithSavepoint, new String[0]).getFileName()));
            } finally {
            }
        } catch (Throwable th3) {
            if (list != null) {
                if (th != null) {
                    try {
                        list.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    list.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testStopJobAfterSavepointWithDeactivatedPeriodicCheckpointing() throws Exception {
        setUpWithCheckpointInterval(Long.MAX_VALUE);
        String cancelWithSavepoint = cancelWithSavepoint();
        MatcherAssert.assertThat((JobStatus) this.clusterClient.getJobStatus(this.jobGraph.getJobID()).get(60L, TimeUnit.SECONDS), Matchers.isOneOf(new JobStatus[]{JobStatus.CANCELED, JobStatus.CANCELLING}));
        Stream<Path> list = Files.list(this.savepointDirectory);
        Throwable th = null;
        try {
            try {
                List list2 = (List) list.map((v0) -> {
                    return v0.getFileName();
                }).collect(Collectors.toList());
                if (list != null) {
                    if (0 != 0) {
                        try {
                            list.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        list.close();
                    }
                }
                MatcherAssert.assertThat(list2, Matchers.hasItem(Paths.get(cancelWithSavepoint, new String[0]).getFileName()));
            } finally {
            }
        } catch (Throwable th3) {
            if (list != null) {
                if (th != null) {
                    try {
                        list.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    list.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testDoNotCancelJobIfSavepointFails() throws Exception {
        setUpWithCheckpointInterval(10L);
        try {
            Files.setPosixFilePermissions(this.savepointDirectory, Collections.emptySet());
        } catch (IOException e) {
            Assume.assumeNoException(e);
        }
        try {
            cancelWithSavepoint();
        } catch (Exception e2) {
            MatcherAssert.assertThat(Boolean.valueOf(ExceptionUtils.findThrowable(e2, CheckpointException.class).isPresent()), Matchers.equalTo(true));
        }
        MatcherAssert.assertThat((JobStatus) this.clusterClient.getJobStatus(this.jobGraph.getJobID()).get(60L, TimeUnit.SECONDS), Matchers.equalTo(JobStatus.RUNNING));
        triggerCheckpointLatch = new CountDownLatch(1);
        MatcherAssert.assertThat(Boolean.valueOf(triggerCheckpointLatch.await(60L, TimeUnit.SECONDS)), Matchers.equalTo(true));
    }

    @Test
    public void testCancelWithSavepointWithoutConfiguredSavepointDirectory() throws Exception {
        setUpWithCheckpointInterval(10L);
        try {
            this.clusterClient.cancelWithSavepoint(this.jobGraph.getJobID(), (String) null).get();
        } catch (Exception e) {
            if (!ExceptionUtils.findThrowableWithMessage(e, "savepoint directory").isPresent()) {
                throw e;
            }
        }
    }

    private void waitForJob() throws Exception {
        JobStatus jobStatus;
        for (int i = 0; i < 60; i++) {
            try {
                jobStatus = (JobStatus) this.clusterClient.getJobStatus(this.jobGraph.getJobID()).get(60L, TimeUnit.SECONDS);
                MatcherAssert.assertThat(Boolean.valueOf(jobStatus.isGloballyTerminalState()), Matchers.equalTo(false));
            } catch (ExecutionException e) {
            }
            if (jobStatus == JobStatus.RUNNING) {
                return;
            }
            Thread.sleep(1000L);
        }
        throw new AssertionError("Job did not become running within timeout.");
    }

    private String cancelWithSavepoint() throws Exception {
        return (String) this.clusterClient.cancelWithSavepoint(this.jobGraph.getJobID(), this.savepointDirectory.toAbsolutePath().toString()).get();
    }
}
