package org.apache.flink.client.deployment.application;

import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.testjar.MultiExecuteJob;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.PipelineOptionsInternal;
import org.apache.flink.runtime.client.DuplicateJobSubmissionException;
import org.apache.flink.runtime.client.JobCancellationException;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.SerializedThrowable;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.class */
public class ApplicationDispatcherBootstrapTest extends TestLogger {
    private static final int TIMEOUT_SECONDS = 10;
    private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(4);
    private final ScheduledExecutor scheduledExecutor = new ScheduledExecutorServiceAdapter(this.executor);

    @After
    public void cleanup() {
        ExecutorUtils.gracefulShutdown(5L, TimeUnit.SECONDS, new ExecutorService[]{this.executor});
    }

    @Test
    public void testExceptionThrownWhenApplicationContainsNoJobs() throws Throwable {
        assertException(runApplication(new TestingDispatcherGateway.Builder().setSubmitFunction(jobGraph -> {
            return CompletableFuture.completedFuture(Acknowledge.get());
        }), 0), ApplicationExecutionException.class);
    }

    @Test
    public void testOnlyOneJobIsAllowedWithHa() throws Throwable {
        Configuration configuration = getConfiguration();
        configuration.set(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.ZOOKEEPER.name());
        assertException(runApplication(configuration, 2), FlinkRuntimeException.class);
    }

