package org.apache.flink.runtime.jobmaster;

import akka.actor.ActorSystem;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.core.io.InputSplitSource;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.queryablestate.KvStateID;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.Checkpoints;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmanager.OnCompletionActions;
import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
import org.apache.flink.runtime.jobmaster.factories.UnregisteredJobManagerJobMetricGroupFactory;
import org.apache.flink.runtime.jobmaster.slotpool.DefaultSlotPoolFactory;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.query.UnknownKvStateLocation;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.OperatorStreamStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.taskexecutor.rpc.RpcCheckpointResponder;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.testutils.ClassLoaderUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.SerializedThrowable;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.SupplierWithException;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/runtime/jobmaster/JobMasterTest.class */
public class JobMasterTest extends TestLogger {
    private static final long fastHeartbeatInterval = 1;
    private static final long fastHeartbeatTimeout = 5;
    private static final long heartbeatInterval = 1000;
    private static final long heartbeatTimeout = 5000;
    private static TestingRpcService rpcService;
    private static HeartbeatServices fastHeartbeatServices;
    private static HeartbeatServices heartbeatServices;
    private BlobServer blobServer;
    private Configuration configuration;
    private ResourceID jmResourceId;
    private JobMasterId jobMasterId;
    private TestingHighAvailabilityServices haServices;
    private SettableLeaderRetrievalService rmLeaderRetrievalService;
    private TestingFatalErrorHandler testingFatalErrorHandler;
    private static final TestingInputSplit[] EMPTY_TESTING_INPUT_SPLITS = new TestingInputSplit[0];

    @ClassRule
    public static TemporaryFolder temporaryFolder = new TemporaryFolder();
    private static final Time testingTimeout = Time.seconds(10);
    private static final JobGraph jobGraph = new JobGraph(new JobVertex[0]);

    /* loaded from: input_file:org/apache/flink/runtime/jobmaster/JobMasterTest$DummyCheckpointStorageLocation.class */
    private static final class DummyCheckpointStorageLocation implements CompletedCheckpointStorageLocation {
        private static final long serialVersionUID = 164095949572620688L;

        private DummyCheckpointStorageLocation() {
        }

        public String getExternalPointer() {
            return null;
        }

        public StreamStateHandle getMetadataHandle() {
            return null;
        }

