package org.apache.flink.runtime.jobmanager;

import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.blob.BlobClient;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.FailingBlockingInvokable;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import scala.concurrent.Await;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.class */
public class JobManagerCleanupITCase extends TestLogger {

    @Rule
    public TemporaryFolder tmpFolder = new TemporaryFolder();
    private static ActorSystem system;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase$TestCase.class */
    public enum TestCase {
        JOB_FINISHES_SUCESSFULLY,
        JOB_IS_CANCELLED,
        JOB_FAILS,
        JOB_SUBMISSION_FAILS
    }

    @BeforeClass
    public static void setup() {
        system = AkkaUtils.createLocalActorSystem(new Configuration());
    }

    @AfterClass
    public static void teardown() {
        JavaTestKit.shutdownActorSystem(system);
    }

    @Test
    public void testBlobServerCleanupFinishedJob() throws IOException {
        testBlobServerCleanup(TestCase.JOB_FINISHES_SUCESSFULLY);
    }

    @Test
    public void testBlobServerCleanupCancelledJob() throws IOException {
        testBlobServerCleanup(TestCase.JOB_IS_CANCELLED);
    }

    @Test
    public void testBlobServerCleanupFailedJob() throws IOException {
        testBlobServerCleanup(TestCase.JOB_FAILS);
    }

    @Test
    public void testBlobServerCleanupFailedSubmission() throws IOException {
        testBlobServerCleanup(TestCase.JOB_SUBMISSION_FAILS);
    }

    private void testBlobServerCleanup(final TestCase testCase) throws IOException {
        final File newFolder = this.tmpFolder.newFolder();
        new JavaTestKit(system) { // from class: org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase.1
            {
                new JavaTestKit.Within(duration("30 seconds")) { // from class: org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase.1.1
                    protected void run() {
                        TestingCluster testingCluster = null;
                        File file = null;
                        try {
                            try {
                                Configuration configuration = new Configuration();
                                configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
                                configuration.setInteger("local.number-taskmanager", 1);
                                configuration.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
                                configuration.setString(BlobServerOptions.STORAGE_DIRECTORY, newFolder.getAbsolutePath());
                                configuration.setString("restart-strategy", "fixeddelay");
                                configuration.setInteger("restart-strategy.fixed-delay.attempts", 1);
                                configuration.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "1 s");
                                configuration.setLong(BlobServerOptions.CLEANUP_INTERVAL, 1L);
                                testingCluster = new TestingCluster(configuration);
                                testingCluster.start();
                                ActorGateway leaderGateway = testingCluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
                                AkkaActorGateway akkaActorGateway = new AkkaActorGateway(getTestActor(), HighAvailabilityServices.DEFAULT_LEADER_ID);
                                JobVertex jobVertex = new JobVertex("Source");
                                if (testCase == TestCase.JOB_FAILS || testCase == TestCase.JOB_IS_CANCELLED) {
                                    jobVertex.setInvokableClass(FailingBlockingInvokable.class);
                                } else {
                                    jobVertex.setInvokableClass(NoOpInvokable.class);
                                }
                                jobVertex.setParallelism(2);
                                JobGraph jobGraph = new JobGraph("BlobCleanupTest", new JobVertex[]{jobVertex});
                                JobID jobID = jobGraph.getJobID();
                                int intValue = ((Integer) Await.result(leaderGateway.ask(JobManagerMessages.getRequestBlobManagerPort(), remaining()), remaining())).intValue();
                                file = File.createTempFile("Required", ".jar");
                                List uploadFiles = BlobClient.uploadFiles(new InetSocketAddress("localhost", intValue), configuration, jobID, Collections.singletonList(new Path(file.getAbsolutePath())));
                                Assert.assertEquals(1L, uploadFiles.size());
                                jobGraph.addUserJarBlobKey((PermanentBlobKey) uploadFiles.get(0));
                                if (testCase == TestCase.JOB_SUBMISSION_FAILS) {
                                    jobGraph.addUserJarBlobKey(new PermanentBlobKey());
                                }
                                leaderGateway.tell(new JobManagerMessages.SubmitJob(jobGraph, testCase == TestCase.JOB_IS_CANCELLED ? ListeningBehaviour.DETACHED : ListeningBehaviour.EXECUTION_RESULT), akkaActorGateway);
                                if (testCase == TestCase.JOB_SUBMISSION_FAILS) {
                                    expectMsgClass(JobManagerMessages.JobResultFailure.class);
                                } else {
                                    expectMsgEquals(new JobManagerMessages.JobSubmitSuccess(jobID));
                                    if (testCase == TestCase.JOB_FAILS) {
                                        FailingBlockingInvokable.unblock();
                                        expectMsgClass(JobManagerMessages.JobResultFailure.class);
                                    } else if (testCase == TestCase.JOB_IS_CANCELLED) {
                                        leaderGateway.tell(new JobManagerMessages.CancelJob(jobID), akkaActorGateway);
                                        expectMsgEquals(new JobManagerMessages.CancellationSuccess(jobID, (String) null));
                                    } else {
                                        expectMsgClass(JobManagerMessages.JobResultSuccess.class);
                                    }
                                }
                                File[] listFiles = newFolder.listFiles(new FilenameFilter() { // from class: org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase.1.1.1
                                    @Override // java.io.FilenameFilter
                                    public boolean accept(File file2, String str) {
                                        return str.startsWith("blobStore-");
                                    }
                                });
                                Assert.assertNotNull(listFiles);
                                for (File file2 : listFiles) {
                                    JobManagerCleanupITCase.waitForEmptyBlobDir(file2, remaining());
                                }
                                if (testingCluster != null) {
                                    testingCluster.stop();
                                }
                                if (file != null) {
                                    Assert.assertTrue(file.delete());
                                }
                            } catch (Exception e) {
                                e.printStackTrace();
                                Assert.fail(e.getMessage());
                                if (testingCluster != null) {
                                    testingCluster.stop();
                                }
                                if (file != null) {
                                    Assert.assertTrue(file.delete());
                                }
                            }
                        } catch (Throwable th) {
                            if (testingCluster != null) {
                                testingCluster.stop();
                            }
                            if (file != null) {
                                Assert.assertTrue(file.delete());
                            }
                            throw th;
                        }
                    }
                };
            }
        };
        Assert.assertArrayEquals(new File[0], newFolder.listFiles());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void waitForEmptyBlobDir(File file, FiniteDuration finiteDuration) throws InterruptedException {
        String[] list;
        long currentTimeMillis = System.currentTimeMillis() + finiteDuration.toMillis();
        do {
            list = file.list(new FilenameFilter() { // from class: org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase.2
                @Override // java.io.FilenameFilter
                public boolean accept(File file2, String str) {
                    return str.startsWith("job_");
                }
            });
            if (list == null || list.length == 0) {
                return;
            } else {
                Thread.sleep(100L);
            }
        } while (System.currentTimeMillis() < currentTimeMillis);
        Assert.fail("Timeout while waiting for " + file.getAbsolutePath() + " to become empty. Current contents: " + Arrays.toString(list));
    }
}
