/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.adaptive;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.SchedulerExecutionMode;
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.TestingCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.TestingCompletedCheckpointStore;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredComponentMainThreadExecutor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraphTest;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
import org.apache.flink.runtime.executiongraph.failover.flip1.NoRestartBackoffTimeStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.DefaultAllocatedSlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolTestUtils;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
import org.apache.flink.runtime.operators.coordination.TestOperatorEvent;
import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.SchedulerNG;
import org.apache.flink.runtime.scheduler.VertexParallelismInformation;
import org.apache.flink.runtime.scheduler.VertexParallelismStore;
import org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler;
import org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerBuilder;
import org.apache.flink.runtime.scheduler.adaptive.Created;
import org.apache.flink.runtime.scheduler.adaptive.CreatingExecutionGraph;
import org.apache.flink.runtime.scheduler.adaptive.CreatingExecutionGraphTest;
import org.apache.flink.runtime.scheduler.adaptive.Executing;
import org.apache.flink.runtime.scheduler.adaptive.Finished;
import org.apache.flink.runtime.scheduler.adaptive.State;
import org.apache.flink.runtime.scheduler.adaptive.StateFactory;
import org.apache.flink.runtime.scheduler.adaptive.StateTrackingMockExecutionGraph;
import org.apache.flink.runtime.scheduler.adaptive.WaitingForResources;
import org.apache.flink.runtime.scheduler.adaptive.allocator.TestingSlotAllocator;
import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism;
import org.apache.flink.runtime.slots.ResourceRequirement;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.ResourceCounter;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.testutils.executor.TestExecutorResource;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.hamcrest.core.Is;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;