        public void disposeStorageLocation() throws IOException {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/jobmaster/JobMasterTest$NoOpOnCompletionActions.class */
    public static final class NoOpOnCompletionActions implements OnCompletionActions {
        private NoOpOnCompletionActions() {
        }

        public void jobReachedGloballyTerminalState(ArchivedExecutionGraph archivedExecutionGraph) {
        }

        public void jobFinishedByOther() {
        }

        public void jobMasterFailed(Throwable th) {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/jobmaster/JobMasterTest$TestingInputSplit.class */
    public static final class TestingInputSplit implements InputSplit {
        private static final long serialVersionUID = -5404803705463116083L;
        private final int splitNumber;

        TestingInputSplit(int i) {
            this.splitNumber = i;
        }

        public int getSplitNumber() {
            return this.splitNumber;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            return obj != null && getClass() == obj.getClass() && this.splitNumber == ((TestingInputSplit) obj).splitNumber;
        }

        public int hashCode() {
            return Objects.hash(Integer.valueOf(this.splitNumber));
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/jobmaster/JobMasterTest$TestingInputSplitSource.class */
    private static final class TestingInputSplitSource implements InputSplitSource<TestingInputSplit> {
        private static final long serialVersionUID = -2344684048759139086L;
        private final List<TestingInputSplit> inputSplits;

        private TestingInputSplitSource(List<TestingInputSplit> list) {
            this.inputSplits = list;
        }

        /* renamed from: createInputSplits, reason: merged with bridge method [inline-methods] */
        public TestingInputSplit[] m101createInputSplits(int i) {
            return (TestingInputSplit[]) this.inputSplits.toArray(JobMasterTest.EMPTY_TESTING_INPUT_SPLITS);
        }

        public InputSplitAssigner getInputSplitAssigner(TestingInputSplit[] testingInputSplitArr) {
            return new DefaultInputSplitAssigner(testingInputSplitArr);
        }
    }

    @BeforeClass
    public static void setupClass() {
        rpcService = new TestingRpcService();
        fastHeartbeatServices = new TestingHeartbeatServices(fastHeartbeatInterval, fastHeartbeatTimeout, rpcService.getScheduledExecutor());
        heartbeatServices = new TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout, rpcService.getScheduledExecutor());
    }

    @Before
    public void setup() throws IOException {
        this.configuration = new Configuration();
        this.haServices = new TestingHighAvailabilityServices();
        this.jobMasterId = JobMasterId.generate();
        this.jmResourceId = ResourceID.generate();
        this.testingFatalErrorHandler = new TestingFatalErrorHandler();
        this.haServices.setCheckpointRecoveryFactory(new StandaloneCheckpointRecoveryFactory());
        this.rmLeaderRetrievalService = new SettableLeaderRetrievalService((String) null, (UUID) null);
        this.haServices.setResourceManagerLeaderRetriever(this.rmLeaderRetrievalService);
        this.configuration.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
        this.blobServer = new BlobServer(this.configuration, new VoidBlobStore());
        this.blobServer.start();
    }

    @After
    public void teardown() throws Exception {
        if (this.testingFatalErrorHandler != null) {
            this.testingFatalErrorHandler.rethrowError();
        }
        if (this.blobServer != null) {
            this.blobServer.close();
        }
        rpcService.clearGateways();
    }

    @AfterClass
    public static void teardownClass() {
        if (rpcService != null) {
            rpcService.stopService();
            rpcService = null;
        }
    }

    @Test
    public void testDeclineCheckpointInvocationWithUserException() throws Exception {
        RpcService rpcService2 = null;
        RpcService rpcService3 = null;
        try {
            ActorSystem createDefaultActorSystem = AkkaUtils.createDefaultActorSystem();
            ActorSystem createDefaultActorSystem2 = AkkaUtils.createDefaultActorSystem();
            rpcService2 = new AkkaRpcService(createDefaultActorSystem, testingTimeout);
            rpcService3 = new AkkaRpcService(createDefaultActorSystem2, testingTimeout);
            final CompletableFuture completableFuture = new CompletableFuture();
            JobMaster jobMaster = new JobMaster(rpcService2, JobMasterConfiguration.fromConfiguration(this.configuration), this.jmResourceId, jobGraph, this.haServices, DefaultSlotPoolFactory.fromConfiguration(this.configuration, rpcService2), new TestingJobManagerSharedServicesBuilder().build(), heartbeatServices, this.blobServer, UnregisteredJobManagerJobMetricGroupFactory.INSTANCE, new NoOpOnCompletionActions(), this.testingFatalErrorHandler, JobMasterTest.class.getClassLoader()) { // from class: org.apache.flink.runtime.jobmaster.JobMasterTest.1
                public void declineCheckpoint(DeclineCheckpoint declineCheckpoint) {
                    completableFuture.complete(declineCheckpoint.getReason());
                }
            };
            jobMaster.start(this.jobMasterId, testingTimeout).get();
            Throwable th = (Throwable) Class.forName("UserException", false, ClassLoaderUtils.compileAndLoadJava(temporaryFolder.newFolder(), "UserException.java", String.format("public class %s extends RuntimeException { public %s() {super(\"UserMessage\");} }", "UserException", "UserException"))).newInstance();
            new RpcCheckpointResponder((JobMasterGateway) rpcService3.connect(jobMaster.getAddress(), jobMaster.getFencingToken(), JobMasterGateway.class).get()).declineCheckpoint(jobGraph.getJobID(), new ExecutionAttemptID(fastHeartbeatInterval, fastHeartbeatInterval), fastHeartbeatInterval, th);
            Throwable th2 = (Throwable) completableFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            MatcherAssert.assertThat(th2, Matchers.instanceOf(SerializedThrowable.class));
            MatcherAssert.assertThat(th2.getMessage(), Matchers.equalTo(th.getMessage()));
            RpcUtils.terminateRpcServices(testingTimeout, new RpcService[]{rpcService2, rpcService3});
        } catch (Throwable th3) {
            RpcUtils.terminateRpcServices(testingTimeout, new RpcService[]{rpcService2, rpcService3});
            throw th3;
        }
    }

    @Test
    public void testHeartbeatTimeoutWithTaskManager() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        LocalTaskManagerLocation localTaskManagerLocation = new LocalTaskManagerLocation();
        TestingTaskExecutorGatewayBuilder testingTaskExecutorGatewayBuilder = new TestingTaskExecutorGatewayBuilder();
        completableFuture.getClass();
        RpcGateway createTestingTaskExecutorGateway = testingTaskExecutorGatewayBuilder.setHeartbeatJobManagerConsumer((v1) -> {
            r1.complete(v1);
        }).setDisconnectJobManagerConsumer((jobID, th) -> {
            completableFuture2.complete(jobID);
        }).createTestingTaskExecutorGateway();
        rpcService.registerGateway(createTestingTaskExecutorGateway.getAddress(), createTestingTaskExecutorGateway);
        JobManagerSharedServices build = new TestingJobManagerSharedServicesBuilder().build();
        JobMaster createJobMaster = createJobMaster(this.configuration, jobGraph, this.haServices, build);
        try {
            createJobMaster.start(this.jobMasterId, testingTimeout).get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            createJobMaster.getSelfGateway(JobMasterGateway.class).registerTaskManager(createTestingTaskExecutorGateway.getAddress(), localTaskManagerLocation, testingTimeout).get();
            MatcherAssert.assertThat((ResourceID) completableFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS), Matchers.equalTo(this.jmResourceId));
            MatcherAssert.assertThat((JobID) completableFuture2.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS), Matchers.equalTo(jobGraph.getJobID()));
            build.shutdown();
            RpcUtils.terminateRpcEndpoint(createJobMaster, testingTimeout);
        } catch (Throwable th2) {
            build.shutdown();
            RpcUtils.terminateRpcEndpoint(createJobMaster, testingTimeout);
            throw th2;
        }
    }

    @Test
    public void testHeartbeatTimeoutWithResourceManager() throws Exception {
        ResourceManagerId generate = ResourceManagerId.generate();
        RpcGateway testingResourceManagerGateway = new TestingResourceManagerGateway(generate, new ResourceID("rm"), "rm", "localhost");
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        CountDownLatch countDownLatch = new CountDownLatch(2);
        testingResourceManagerGateway.setRegisterJobManagerConsumer(tuple4 -> {
            completableFuture.complete(Tuple3.of(tuple4.f0, tuple4.f1, tuple4.f3));
            countDownLatch.countDown();
        });
        testingResourceManagerGateway.setDisconnectJobManagerConsumer(tuple2 -> {
            completableFuture2.complete(tuple2.f0);
        });
        rpcService.registerGateway("rm", testingResourceManagerGateway);
        JobManagerSharedServices build = new TestingJobManagerSharedServicesBuilder().build();
        JobMaster createJobMaster = createJobMaster(this.configuration, jobGraph, this.haServices, build);
        try {
            createJobMaster.start(this.jobMasterId, testingTimeout).get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            this.rmLeaderRetrievalService.notifyListener("rm", generate.toUUID());
            Tuple3 tuple3 = (Tuple3) completableFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            MatcherAssert.assertThat(tuple3.f0, Matchers.equalTo(this.jobMasterId));
            MatcherAssert.assertThat(tuple3.f1, Matchers.equalTo(this.jmResourceId));
            MatcherAssert.assertThat(tuple3.f2, Matchers.equalTo(jobGraph.getJobID()));
            MatcherAssert.assertThat((JobID) completableFuture2.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS), Matchers.equalTo(jobGraph.getJobID()));
            countDownLatch.await();
            build.shutdown();
            RpcUtils.terminateRpcEndpoint(createJobMaster, testingTimeout);
        } catch (Throwable th) {
            build.shutdown();
            RpcUtils.terminateRpcEndpoint(createJobMaster, testingTimeout);
            throw th;
        }
    }

