package org.apache.flink.client.program;

import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.testutils.WaitingCancelableInvokable;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/client/program/PerJobMiniClusterFactoryTest.class */
public class PerJobMiniClusterFactoryTest extends TestLogger {
    private MiniCluster miniCluster;

    @After
    public void teardown() throws Exception {
        if (this.miniCluster != null) {
            this.miniCluster.close();
        }
    }

    @Test
    public void testJobExecution() throws Exception {
        JobClient jobClient = (JobClient) initializeMiniCluster().submitJob(getNoopJobGraph(), ClassLoader.getSystemClassLoader()).get();
        MatcherAssert.assertThat((JobExecutionResult) jobClient.getJobExecutionResult().get(), CoreMatchers.is(CoreMatchers.notNullValue()));
        MatcherAssert.assertThat((Map) jobClient.getAccumulators().get(), CoreMatchers.is(CoreMatchers.notNullValue()));
        assertThatMiniClusterIsShutdown();
    }

    @Test
    public void testJobClient() throws Exception {
        PerJobMiniClusterFactory initializeMiniCluster = initializeMiniCluster();
        JobGraph cancellableJobGraph = getCancellableJobGraph();
        JobClient jobClient = (JobClient) initializeMiniCluster.submitJob(cancellableJobGraph, ClassLoader.getSystemClassLoader()).get();
        MatcherAssert.assertThat(jobClient.getJobID(), CoreMatchers.is(cancellableJobGraph.getJobID()));
        MatcherAssert.assertThat(jobClient.getJobStatus().get(), CoreMatchers.anyOf(CoreMatchers.is(JobStatus.CREATED), CoreMatchers.is(JobStatus.RUNNING)));
        jobClient.cancel().get();
        CommonTestUtils.assertThrows("Job was cancelled.", ExecutionException.class, () -> {
            return (JobExecutionResult) jobClient.getJobExecutionResult().get();
        });
        assertThatMiniClusterIsShutdown();
    }

    @Test
    public void testJobClientSavepoint() throws Exception {
        JobClient jobClient = (JobClient) initializeMiniCluster().submitJob(getCancellableJobGraph(), ClassLoader.getSystemClassLoader()).get();
        while (jobClient.getJobStatus().get() != JobStatus.RUNNING) {
            Thread.sleep(50L);
        }
        CommonTestUtils.assertThrows("is not a streaming job.", ExecutionException.class, () -> {
            return (String) jobClient.triggerSavepoint((String) null, SavepointFormatType.DEFAULT).get();
        });
        CommonTestUtils.assertThrows("is not a streaming job.", ExecutionException.class, () -> {
            return (String) jobClient.stopWithSavepoint(true, (String) null, SavepointFormatType.DEFAULT).get();
        });
    }

    @Test
    public void testMultipleExecutions() throws Exception {
        PerJobMiniClusterFactory initializeMiniCluster = initializeMiniCluster();
        ((JobClient) initializeMiniCluster.submitJob(getNoopJobGraph(), ClassLoader.getSystemClassLoader()).get()).getJobExecutionResult().get();
        assertThatMiniClusterIsShutdown();
        ((JobClient) initializeMiniCluster.submitJob(getNoopJobGraph(), ClassLoader.getSystemClassLoader()).get()).getJobExecutionResult().get();
        assertThatMiniClusterIsShutdown();
    }

    @Test
    public void testJobClientInteractionAfterShutdown() throws Exception {
        JobClient jobClient = (JobClient) initializeMiniCluster().submitJob(getNoopJobGraph(), ClassLoader.getSystemClassLoader()).get();
        jobClient.getJobExecutionResult().get();
        assertThatMiniClusterIsShutdown();
        jobClient.getClass();
        CommonTestUtils.assertThrows("MiniCluster is not yet running or has already been shut down.", IllegalStateException.class, jobClient::cancel);
    }

    private PerJobMiniClusterFactory initializeMiniCluster() {
        return PerJobMiniClusterFactory.createWithFactory(new Configuration(), miniClusterConfiguration -> {
            this.miniCluster = new MiniCluster(miniClusterConfiguration);
            return this.miniCluster;
        });
    }

    private void assertThatMiniClusterIsShutdown() {
        MatcherAssert.assertThat(Boolean.valueOf(this.miniCluster.isRunning()), CoreMatchers.is(false));
    }

    private static JobGraph getNoopJobGraph() {
        return JobGraphTestUtils.singleNoOpJobGraph();
    }

    private static JobGraph getCancellableJobGraph() {
        JobVertex jobVertex = new JobVertex("jobVertex");
        jobVertex.setInvokableClass(WaitingCancelableInvokable.class);
        jobVertex.setParallelism(1);
        return JobGraphTestUtils.streamingJobGraph(new JobVertex[]{jobVertex});
    }
}