public class AdaptiveSchedulerTest
extends TestLogger {
    private static final int PARALLELISM = 4;
    private static final JobVertex JOB_VERTEX = ExecutionGraphTestUtils.createNoOpVertex("v1", 4);
    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
    @ClassRule
    public static final TestExecutorResource<ScheduledExecutorService> TEST_EXECUTOR_RESOURCE = new TestExecutorResource(Executors::newSingleThreadScheduledExecutor);
    private final ManuallyTriggeredComponentMainThreadExecutor mainThreadExecutor = new ManuallyTriggeredComponentMainThreadExecutor(Thread.currentThread());
    private final ComponentMainThreadExecutor singleThreadMainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor((ScheduledExecutorService)TEST_EXECUTOR_RESOURCE.getExecutor());

    @Test
    public void testInitialState() throws Exception {
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor).build();
        Assert.assertThat((Object)scheduler.getState(), (Matcher)Matchers.instanceOf(Created.class));
    }

    @Test
    public void testArchivedCheckpointingSettingsNotNullIfCheckpointingIsEnabled() throws Exception {
        JobGraph jobGraph = AdaptiveSchedulerTest.createJobGraph();
        jobGraph.setSnapshotSettings(new JobCheckpointingSettings(CheckpointCoordinatorConfiguration.builder().build(), null));
        ArchivedExecutionGraph archivedExecutionGraph = new AdaptiveSchedulerBuilder(jobGraph, this.mainThreadExecutor).build().getArchivedExecutionGraph(JobStatus.INITIALIZING, null);
        ArchivedExecutionGraphTest.assertContainsCheckpointSettings(archivedExecutionGraph);
    }

    @Test
    public void testIsState() throws Exception {
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor).build();
        State state = scheduler.getState();
        Assert.assertThat((Object)scheduler.isState(state), (Matcher)Is.is((Object)true));
        Assert.assertThat((Object)scheduler.isState((State)new DummyState()), (Matcher)Is.is((Object)false));
    }

    @Test
    public void testRunIfState() throws Exception {
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor).build();
        AtomicBoolean ran = new AtomicBoolean(false);
        scheduler.runIfState(scheduler.getState(), () -> ran.set(true));
        Assert.assertThat((Object)ran.get(), (Matcher)Is.is((Object)true));
    }

    @Test
    public void testRunIfStateWithStateMismatch() throws Exception {
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor).build();
        AtomicBoolean ran = new AtomicBoolean(false);
        scheduler.runIfState((State)new DummyState(), () -> ran.set(true));
        Assert.assertThat((Object)ran.get(), (Matcher)Is.is((Object)false));
    }

    @Test
    public void testHasEnoughResourcesReturnsFalseIfUnsatisfied() throws Exception {
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor).build();
        scheduler.startScheduling();
        ResourceCounter resourceRequirement = ResourceCounter.withResource((ResourceProfile)ResourceProfile.UNKNOWN, (int)1);
        Assert.assertThat((Object)scheduler.hasDesiredResources(resourceRequirement), (Matcher)Is.is((Object)false));
    }

    @Test
    public void testHasEnoughResourcesReturnsTrueIfSatisfied() throws Exception {
        JobGraph jobGraph = AdaptiveSchedulerTest.createJobGraph();
        DefaultDeclarativeSlotPool declarativeSlotPool = AdaptiveSchedulerTest.createDeclarativeSlotPool(jobGraph.getJobID());
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(jobGraph, this.mainThreadExecutor).setDeclarativeSlotPool((DeclarativeSlotPool)declarativeSlotPool).build();
        scheduler.startScheduling();
        ResourceCounter resourceRequirement = ResourceCounter.withResource((ResourceProfile)ResourceProfile.UNKNOWN, (int)1);
        SlotPoolTestUtils.offerSlots((DeclarativeSlotPool)declarativeSlotPool, DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements(resourceRequirement));
        Assert.assertThat((Object)scheduler.hasDesiredResources(resourceRequirement), (Matcher)Is.is((Object)true));
    }

    @Test
    public void testHasEnoughResourcesUsesUnmatchedSlotsAsUnknown() throws Exception {
        JobGraph jobGraph = AdaptiveSchedulerTest.createJobGraph();
        DefaultDeclarativeSlotPool declarativeSlotPool = AdaptiveSchedulerTest.createDeclarativeSlotPool(jobGraph.getJobID());
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(jobGraph, this.mainThreadExecutor).setDeclarativeSlotPool((DeclarativeSlotPool)declarativeSlotPool).build();
        scheduler.startScheduling();
        boolean numRequiredSlots = true;
        ResourceCounter requiredResources = ResourceCounter.withResource((ResourceProfile)ResourceProfile.UNKNOWN, (int)1);
        ResourceCounter providedResources = ResourceCounter.withResource((ResourceProfile)ResourceProfile.newBuilder().setCpuCores(1.0).build(), (int)1);
        SlotPoolTestUtils.offerSlots((DeclarativeSlotPool)declarativeSlotPool, DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements(providedResources));
        Assert.assertThat((Object)scheduler.hasDesiredResources(requiredResources), (Matcher)Is.is((Object)true));
    }

    @Test
    public void testExecutionGraphGenerationWithAvailableResources() throws Exception {
        JobGraph jobGraph = AdaptiveSchedulerTest.createJobGraph();
        DefaultDeclarativeSlotPool declarativeSlotPool = AdaptiveSchedulerTest.createDeclarativeSlotPool(jobGraph.getJobID());
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, (Object)Duration.ofMillis(1L));
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(jobGraph, this.singleThreadMainThreadExecutor).setDeclarativeSlotPool((DeclarativeSlotPool)declarativeSlotPool).setJobMasterConfiguration(configuration).build();
        boolean numAvailableSlots = true;
        SubmissionBufferingTaskManagerGateway taskManagerGateway = new SubmissionBufferingTaskManagerGateway(1);
        this.singleThreadMainThreadExecutor.execute(() -> {
            scheduler.startScheduling();
            SlotPoolTestUtils.offerSlots((DeclarativeSlotPool)declarativeSlotPool, DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements(ResourceCounter.withResource((ResourceProfile)ResourceProfile.UNKNOWN, (int)1)), taskManagerGateway);
        });
        taskManagerGateway.waitForSubmissions(1, Duration.ofSeconds(5L));
        ArchivedExecutionGraph executionGraph = CompletableFuture.supplyAsync(() -> scheduler.requestJob().getArchivedExecutionGraph(), (Executor)this.singleThreadMainThreadExecutor).join();
        Assert.assertThat((Object)executionGraph.getJobVertex(JOB_VERTEX.getID()).getParallelism(), (Matcher)Is.is((Object)1));
    }

    @Test
    public void testExecutionGraphGenerationSetsInitializationTimestamp() throws Exception {
        long initializationTimestamp = 42L;
        JobGraph jobGraph = AdaptiveSchedulerTest.createJobGraph();
        DefaultDeclarativeSlotPool declarativeSlotPool = AdaptiveSchedulerTest.createDeclarativeSlotPool(jobGraph.getJobID());
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, (Object)Duration.ofMillis(1L));
        AdaptiveScheduler adaptiveScheduler = new AdaptiveSchedulerBuilder(jobGraph, this.singleThreadMainThreadExecutor).setInitializationTimestamp(42L).setDeclarativeSlotPool((DeclarativeSlotPool)declarativeSlotPool).setJobMasterConfiguration(configuration).build();
        SubmissionBufferingTaskManagerGateway taskManagerGateway = new SubmissionBufferingTaskManagerGateway(4);
        this.singleThreadMainThreadExecutor.execute(() -> {
            adaptiveScheduler.startScheduling();
            SlotPoolTestUtils.offerSlots((DeclarativeSlotPool)declarativeSlotPool, DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements(ResourceCounter.withResource((ResourceProfile)ResourceProfile.UNKNOWN, (int)4)), taskManagerGateway);
        });
        taskManagerGateway.waitForSubmissions(1, Duration.ofSeconds(5L));
        ArchivedExecutionGraph executionGraph = CompletableFuture.supplyAsync(() -> adaptiveScheduler.requestJob().getArchivedExecutionGraph(), (Executor)this.singleThreadMainThreadExecutor).join();
        Assert.assertThat((Object)executionGraph.getStatusTimestamp(JobStatus.INITIALIZING), (Matcher)Is.is((Object)42L));
    }

    @Test
    public void testInitializationTimestampForwarding() throws Exception {
        long expectedInitializationTimestamp = 42L;
        AdaptiveScheduler adaptiveScheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor).setInitializationTimestamp(42L).build();
        long initializationTimestamp = adaptiveScheduler.requestJob().getArchivedExecutionGraph().getStatusTimestamp(JobStatus.INITIALIZING);
        Assert.assertThat((Object)initializationTimestamp, (Matcher)Is.is((Object)42L));
    }

    @Test
    public void testFatalErrorsForwardedToFatalErrorHandler() throws Exception {
        TestingFatalErrorHandler fatalErrorHandler = new TestingFatalErrorHandler();
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor).setFatalErrorHandler(fatalErrorHandler).build();
        RuntimeException exception = new RuntimeException();
        scheduler.runIfState(scheduler.getState(), () -> {
            throw exception;
        });
        Assert.assertThat((Object)fatalErrorHandler.getException(), (Matcher)Is.is((Object)exception));
    }

    @Test
    public void testResourceTimeout() throws Exception {
        ManuallyTriggeredComponentMainThreadExecutor mainThreadExecutor = new ManuallyTriggeredComponentMainThreadExecutor(Thread.currentThread());
        Duration resourceTimeout = Duration.ofMinutes(1234L);
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, (Object)resourceTimeout);
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), mainThreadExecutor).setJobMasterConfiguration(configuration).build();
        scheduler.startScheduling();
        boolean b = mainThreadExecutor.getActiveNonPeriodicScheduledTask().stream().anyMatch(scheduledTask -> scheduledTask.getDelay(TimeUnit.MINUTES) == resourceTimeout.toMinutes());
        Assert.assertThat((Object)b, (Matcher)Is.is((Object)true));
    }

    @Test
    public void testNumRestartsMetric() throws Exception {
        CompletableFuture numRestartsMetricFuture = new CompletableFuture();
        TestingMetricRegistry metricRegistry = TestingMetricRegistry.builder().setRegisterConsumer((metric, name, group) -> {
            if ("numRestarts".equals(name)) {
                numRestartsMetricFuture.complete((Gauge)metric);
            }
        }).build();
        JobGraph jobGraph = AdaptiveSchedulerTest.createJobGraph();
        DefaultDeclarativeSlotPool declarativeSlotPool = new DefaultDeclarativeSlotPool(jobGraph.getJobID(), (AllocatedSlotPool)new DefaultAllocatedSlotPool(), ignored -> {}, Time.minutes((long)10L), Time.minutes((long)10L));
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.MIN_PARALLELISM_INCREASE, (Object)1);
        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, (Object)Duration.ofMillis(1L));
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(jobGraph, this.singleThreadMainThreadExecutor).setJobMasterConfiguration(configuration).setJobManagerJobMetricGroup(JobManagerMetricGroup.createJobManagerMetricGroup((MetricRegistry)metricRegistry, (String)"localhost").addJob(new JobID(), "jobName")).setDeclarativeSlotPool((DeclarativeSlotPool)declarativeSlotPool).build();
        Gauge numRestartsMetric = (Gauge)numRestartsMetricFuture.get();
        SubmissionBufferingTaskManagerGateway taskManagerGateway = new SubmissionBufferingTaskManagerGateway(5);
        taskManagerGateway.setCancelConsumer(this.createCancelConsumer((SchedulerNG)scheduler));
        this.singleThreadMainThreadExecutor.execute(() -> {
            scheduler.startScheduling();
            declarativeSlotPool.offerSlots(DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements(ResourceCounter.withResource((ResourceProfile)ResourceProfile.UNKNOWN, (int)1)), (TaskManagerLocation)new LocalTaskManagerLocation(), (TaskManagerGateway)taskManagerGateway, System.currentTimeMillis());
        });
        taskManagerGateway.waitForSubmissions(1, Duration.ofSeconds(5L));
        Assert.assertThat((Object)numRestartsMetric.getValue(), (Matcher)Is.is((Object)0));
        this.singleThreadMainThreadExecutor.execute(() -> SlotPoolTestUtils.offerSlots((DeclarativeSlotPool)declarativeSlotPool, DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements(ResourceCounter.withResource((ResourceProfile)ResourceProfile.UNKNOWN, (int)4)), taskManagerGateway));
        taskManagerGateway.waitForSubmissions(4, Duration.ofSeconds(5L));
        Assert.assertThat((Object)numRestartsMetric.getValue(), (Matcher)Is.is((Object)1));
    }

    @Test
    public void testStartSchedulingTransitionsToWaitingForResources() throws Exception {
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor).build();
        scheduler.startScheduling();
        Assert.assertThat((Object)scheduler.getState(), (Matcher)Matchers.instanceOf(WaitingForResources.class));
    }

    @Test
    public void testStartSchedulingSetsResourceRequirementsForDefaultMode() throws Exception {
        JobGraph jobGraph = AdaptiveSchedulerTest.createJobGraph();
        DefaultDeclarativeSlotPool declarativeSlotPool = AdaptiveSchedulerTest.createDeclarativeSlotPool(jobGraph.getJobID());
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(jobGraph, this.mainThreadExecutor).setDeclarativeSlotPool((DeclarativeSlotPool)declarativeSlotPool).build();
        scheduler.startScheduling();
        Assert.assertThat((Object)declarativeSlotPool.getResourceRequirements(), (Matcher)Matchers.contains((Object[])new ResourceRequirement[]{ResourceRequirement.create((ResourceProfile)ResourceProfile.UNKNOWN, (int)4)}));
    }

    @Test
    public void testStartSchedulingSetsResourceRequirementsForReactiveMode() throws Exception {
        JobGraph jobGraph = AdaptiveSchedulerTest.createJobGraph();
        DefaultDeclarativeSlotPool declarativeSlotPool = AdaptiveSchedulerTest.createDeclarativeSlotPool(jobGraph.getJobID());
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.SCHEDULER_MODE, (Object)SchedulerExecutionMode.REACTIVE);
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(jobGraph, this.mainThreadExecutor).setDeclarativeSlotPool((DeclarativeSlotPool)declarativeSlotPool).setJobMasterConfiguration(configuration).build();
        scheduler.startScheduling();
        int expectedParallelism = KeyGroupRangeAssignment.computeDefaultMaxParallelism((int)4);
        Assert.assertThat((Object)declarativeSlotPool.getResourceRequirements(), (Matcher)Matchers.contains((Object[])new ResourceRequirement[]{ResourceRequirement.create((ResourceProfile)ResourceProfile.UNKNOWN, (int)expectedParallelism)}));
    }

    @Test
    public void testResourceAcquisitionTriggersJobExecution() throws Exception {
        JobGraph jobGraph = AdaptiveSchedulerTest.createJobGraph();
        DefaultDeclarativeSlotPool declarativeSlotPool = AdaptiveSchedulerTest.createDeclarativeSlotPool(jobGraph.getJobID());
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, (Object)Duration.ofMillis(1L));
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(jobGraph, this.singleThreadMainThreadExecutor).setDeclarativeSlotPool((DeclarativeSlotPool)declarativeSlotPool).setJobMasterConfiguration(configuration).build();
        SubmissionBufferingTaskManagerGateway taskManagerGateway = new SubmissionBufferingTaskManagerGateway(4);
        CompletableFuture startingStateFuture = new CompletableFuture();
        this.singleThreadMainThreadExecutor.execute(() -> {
            scheduler.startScheduling();
            startingStateFuture.complete(scheduler.getState());
            SlotPoolTestUtils.offerSlots((DeclarativeSlotPool)declarativeSlotPool, DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements(ResourceCounter.withResource((ResourceProfile)ResourceProfile.UNKNOWN, (int)4)), taskManagerGateway);
        });
        Assert.assertThat(startingStateFuture.get(), (Matcher)Matchers.instanceOf(WaitingForResources.class));
        taskManagerGateway.waitForSubmissions(4, Duration.ofSeconds(5L));
        ArchivedExecutionGraph executionGraph = CompletableFuture.supplyAsync(() -> scheduler.requestJob().getArchivedExecutionGraph(), (Executor)this.singleThreadMainThreadExecutor).get();
        Assert.assertThat((Object)executionGraph.getJobVertex(JOB_VERTEX.getID()).getParallelism(), (Matcher)Is.is((Object)4));
    }

    @Test
    public void testGoToFinished() throws Exception {
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor).build();
        ArchivedExecutionGraph archivedExecutionGraph = new ArchivedExecutionGraphBuilder().setState(JobStatus.FAILED).build();
        scheduler.goToFinished(archivedExecutionGraph);
        Assert.assertThat((Object)scheduler.getState(), (Matcher)Matchers.instanceOf(Finished.class));
    }

    @Test
    public void testGoToFinishedNotifiesJobListener() throws Exception {
        AtomicReference jobStatusUpdate = new AtomicReference();
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor).setJobStatusListener((jobId, newJobStatus, timestamp, error) -> jobStatusUpdate.set(newJobStatus)).build();
        ArchivedExecutionGraph archivedExecutionGraph = new ArchivedExecutionGraphBuilder().setState(JobStatus.FAILED).build();
        scheduler.goToFinished(archivedExecutionGraph);
        Assert.assertThat(jobStatusUpdate.get(), (Matcher)Is.is((Object)archivedExecutionGraph.getState()));
    }

    @Test
    public void testCloseShutsDownCheckpointingComponents() throws Exception {
        CompletableFuture<JobStatus> completedCheckpointStoreShutdownFuture = new CompletableFuture<JobStatus>();
        TestingCompletedCheckpointStore completedCheckpointStore = new TestingCompletedCheckpointStore(completedCheckpointStoreShutdownFuture);
        CompletableFuture<JobStatus> checkpointIdCounterShutdownFuture = new CompletableFuture<JobStatus>();
        TestingCheckpointIDCounter checkpointIdCounter = new TestingCheckpointIDCounter(checkpointIdCounterShutdownFuture);
        JobGraph jobGraph = AdaptiveSchedulerTest.createJobGraph();
        jobGraph.setSnapshotSettings(new JobCheckpointingSettings(CheckpointCoordinatorConfiguration.builder().build(), null));
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(jobGraph, this.singleThreadMainThreadExecutor).setCheckpointRecoveryFactory(new TestingCheckpointRecoveryFactory(completedCheckpointStore, checkpointIdCounter)).build();
        this.singleThreadMainThreadExecutor.execute(() -> {
            scheduler.startScheduling();
            scheduler.handleGlobalFailure((Throwable)new FlinkException("Test exception"));
            scheduler.closeAsync();
        });
        Assert.assertThat((Object)completedCheckpointStoreShutdownFuture.get(), (Matcher)Is.is((Object)JobStatus.FAILED));
        Assert.assertThat((Object)checkpointIdCounterShutdownFuture.get(), (Matcher)Is.is((Object)JobStatus.FAILED));
    }

    @Test
    public void testTransitionToStateCallsOnLeave() throws Exception {
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor).build();
        LifecycleMethodCapturingState firstState = new LifecycleMethodCapturingState();
        scheduler.transitionToState((StateFactory)new StateInstanceFactory(firstState));
        firstState.reset();
        scheduler.transitionToState((StateFactory)new DummyState.Factory());
        Assert.assertThat((Object)firstState.onLeaveCalled, (Matcher)Is.is((Object)true));
        Assert.assertThat((Object)firstState.onLeaveNewStateArgument.equals(DummyState.class), (Matcher)Is.is((Object)true));
    }

    @Test
    public void testConsistentMaxParallelism() throws Exception {
        int parallelism = 240;
        int expectedMaxParallelism = KeyGroupRangeAssignment.computeDefaultMaxParallelism((int)240);
        JobVertex vertex = ExecutionGraphTestUtils.createNoOpVertex(240);
        JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(vertex);
        DefaultDeclarativeSlotPool declarativeSlotPool = AdaptiveSchedulerTest.createDeclarativeSlotPool(jobGraph.getJobID());
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, (Object)Duration.ofMillis(1L));
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(jobGraph, this.singleThreadMainThreadExecutor).setDeclarativeSlotPool((DeclarativeSlotPool)declarativeSlotPool).setJobMasterConfiguration(configuration).build();
        SubmissionBufferingTaskManagerGateway taskManagerGateway = new SubmissionBufferingTaskManagerGateway(241);
        taskManagerGateway.setCancelConsumer(this.createCancelConsumer((SchedulerNG)scheduler));
        this.singleThreadMainThreadExecutor.execute(() -> {
            scheduler.startScheduling();
            SlotPoolTestUtils.offerSlots((DeclarativeSlotPool)declarativeSlotPool, DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements(ResourceCounter.withResource((ResourceProfile)ResourceProfile.UNKNOWN, (int)1)), taskManagerGateway);
        });
        taskManagerGateway.waitForSubmissions(1, Duration.ofSeconds(5L));
        ArchivedExecutionGraph executionGraph = this.getArchivedExecutionGraphForRunningJob((SchedulerNG)scheduler).get();
        ArchivedExecutionJobVertex archivedVertex = executionGraph.getJobVertex(vertex.getID());
        Assert.assertThat((Object)archivedVertex.getParallelism(), (Matcher)Is.is((Object)1));
        Assert.assertThat((Object)archivedVertex.getMaxParallelism(), (Matcher)Is.is((Object)expectedMaxParallelism));
        this.singleThreadMainThreadExecutor.execute(() -> SlotPoolTestUtils.offerSlots((DeclarativeSlotPool)declarativeSlotPool, DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements(ResourceCounter.withResource((ResourceProfile)ResourceProfile.UNKNOWN, (int)240)), taskManagerGateway));
        taskManagerGateway.waitForSubmissions(240, Duration.ofSeconds(5L));
        ArchivedExecutionGraph resubmittedExecutionGraph = this.getArchivedExecutionGraphForRunningJob((SchedulerNG)scheduler).get();
        ArchivedExecutionJobVertex resubmittedArchivedVertex = resubmittedExecutionGraph.getJobVertex(vertex.getID());
        Assert.assertThat((Object)resubmittedArchivedVertex.getParallelism(), (Matcher)Is.is((Object)240));
        Assert.assertThat((Object)resubmittedArchivedVertex.getMaxParallelism(), (Matcher)Is.is((Object)expectedMaxParallelism));
    }

    @Test
    public void testHowToHandleFailureRejectedByStrategy() throws Exception {
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor).setRestartBackoffTimeStrategy((RestartBackoffTimeStrategy)NoRestartBackoffTimeStrategy.INSTANCE).build();
        Assert.assertThat((Object)scheduler.howToHandleFailure((Throwable)new Exception("test")).canRestart(), (Matcher)Is.is((Object)false));
    }

    @Test
    public void testHowToHandleFailureAllowedByStrategy() throws Exception {
        TestRestartBackoffTimeStrategy restartBackoffTimeStrategy = new TestRestartBackoffTimeStrategy(true, 1234L);
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor).setRestartBackoffTimeStrategy(restartBackoffTimeStrategy).build();
        Executing.FailureResult failureResult = scheduler.howToHandleFailure((Throwable)new Exception("test"));
        Assert.assertThat((Object)failureResult.canRestart(), (Matcher)Is.is((Object)true));
        Assert.assertThat((Object)failureResult.getBackoffTime().toMillis(), (Matcher)Is.is((Object)restartBackoffTimeStrategy.getBackoffTime()));
    }

    @Test
    public void testHowToHandleFailureUnrecoverableFailure() throws Exception {
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor).build();
        Assert.assertThat((Object)scheduler.howToHandleFailure((Throwable)new SuppressRestartsException((Throwable)new Exception("test"))).canRestart(), (Matcher)Is.is((Object)false));
    }

    @Test(expected=IllegalStateException.class)
    public void testRepeatedTransitionIntoCurrentStateFails() throws Exception {
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor).build();
        State state = scheduler.getState();
        Assert.assertThat((Object)state, (Matcher)Matchers.instanceOf(Created.class));
        scheduler.transitionToState((StateFactory)new Created.Factory((Created.Context)scheduler, this.log));
    }

    @Test
    public void testTriggerSavepointFailsInIllegalState() throws Exception {
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor).build();
        Assert.assertThat((Object)scheduler.triggerSavepoint("some directory", false), (Matcher)FlinkMatchers.futureFailedWith(CheckpointException.class));
    }

    @Test
    public void testStopWithSavepointFailsInIllegalState() throws Exception {
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor).build();
        Assert.assertThat((Object)scheduler.stopWithSavepoint("some directory", false), (Matcher)FlinkMatchers.futureFailedWith(CheckpointException.class));
    }

    @Test(expected=TaskNotRunningException.class)
    public void testDeliverOperatorEventToCoordinatorFailsInIllegalState() throws Exception {
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor).build();
        scheduler.deliverOperatorEventToCoordinator(new ExecutionAttemptID(), new OperatorID(), (OperatorEvent)new TestOperatorEvent());
    }

    @Test
    public void testDeliverCoordinationRequestToCoordinatorFailsInIllegalState() throws Exception {
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor).build();
        Assert.assertThat((Object)scheduler.deliverCoordinationRequestToCoordinator(new OperatorID(), new CoordinationRequest(){}), (Matcher)FlinkMatchers.futureFailedWith(FlinkException.class));
    }

    @Test
    public void testUpdateTaskExecutionStateReturnsFalseInIllegalState() throws Exception {
        JobGraph jobGraph = AdaptiveSchedulerTest.createJobGraph();
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(jobGraph, this.mainThreadExecutor).build();
        Assert.assertThat((Object)scheduler.updateTaskExecutionState(new TaskExecutionStateTransition(new TaskExecutionState(new ExecutionAttemptID(), ExecutionState.FAILED))), (Matcher)Is.is((Object)false));
    }

    @Test(expected=IOException.class)
    public void testRequestNextInputSplitFailsInIllegalState() throws Exception {
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor).build();
        scheduler.requestNextInputSplit(JOB_VERTEX.getID(), new ExecutionAttemptID());
    }

    @Test(expected=PartitionProducerDisposedException.class)
    public void testRequestPartitionStateFailsInIllegalState() throws Exception {
        AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor).build();
        scheduler.requestPartitionState(new IntermediateDataSetID(), new ResultPartitionID());
    }

    @Test
    public void testTryToAssignSlotsReturnsNotPossibleIfExpectedResourcesAreNotAvailable() throws Exception {
        TestingSlotAllocator slotAllocator = TestingSlotAllocator.newBuilder().setTryReserveResourcesFunction(ignored -> Optional.empty()).build();
        AdaptiveScheduler adaptiveScheduler = new AdaptiveSchedulerBuilder(AdaptiveSchedulerTest.createJobGraph(), this.mainThreadExecutor).setSlotAllocator(slotAllocator).build();
        CreatingExecutionGraph.AssignmentResult assignmentResult = adaptiveScheduler.tryToAssignSlots(CreatingExecutionGraph.ExecutionGraphWithVertexParallelism.create((ExecutionGraph)new StateTrackingMockExecutionGraph(), (VertexParallelism)new CreatingExecutionGraphTest.TestingVertexParallelism()));
        Assert.assertFalse((boolean)assignmentResult.isSuccess());
    }

    @Test
    public void testComputeVertexParallelismStoreForExecutionInReactiveMode() {
        JobVertex v1 = ExecutionGraphTestUtils.createNoOpVertex("v1", 1, 50);
        JobVertex v2 = ExecutionGraphTestUtils.createNoOpVertex("v2", 50, 50);
        JobGraph graph = JobGraphTestUtils.streamingJobGraph(v1, v2);
        VertexParallelismStore parallelismStore = AdaptiveScheduler.computeVertexParallelismStoreForExecution((JobGraph)graph, (SchedulerExecutionMode)SchedulerExecutionMode.REACTIVE, SchedulerBase::getDefaultMaxParallelism);
        for (JobVertex vertex : graph.getVertices()) {
            VertexParallelismInformation info = parallelismStore.getParallelismInfo(vertex.getID());
            Assert.assertThat((Object)info.getParallelism(), (Matcher)Is.is((Object)vertex.getParallelism()));
            Assert.assertThat((Object)info.getMaxParallelism(), (Matcher)Is.is((Object)vertex.getMaxParallelism()));
        }
    }

    @Test
    public void testComputeVertexParallelismStoreForExecutionInDefaultMode() {
        JobVertex v1 = ExecutionGraphTestUtils.createNoOpVertex("v1", 1, 50);
        JobVertex v2 = ExecutionGraphTestUtils.createNoOpVertex("v2", 50, 50);
        JobGraph graph = JobGraphTestUtils.streamingJobGraph(v1, v2);
        VertexParallelismStore parallelismStore = AdaptiveScheduler.computeVertexParallelismStoreForExecution((JobGraph)graph, null, SchedulerBase::getDefaultMaxParallelism);
        for (JobVertex vertex : graph.getVertices()) {
            VertexParallelismInformation info = parallelismStore.getParallelismInfo(vertex.getID());
            Assert.assertThat((Object)info.getParallelism(), (Matcher)Is.is((Object)vertex.getParallelism()));
            Assert.assertThat((Object)info.getMaxParallelism(), (Matcher)Is.is((Object)vertex.getMaxParallelism()));
        }
    }

    private CompletableFuture<ArchivedExecutionGraph> getArchivedExecutionGraphForRunningJob(SchedulerNG scheduler) {
        return CompletableFuture.supplyAsync(() -> {
            ArchivedExecutionGraph graph = null;
            while (graph == null || graph.getState() != JobStatus.RUNNING) {
                graph = scheduler.requestJob().getArchivedExecutionGraph();
            }
            return graph;
        }, (Executor)this.singleThreadMainThreadExecutor);
    }

    private Consumer<ExecutionAttemptID> createCancelConsumer(SchedulerNG scheduler) {
        return executionAttemptId -> this.singleThreadMainThreadExecutor.execute(() -> scheduler.updateTaskExecutionState(new TaskExecutionState(executionAttemptId, ExecutionState.CANCELED)));
    }

    @Nonnull
    private static DefaultDeclarativeSlotPool createDeclarativeSlotPool(JobID jobId) {
        return new DefaultDeclarativeSlotPool(jobId, (AllocatedSlotPool)new DefaultAllocatedSlotPool(), ignored -> {}, Time.minutes((long)10L), Time.minutes((long)10L));
    }

    private static JobGraph createJobGraph() {
        return JobGraphTestUtils.streamingJobGraph(JOB_VERTEX);
    }

    static class DummyState
    implements State {
        DummyState() {
        }

        public void cancel() {
        }

        public void suspend(Throwable cause) {
        }

        public JobStatus getJobStatus() {
            return null;
        }

        public ArchivedExecutionGraph getJob() {
            return null;
        }

        public void handleGlobalFailure(Throwable cause) {
        }

        public Logger getLogger() {
            return null;
        }

        private static class Factory
        implements StateFactory<DummyState> {
            private Factory() {
            }

            public Class<DummyState> getStateClass() {
                return DummyState.class;
            }

            public DummyState getState() {
                return new DummyState();
            }
        }
    }

    private static class StateInstanceFactory
    implements StateFactory<LifecycleMethodCapturingState> {
        private final LifecycleMethodCapturingState instance;

        public StateInstanceFactory(LifecycleMethodCapturingState instance) {
            this.instance = instance;
        }

        public Class<LifecycleMethodCapturingState> getStateClass() {
            return LifecycleMethodCapturingState.class;
        }

        public LifecycleMethodCapturingState getState() {
            return this.instance;
        }
    }

    private static class SubmissionBufferingTaskManagerGateway
    extends SimpleAckingTaskManagerGateway {
        final BlockingQueue<TaskDeploymentDescriptor> submittedTasks;

        public SubmissionBufferingTaskManagerGateway(int capacity) {
            this.submittedTasks = new ArrayBlockingQueue<TaskDeploymentDescriptor>(capacity);
            super.setSubmitConsumer(this.submittedTasks::offer);
        }

        @Override
        public void setSubmitConsumer(Consumer<TaskDeploymentDescriptor> submitConsumer) {
            super.setSubmitConsumer(((Consumer<TaskDeploymentDescriptor>)this.submittedTasks::offer).andThen(submitConsumer));
        }

        public List<TaskDeploymentDescriptor> waitForSubmissions(int numSubmissions, Duration perTaskTimeout) throws InterruptedException {
            ArrayList<TaskDeploymentDescriptor> descriptors = new ArrayList<TaskDeploymentDescriptor>();
            for (int i = 0; i < numSubmissions; ++i) {
                descriptors.add(this.submittedTasks.poll(perTaskTimeout.toMillis(), TimeUnit.MILLISECONDS));
            }
            return descriptors;
        }
    }

    private static class LifecycleMethodCapturingState
    extends DummyState {
        boolean onLeaveCalled = false;
        @Nullable
        Class<? extends State> onLeaveNewStateArgument = null;

        private LifecycleMethodCapturingState() {
        }

        void reset() {
            this.onLeaveCalled = false;
            this.onLeaveNewStateArgument = null;
        }

        public void onLeave(Class<? extends State> newState) {
            this.onLeaveCalled = true;
            this.onLeaveNewStateArgument = newState;
        }
    }
}