    @Test
    public void testRestoringFromSavepoint() throws Exception {
        JobGraph createJobGraphWithCheckpointing = createJobGraphWithCheckpointing(SavepointRestoreSettings.forPath(createSavepoint(42L).getAbsolutePath(), true));
        StandaloneCompletedCheckpointStore standaloneCompletedCheckpointStore = new StandaloneCompletedCheckpointStore(1);
        this.haServices.setCheckpointRecoveryFactory(new TestingCheckpointRecoveryFactory(standaloneCompletedCheckpointStore, new StandaloneCheckpointIDCounter()));
        JobMaster createJobMaster = createJobMaster(this.configuration, createJobGraphWithCheckpointing, this.haServices, new TestingJobManagerSharedServicesBuilder().build());
        try {
            CompletedCheckpoint latestCheckpoint = standaloneCompletedCheckpointStore.getLatestCheckpoint();
            MatcherAssert.assertThat(latestCheckpoint, Matchers.notNullValue());
            MatcherAssert.assertThat(Long.valueOf(latestCheckpoint.getCheckpointID()), Matchers.is(42L));
            RpcUtils.terminateRpcEndpoint(createJobMaster, testingTimeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(createJobMaster, testingTimeout);
            throw th;
        }
    }

    @Test
    public void testRestoringModifiedJobFromSavepoint() throws Exception {
        File createSavepointWithOperatorState = createSavepointWithOperatorState(42L, new OperatorID());
        SavepointRestoreSettings forPath = SavepointRestoreSettings.forPath(createSavepointWithOperatorState.getAbsolutePath(), false);
        JobVertex jobVertex = new JobVertex("New operator");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        JobGraph createJobGraphFromJobVerticesWithCheckpointing = createJobGraphFromJobVerticesWithCheckpointing(forPath, jobVertex);
        StandaloneCompletedCheckpointStore standaloneCompletedCheckpointStore = new StandaloneCompletedCheckpointStore(1);
        this.haServices.setCheckpointRecoveryFactory(new TestingCheckpointRecoveryFactory(standaloneCompletedCheckpointStore, new StandaloneCheckpointIDCounter()));
        try {
            createJobMaster(this.configuration, createJobGraphFromJobVerticesWithCheckpointing, this.haServices, new TestingJobManagerSharedServicesBuilder().build());
            Assert.fail("Should fail because we cannot resume the changed JobGraph from the savepoint.");
        } catch (IllegalStateException e) {
        }
        createJobGraphFromJobVerticesWithCheckpointing.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(createSavepointWithOperatorState.getAbsolutePath(), true));
        JobMaster createJobMaster = createJobMaster(this.configuration, createJobGraphFromJobVerticesWithCheckpointing, this.haServices, new TestingJobManagerSharedServicesBuilder().build());
        try {
            CompletedCheckpoint latestCheckpoint = standaloneCompletedCheckpointStore.getLatestCheckpoint();
            MatcherAssert.assertThat(latestCheckpoint, Matchers.notNullValue());
            MatcherAssert.assertThat(Long.valueOf(latestCheckpoint.getCheckpointID()), Matchers.is(42L));
            RpcUtils.terminateRpcEndpoint(createJobMaster, testingTimeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(createJobMaster, testingTimeout);
            throw th;
        }
    }

    @Test
    public void testAutomaticRestartingWhenCheckpointing() throws Exception {
        JobGraph createJobGraphWithCheckpointing = createJobGraphWithCheckpointing(SavepointRestoreSettings.forPath(createSavepoint(42L).getAbsolutePath(), true));
        this.haServices.setCheckpointRecoveryFactory(new TestingCheckpointRecoveryFactory(new StandaloneCompletedCheckpointStore(1), new StandaloneCheckpointIDCounter()));
        RestartStrategy restartStrategy = createJobMaster(new Configuration(), createJobGraphWithCheckpointing, this.haServices, new TestingJobManagerSharedServicesBuilder().setRestartStrategyFactory(RestartStrategyFactory.createRestartStrategyFactory(this.configuration)).build()).getRestartStrategy();
        Assert.assertNotNull(restartStrategy);
        Assert.assertTrue(restartStrategy instanceof FixedDelayRestartStrategy);
    }

    @Test
    public void testCheckpointPrecedesSavepointRecovery() throws Exception {
        JobGraph createJobGraphWithCheckpointing = createJobGraphWithCheckpointing(SavepointRestoreSettings.forPath("" + createSavepoint(42L).getAbsolutePath(), true));
        CompletedCheckpoint completedCheckpoint = new CompletedCheckpoint(createJobGraphWithCheckpointing.getJobID(), fastHeartbeatInterval, fastHeartbeatInterval, fastHeartbeatInterval, Collections.emptyMap(), (Collection) null, CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), new DummyCheckpointStorageLocation());
        StandaloneCompletedCheckpointStore standaloneCompletedCheckpointStore = new StandaloneCompletedCheckpointStore(1);
        standaloneCompletedCheckpointStore.addCheckpoint(completedCheckpoint);
        this.haServices.setCheckpointRecoveryFactory(new TestingCheckpointRecoveryFactory(standaloneCompletedCheckpointStore, new StandaloneCheckpointIDCounter()));
        JobMaster createJobMaster = createJobMaster(this.configuration, createJobGraphWithCheckpointing, this.haServices, new TestingJobManagerSharedServicesBuilder().build());
        try {
            CompletedCheckpoint latestCheckpoint = standaloneCompletedCheckpointStore.getLatestCheckpoint();
            MatcherAssert.assertThat(latestCheckpoint, Matchers.notNullValue());
            MatcherAssert.assertThat(Long.valueOf(latestCheckpoint.getCheckpointID()), Matchers.is(Long.valueOf(fastHeartbeatInterval)));
            RpcUtils.terminateRpcEndpoint(createJobMaster, testingTimeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(createJobMaster, testingTimeout);
            throw th;
        }
    }

    @Test
    public void testSlotRequestTimeoutWhenNoSlotOffering() throws Exception {
        JobGraph createSingleVertexJobWithRestartStrategy = createSingleVertexJobWithRestartStrategy();
        this.configuration.setLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT, 10L);
        JobMaster createJobMaster = createJobMaster(this.configuration, createSingleVertexJobWithRestartStrategy, this.haServices, new TestingJobManagerSharedServicesBuilder().build(), heartbeatServices);
        JobMasterGateway selfGateway = createJobMaster.getSelfGateway(JobMasterGateway.class);
        try {
            long nanoTime = System.nanoTime();
            createJobMaster.start(JobMasterId.generate(), testingTimeout).get();
            RpcGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
            ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(2);
            arrayBlockingQueue.getClass();
            testingResourceManagerGateway.setRequestSlotConsumer((v1) -> {
                r1.offer(v1);
            });
            rpcService.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway);
            this.rmLeaderRetrievalService.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.m205getFencingToken().toUUID());
            arrayBlockingQueue.take();
            CompletableFuture completableFuture = new CompletableFuture();
            LocalTaskManagerLocation localTaskManagerLocation = new LocalTaskManagerLocation();
            RpcGateway createTestingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setSubmitTaskConsumer((taskDeploymentDescriptor, jobMasterId) -> {
                completableFuture.complete(taskDeploymentDescriptor);
                return CompletableFuture.completedFuture(Acknowledge.get());
            }).createTestingTaskExecutorGateway();
            rpcService.registerGateway(createTestingTaskExecutorGateway.getAddress(), createTestingTaskExecutorGateway);
            selfGateway.registerTaskManager(createTestingTaskExecutorGateway.getAddress(), localTaskManagerLocation, testingTimeout).get();
            SlotRequest slotRequest = (SlotRequest) arrayBlockingQueue.take();
            MatcherAssert.assertThat(Long.valueOf((System.nanoTime() - nanoTime) / 1000000), Matchers.greaterThanOrEqualTo(10L));
            MatcherAssert.assertThat(Boolean.valueOf(completableFuture.isDone()), Matchers.is(false));
            Collection collection = (Collection) selfGateway.offerSlots(localTaskManagerLocation.getResourceID(), Collections.singleton(new SlotOffer(slotRequest.getAllocationId(), 0, ResourceProfile.UNKNOWN)), testingTimeout).get();
            MatcherAssert.assertThat(collection, Matchers.hasSize(1));
            MatcherAssert.assertThat(((SlotOffer) collection.iterator().next()).getAllocationId(), Matchers.equalTo(slotRequest.getAllocationId()));
            MatcherAssert.assertThat(((TaskDeploymentDescriptor) completableFuture.get()).getAllocationId(), Matchers.equalTo(slotRequest.getAllocationId()));
            RpcUtils.terminateRpcEndpoint(createJobMaster, testingTimeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(createJobMaster, testingTimeout);
            throw th;
        }
    }