    @Test
    public void testOnlyOneJobAllowedWithStaticJobId() throws Throwable {
        JobID jobID = new JobID(0L, 2L);
        Configuration configuration = getConfiguration();
        configuration.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, jobID.toHexString());
        assertException(runApplication(configuration, 2), FlinkRuntimeException.class);
    }

    @Test
    public void testOnlyOneJobAllowedWithStaticJobIdAndHa() throws Throwable {
        JobID jobID = new JobID(0L, 2L);
        Configuration configuration = getConfiguration();
        configuration.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, jobID.toHexString());
        configuration.set(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.ZOOKEEPER.name());
        assertException(runApplication(configuration, 2), FlinkRuntimeException.class);
    }

    @Test
    public void testJobIdDefaultsToZeroWithHa() throws Throwable {
        Configuration configuration = getConfiguration();
        configuration.set(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.ZOOKEEPER.name());
        CompletableFuture completableFuture = new CompletableFuture();
        runApplication((TestingDispatcherGateway.Builder) new TestingDispatcherGateway.Builder().setSubmitFunction(jobGraph -> {
            completableFuture.complete(jobGraph.getJobID());
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).setRequestJobStatusFunction(jobID -> {
            return CompletableFuture.completedFuture(JobStatus.FINISHED);
        }).setRequestJobResultFunction(jobID2 -> {
            return CompletableFuture.completedFuture(createSuccessfulJobResult(jobID2));
        }), configuration, 1).get(10L, TimeUnit.SECONDS);
        MatcherAssert.assertThat(completableFuture.get(10L, TimeUnit.SECONDS), CoreMatchers.is(new JobID(0L, 0L)));
    }

    @Test
    public void testStaticJobId() throws Throwable {
        JobID jobID = new JobID(0L, 2L);
        Configuration configuration = getConfiguration();
        configuration.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, jobID.toHexString());
        CompletableFuture completableFuture = new CompletableFuture();
        runApplication((TestingDispatcherGateway.Builder) new TestingDispatcherGateway.Builder().setSubmitFunction(jobGraph -> {
            completableFuture.complete(jobGraph.getJobID());
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).setRequestJobStatusFunction(jobID2 -> {
            return CompletableFuture.completedFuture(JobStatus.FINISHED);
        }).setRequestJobResultFunction(jobID3 -> {
            return CompletableFuture.completedFuture(createSuccessfulJobResult(jobID3));
        }), configuration, 1).get(10L, TimeUnit.SECONDS);
        MatcherAssert.assertThat(completableFuture.get(10L, TimeUnit.SECONDS), CoreMatchers.is(new JobID(0L, 2L)));
    }

    @Test
    public void testStaticJobIdWithHa() throws Throwable {
        JobID jobID = new JobID(0L, 2L);
        Configuration configuration = getConfiguration();
        configuration.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, jobID.toHexString());
        configuration.set(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.ZOOKEEPER.name());
        CompletableFuture completableFuture = new CompletableFuture();
        runApplication((TestingDispatcherGateway.Builder) new TestingDispatcherGateway.Builder().setSubmitFunction(jobGraph -> {
            completableFuture.complete(jobGraph.getJobID());
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).setRequestJobStatusFunction(jobID2 -> {
            return CompletableFuture.completedFuture(JobStatus.FINISHED);
        }).setRequestJobResultFunction(jobID3 -> {
            return CompletableFuture.completedFuture(createSuccessfulJobResult(jobID3));
        }), configuration, 1).get(10L, TimeUnit.SECONDS);
        MatcherAssert.assertThat(completableFuture.get(10L, TimeUnit.SECONDS), CoreMatchers.is(new JobID(0L, 2L)));
    }

    @Test
    public void testApplicationFailsAsSoonAsOneJobFails() throws Throwable {
        ConcurrentLinkedDeque concurrentLinkedDeque = new ConcurrentLinkedDeque();
        Assert.assertEquals(assertException(runApplication((TestingDispatcherGateway.Builder) new TestingDispatcherGateway.Builder().setSubmitFunction(jobGraph -> {
            concurrentLinkedDeque.add(jobGraph.getJobID());
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).setRequestJobStatusFunction(jobID -> {
            return jobID.equals(concurrentLinkedDeque.peek()) ? CompletableFuture.completedFuture(JobStatus.FAILED) : CompletableFuture.completedFuture(JobStatus.RUNNING);
        }).setRequestJobResultFunction(jobID2 -> {
            return jobID2.equals(concurrentLinkedDeque.peek()) ? CompletableFuture.completedFuture(createFailedJobResult(jobID2)) : new CompletableFuture();
        }), 2), UnsuccessfulExecutionException.class).getStatus(), ApplicationStatus.FAILED);
    }

    @Test
    public void testApplicationSucceedsWhenAllJobsSucceed() throws Exception {
        runApplication((TestingDispatcherGateway.Builder) new TestingDispatcherGateway.Builder().setSubmitFunction(jobGraph -> {
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).setRequestJobStatusFunction(jobID -> {
            return CompletableFuture.completedFuture(JobStatus.FINISHED);
        }).setRequestJobResultFunction(jobID2 -> {
            return CompletableFuture.completedFuture(createSuccessfulJobResult(jobID2));
        }), 3).get(10L, TimeUnit.SECONDS);
    }

    @Test
    public void testDispatcherIsCancelledWhenOneJobIsCancelled() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        createApplicationDispatcherBootstrap(3, new TestingDispatcherGateway.Builder().setSubmitFunction(jobGraph -> {
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).setRequestJobStatusFunction(jobID -> {
            return CompletableFuture.completedFuture(JobStatus.CANCELED);
        }).setClusterShutdownFunction(applicationStatus -> {
            completableFuture.complete(applicationStatus);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).setRequestJobResultFunction(jobID2 -> {
            return CompletableFuture.completedFuture(createCancelledJobResult(jobID2));
        }).build(), this.scheduledExecutor).getClusterShutdownFuture().get(10L, TimeUnit.SECONDS);
        MatcherAssert.assertThat(completableFuture.get(10L, TimeUnit.SECONDS), CoreMatchers.is(ApplicationStatus.CANCELED));
    }

    @Test
    public void testApplicationTaskFinishesWhenApplicationFinishes() throws Exception {
        ApplicationDispatcherBootstrap createApplicationDispatcherBootstrap = createApplicationDispatcherBootstrap(3, new TestingDispatcherGateway.Builder().setSubmitFunction(jobGraph -> {
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).setRequestJobStatusFunction(jobID -> {
            return CompletableFuture.completedFuture(JobStatus.FINISHED);
        }).setRequestJobResultFunction(jobID2 -> {
            return CompletableFuture.completedFuture(createSuccessfulJobResult(jobID2));
        }).build(), this.scheduledExecutor);
        CompletableFuture clusterShutdownFuture = createApplicationDispatcherBootstrap.getClusterShutdownFuture();
        ScheduledFuture applicationExecutionFuture = createApplicationDispatcherBootstrap.getApplicationExecutionFuture();
        clusterShutdownFuture.get(10L, TimeUnit.SECONDS);
        applicationExecutionFuture.get(10L, TimeUnit.SECONDS);
    }

    @Test
    public void testApplicationIsStoppedWhenStoppingBootstrap() throws Exception {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        TestingDispatcherGateway.Builder clusterShutdownFunction = new TestingDispatcherGateway.Builder().setSubmitFunction(jobGraph -> {
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).setRequestJobStatusFunction(jobID -> {
            return CompletableFuture.completedFuture(JobStatus.RUNNING);
        }).setClusterShutdownFunction(applicationStatus -> {
            atomicBoolean.set(true);
            return CompletableFuture.completedFuture(Acknowledge.get());
        });
        CompletableFuture completableFuture = new CompletableFuture();
        TestingDispatcherGateway build = clusterShutdownFunction.build();
        ScheduledExecutor scheduledExecutor = this.scheduledExecutor;
        completableFuture.getClass();
        ApplicationDispatcherBootstrap createApplicationDispatcherBootstrap = createApplicationDispatcherBootstrap(3, build, scheduledExecutor, completableFuture::completeExceptionally);
        CompletableFuture clusterShutdownFuture = createApplicationDispatcherBootstrap.getClusterShutdownFuture();
        ScheduledFuture applicationExecutionFuture = createApplicationDispatcherBootstrap.getApplicationExecutionFuture();
        createApplicationDispatcherBootstrap.stop();
        Assert.assertFalse(completableFuture.isDone());
        clusterShutdownFuture.get();
        Assert.assertFalse(atomicBoolean.get());
        MatcherAssert.assertThat(Boolean.valueOf(applicationExecutionFuture.isCancelled()), CoreMatchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(applicationExecutionFuture.isDone()), CoreMatchers.is(true));
    }

    @Test
    public void testErrorHandlerIsCalledWhenSubmissionThrowsAnException() throws Exception {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        TestingDispatcherGateway.Builder clusterShutdownFunction = new TestingDispatcherGateway.Builder().setSubmitFunction(jobGraph -> {
            throw new FlinkRuntimeException("Nope!");
        }).setClusterShutdownFunction(applicationStatus -> {
            atomicBoolean.set(true);
            return CompletableFuture.completedFuture(Acknowledge.get());
        });
        CompletableFuture completableFuture = new CompletableFuture();
        TestingDispatcherGateway build = clusterShutdownFunction.build();
        ScheduledExecutor scheduledExecutor = this.scheduledExecutor;
        completableFuture.getClass();
        assertException(createApplicationDispatcherBootstrap(3, build, scheduledExecutor, completableFuture::completeExceptionally).getClusterShutdownFuture(), FlinkRuntimeException.class);
        assertException(completableFuture, FlinkRuntimeException.class);
        Assert.assertFalse(atomicBoolean.get());
    }

    @Test
    public void testErrorHandlerIsCalledWhenShutdownCompletesExceptionally() throws Exception {
        testErrorHandlerIsCalled(() -> {
            return FutureUtils.completedExceptionally(new FlinkRuntimeException("Test exception."));
        });
    }

    @Test
    public void testErrorHandlerIsCalledWhenShutdownThrowsAnException() throws Exception {
        testErrorHandlerIsCalled(() -> {
            throw new FlinkRuntimeException("Test exception.");
        });
    }

    private void testErrorHandlerIsCalled(Supplier<CompletableFuture<Acknowledge>> supplier) throws Exception {
        TestingDispatcherGateway.Builder clusterShutdownFunction = new TestingDispatcherGateway.Builder().setSubmitFunction(jobGraph -> {
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).setRequestJobStatusFunction(jobID -> {
            return CompletableFuture.completedFuture(JobStatus.FINISHED);
        }).setRequestJobResultFunction(jobID2 -> {
            return CompletableFuture.completedFuture(createSuccessfulJobResult(jobID2));
        }).setClusterShutdownFunction(applicationStatus -> {
            return (CompletableFuture) supplier.get();
        });
        CompletableFuture completableFuture = new CompletableFuture();
        TestingDispatcherGateway build = clusterShutdownFunction.build();
        ScheduledExecutor scheduledExecutor = this.scheduledExecutor;
        completableFuture.getClass();
        CompletableFuture clusterShutdownFuture = createApplicationDispatcherBootstrap(3, build, scheduledExecutor, completableFuture::completeExceptionally).getClusterShutdownFuture();
        assertException(completableFuture, FlinkRuntimeException.class);
        assertException(clusterShutdownFuture, FlinkRuntimeException.class);
    }

    @Test
    public void testClusterIsShutdownInAttachedModeWhenJobCancelled() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        TestingDispatcherGateway build = new TestingDispatcherGateway.Builder().setSubmitFunction(jobGraph -> {
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).setRequestJobStatusFunction(jobID -> {
            return CompletableFuture.completedFuture(JobStatus.CANCELED);
        }).setRequestJobResultFunction(jobID2 -> {
            return CompletableFuture.completedFuture(createCancelledJobResult(jobID2));
        }).setClusterShutdownFunction(applicationStatus -> {
            completableFuture.complete(applicationStatus);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).build();
        PackagedProgram program = getProgram(2);
        Configuration configuration = getConfiguration();
        configuration.set(DeploymentOptions.ATTACHED, true);
        assertException(new ApplicationDispatcherBootstrap(program, Collections.emptyList(), configuration, build, this.scheduledExecutor, th -> {
        }).getApplicationCompletionFuture(), UnsuccessfulExecutionException.class);
        Assert.assertEquals(completableFuture.get(), ApplicationStatus.CANCELED);
    }

    @Test
    public void testClusterShutdownWhenApplicationSucceeds() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        createApplicationDispatcherBootstrap(3, new TestingDispatcherGateway.Builder().setSubmitFunction(jobGraph -> {
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).setRequestJobStatusFunction(jobID -> {
            return CompletableFuture.completedFuture(JobStatus.FINISHED);
        }).setRequestJobResultFunction(jobID2 -> {
            return CompletableFuture.completedFuture(createSuccessfulJobResult(jobID2));
        }).setClusterShutdownFunction(applicationStatus -> {
            completableFuture.complete(applicationStatus);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).build(), this.scheduledExecutor).getClusterShutdownFuture().get(10L, TimeUnit.SECONDS);
        MatcherAssert.assertThat(completableFuture.get(10L, TimeUnit.SECONDS), CoreMatchers.is(ApplicationStatus.SUCCEEDED));
    }

    @Test
    public void testClusterShutdownWhenApplicationFails() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        createApplicationDispatcherBootstrap(3, new TestingDispatcherGateway.Builder().setSubmitFunction(jobGraph -> {
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).setRequestJobStatusFunction(jobID -> {
            return CompletableFuture.completedFuture(JobStatus.FAILED);
        }).setRequestJobResultFunction(jobID2 -> {
            return CompletableFuture.completedFuture(createFailedJobResult(jobID2));
        }).setClusterShutdownFunction(applicationStatus -> {
            completableFuture.complete(applicationStatus);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).build(), this.scheduledExecutor).getClusterShutdownFuture().get(10L, TimeUnit.SECONDS);
        MatcherAssert.assertThat(completableFuture.get(10L, TimeUnit.SECONDS), CoreMatchers.is(ApplicationStatus.FAILED));
    }

    @Test
    public void testClusterShutdownWhenApplicationGetsCancelled() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        createApplicationDispatcherBootstrap(3, new TestingDispatcherGateway.Builder().setSubmitFunction(jobGraph -> {
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).setRequestJobStatusFunction(jobID -> {
            return CompletableFuture.completedFuture(JobStatus.CANCELED);
        }).setRequestJobResultFunction(jobID2 -> {
            return CompletableFuture.completedFuture(createCancelledJobResult(jobID2));
        }).setClusterShutdownFunction(applicationStatus -> {
            completableFuture.complete(applicationStatus);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).build(), this.scheduledExecutor).getClusterShutdownFuture().get(10L, TimeUnit.SECONDS);
        MatcherAssert.assertThat(completableFuture.get(10L, TimeUnit.SECONDS), CoreMatchers.is(ApplicationStatus.CANCELED));
    }

    @Test
    public void testErrorHandlerIsCalledWhenApplicationStatusIsUnknown() throws Exception {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        TestingDispatcherGateway build = new TestingDispatcherGateway.Builder().setSubmitFunction(jobGraph -> {
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).setRequestJobStatusFunction(jobID -> {
            return CompletableFuture.completedFuture(JobStatus.FAILED);
        }).setRequestJobResultFunction(jobID2 -> {
            return CompletableFuture.completedFuture(createUnknownJobResult(jobID2));
        }).setClusterShutdownFunction(applicationStatus -> {
            atomicBoolean.set(true);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).build();
        CompletableFuture completableFuture = new CompletableFuture();
        ScheduledExecutor scheduledExecutor = this.scheduledExecutor;
        completableFuture.getClass();
        ApplicationDispatcherBootstrap createApplicationDispatcherBootstrap = createApplicationDispatcherBootstrap(3, build, scheduledExecutor, completableFuture::completeExceptionally);
        assertException(createApplicationDispatcherBootstrap.getClusterShutdownFuture(), UnsuccessfulExecutionException.class);
        assertException(createApplicationDispatcherBootstrap.getClusterShutdownFuture(), UnsuccessfulExecutionException.class);
        Assert.assertFalse(atomicBoolean.get());
    }

    @Test
    public void testDuplicateJobSubmissionWithTerminatedJobId() throws Throwable {
        JobID jobID = new JobID(0L, 2L);
        Configuration configuration = getConfiguration();
        configuration.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, jobID.toHexString());
        configuration.set(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.ZOOKEEPER.name());
        runApplication((TestingDispatcherGateway.Builder) new TestingDispatcherGateway.Builder().setSubmitFunction(jobGraph -> {
            return FutureUtils.completedExceptionally(DuplicateJobSubmissionException.ofGloballyTerminated(jobID));
        }).setRequestJobStatusFunction(jobID2 -> {
            return CompletableFuture.completedFuture(JobStatus.FINISHED);
        }).setRequestJobResultFunction(jobID3 -> {
            return CompletableFuture.completedFuture(createSuccessfulJobResult(jobID3));
        }), configuration, 1).get(10L, TimeUnit.SECONDS);
    }

    @Test
    public void testDuplicateJobSubmissionWithTerminatedJobIdWithUnknownResult() throws Throwable {
        JobID jobID = new JobID(0L, 2L);
        Configuration configuration = getConfiguration();
        configuration.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, jobID.toHexString());
        configuration.set(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.ZOOKEEPER.name());
        runApplication((TestingDispatcherGateway.Builder) new TestingDispatcherGateway.Builder().setSubmitFunction(jobGraph -> {
            return FutureUtils.completedExceptionally(DuplicateJobSubmissionException.ofGloballyTerminated(jobID));
        }).setRequestJobStatusFunction(jobID2 -> {
            return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobID2));
        }).setRequestJobResultFunction(jobID3 -> {
            return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobID3));
        }), configuration, 1).get(10L, TimeUnit.SECONDS);
    }

    @Test
    public void testDuplicateJobSubmissionWithTerminatedJobIdWithUnknownResultAttached() throws Throwable {
        JobID jobID = new JobID(0L, 2L);
        Configuration configuration = getConfiguration();
        configuration.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, jobID.toHexString());
        configuration.set(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.ZOOKEEPER.name());
        runApplication((TestingDispatcherGateway.Builder) new TestingDispatcherGateway.Builder().setSubmitFunction(jobGraph -> {
            return FutureUtils.completedExceptionally(DuplicateJobSubmissionException.ofGloballyTerminated(jobID));
        }).setRequestJobStatusFunction(jobID2 -> {
            return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobID2));
        }).setRequestJobResultFunction(jobID3 -> {
            return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobID3));
        }), configuration, 1).get(10L, TimeUnit.SECONDS);
    }

    @Test
    public void testDuplicateJobSubmissionWithRunningJobId() throws Throwable {
        JobID jobID = new JobID(0L, 2L);
        Configuration configuration = getConfiguration();
        configuration.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, jobID.toHexString());
        configuration.set(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.ZOOKEEPER.name());
        CompletableFuture<Void> runApplication = runApplication(new TestingDispatcherGateway.Builder().setSubmitFunction(jobGraph -> {
            return FutureUtils.completedExceptionally(DuplicateJobSubmissionException.of(jobID));
        }), configuration, 1);
        Optional findThrowable = ExceptionUtils.findThrowable((ExecutionException) Assert.assertThrows(ExecutionException.class, () -> {
        }), DuplicateJobSubmissionException.class);
        Assert.assertTrue(findThrowable.isPresent());
        Assert.assertFalse(((DuplicateJobSubmissionException) findThrowable.get()).isGloballyTerminated());
    }

    private CompletableFuture<Void> runApplication(TestingDispatcherGateway.Builder builder, int i) throws FlinkException {
        return runApplication(builder, getConfiguration(), i);
    }

    private CompletableFuture<Void> runApplication(Configuration configuration, int i) throws Throwable {
        return runApplication((TestingDispatcherGateway.Builder) new TestingDispatcherGateway.Builder().setSubmitFunction(jobGraph -> {
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).setRequestJobStatusFunction(jobID -> {
            return CompletableFuture.completedFuture(JobStatus.FINISHED);
        }).setRequestJobResultFunction(jobID2 -> {
            return CompletableFuture.completedFuture(createSuccessfulJobResult(jobID2));
        }), configuration, i);
    }

    private CompletableFuture<Void> runApplication(TestingDispatcherGateway.Builder builder, Configuration configuration, int i) throws FlinkException {
        return new ApplicationDispatcherBootstrap(getProgram(i), Collections.emptyList(), configuration, builder.build(), this.scheduledExecutor, th -> {
        }).getApplicationCompletionFuture();
    }

    private ApplicationDispatcherBootstrap createApplicationDispatcherBootstrap(int i, DispatcherGateway dispatcherGateway, ScheduledExecutor scheduledExecutor) throws FlinkException {
        return createApplicationDispatcherBootstrap(i, dispatcherGateway, scheduledExecutor, th -> {
        });
    }

    private ApplicationDispatcherBootstrap createApplicationDispatcherBootstrap(int i, DispatcherGateway dispatcherGateway, ScheduledExecutor scheduledExecutor, FatalErrorHandler fatalErrorHandler) throws FlinkException {
        return new ApplicationDispatcherBootstrap(getProgram(i), Collections.emptyList(), getConfiguration(), dispatcherGateway, scheduledExecutor, fatalErrorHandler);
    }

    private PackagedProgram getProgram(int i) throws FlinkException {
        return MultiExecuteJob.getProgram(i, true);
    }

    private static JobResult createUnknownJobResult(JobID jobID) {
        return new JobResult.Builder().jobId(jobID).netRuntime(2L).applicationStatus(ApplicationStatus.UNKNOWN).serializedThrowable(new SerializedThrowable(new JobExecutionException(jobID, "unknown bla bla bla"))).build();
    }

    private static JobResult createFailedJobResult(JobID jobID) {
        return new JobResult.Builder().jobId(jobID).netRuntime(2L).applicationStatus(ApplicationStatus.FAILED).serializedThrowable(new SerializedThrowable(new JobExecutionException(jobID, "bla bla bla"))).build();
    }

    private static JobResult createSuccessfulJobResult(JobID jobID) {
        return new JobResult.Builder().jobId(jobID).netRuntime(2L).applicationStatus(ApplicationStatus.SUCCEEDED).build();
    }

    private static JobResult createCancelledJobResult(JobID jobID) {
        return new JobResult.Builder().jobId(jobID).netRuntime(2L).serializedThrowable(new SerializedThrowable(new JobCancellationException(jobID, "Hello", (Throwable) null))).applicationStatus(ApplicationStatus.CANCELED).build();
    }

    private static <T, E extends Throwable> E assertException(CompletableFuture<T> completableFuture, Class<E> cls) throws Exception {
        try {
            completableFuture.get(10L, TimeUnit.SECONDS);
            throw new Exception("Future should have completed exceptionally with " + cls.getCanonicalName() + ".");
        } catch (Throwable th) {
            Optional findThrowable = ExceptionUtils.findThrowable(th, cls);
            if (findThrowable.isPresent()) {
                return (E) findThrowable.get();
            }
            throw th;
        }
    }

    private Configuration getConfiguration() {
        Configuration configuration = new Configuration();
        configuration.set(DeploymentOptions.TARGET, "embedded");
        return configuration;
    }
}