    @Test
    public void testCloseUnestablishedResourceManagerConnection() throws Exception {
        JobMaster createJobMaster = createJobMaster(this.configuration, jobGraph, this.haServices, new TestingJobManagerSharedServicesBuilder().build());
        try {
            createJobMaster.start(JobMasterId.generate(), testingTimeout).get();
            ResourceManagerId generate = ResourceManagerId.generate();
            RpcGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
            RpcGateway testingResourceManagerGateway2 = new TestingResourceManagerGateway();
            rpcService.registerGateway("address1", testingResourceManagerGateway);
            rpcService.registerGateway("address2", testingResourceManagerGateway2);
            OneShotLatch oneShotLatch = new OneShotLatch();
            OneShotLatch oneShotLatch2 = new OneShotLatch();
            testingResourceManagerGateway.setRegisterJobManagerConsumer(tuple4 -> {
                oneShotLatch.trigger();
            });
            testingResourceManagerGateway2.setRegisterJobManagerConsumer(tuple42 -> {
                oneShotLatch2.trigger();
            });
            this.rmLeaderRetrievalService.notifyListener("address1", generate.toUUID());
            oneShotLatch.await();
            this.rmLeaderRetrievalService.notifyListener("address2", generate.toUUID());
            oneShotLatch2.await();
            RpcUtils.terminateRpcEndpoint(createJobMaster, testingTimeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(createJobMaster, testingTimeout);
            throw th;
        }
    }

    @Test
    public void testReconnectionAfterDisconnect() throws Exception {
        JobMaster createJobMaster = createJobMaster(this.configuration, jobGraph, this.haServices, new TestingJobManagerSharedServicesBuilder().build());
        JobMasterGateway selfGateway = createJobMaster.getSelfGateway(JobMasterGateway.class);
        try {
            createJobMaster.start(this.jobMasterId, testingTimeout).get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            RpcGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
            ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(1);
            testingResourceManagerGateway.setRegisterJobManagerConsumer(tuple4 -> {
                arrayBlockingQueue.offer(tuple4.f0);
            });
            rpcService.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway);
            ResourceManagerId m205getFencingToken = testingResourceManagerGateway.m205getFencingToken();
            this.rmLeaderRetrievalService.notifyListener(testingResourceManagerGateway.getAddress(), m205getFencingToken.toUUID());
            MatcherAssert.assertThat((JobMasterId) arrayBlockingQueue.take(), Matchers.equalTo(this.jobMasterId));
            MatcherAssert.assertThat(Boolean.valueOf(arrayBlockingQueue.isEmpty()), Matchers.is(true));
            selfGateway.disconnectResourceManager(m205getFencingToken, new FlinkException("Test exception"));
            MatcherAssert.assertThat(arrayBlockingQueue.take(), Matchers.equalTo(this.jobMasterId));
            RpcUtils.terminateRpcEndpoint(createJobMaster, testingTimeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(createJobMaster, testingTimeout);
            throw th;
        }
    }

    @Test
    public void testResourceManagerConnectionAfterRegainingLeadership() throws Exception {
        JobMaster createJobMaster = createJobMaster(this.configuration, jobGraph, this.haServices, new TestingJobManagerSharedServicesBuilder().build());
        try {
            createJobMaster.start(this.jobMasterId, testingTimeout).get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            RpcGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
            ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(1);
            testingResourceManagerGateway.setRegisterJobManagerConsumer(tuple4 -> {
                arrayBlockingQueue.offer(tuple4.f0);
            });
            String address = testingResourceManagerGateway.getAddress();
            rpcService.registerGateway(address, testingResourceManagerGateway);
            this.rmLeaderRetrievalService.notifyListener(address, testingResourceManagerGateway.m205getFencingToken().toUUID());
            MatcherAssert.assertThat((JobMasterId) arrayBlockingQueue.take(), Matchers.equalTo(this.jobMasterId));
            createJobMaster.suspend(new FlinkException("Test exception."), testingTimeout).get();
            JobMasterId generate = JobMasterId.generate();
            createJobMaster.start(generate, testingTimeout).get();
            MatcherAssert.assertThat((JobMasterId) arrayBlockingQueue.take(), Matchers.equalTo(generate));
            RpcUtils.terminateRpcEndpoint(createJobMaster, testingTimeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(createJobMaster, testingTimeout);
            throw th;
        }
    }

    @Test
    public void testRequestNextInputSplit() throws Exception {
        List asList = Arrays.asList(new TestingInputSplit(1), new TestingInputSplit(42), new TestingInputSplit(1337));
        TestingInputSplitSource testingInputSplitSource = new TestingInputSplitSource(asList);
        JobVertex jobVertex = new JobVertex("vertex1");
        jobVertex.setParallelism(1);
        jobVertex.setInputSplitSource(testingInputSplitSource);
        jobVertex.setInvokableClass(AbstractInvokable.class);
        JobGraph jobGraph2 = new JobGraph(new JobVertex[]{jobVertex});
        jobGraph2.setAllowQueuedScheduling(true);
        this.configuration.setLong("restart-strategy.fixed-delay.attempts", fastHeartbeatInterval);
        this.configuration.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
        JobMaster createJobMaster = createJobMaster(this.configuration, jobGraph2, this.haServices, new TestingJobManagerSharedServicesBuilder().setRestartStrategyFactory(RestartStrategyFactory.createRestartStrategyFactory(this.configuration)).build());
        try {
            createJobMaster.start(this.jobMasterId, testingTimeout).get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            JobMasterGateway selfGateway = createJobMaster.getSelfGateway(JobMasterGateway.class);
            ExecutionGraph executionGraph = createJobMaster.getExecutionGraph();
            ExecutionVertex executionVertex = (ExecutionVertex) executionGraph.getAllExecutionVertices().iterator().next();
            SupplierWithException supplierWithException = () -> {
                return (SerializedInputSplit) selfGateway.requestNextInputSplit(jobVertex.getID(), executionVertex.getCurrentExecutionAttempt().getAttemptId()).get();
            };
            List<InputSplit> inputSplits = getInputSplits(asList.size(), supplierWithException);
            Matcher containsInAnyOrder = Matchers.containsInAnyOrder(asList.toArray(EMPTY_TESTING_INPUT_SPLITS));
            MatcherAssert.assertThat(inputSplits, containsInAnyOrder);
            ExecutionGraphTestUtils.waitUntilExecutionVertexState(executionVertex, ExecutionState.SCHEDULED, 2000L);
            executionGraph.failGlobal(new Exception("Testing exception"));
            ExecutionGraphTestUtils.waitUntilExecutionVertexState(executionVertex, ExecutionState.SCHEDULED, 2000L);
            MatcherAssert.assertThat(getInputSplits(asList.size(), supplierWithException), containsInAnyOrder);
            RpcUtils.terminateRpcEndpoint(createJobMaster, testingTimeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(createJobMaster, testingTimeout);
            throw th;
        }
    }

    @Nonnull
    private static List<InputSplit> getInputSplits(int i, SupplierWithException<SerializedInputSplit, Exception> supplierWithException) throws Exception {
        ArrayList arrayList = new ArrayList(i);
        for (int i2 = 0; i2 < i; i2++) {
            SerializedInputSplit serializedInputSplit = (SerializedInputSplit) supplierWithException.get();
            MatcherAssert.assertThat(Boolean.valueOf(serializedInputSplit.isEmpty()), Matchers.is(false));
            arrayList.add(InstantiationUtil.deserializeObject(serializedInputSplit.getInputSplitData(), ClassLoader.getSystemClassLoader()));
        }
        SerializedInputSplit serializedInputSplit2 = (SerializedInputSplit) supplierWithException.get();
        if (!serializedInputSplit2.isEmpty()) {
            MatcherAssert.assertThat((InputSplit) InstantiationUtil.deserializeObject(serializedInputSplit2.getInputSplitData(), ClassLoader.getSystemClassLoader()), Matchers.is(Matchers.nullValue()));
        }
        return arrayList;
    }

    @Test
    public void testRequestKvStateWithoutRegistration() throws Exception {
        JobGraph createKvJobGraph = createKvJobGraph();
        JobMaster createJobMaster = createJobMaster(this.configuration, createKvJobGraph, this.haServices, new TestingJobManagerSharedServicesBuilder().build(), heartbeatServices);
        CompletableFuture start = createJobMaster.start(this.jobMasterId, testingTimeout);
        JobMasterGateway selfGateway = createJobMaster.getSelfGateway(JobMasterGateway.class);
        try {
            start.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            try {
                selfGateway.requestKvStateLocation(createKvJobGraph.getJobID(), "unknown").get();
                Assert.fail("Expected to fail with UnknownKvStateLocation");
            } catch (Exception e) {
                Assert.assertTrue(ExceptionUtils.findThrowable(e, UnknownKvStateLocation.class).isPresent());
            }
        } finally {
            RpcUtils.terminateRpcEndpoint(createJobMaster, testingTimeout);
        }
    }

    @Test
    public void testRequestKvStateOfWrongJob() throws Exception {
        JobMaster createJobMaster = createJobMaster(this.configuration, createKvJobGraph(), this.haServices, new TestingJobManagerSharedServicesBuilder().build(), heartbeatServices);
        CompletableFuture start = createJobMaster.start(this.jobMasterId, testingTimeout);
        JobMasterGateway selfGateway = createJobMaster.getSelfGateway(JobMasterGateway.class);
        try {
            start.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            try {
                selfGateway.requestKvStateLocation(new JobID(), "unknown").get();
                Assert.fail("Expected to fail with FlinkJobNotFoundException");
            } catch (Exception e) {
                Assert.assertTrue(ExceptionUtils.findThrowable(e, FlinkJobNotFoundException.class).isPresent());
            }
        } finally {
            RpcUtils.terminateRpcEndpoint(createJobMaster, testingTimeout);
        }
    }

    @Nonnull
    public JobGraph createKvJobGraph() {
        JobVertex jobVertex = new JobVertex("v1");
        jobVertex.setParallelism(4);
        jobVertex.setMaxParallelism(16);
        jobVertex.setInvokableClass(BlockingNoOpInvokable.class);
        JobVertex jobVertex2 = new JobVertex("v2");
        jobVertex2.setParallelism(4);
        jobVertex2.setMaxParallelism(16);
        jobVertex2.setInvokableClass(BlockingNoOpInvokable.class);
        return new JobGraph(new JobVertex[]{jobVertex, jobVertex2});
    }

    @Test
    public void testRequestKvStateWithIrrelevantRegistration() throws Exception {
        JobMaster createJobMaster = createJobMaster(this.configuration, createKvJobGraph(), this.haServices, new TestingJobManagerSharedServicesBuilder().build(), heartbeatServices);
        CompletableFuture start = createJobMaster.start(this.jobMasterId, testingTimeout);
        JobMasterGateway selfGateway = createJobMaster.getSelfGateway(JobMasterGateway.class);
        try {
            start.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            try {
                selfGateway.notifyKvStateRegistered(new JobID(), new JobVertexID(), new KeyGroupRange(0, 0), "any-name", new KvStateID(), new InetSocketAddress(InetAddress.getLocalHost(), 1233)).get();
                Assert.fail("Expected to fail with FlinkJobNotFoundException.");
            } catch (Exception e) {
                Assert.assertTrue(ExceptionUtils.findThrowable(e, FlinkJobNotFoundException.class).isPresent());
            }
        } finally {
            RpcUtils.terminateRpcEndpoint(createJobMaster, testingTimeout);
        }
    }

    @Test
    public void testRegisterAndUnregisterKvState() throws Exception {
        JobGraph createKvJobGraph = createKvJobGraph();
        JobVertex jobVertex = (JobVertex) createKvJobGraph.getVerticesSortedTopologicallyFromSources().get(0);
        JobMaster createJobMaster = createJobMaster(this.configuration, createKvJobGraph, this.haServices, new TestingJobManagerSharedServicesBuilder().build(), heartbeatServices);
        CompletableFuture start = createJobMaster.start(this.jobMasterId, testingTimeout);
        JobMasterGateway selfGateway = createJobMaster.getSelfGateway(JobMasterGateway.class);
        try {
            start.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            KvStateID kvStateID = new KvStateID();
            KeyGroupRange keyGroupRange = new KeyGroupRange(0, 0);
            InetSocketAddress inetSocketAddress = new InetSocketAddress(InetAddress.getLocalHost(), 1029);
            selfGateway.notifyKvStateRegistered(createKvJobGraph.getJobID(), jobVertex.getID(), keyGroupRange, "register-me", kvStateID, inetSocketAddress).get();
            KvStateLocation kvStateLocation = (KvStateLocation) selfGateway.requestKvStateLocation(createKvJobGraph.getJobID(), "register-me").get();
            Assert.assertEquals(createKvJobGraph.getJobID(), kvStateLocation.getJobId());
            Assert.assertEquals(jobVertex.getID(), kvStateLocation.getJobVertexId());
            Assert.assertEquals(jobVertex.getMaxParallelism(), kvStateLocation.getNumKeyGroups());
            Assert.assertEquals(fastHeartbeatInterval, kvStateLocation.getNumRegisteredKeyGroups());
            Assert.assertEquals(fastHeartbeatInterval, keyGroupRange.getNumberOfKeyGroups());
            Assert.assertEquals(kvStateID, kvStateLocation.getKvStateID(keyGroupRange.getStartKeyGroup()));
            Assert.assertEquals(inetSocketAddress, kvStateLocation.getKvStateServerAddress(keyGroupRange.getStartKeyGroup()));
            selfGateway.notifyKvStateUnregistered(createKvJobGraph.getJobID(), jobVertex.getID(), keyGroupRange, "register-me").get();
            try {
                selfGateway.requestKvStateLocation(createKvJobGraph.getJobID(), "register-me").get();
                Assert.fail("Expected to fail with an UnknownKvStateLocation.");
            } catch (Exception e) {
                Assert.assertTrue(ExceptionUtils.findThrowable(e, UnknownKvStateLocation.class).isPresent());
            }
        } finally {
            RpcUtils.terminateRpcEndpoint(createJobMaster, testingTimeout);
        }
    }

    @Test
    public void testDuplicatedKvStateRegistrationsFailTask() throws Exception {
        JobGraph createKvJobGraph = createKvJobGraph();
        List verticesSortedTopologicallyFromSources = createKvJobGraph.getVerticesSortedTopologicallyFromSources();
        JobVertex jobVertex = (JobVertex) verticesSortedTopologicallyFromSources.get(0);
        JobVertex jobVertex2 = (JobVertex) verticesSortedTopologicallyFromSources.get(1);
        JobMaster createJobMaster = createJobMaster(this.configuration, createKvJobGraph, this.haServices, new TestingJobManagerSharedServicesBuilder().build(), heartbeatServices);
        CompletableFuture start = createJobMaster.start(this.jobMasterId, testingTimeout);
        JobMasterGateway selfGateway = createJobMaster.getSelfGateway(JobMasterGateway.class);
        try {
            start.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            KvStateID kvStateID = new KvStateID();
            KeyGroupRange keyGroupRange = new KeyGroupRange(0, 0);
            InetSocketAddress inetSocketAddress = new InetSocketAddress(InetAddress.getLocalHost(), 4396);
            selfGateway.notifyKvStateRegistered(createKvJobGraph.getJobID(), jobVertex.getID(), keyGroupRange, "duplicate-me", kvStateID, inetSocketAddress).get();
            try {
                selfGateway.notifyKvStateRegistered(createKvJobGraph.getJobID(), jobVertex2.getID(), keyGroupRange, "duplicate-me", kvStateID, inetSocketAddress).get();
                Assert.fail("Expected to fail because of clashing registration message.");
            } catch (Exception e) {
                Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(e, "Registration name clash").isPresent());
                Assert.assertEquals(JobStatus.FAILED, createJobMaster.getExecutionGraph().getState());
            }
        } finally {
            RpcUtils.terminateRpcEndpoint(createJobMaster, testingTimeout);
        }
    }

    @Test
    public void testRequestPartitionState() throws Exception {
        JobGraph producerConsumerJobGraph = producerConsumerJobGraph();
        JobMaster createJobMaster = createJobMaster(this.configuration, producerConsumerJobGraph, this.haServices, new TestingJobManagerSharedServicesBuilder().build(), heartbeatServices);
        try {
            createJobMaster.start(this.jobMasterId, testingTimeout).get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            RpcGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
            CompletableFuture completableFuture = new CompletableFuture();
            testingResourceManagerGateway.setRequestSlotConsumer(slotRequest -> {
                completableFuture.complete(slotRequest.getAllocationId());
            });
            rpcService.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway);
            CompletableFuture completableFuture2 = new CompletableFuture();
            RpcGateway createTestingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setSubmitTaskConsumer((taskDeploymentDescriptor, jobMasterId) -> {
                completableFuture2.complete(taskDeploymentDescriptor);
                return CompletableFuture.completedFuture(Acknowledge.get());
            }).createTestingTaskExecutorGateway();
            rpcService.registerGateway(createTestingTaskExecutorGateway.getAddress(), createTestingTaskExecutorGateway);
            JobMasterGateway selfGateway = createJobMaster.getSelfGateway(JobMasterGateway.class);
            this.rmLeaderRetrievalService.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.m205getFencingToken().toUUID());
            AllocationID allocationID = (AllocationID) completableFuture.get();
            LocalTaskManagerLocation localTaskManagerLocation = new LocalTaskManagerLocation();
            selfGateway.registerTaskManager(createTestingTaskExecutorGateway.getAddress(), localTaskManagerLocation, testingTimeout).get();
            SlotOffer slotOffer = new SlotOffer(allocationID, 0, ResourceProfile.UNKNOWN);
            Collection collection = (Collection) selfGateway.offerSlots(localTaskManagerLocation.getResourceID(), Collections.singleton(slotOffer), testingTimeout).get();
            MatcherAssert.assertThat(collection, Matchers.hasSize(1));
            MatcherAssert.assertThat(collection, Matchers.contains(new SlotOffer[]{slotOffer}));
            TaskDeploymentDescriptor taskDeploymentDescriptor2 = (TaskDeploymentDescriptor) completableFuture2.get();
            MatcherAssert.assertThat(taskDeploymentDescriptor2.getProducedPartitions(), Matchers.hasSize(1));
            ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor = (ResultPartitionDeploymentDescriptor) taskDeploymentDescriptor2.getProducedPartitions().iterator().next();
            ExecutionAttemptID executionAttemptId = taskDeploymentDescriptor2.getExecutionAttemptId();
            ExecutionAttemptID executionAttemptID = new ExecutionAttemptID(executionAttemptId.getLowerPart(), executionAttemptId.getUpperPart());
            selfGateway.updateTaskExecutionState(new TaskExecutionState(producerConsumerJobGraph.getJobID(), executionAttemptId, ExecutionState.FINISHED)).get();
            ResultPartitionID resultPartitionID = new ResultPartitionID(resultPartitionDeploymentDescriptor.getPartitionId(), executionAttemptID);
            MatcherAssert.assertThat(selfGateway.requestPartitionState(resultPartitionDeploymentDescriptor.getResultId(), resultPartitionID).get(), Matchers.equalTo(ExecutionState.FINISHED));
            try {
                selfGateway.requestPartitionState(resultPartitionDeploymentDescriptor.getResultId(), new ResultPartitionID()).get();
                Assert.fail("Expected failure.");
            } catch (ExecutionException e) {
                MatcherAssert.assertThat(Boolean.valueOf(ExceptionUtils.findThrowable(e, IllegalArgumentException.class).isPresent()), Matchers.is(true));
            }
            try {
                selfGateway.requestPartitionState(new IntermediateDataSetID(), resultPartitionID).get();
                Assert.fail("Expected failure.");
            } catch (ExecutionException e2) {
                MatcherAssert.assertThat(Boolean.valueOf(ExceptionUtils.findThrowable(e2, IllegalArgumentException.class).isPresent()), Matchers.is(true));
            }
            try {
                selfGateway.requestPartitionState(resultPartitionDeploymentDescriptor.getResultId(), new ResultPartitionID(resultPartitionDeploymentDescriptor.getPartitionId(), new ExecutionAttemptID())).get();
                Assert.fail("Expected failure.");
            } catch (ExecutionException e3) {
                MatcherAssert.assertThat(Boolean.valueOf(ExceptionUtils.findThrowable(e3, PartitionProducerDisposedException.class).isPresent()), Matchers.is(true));
            }
        } finally {
            RpcUtils.terminateRpcEndpoint(createJobMaster, testingTimeout);
        }
    }

    @Test
    public void testTriggerSavepointTimeout() throws Exception {
        JobMaster jobMaster = new JobMaster(rpcService, JobMasterConfiguration.fromConfiguration(this.configuration), this.jmResourceId, jobGraph, this.haServices, DefaultSlotPoolFactory.fromConfiguration(this.configuration, rpcService), new TestingJobManagerSharedServicesBuilder().build(), heartbeatServices, this.blobServer, UnregisteredJobManagerJobMetricGroupFactory.INSTANCE, new NoOpOnCompletionActions(), this.testingFatalErrorHandler, JobMasterTest.class.getClassLoader()) { // from class: org.apache.flink.runtime.jobmaster.JobMasterTest.2
            public CompletableFuture<String> triggerSavepoint(@Nullable String str, boolean z, Time time) {
                return new CompletableFuture<>();
            }
        };
        try {
            jobMaster.start(this.jobMasterId, testingTimeout).get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            JobMasterGateway selfGateway = jobMaster.getSelfGateway(JobMasterGateway.class);
            CompletableFuture triggerSavepoint = selfGateway.triggerSavepoint("/tmp", false, Time.milliseconds(fastHeartbeatInterval));
            CompletableFuture triggerSavepoint2 = selfGateway.triggerSavepoint("/tmp", false, RpcUtils.INF_TIMEOUT);
            try {
                triggerSavepoint.get(testingTimeout.getSize(), testingTimeout.getUnit());
                Assert.fail();
            } catch (ExecutionException e) {
                MatcherAssert.assertThat(ExceptionUtils.stripExecutionException(e), Matchers.instanceOf(TimeoutException.class));
            }
            MatcherAssert.assertThat(Boolean.valueOf(triggerSavepoint2.isDone()), Matchers.is(Matchers.equalTo(false)));
            RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
            throw th;
        }
    }

    @Test
    public void testReleasingTaskExecutorIfNoMoreSlotsRegistered() throws Exception {
        JobManagerSharedServices build = new TestingJobManagerSharedServicesBuilder().build();
        JobGraph createSingleVertexJobWithRestartStrategy = createSingleVertexJobWithRestartStrategy();
        JobMaster createJobMaster = createJobMaster(this.configuration, createSingleVertexJobWithRestartStrategy, this.haServices, build, heartbeatServices);
        RpcGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
        rpcService.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway);
        this.rmLeaderRetrievalService.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.m205getFencingToken().toUUID());
        CompletableFuture completableFuture = new CompletableFuture();
        testingResourceManagerGateway.setRequestSlotConsumer(slotRequest -> {
            completableFuture.complete(slotRequest.getAllocationId());
        });
        CompletableFuture completableFuture2 = new CompletableFuture();
        CompletableFuture completableFuture3 = new CompletableFuture();
        RpcGateway createTestingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setFreeSlotFunction((allocationID, th) -> {
            completableFuture3.complete(allocationID);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).setDisconnectJobManagerConsumer((jobID, th2) -> {
            completableFuture2.complete(jobID);
        }).createTestingTaskExecutorGateway();
        LocalTaskManagerLocation localTaskManagerLocation = new LocalTaskManagerLocation();
        rpcService.registerGateway(createTestingTaskExecutorGateway.getAddress(), createTestingTaskExecutorGateway);
        try {
            createJobMaster.start(this.jobMasterId, testingTimeout).get();
            JobMasterGateway selfGateway = createJobMaster.getSelfGateway(JobMasterGateway.class);
            AllocationID allocationID2 = (AllocationID) completableFuture.get();
            selfGateway.registerTaskManager(createTestingTaskExecutorGateway.getAddress(), localTaskManagerLocation, testingTimeout).get();
            MatcherAssert.assertThat((Collection) selfGateway.offerSlots(localTaskManagerLocation.getResourceID(), Collections.singleton(new SlotOffer(allocationID2, 0, ResourceProfile.UNKNOWN)), testingTimeout).get(), Matchers.hasSize(1));
            selfGateway.notifyAllocationFailure(allocationID2, new FlinkException("Fail alloction test exception"));
            MatcherAssert.assertThat(completableFuture3.get(), Matchers.equalTo(allocationID2));
            MatcherAssert.assertThat(completableFuture2.get(), Matchers.equalTo(createSingleVertexJobWithRestartStrategy.getJobID()));
            RpcUtils.terminateRpcEndpoint(createJobMaster, testingTimeout);
        } catch (Throwable th3) {
            RpcUtils.terminateRpcEndpoint(createJobMaster, testingTimeout);
            throw th3;
        }
    }

    private JobGraph producerConsumerJobGraph() {
        JobVertex jobVertex = new JobVertex("Producer");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        JobVertex jobVertex2 = new JobVertex("Consumer");
        jobVertex2.setInvokableClass(NoOpInvokable.class);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        JobGraph jobGraph2 = new JobGraph(new JobVertex[]{jobVertex, jobVertex2});
        jobGraph2.setAllowQueuedScheduling(true);
        return jobGraph2;
    }

    private File createSavepoint(long j) throws IOException {
        return createSavepointWithOperatorState(j, new OperatorID[0]);
    }

    private File createSavepointWithOperatorState(long j, OperatorID... operatorIDArr) throws IOException {
        File newFile = temporaryFolder.newFile();
        SavepointV2 savepointV2 = new SavepointV2(j, createOperatorState(operatorIDArr), Collections.emptyList());
        FileOutputStream fileOutputStream = new FileOutputStream(newFile);
        Throwable th = null;
        try {
            Checkpoints.storeCheckpointMetadata(savepointV2, fileOutputStream);
            if (fileOutputStream != null) {
                if (0 != 0) {
                    try {
                        fileOutputStream.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    fileOutputStream.close();
                }
            }
            return newFile;
        } catch (Throwable th3) {
            if (fileOutputStream != null) {
                if (0 != 0) {
                    try {
                        fileOutputStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    fileOutputStream.close();
                }
            }
            throw th3;
        }
    }

    private Collection<OperatorState> createOperatorState(OperatorID... operatorIDArr) {
        ArrayList arrayList = new ArrayList(operatorIDArr.length);
        for (OperatorID operatorID : operatorIDArr) {
            OperatorState operatorState = new OperatorState(operatorID, 1, 42);
            operatorState.putState(0, new OperatorSubtaskState(new OperatorStreamStateHandle(Collections.emptyMap(), new ByteStreamStateHandle("foobar", new byte[0])), (OperatorStateHandle) null, (KeyedStateHandle) null, (KeyedStateHandle) null));
            arrayList.add(operatorState);
        }
        return arrayList;
    }

    @Nonnull
    private JobGraph createJobGraphWithCheckpointing(SavepointRestoreSettings savepointRestoreSettings) {
        return createJobGraphFromJobVerticesWithCheckpointing(savepointRestoreSettings, new JobVertex[0]);
    }

    @Nonnull
    private JobGraph createJobGraphFromJobVerticesWithCheckpointing(SavepointRestoreSettings savepointRestoreSettings, JobVertex... jobVertexArr) {
        JobGraph jobGraph2 = new JobGraph(jobVertexArr);
        jobGraph2.setSnapshotSettings(new JobCheckpointingSettings(Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), new CheckpointCoordinatorConfiguration(heartbeatInterval, heartbeatInterval, heartbeatInterval, 1, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, true), (SerializedValue) null));
        jobGraph2.setSavepointRestoreSettings(savepointRestoreSettings);
        return jobGraph2;
    }

    @Nonnull
    private JobMaster createJobMaster(Configuration configuration, JobGraph jobGraph2, HighAvailabilityServices highAvailabilityServices, JobManagerSharedServices jobManagerSharedServices) throws Exception {
        return createJobMaster(configuration, jobGraph2, highAvailabilityServices, jobManagerSharedServices, fastHeartbeatServices);
    }

    @Nonnull
    private JobMaster createJobMaster(Configuration configuration, JobGraph jobGraph2, HighAvailabilityServices highAvailabilityServices, JobManagerSharedServices jobManagerSharedServices, HeartbeatServices heartbeatServices2) throws Exception {
        return new JobMaster(rpcService, JobMasterConfiguration.fromConfiguration(configuration), this.jmResourceId, jobGraph2, highAvailabilityServices, DefaultSlotPoolFactory.fromConfiguration(configuration, rpcService), jobManagerSharedServices, heartbeatServices2, this.blobServer, UnregisteredJobManagerJobMetricGroupFactory.INSTANCE, new NoOpOnCompletionActions(), this.testingFatalErrorHandler, JobMasterTest.class.getClassLoader());
    }

    private JobGraph createSingleVertexJobWithRestartStrategy() throws IOException {
        JobVertex jobVertex = new JobVertex("Test vertex");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0L));
        JobGraph jobGraph2 = new JobGraph(new JobVertex[]{jobVertex});
        jobGraph2.setAllowQueuedScheduling(true);
        jobGraph2.setExecutionConfig(executionConfig);
        return jobGraph2;
    }
}
