package org.apache.flink.runtime.taskmanager;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Kill;
import akka.actor.Props;
import akka.actor.Status;
import akka.japi.Creator;
import akka.testkit.JavaTestKit;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.FlinkUntypedActor;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionLocation;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.Tasks;
import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.RegistrationMessages;
import org.apache.flink.runtime.messages.StackTraceSampleMessages;
import org.apache.flink.runtime.messages.StackTraceSampleResponse;
import org.apache.flink.runtime.messages.TaskManagerMessages;
import org.apache.flink.runtime.messages.TaskMessages;
import org.apache.flink.runtime.taskexecutor.TaskManagerServicesConfiguration;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
import org.apache.flink.runtime.testutils.StoppableInvokable;
import org.apache.flink.types.IntValue;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
import scala.util.Failure;

/* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskManagerTest.class */
public class TaskManagerTest extends TestLogger {
    private static ActorSystem system;
    private TestingHighAvailabilityServices highAvailabilityServices;
    private static final Logger LOG = LoggerFactory.getLogger(TaskManagerTest.class);
    private static final FiniteDuration timeout = new FiniteDuration(1, TimeUnit.MINUTES);
    private static final FiniteDuration d = new FiniteDuration(60, TimeUnit.SECONDS);
    private static final Time timeD = Time.seconds(60);
    static final UUID leaderSessionID = UUID.randomUUID();

    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskManagerTest$FailingScheduleOrUpdateConsumersJobManager.class */
    public static class FailingScheduleOrUpdateConsumersJobManager extends SimpleJobManager {
        public FailingScheduleOrUpdateConsumersJobManager(UUID uuid) {
            super(uuid);
        }

        @Override // org.apache.flink.runtime.taskmanager.TaskManagerTest.SimpleJobManager
        public void handleMessage(Object obj) throws Exception {
            if (obj instanceof JobManagerMessages.ScheduleOrUpdateConsumers) {
                getSender().tell(decorateMessage(new Status.Failure(new Exception("Could not schedule or update consumers."))), getSelf());
            } else {
                super.handleMessage(obj);
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskManagerTest$SimpleJobManager.class */
    public static class SimpleJobManager extends FlinkUntypedActor {
        private final UUID leaderSessionID;

        public SimpleJobManager(UUID uuid) {
            this.leaderSessionID = uuid;
        }

        public void handleMessage(Object obj) throws Exception {
            if (obj instanceof RegistrationMessages.RegisterTaskManager) {
                InstanceID instanceID = new InstanceID();
                getSender().tell(decorateMessage(new RegistrationMessages.AcknowledgeRegistration(instanceID, 12345)), getSelf());
            } else if (obj instanceof TaskMessages.UpdateTaskExecutionState) {
                getSender().tell(true, getSelf());
            }
        }

        protected UUID getLeaderSessionID() {
            return this.leaderSessionID;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskManagerTest$SimpleLookupFailingUpdateJobManager.class */
    public static class SimpleLookupFailingUpdateJobManager extends SimpleLookupJobManager {
        private final Set<ExecutionAttemptID> validIDs;

        public SimpleLookupFailingUpdateJobManager(UUID uuid, Set<ExecutionAttemptID> set) {
            super(uuid);
            this.validIDs = new HashSet(set);
        }

        @Override // org.apache.flink.runtime.taskmanager.TaskManagerTest.SimpleLookupJobManager, org.apache.flink.runtime.taskmanager.TaskManagerTest.SimpleJobManager
        public void handleMessage(Object obj) throws Exception {
            if (!(obj instanceof TaskMessages.UpdateTaskExecutionState)) {
                super.handleMessage(obj);
            } else if (this.validIDs.contains(((TaskMessages.UpdateTaskExecutionState) obj).taskExecutionState().getID())) {
                getSender().tell(true, getSelf());
            } else {
                getSender().tell(false, getSelf());
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskManagerTest$SimpleLookupFailingUpdateJobManagerCreator.class */
    public static class SimpleLookupFailingUpdateJobManagerCreator implements Creator<SimpleLookupFailingUpdateJobManager> {
        private final UUID leaderSessionID;
        private final Set<ExecutionAttemptID> validIDs = new HashSet();

        public SimpleLookupFailingUpdateJobManagerCreator(UUID uuid, ExecutionAttemptID... executionAttemptIDArr) {
            this.leaderSessionID = uuid;
            for (ExecutionAttemptID executionAttemptID : executionAttemptIDArr) {
                this.validIDs.add(executionAttemptID);
            }
        }

        /* renamed from: create, reason: merged with bridge method [inline-methods] */
        public SimpleLookupFailingUpdateJobManager m350create() throws Exception {
            return new SimpleLookupFailingUpdateJobManager(this.leaderSessionID, this.validIDs);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskManagerTest$SimpleLookupJobManager.class */
    public static class SimpleLookupJobManager extends SimpleJobManager {
        public SimpleLookupJobManager(UUID uuid) {
            super(uuid);
        }

        @Override // org.apache.flink.runtime.taskmanager.TaskManagerTest.SimpleJobManager
        public void handleMessage(Object obj) throws Exception {
            if (obj instanceof JobManagerMessages.ScheduleOrUpdateConsumers) {
                getSender().tell(decorateMessage(Acknowledge.get()), getSelf());
            } else {
                super.handleMessage(obj);
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskManagerTest$SimpleLookupJobManagerCreator.class */
    public static class SimpleLookupJobManagerCreator implements Creator<SimpleLookupJobManager> {
        private final UUID leaderSessionID;

        public SimpleLookupJobManagerCreator(UUID uuid) {
            this.leaderSessionID = uuid;
        }

        /* renamed from: create, reason: merged with bridge method [inline-methods] */
        public SimpleLookupJobManager m351create() throws Exception {
            return new SimpleLookupJobManager(this.leaderSessionID);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskManagerTest$SimplePartitionStateLookupJobManager.class */
    public static class SimplePartitionStateLookupJobManager extends SimpleJobManager {
        private final ActorRef testActor;

        public SimplePartitionStateLookupJobManager(UUID uuid, ActorRef actorRef) {
            super(uuid);
            this.testActor = actorRef;
        }

        @Override // org.apache.flink.runtime.taskmanager.TaskManagerTest.SimpleJobManager
        public void handleMessage(Object obj) throws Exception {
            if (obj instanceof JobManagerMessages.RequestPartitionProducerState) {
                getSender().tell(decorateMessage(ExecutionState.RUNNING), getSelf());
                return;
            }
            if (!(obj instanceof TaskMessages.UpdateTaskExecutionState)) {
                super.handleMessage(obj);
                return;
            }
            TaskExecutionState taskExecutionState = ((TaskMessages.UpdateTaskExecutionState) obj).taskExecutionState();
            if (taskExecutionState.getExecutionState().isTerminal()) {
                this.testActor.tell(taskExecutionState, self());
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskManagerTest$SimplePartitionStateLookupJobManagerCreator.class */
    public static class SimplePartitionStateLookupJobManagerCreator implements Creator<SimplePartitionStateLookupJobManager> {
        private final UUID leaderSessionID;
        private final ActorRef testActor;

        public SimplePartitionStateLookupJobManagerCreator(UUID uuid, ActorRef actorRef) {
            this.leaderSessionID = uuid;
            this.testActor = actorRef;
        }

        /* renamed from: create, reason: merged with bridge method [inline-methods] */
        public SimplePartitionStateLookupJobManager m352create() throws Exception {
            return new SimplePartitionStateLookupJobManager(this.leaderSessionID, this.testActor);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskManagerTest$TestInvokableBlockingCancelable.class */
    public static class TestInvokableBlockingCancelable extends AbstractInvokable {
        public void invoke() throws Exception {
            Object obj = new Object();
            synchronized (obj) {
                while (true) {
                    obj.wait();
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskManagerTest$TestInvokableCorrect.class */
    public static final class TestInvokableCorrect extends AbstractInvokable {
        public void invoke() {
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskManagerTest$TestInvokableRecordCancel.class */
    public static final class TestInvokableRecordCancel extends AbstractInvokable {
        private static final Object lock = new Object();
        private static CompletableFuture<Boolean> gotCanceledFuture = new CompletableFuture<>();

        public void invoke() throws Exception {
            Object obj = new Object();
            RecordWriter recordWriter = new RecordWriter(getEnvironment().getWriter(0));
            for (int i = 0; i < 1024; i++) {
                recordWriter.emit(new IntValue(42));
            }
            synchronized (obj) {
                while (true) {
                    obj.wait();
                }
            }
        }

        public void cancel() {
            synchronized (lock) {
                gotCanceledFuture.complete(true);
            }
        }

        public static void resetGotCanceledFuture() {
            synchronized (lock) {
                gotCanceledFuture = new CompletableFuture<>();
            }
        }

        public static CompletableFuture<Boolean> gotCanceled() {
            CompletableFuture<Boolean> completableFuture;
            synchronized (lock) {
                completableFuture = gotCanceledFuture;
            }
            return completableFuture;
        }
    }

    @BeforeClass
    public static void setup() {
        system = AkkaUtils.createLocalActorSystem(new Configuration());
    }

    @AfterClass
    public static void teardown() {
        JavaTestKit.shutdownActorSystem(system);
    }

    @Before
    public void setupTest() {
        this.highAvailabilityServices = new TestingHighAvailabilityServices();
    }

    @After
    public void tearDownTest() throws Exception {
        if (this.highAvailabilityServices != null) {
            this.highAvailabilityServices.closeAndCleanupAllData();
            this.highAvailabilityServices = null;
        }
    }

    @Test
    public void testSubmitAndExecuteTask() throws IOException {
        new JavaTestKit(system) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerTest.1
            {
                final ActorGateway actorGateway = null;
                final ActorGateway createForwardingActor = TestingUtils.createForwardingActor(TaskManagerTest.system, getTestActor(), HighAvailabilityServices.DEFAULT_LEADER_ID, Option.empty());
                TaskManagerTest.this.highAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID, new StandaloneLeaderRetrievalService(createForwardingActor.path(), createForwardingActor.leaderSessionID()));
                try {
                    actorGateway = TestingUtils.createTaskManager(TaskManagerTest.system, TaskManagerTest.this.highAvailabilityServices, new Configuration(), true, false);
                    new JavaTestKit.Within(TaskManagerTest.d) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerTest.1.1
                        protected void run() {
                            expectMsgClass(RegistrationMessages.RegisterTaskManager.class);
                            InstanceID instanceID = new InstanceID();
                            Assert.assertEquals(actorGateway.actor(), getLastSender());
                            actorGateway.tell(new RegistrationMessages.AcknowledgeRegistration(instanceID, 12345), createForwardingActor);
                        }
                    };
                    final JobID jobID = new JobID();
                    JobVertexID jobVertexID = new JobVertexID();
                    final ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
                    final TaskDeploymentDescriptor createTaskDeploymentDescriptor = TaskManagerTest.createTaskDeploymentDescriptor(jobID, "TestJob", jobVertexID, executionAttemptID, new SerializedValue(new ExecutionConfig()), "TestTask", 7, 2, 7, 0, new Configuration(), new Configuration(), TestInvokableCorrect.class.getName(), Collections.emptyList(), Collections.emptyList(), new ArrayList(), Collections.emptyList(), 0);
                    new JavaTestKit.Within(TaskManagerTest.d) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerTest.1.2
                        protected void run() {
                            actorGateway.tell(new TaskMessages.SubmitTask(createTaskDeploymentDescriptor), createForwardingActor);
                            long currentTimeMillis = System.currentTimeMillis() + 10000;
                            while (!receiveOne(TaskManagerTest.d).equals(Acknowledge.get()) && System.currentTimeMillis() < currentTimeMillis) {
                            }
                            TaskMessages.UpdateTaskExecutionState updateTaskExecutionState = new TaskMessages.UpdateTaskExecutionState(new TaskExecutionState(jobID, executionAttemptID, ExecutionState.RUNNING));
                            TaskMessages.UpdateTaskExecutionState updateTaskExecutionState2 = new TaskMessages.UpdateTaskExecutionState(new TaskExecutionState(jobID, executionAttemptID, ExecutionState.FINISHED));
                            long currentTimeMillis2 = System.currentTimeMillis() + 10000;
                            do {
                                Object receiveOne = receiveOne(TaskManagerTest.d);
                                if (receiveOne.equals(updateTaskExecutionState)) {
                                    break;
                                } else if (!(receiveOne instanceof TaskManagerMessages.Heartbeat)) {
                                    Assert.fail("Unexpected message: " + receiveOne);
                                }
                            } while (System.currentTimeMillis() < currentTimeMillis2);
                            long currentTimeMillis3 = System.currentTimeMillis() + 10000;
                            do {
                                Object receiveOne2 = receiveOne(TaskManagerTest.d);
                                if (receiveOne2.equals(updateTaskExecutionState2)) {
                                    return;
                                }
                                if (!(receiveOne2 instanceof TaskManagerMessages.Heartbeat)) {
                                    Assert.fail("Unexpected message: " + receiveOne2);
                                }
                            } while (System.currentTimeMillis() < currentTimeMillis3);
                        }
                    };
                    TestingUtils.stopActor(actorGateway);
                    TestingUtils.stopActor(createForwardingActor);
                } catch (Throwable th) {
                    TestingUtils.stopActor(actorGateway);
                    TestingUtils.stopActor(createForwardingActor);
                    throw th;
                }
            }
        };
    }

    @Test
    public void testJobSubmissionAndCanceling() {
        new JavaTestKit(system) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerTest.2
            {
                ActorGateway actorGateway = null;
                final ActorGateway actorGateway2 = null;
                final AkkaActorGateway akkaActorGateway = new AkkaActorGateway(getTestActor(), TaskManagerTest.leaderSessionID);
                try {
                    try {
                        actorGateway = new AkkaActorGateway(TaskManagerTest.system.actorOf(Props.create(SimpleJobManager.class, new Object[]{TaskManagerTest.leaderSessionID})), TaskManagerTest.leaderSessionID);
                        TaskManagerTest.this.highAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID, new StandaloneLeaderRetrievalService(actorGateway.path(), actorGateway.leaderSessionID()));
                        actorGateway2 = TestingUtils.createTaskManager(TaskManagerTest.system, TaskManagerTest.this.highAvailabilityServices, new Configuration(), true, true);
                        JobID jobID = new JobID();
                        JobID jobID2 = new JobID();
                        JobVertexID jobVertexID = new JobVertexID();
                        JobVertexID jobVertexID2 = new JobVertexID();
                        final ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
                        final ExecutionAttemptID executionAttemptID2 = new ExecutionAttemptID();
                        final TaskDeploymentDescriptor createTaskDeploymentDescriptor = TaskManagerTest.createTaskDeploymentDescriptor(jobID, "TestJob1", jobVertexID, executionAttemptID, new SerializedValue(new ExecutionConfig()), "TestTask1", 5, 1, 5, 0, new Configuration(), new Configuration(), TestInvokableBlockingCancelable.class.getName(), Collections.emptyList(), Collections.emptyList(), new ArrayList(), Collections.emptyList(), 0);
                        final TaskDeploymentDescriptor createTaskDeploymentDescriptor2 = TaskManagerTest.createTaskDeploymentDescriptor(jobID2, "TestJob2", jobVertexID2, executionAttemptID2, new SerializedValue(new ExecutionConfig()), "TestTask2", 7, 2, 7, 0, new Configuration(), new Configuration(), TestInvokableBlockingCancelable.class.getName(), Collections.emptyList(), Collections.emptyList(), new ArrayList(), Collections.emptyList(), 0);
                        new JavaTestKit.Within(TaskManagerTest.d) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerTest.2.1
                            protected void run() {
                                try {
                                    Future ask = actorGateway2.ask(new TestingTaskManagerMessages.NotifyWhenTaskIsRunning(executionAttemptID), TaskManagerTest.timeout);
                                    Future ask2 = actorGateway2.ask(new TestingTaskManagerMessages.NotifyWhenTaskIsRunning(executionAttemptID2), TaskManagerTest.timeout);
                                    actorGateway2.tell(new TaskMessages.SubmitTask(createTaskDeploymentDescriptor), akkaActorGateway);
                                    actorGateway2.tell(new TaskMessages.SubmitTask(createTaskDeploymentDescriptor2), akkaActorGateway);
                                    expectMsgEquals(Acknowledge.get());
                                    expectMsgEquals(Acknowledge.get());
                                    Await.ready(ask, TaskManagerTest.d);
                                    Await.ready(ask2, TaskManagerTest.d);
                                    actorGateway2.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), akkaActorGateway);
                                    Map<ExecutionAttemptID, Task> asJava = ((TestingTaskManagerMessages.ResponseRunningTasks) expectMsgClass(TestingTaskManagerMessages.ResponseRunningTasks.class)).asJava();
                                    Assert.assertEquals(2L, asJava.size());
                                    Task task = asJava.get(executionAttemptID);
                                    Task task2 = asJava.get(executionAttemptID2);
                                    Assert.assertNotNull(task);
                                    Assert.assertNotNull(task2);
                                    Assert.assertEquals(ExecutionState.RUNNING, task.getExecutionState());
                                    Assert.assertEquals(ExecutionState.RUNNING, task2.getExecutionState());
                                    actorGateway2.tell(new TaskMessages.CancelTask(executionAttemptID), akkaActorGateway);
                                    expectMsgEquals(Acknowledge.get());
                                    Await.ready(actorGateway2.ask(new TestingTaskManagerMessages.NotifyWhenTaskRemoved(executionAttemptID), TaskManagerTest.timeout), TaskManagerTest.d);
                                    Assert.assertEquals(ExecutionState.CANCELED, task.getExecutionState());
                                    actorGateway2.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), akkaActorGateway);
                                    Assert.assertEquals(1L, ((TestingTaskManagerMessages.ResponseRunningTasks) expectMsgClass(TestingTaskManagerMessages.ResponseRunningTasks.class)).asJava().size());
                                    actorGateway2.tell(new TaskMessages.CancelTask(executionAttemptID), akkaActorGateway);
                                    expectMsgEquals(Acknowledge.get());
                                    actorGateway2.tell(new TaskMessages.CancelTask(executionAttemptID2), akkaActorGateway);
                                    expectMsgEquals(Acknowledge.get());
                                    Await.ready(actorGateway2.ask(new TestingTaskManagerMessages.NotifyWhenTaskRemoved(executionAttemptID2), TaskManagerTest.timeout), TaskManagerTest.d);
                                    Assert.assertEquals(ExecutionState.CANCELED, task2.getExecutionState());
                                    actorGateway2.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), akkaActorGateway);
                                    Assert.assertEquals(0L, ((TestingTaskManagerMessages.ResponseRunningTasks) expectMsgClass(TestingTaskManagerMessages.ResponseRunningTasks.class)).asJava().size());
                                } catch (Exception e) {
                                    e.printStackTrace();
                                    Assert.fail(e.getMessage());
                                }
                            }
                        };
                        TestingUtils.stopActor(actorGateway2);
                        TestingUtils.stopActor(actorGateway);
                    } catch (Exception e) {
                        e.printStackTrace();
                        Assert.fail(e.getMessage());
                        TestingUtils.stopActor(actorGateway2);
                        TestingUtils.stopActor(actorGateway);
                    }
                } catch (Throwable th) {
                    TestingUtils.stopActor(actorGateway2);
                    TestingUtils.stopActor(actorGateway);
                    throw th;
                }
            }
        };
    }

    @Test
    public void testJobSubmissionAndStop() throws Exception {
        new JavaTestKit(system) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerTest.3
            {
                ActorGateway actorGateway = null;
                final ActorGateway actorGateway2 = null;
                final AkkaActorGateway akkaActorGateway = new AkkaActorGateway(getTestActor(), TaskManagerTest.leaderSessionID);
                try {
                    actorGateway = new AkkaActorGateway(TaskManagerTest.system.actorOf(Props.create(SimpleJobManager.class, new Object[]{TaskManagerTest.leaderSessionID})), TaskManagerTest.leaderSessionID);
                    TaskManagerTest.this.highAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID, new StandaloneLeaderRetrievalService(actorGateway.path(), actorGateway.leaderSessionID()));
                    actorGateway2 = TestingUtils.createTaskManager(TaskManagerTest.system, TaskManagerTest.this.highAvailabilityServices, new Configuration(), true, true);
                    JobID jobID = new JobID();
                    JobID jobID2 = new JobID();
                    JobVertexID jobVertexID = new JobVertexID();
                    JobVertexID jobVertexID2 = new JobVertexID();
                    final ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
                    final ExecutionAttemptID executionAttemptID2 = new ExecutionAttemptID();
                    SerializedValue serializedValue = new SerializedValue(new ExecutionConfig());
                    final TaskDeploymentDescriptor createTaskDeploymentDescriptor = TaskManagerTest.createTaskDeploymentDescriptor(jobID, "TestJob", jobVertexID, executionAttemptID, serializedValue, "TestTask1", 5, 1, 5, 0, new Configuration(), new Configuration(), StoppableInvokable.class.getName(), Collections.emptyList(), Collections.emptyList(), new ArrayList(), Collections.emptyList(), 0);
                    final TaskDeploymentDescriptor createTaskDeploymentDescriptor2 = TaskManagerTest.createTaskDeploymentDescriptor(jobID2, "TestJob", jobVertexID2, executionAttemptID2, serializedValue, "TestTask2", 7, 2, 7, 0, new Configuration(), new Configuration(), TestInvokableBlockingCancelable.class.getName(), Collections.emptyList(), Collections.emptyList(), new ArrayList(), Collections.emptyList(), 0);
                    new JavaTestKit.Within(TaskManagerTest.d) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerTest.3.1
                        protected void run() {
                            try {
                                Future ask = actorGateway2.ask(new TestingTaskManagerMessages.NotifyWhenTaskIsRunning(executionAttemptID), TaskManagerTest.timeout);
                                Future ask2 = actorGateway2.ask(new TestingTaskManagerMessages.NotifyWhenTaskIsRunning(executionAttemptID2), TaskManagerTest.timeout);
                                actorGateway2.tell(new TaskMessages.SubmitTask(createTaskDeploymentDescriptor), akkaActorGateway);
                                actorGateway2.tell(new TaskMessages.SubmitTask(createTaskDeploymentDescriptor2), akkaActorGateway);
                                expectMsgEquals(Acknowledge.get());
                                expectMsgEquals(Acknowledge.get());
                                Await.ready(ask, TaskManagerTest.d);
                                Await.ready(ask2, TaskManagerTest.d);
                                actorGateway2.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), akkaActorGateway);
                                Map<ExecutionAttemptID, Task> asJava = ((TestingTaskManagerMessages.ResponseRunningTasks) expectMsgClass(TestingTaskManagerMessages.ResponseRunningTasks.class)).asJava();
                                Assert.assertEquals(2L, asJava.size());
                                Task task = asJava.get(executionAttemptID);
                                Task task2 = asJava.get(executionAttemptID2);
                                Assert.assertNotNull(task);
                                Assert.assertNotNull(task2);
                                Assert.assertEquals(ExecutionState.RUNNING, task.getExecutionState());
                                Assert.assertEquals(ExecutionState.RUNNING, task2.getExecutionState());
                                actorGateway2.tell(new TaskMessages.StopTask(executionAttemptID), akkaActorGateway);
                                expectMsgEquals(Acknowledge.get());
                                Await.ready(actorGateway2.ask(new TestingTaskManagerMessages.NotifyWhenTaskRemoved(executionAttemptID), TaskManagerTest.timeout), TaskManagerTest.d);
                                Assert.assertEquals(ExecutionState.FINISHED, task.getExecutionState());
                                actorGateway2.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), akkaActorGateway);
                                Assert.assertEquals(1L, ((TestingTaskManagerMessages.ResponseRunningTasks) expectMsgClass(TestingTaskManagerMessages.ResponseRunningTasks.class)).asJava().size());
                                actorGateway2.tell(new TaskMessages.StopTask(executionAttemptID), akkaActorGateway);
                                expectMsgEquals(Acknowledge.get());
                                actorGateway2.tell(new TaskMessages.StopTask(executionAttemptID2), akkaActorGateway);
                                expectMsgClass(Status.Failure.class);
                                Assert.assertEquals(ExecutionState.RUNNING, task2.getExecutionState());
                                actorGateway2.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), akkaActorGateway);
                                Assert.assertEquals(1L, ((TestingTaskManagerMessages.ResponseRunningTasks) expectMsgClass(TestingTaskManagerMessages.ResponseRunningTasks.class)).asJava().size());
                            } catch (Exception e) {
                                e.printStackTrace();
                                Assert.fail(e.getMessage());
                            }
                        }
                    };
                    TestingUtils.stopActor(actorGateway2);
                    TestingUtils.stopActor(actorGateway);
                } catch (Throwable th) {
                    TestingUtils.stopActor(actorGateway2);
                    TestingUtils.stopActor(actorGateway);
                    throw th;
                }
            }
        };
    }

    @Test
    public void testGateChannelEdgeMismatch() {
        new JavaTestKit(system) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerTest.4
            {
                ActorGateway actorGateway = null;
                final ActorGateway actorGateway2 = null;
                final AkkaActorGateway akkaActorGateway = new AkkaActorGateway(getTestActor(), TaskManagerTest.leaderSessionID);
                try {
                    try {
                        actorGateway = new AkkaActorGateway(TaskManagerTest.system.actorOf(Props.create(SimpleJobManager.class, new Object[]{TaskManagerTest.leaderSessionID})), TaskManagerTest.leaderSessionID);
                        TaskManagerTest.this.highAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID, new StandaloneLeaderRetrievalService(actorGateway.path(), actorGateway.leaderSessionID()));
                        actorGateway2 = TestingUtils.createTaskManager(TaskManagerTest.system, TaskManagerTest.this.highAvailabilityServices, new Configuration(), true, true);
                        JobID jobID = new JobID();
                        JobVertexID jobVertexID = new JobVertexID();
                        JobVertexID jobVertexID2 = new JobVertexID();
                        final ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
                        final ExecutionAttemptID executionAttemptID2 = new ExecutionAttemptID();
                        final TaskDeploymentDescriptor createTaskDeploymentDescriptor = TaskManagerTest.createTaskDeploymentDescriptor(jobID, "TestJob", jobVertexID, executionAttemptID, new SerializedValue(new ExecutionConfig()), "Sender", 1, 0, 1, 0, new Configuration(), new Configuration(), Tasks.Sender.class.getName(), Collections.emptyList(), Collections.emptyList(), new ArrayList(), Collections.emptyList(), 0);
                        final TaskDeploymentDescriptor createTaskDeploymentDescriptor2 = TaskManagerTest.createTaskDeploymentDescriptor(jobID, "TestJob", jobVertexID2, executionAttemptID2, new SerializedValue(new ExecutionConfig()), "Receiver", 7, 2, 7, 0, new Configuration(), new Configuration(), Tasks.Receiver.class.getName(), Collections.emptyList(), Collections.emptyList(), new ArrayList(), Collections.emptyList(), 0);
                        new JavaTestKit.Within(TaskManagerTest.d) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerTest.4.1
                            protected void run() {
                                try {
                                    actorGateway2.tell(new TaskMessages.SubmitTask(createTaskDeploymentDescriptor), akkaActorGateway);
                                    actorGateway2.tell(new TaskMessages.SubmitTask(createTaskDeploymentDescriptor2), akkaActorGateway);
                                    expectMsgEquals(Acknowledge.get());
                                    expectMsgEquals(Acknowledge.get());
                                    actorGateway2.tell(new TestingTaskManagerMessages.NotifyWhenTaskRemoved(executionAttemptID), akkaActorGateway);
                                    actorGateway2.tell(new TestingTaskManagerMessages.NotifyWhenTaskRemoved(executionAttemptID2), akkaActorGateway);
                                    expectMsgEquals(true);
                                    expectMsgEquals(true);
                                    actorGateway2.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), akkaActorGateway);
                                    Assert.assertEquals(0L, ((TestingTaskManagerMessages.ResponseRunningTasks) expectMsgClass(TestingTaskManagerMessages.ResponseRunningTasks.class)).asJava().size());
                                } catch (Exception e) {
                                    e.printStackTrace();
                                    Assert.fail(e.getMessage());
                                }
                            }
                        };
                        TestingUtils.stopActor(actorGateway2);
                        TestingUtils.stopActor(actorGateway);
                    } catch (Exception e) {
                        e.printStackTrace();
                        Assert.fail(e.getMessage());
                        TestingUtils.stopActor(actorGateway2);
                        TestingUtils.stopActor(actorGateway);
                    }
                } catch (Throwable th) {
                    TestingUtils.stopActor(actorGateway2);
                    TestingUtils.stopActor(actorGateway);
                    throw th;
                }
            }
        };
    }

    @Test
    public void testRunJobWithForwardChannel() {
        new JavaTestKit(system) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerTest.5
            {
                ActorGateway actorGateway = null;
                final ActorGateway actorGateway2 = null;
                final AkkaActorGateway akkaActorGateway = new AkkaActorGateway(getTestActor(), TaskManagerTest.leaderSessionID);
                try {
                    try {
                        JobID jobID = new JobID();
                        JobVertexID jobVertexID = new JobVertexID();
                        JobVertexID jobVertexID2 = new JobVertexID();
                        final ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
                        final ExecutionAttemptID executionAttemptID2 = new ExecutionAttemptID();
                        actorGateway = new AkkaActorGateway(TaskManagerTest.system.actorOf(Props.create(new SimpleLookupJobManagerCreator(TaskManagerTest.leaderSessionID))), TaskManagerTest.leaderSessionID);
                        TaskManagerTest.this.highAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID, new StandaloneLeaderRetrievalService(actorGateway.path(), actorGateway.leaderSessionID()));
                        actorGateway2 = TestingUtils.createTaskManager(TaskManagerTest.system, TaskManagerTest.this.highAvailabilityServices, new Configuration(), true, true);
                        IntermediateResultPartitionID intermediateResultPartitionID = new IntermediateResultPartitionID();
                        ArrayList arrayList = new ArrayList();
                        arrayList.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), intermediateResultPartitionID, ResultPartitionType.PIPELINED, 1, 1, true));
                        InputGateDeploymentDescriptor inputGateDeploymentDescriptor = new InputGateDeploymentDescriptor(new IntermediateDataSetID(), ResultPartitionType.PIPELINED, 0, new InputChannelDeploymentDescriptor[]{new InputChannelDeploymentDescriptor(new ResultPartitionID(intermediateResultPartitionID, executionAttemptID), ResultPartitionLocation.createLocal())});
                        final TaskDeploymentDescriptor createTaskDeploymentDescriptor = TaskManagerTest.createTaskDeploymentDescriptor(jobID, "TestJob", jobVertexID, executionAttemptID, new SerializedValue(new ExecutionConfig()), "Sender", 1, 0, 1, 0, new Configuration(), new Configuration(), Tasks.Sender.class.getName(), arrayList, Collections.emptyList(), new ArrayList(), Collections.emptyList(), 0);
                        final TaskDeploymentDescriptor createTaskDeploymentDescriptor2 = TaskManagerTest.createTaskDeploymentDescriptor(jobID, "TestJob", jobVertexID2, executionAttemptID2, new SerializedValue(new ExecutionConfig()), "Receiver", 7, 2, 7, 0, new Configuration(), new Configuration(), Tasks.Receiver.class.getName(), Collections.emptyList(), Collections.singletonList(inputGateDeploymentDescriptor), new ArrayList(), Collections.emptyList(), 0);
                        new JavaTestKit.Within(TaskManagerTest.d) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerTest.5.1
                            protected void run() {
                                try {
                                    Future ask = actorGateway2.ask(new TestingTaskManagerMessages.NotifyWhenTaskIsRunning(executionAttemptID), TaskManagerTest.timeout);
                                    Future ask2 = actorGateway2.ask(new TestingTaskManagerMessages.NotifyWhenTaskIsRunning(executionAttemptID2), TaskManagerTest.timeout);
                                    actorGateway2.tell(new TaskMessages.SubmitTask(createTaskDeploymentDescriptor), akkaActorGateway);
                                    expectMsgEquals(Acknowledge.get());
                                    Await.ready(ask, TaskManagerTest.d);
                                    actorGateway2.tell(new TaskMessages.SubmitTask(createTaskDeploymentDescriptor2), akkaActorGateway);
                                    expectMsgEquals(Acknowledge.get());
                                    Await.ready(ask2, TaskManagerTest.d);
                                    actorGateway2.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), akkaActorGateway);
                                    Map<ExecutionAttemptID, Task> asJava = ((TestingTaskManagerMessages.ResponseRunningTasks) expectMsgClass(TestingTaskManagerMessages.ResponseRunningTasks.class)).asJava();
                                    Task task = asJava.get(executionAttemptID);
                                    Task task2 = asJava.get(executionAttemptID2);
                                    if (task != null) {
                                        Await.ready(actorGateway2.ask(new TestingTaskManagerMessages.NotifyWhenTaskRemoved(executionAttemptID), TaskManagerTest.timeout), TaskManagerTest.d);
                                    }
                                    if (task2 != null) {
                                        Await.ready(actorGateway2.ask(new TestingTaskManagerMessages.NotifyWhenTaskRemoved(executionAttemptID2), TaskManagerTest.timeout), TaskManagerTest.d);
                                        Assert.assertEquals(ExecutionState.FINISHED, task2.getExecutionState());
                                    }
                                    actorGateway2.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), akkaActorGateway);
                                    Assert.assertEquals(0L, ((TestingTaskManagerMessages.ResponseRunningTasks) expectMsgClass(TestingTaskManagerMessages.ResponseRunningTasks.class)).asJava().size());
                                } catch (Exception e) {
                                    e.printStackTrace();
                                    Assert.fail(e.getMessage());
                                }
                            }
                        };
                        TestingUtils.stopActor(actorGateway2);
                        TestingUtils.stopActor(actorGateway);
                    } catch (Exception e) {
                        e.printStackTrace();
                        Assert.fail(e.getMessage());
                        TestingUtils.stopActor(actorGateway2);
                        TestingUtils.stopActor(actorGateway);
                    }
                } catch (Throwable th) {
                    TestingUtils.stopActor(actorGateway2);
                    TestingUtils.stopActor(actorGateway);
                    throw th;
                }
            }
        };
    }

    @Test
    public void testCancellingDependentAndStateUpdateFails() {
        new JavaTestKit(system) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerTest.6
            {
                ActorGateway actorGateway = null;
                final ActorGateway actorGateway2 = null;
                final AkkaActorGateway akkaActorGateway = new AkkaActorGateway(getTestActor(), TaskManagerTest.leaderSessionID);
                try {
                    try {
                        JobID jobID = new JobID();
                        JobVertexID jobVertexID = new JobVertexID();
                        JobVertexID jobVertexID2 = new JobVertexID();
                        final ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
                        final ExecutionAttemptID executionAttemptID2 = new ExecutionAttemptID();
                        actorGateway = new AkkaActorGateway(TaskManagerTest.system.actorOf(Props.create(new SimpleLookupFailingUpdateJobManagerCreator(TaskManagerTest.leaderSessionID, executionAttemptID2))), TaskManagerTest.leaderSessionID);
                        TaskManagerTest.this.highAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID, new StandaloneLeaderRetrievalService(actorGateway.path(), actorGateway.leaderSessionID()));
                        actorGateway2 = TestingUtils.createTaskManager(TaskManagerTest.system, TaskManagerTest.this.highAvailabilityServices, new Configuration(), true, true);
                        IntermediateResultPartitionID intermediateResultPartitionID = new IntermediateResultPartitionID();
                        ArrayList arrayList = new ArrayList();
                        arrayList.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), intermediateResultPartitionID, ResultPartitionType.PIPELINED, 1, 1, true));
                        InputGateDeploymentDescriptor inputGateDeploymentDescriptor = new InputGateDeploymentDescriptor(new IntermediateDataSetID(), ResultPartitionType.PIPELINED, 0, new InputChannelDeploymentDescriptor[]{new InputChannelDeploymentDescriptor(new ResultPartitionID(intermediateResultPartitionID, executionAttemptID), ResultPartitionLocation.createLocal())});
                        final TaskDeploymentDescriptor createTaskDeploymentDescriptor = TaskManagerTest.createTaskDeploymentDescriptor(jobID, "TestJob", jobVertexID, executionAttemptID, new SerializedValue(new ExecutionConfig()), "Sender", 1, 0, 1, 0, new Configuration(), new Configuration(), Tasks.Sender.class.getName(), arrayList, Collections.emptyList(), new ArrayList(), Collections.emptyList(), 0);
                        final TaskDeploymentDescriptor createTaskDeploymentDescriptor2 = TaskManagerTest.createTaskDeploymentDescriptor(jobID, "TestJob", jobVertexID2, executionAttemptID2, new SerializedValue(new ExecutionConfig()), "Receiver", 7, 2, 7, 0, new Configuration(), new Configuration(), Tasks.BlockingReceiver.class.getName(), Collections.emptyList(), Collections.singletonList(inputGateDeploymentDescriptor), new ArrayList(), Collections.emptyList(), 0);
                        new JavaTestKit.Within(TaskManagerTest.d) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerTest.6.1
                            protected void run() {
                                try {
                                    Future ask = actorGateway2.ask(new TestingTaskManagerMessages.NotifyWhenTaskIsRunning(executionAttemptID), TaskManagerTest.timeout);
                                    Future ask2 = actorGateway2.ask(new TestingTaskManagerMessages.NotifyWhenTaskIsRunning(executionAttemptID2), TaskManagerTest.timeout);
                                    actorGateway2.tell(new TaskMessages.SubmitTask(createTaskDeploymentDescriptor2), akkaActorGateway);
                                    actorGateway2.tell(new TaskMessages.SubmitTask(createTaskDeploymentDescriptor), akkaActorGateway);
                                    expectMsgEquals(Acknowledge.get());
                                    expectMsgEquals(Acknowledge.get());
                                    Await.ready(ask, TaskManagerTest.d);
                                    Await.ready(ask2, TaskManagerTest.d);
                                    actorGateway2.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), akkaActorGateway);
                                    Map<ExecutionAttemptID, Task> asJava = ((TestingTaskManagerMessages.ResponseRunningTasks) expectMsgClass(TestingTaskManagerMessages.ResponseRunningTasks.class)).asJava();
                                    Task task = asJava.get(executionAttemptID);
                                    Task task2 = asJava.get(executionAttemptID2);
                                    actorGateway2.tell(new TaskMessages.CancelTask(executionAttemptID2), akkaActorGateway);
                                    expectMsgEquals(Acknowledge.get());
                                    if (task2 != null) {
                                        Await.ready(actorGateway2.ask(new TestingTaskManagerMessages.NotifyWhenTaskRemoved(executionAttemptID2), TaskManagerTest.timeout), TaskManagerTest.d);
                                    }
                                    if (task != null) {
                                        if (task.getExecutionState() == ExecutionState.RUNNING) {
                                            actorGateway2.tell(new TaskMessages.CancelTask(executionAttemptID), akkaActorGateway);
                                            expectMsgEquals(Acknowledge.get());
                                        }
                                        Await.ready(actorGateway2.ask(new TestingTaskManagerMessages.NotifyWhenTaskRemoved(executionAttemptID), TaskManagerTest.timeout), TaskManagerTest.d);
                                    }
                                    actorGateway2.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), akkaActorGateway);
                                    Assert.assertEquals(0L, ((TestingTaskManagerMessages.ResponseRunningTasks) expectMsgClass(TestingTaskManagerMessages.ResponseRunningTasks.class)).asJava().size());
                                } catch (Exception e) {
                                    e.printStackTrace();
                                    Assert.fail(e.getMessage());
                                }
                            }
                        };
                        TestingUtils.stopActor(actorGateway2);
                        TestingUtils.stopActor(actorGateway);
                    } catch (Exception e) {
                        e.printStackTrace();
                        Assert.fail(e.getMessage());
                        TestingUtils.stopActor(actorGateway2);
                        TestingUtils.stopActor(actorGateway);
                    }
                } catch (Throwable th) {
                    TestingUtils.stopActor(actorGateway2);
                    TestingUtils.stopActor(actorGateway);
                    throw th;
                }
            }
        };
    }

    @Test
    public void testRemotePartitionNotFound() throws Exception {
        new JavaTestKit(system) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerTest.7
            {
                ActorGateway actorGateway = null;
                final ActorGateway actorGateway2 = null;
                final AkkaActorGateway akkaActorGateway = new AkkaActorGateway(getTestActor(), TaskManagerTest.leaderSessionID);
                try {
                    try {
                        IntermediateDataSetID intermediateDataSetID = new IntermediateDataSetID();
                        actorGateway = new AkkaActorGateway(TaskManagerTest.system.actorOf(Props.create(new SimplePartitionStateLookupJobManagerCreator(TaskManagerTest.leaderSessionID, getTestActor()))), TaskManagerTest.leaderSessionID);
                        TaskManagerTest.this.highAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID, new StandaloneLeaderRetrievalService(actorGateway.path(), actorGateway.leaderSessionID()));
                        int availablePort = NetUtils.getAvailablePort();
                        Configuration configuration = new Configuration();
                        configuration.setInteger("taskmanager.data.port", availablePort);
                        configuration.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
                        configuration.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
                        actorGateway2 = TestingUtils.createTaskManager(TaskManagerTest.system, TaskManagerTest.this.highAvailabilityServices, configuration, false, true);
                        final TaskDeploymentDescriptor createTaskDeploymentDescriptor = TaskManagerTest.createTaskDeploymentDescriptor(new JobID(), "TestJob", new JobVertexID(), new ExecutionAttemptID(), new SerializedValue(new ExecutionConfig()), "Receiver", 1, 0, 1, 0, new Configuration(), new Configuration(), Tasks.AgnosticReceiver.class.getName(), Collections.emptyList(), Collections.singletonList(new InputGateDeploymentDescriptor(intermediateDataSetID, ResultPartitionType.PIPELINED, 0, new InputChannelDeploymentDescriptor[]{new InputChannelDeploymentDescriptor(new ResultPartitionID(), ResultPartitionLocation.createRemote(new ConnectionID(new InetSocketAddress("localhost", availablePort), 0)))})), Collections.emptyList(), Collections.emptyList(), 0);
                        new JavaTestKit.Within(TaskManagerTest.d) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerTest.7.1
                            protected void run() {
                                actorGateway2.tell(new TaskMessages.SubmitTask(createTaskDeploymentDescriptor), akkaActorGateway);
                                expectMsgClass(Acknowledge.get().getClass());
                                TaskExecutionState taskExecutionState = (TaskExecutionState) expectMsgClass(TaskExecutionState.class);
                                Assert.assertEquals(ExecutionState.FAILED, taskExecutionState.getExecutionState());
                                Throwable error = taskExecutionState.getError(ClassLoader.getSystemClassLoader());
                                Assert.assertEquals("Thrown exception was not a PartitionNotFoundException: " + error.getMessage(), PartitionNotFoundException.class, error.getClass());
                            }
                        };
                        TestingUtils.stopActor(actorGateway2);
                        TestingUtils.stopActor(actorGateway);
                    } catch (Exception e) {
                        e.printStackTrace();
                        Assert.fail(e.getMessage());
                        TestingUtils.stopActor(actorGateway2);
                        TestingUtils.stopActor(actorGateway);
                    }
                } catch (Throwable th) {
                    TestingUtils.stopActor(actorGateway2);
                    TestingUtils.stopActor(actorGateway);
                    throw th;
                }
            }
        };
    }

    @Test
    public void testTaskManagerServicesConfiguration() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
        configuration.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
        configuration.setInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL, 10);
        configuration.setInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE, 100);
        TaskManagerServicesConfiguration fromConfiguration = TaskManagerServicesConfiguration.fromConfiguration(configuration, InetAddress.getLoopbackAddress(), true);
        Assert.assertEquals(fromConfiguration.getNetworkConfig().partitionRequestInitialBackoff(), 100L);
        Assert.assertEquals(fromConfiguration.getNetworkConfig().partitionRequestMaxBackoff(), 200L);
        Assert.assertEquals(fromConfiguration.getNetworkConfig().networkBuffersPerChannel(), 10L);
        Assert.assertEquals(fromConfiguration.getNetworkConfig().floatingNetworkBuffersPerGate(), 100L);
    }

    @Test
    public void testLocalPartitionNotFound() throws Exception {
        new JavaTestKit(system) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerTest.8
            {
                ActorGateway actorGateway = null;
                final ActorGateway actorGateway2 = null;
                final AkkaActorGateway akkaActorGateway = new AkkaActorGateway(getTestActor(), TaskManagerTest.leaderSessionID);
                try {
                    try {
                        IntermediateDataSetID intermediateDataSetID = new IntermediateDataSetID();
                        actorGateway = new AkkaActorGateway(TaskManagerTest.system.actorOf(Props.create(new SimplePartitionStateLookupJobManagerCreator(TaskManagerTest.leaderSessionID, getTestActor()))), TaskManagerTest.leaderSessionID);
                        TaskManagerTest.this.highAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID, new StandaloneLeaderRetrievalService(actorGateway.path(), actorGateway.leaderSessionID()));
                        Configuration configuration = new Configuration();
                        configuration.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
                        configuration.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
                        actorGateway2 = TestingUtils.createTaskManager(TaskManagerTest.system, TaskManagerTest.this.highAvailabilityServices, configuration, true, true);
                        final TaskDeploymentDescriptor createTaskDeploymentDescriptor = TaskManagerTest.createTaskDeploymentDescriptor(new JobID(), "TestJob", new JobVertexID(), new ExecutionAttemptID(), new SerializedValue(new ExecutionConfig()), "Receiver", 1, 0, 1, 0, new Configuration(), new Configuration(), Tasks.AgnosticReceiver.class.getName(), Collections.emptyList(), Collections.singletonList(new InputGateDeploymentDescriptor(intermediateDataSetID, ResultPartitionType.PIPELINED, 0, new InputChannelDeploymentDescriptor[]{new InputChannelDeploymentDescriptor(new ResultPartitionID(), ResultPartitionLocation.createLocal())})), Collections.emptyList(), Collections.emptyList(), 0);
                        new JavaTestKit.Within(new FiniteDuration(120L, TimeUnit.SECONDS)) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerTest.8.1
                            protected void run() {
                                actorGateway2.tell(new TaskMessages.SubmitTask(createTaskDeploymentDescriptor), akkaActorGateway);
                                expectMsgClass(Acknowledge.get().getClass());
                                TaskExecutionState taskExecutionState = (TaskExecutionState) expectMsgClass(TaskExecutionState.class);
                                Assert.assertEquals(taskExecutionState.getExecutionState(), ExecutionState.FAILED);
                                Throwable error = taskExecutionState.getError(getClass().getClassLoader());
                                if (error.getClass() != PartitionNotFoundException.class) {
                                    error.printStackTrace();
                                    Assert.fail("Wrong exception: " + error.getMessage());
                                }
                            }
                        };
                        TestingUtils.stopActor(actorGateway2);
                        TestingUtils.stopActor(actorGateway);
                    } catch (Exception e) {
                        e.printStackTrace();
                        Assert.fail(e.getMessage());
                        TestingUtils.stopActor(actorGateway2);
                        TestingUtils.stopActor(actorGateway);
                    }
                } catch (Throwable th) {
                    TestingUtils.stopActor(actorGateway2);
                    TestingUtils.stopActor(actorGateway);
                    throw th;
                }
            }
        };
    }

    @Test
    public void testLogNotFoundHandling() throws Exception {
        new JavaTestKit(system) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerTest.9
            {
                ActorGateway actorGateway = null;
                final ActorGateway actorGateway2 = null;
                try {
                    actorGateway = new AkkaActorGateway(TaskManagerTest.system.actorOf(Props.create(new SimplePartitionStateLookupJobManagerCreator(TaskManagerTest.leaderSessionID, getTestActor()))), TaskManagerTest.leaderSessionID);
                    int availablePort = NetUtils.getAvailablePort();
                    Configuration configuration = new Configuration();
                    configuration.setInteger("taskmanager.data.port", availablePort);
                    configuration.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
                    configuration.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
                    configuration.setString("taskmanager.log.path", "/i/dont/exist");
                    TaskManagerTest.this.highAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID, new StandaloneLeaderRetrievalService(actorGateway.path(), actorGateway.leaderSessionID()));
                    actorGateway2 = TestingUtils.createTaskManager(TaskManagerTest.system, TaskManagerTest.this.highAvailabilityServices, configuration, false, true);
                    new JavaTestKit.Within(TaskManagerTest.d) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerTest.9.1
                        protected void run() {
                            try {
                                Await.result(actorGateway2.ask(TaskManagerMessages.getRequestTaskManagerLog(), TaskManagerTest.timeout), TaskManagerTest.timeout);
                                Assert.fail();
                            } catch (Exception e) {
                                Assert.assertTrue(e.getMessage().startsWith("TaskManager log files are unavailable. Log file could not be found at"));
                            }
                        }
                    };
                    TestingUtils.stopActor(actorGateway2);
                    TestingUtils.stopActor(actorGateway);
                } catch (Throwable th) {
                    TestingUtils.stopActor(actorGateway2);
                    TestingUtils.stopActor(actorGateway);
                    throw th;
                }
            }
        };
    }

    @Test
    public void testTriggerStackTraceSampleMessage() throws Exception {
        new JavaTestKit(system) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerTest.10
            {
                final AkkaActorGateway akkaActorGateway = new AkkaActorGateway(TaskManagerTest.system.actorOf(Props.create(new SimpleLookupJobManagerCreator(HighAvailabilityServices.DEFAULT_LEADER_ID))), HighAvailabilityServices.DEFAULT_LEADER_ID);
                final AkkaActorGateway akkaActorGateway2 = new AkkaActorGateway(getTestActor(), HighAvailabilityServices.DEFAULT_LEADER_ID);
                try {
                    TaskManagerTest.this.highAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID, new StandaloneLeaderRetrievalService(akkaActorGateway.path(), akkaActorGateway.leaderSessionID()));
                    final ActorGateway createTaskManager = TestingUtils.createTaskManager(TaskManagerTest.system, TaskManagerTest.this.highAvailabilityServices, new Configuration(), true, false);
                    final JobID jobID = new JobID();
                    final TaskDeploymentDescriptor createTaskDeploymentDescriptor = TaskManagerTest.createTaskDeploymentDescriptor(jobID, "Job", new JobVertexID(), new ExecutionAttemptID(), new SerializedValue(new ExecutionConfig()), "Task", 1, 0, 1, 0, new Configuration(), new Configuration(), BlockingNoOpInvokable.class.getName(), Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), 0);
                    new JavaTestKit.Within(TaskManagerTest.d) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerTest.10.1
                        protected void run() {
                            try {
                                Await.ready(createTaskManager.ask(new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(akkaActorGateway.actor()), remaining()), remaining());
                                Future ask = createTaskManager.ask(new TestingTaskManagerMessages.NotifyWhenTaskIsRunning(createTaskDeploymentDescriptor.getExecutionAttemptId()), TaskManagerTest.timeout);
                                createTaskManager.tell(new TaskMessages.SubmitTask(createTaskDeploymentDescriptor));
                                Await.ready(ask, TaskManagerTest.d);
                            } catch (Exception e) {
                                e.printStackTrace();
                                Assert.fail(e.getMessage());
                            }
                        }
                    };
                    new JavaTestKit.Within(TaskManagerTest.d) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerTest.10.2
                        protected void run() {
                            try {
                                createTaskManager.tell(new StackTraceSampleMessages.TriggerStackTraceSample(112223, new ExecutionAttemptID(), 100, TaskManagerTest.timeD, 0), akkaActorGateway2);
                                Object[] receiveN = receiveN(1);
                                while (!(receiveN[0] instanceof Status.Failure)) {
                                    receiveN = receiveN(1);
                                }
                                Assert.assertEquals(IllegalStateException.class, ((Status.Failure) receiveN[0]).cause().getClass());
                            } catch (Exception e) {
                                e.printStackTrace();
                                Assert.fail(e.getMessage());
                            }
                        }
                    };
                    new JavaTestKit.Within(TaskManagerTest.d) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerTest.10.3
                        protected void run() {
                            boolean z = false;
                            Throwable th = null;
                            for (int i = 0; i < 100 && !z; i++) {
                                try {
                                    createTaskManager.tell(new StackTraceSampleMessages.TriggerStackTraceSample(19230, createTaskDeploymentDescriptor.getExecutionAttemptId(), 5, Time.milliseconds(100L), 0), akkaActorGateway2);
                                    Object[] receiveN = receiveN(1);
                                    while (!(receiveN[0] instanceof StackTraceSampleResponse)) {
                                        receiveN = receiveN(1);
                                    }
                                    StackTraceSampleResponse stackTraceSampleResponse = (StackTraceSampleResponse) receiveN[0];
                                    Assert.assertEquals(19230L, stackTraceSampleResponse.getSampleId());
                                    Assert.assertEquals(createTaskDeploymentDescriptor.getExecutionAttemptId(), stackTraceSampleResponse.getExecutionAttemptID());
                                    List<StackTraceElement[]> samples = stackTraceSampleResponse.getSamples();
                                    Assert.assertEquals("Number of samples", 5, samples.size());
                                    for (StackTraceElement[] stackTraceElementArr : samples) {
                                        int length = stackTraceElementArr.length;
                                        int i2 = 0;
                                        while (true) {
                                            if (i2 < length) {
                                                StackTraceElement stackTraceElement = stackTraceElementArr[i2];
                                                if (stackTraceElement.getClassName().equals(BlockingNoOpInvokable.class.getName())) {
                                                    Assert.assertEquals("invoke", stackTraceElement.getMethodName());
                                                    z = true;
                                                    break;
                                                }
                                                i2++;
                                            }
                                        }
                                        Assert.assertTrue("Unexpected stack trace: " + Arrays.toString(stackTraceElementArr), z);
                                    }
                                } catch (Throwable th2) {
                                    th = th2;
                                    TaskManagerTest.LOG.warn("Failed to find invokable.", th2);
                                }
                                try {
                                    Thread.sleep(100L);
                                } catch (InterruptedException e) {
                                    TaskManagerTest.LOG.error("Interrupted while sleeping before retry.", e);
                                }
                            }
                            if (z) {
                                return;
                            }
                            if (th == null) {
                                Assert.fail("Failed to find invokable");
                            } else {
                                Assert.fail(th.getMessage());
                            }
                        }
                    };
                    new JavaTestKit.Within(TaskManagerTest.d) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerTest.10.4
                        protected void run() {
                            try {
                                createTaskManager.tell(new StackTraceSampleMessages.TriggerStackTraceSample(1337, createTaskDeploymentDescriptor.getExecutionAttemptId(), 5, Time.milliseconds(100L), 2), akkaActorGateway2);
                                Object[] receiveN = receiveN(1);
                                while (!(receiveN[0] instanceof StackTraceSampleResponse)) {
                                    receiveN = receiveN(1);
                                }
                                StackTraceSampleResponse stackTraceSampleResponse = (StackTraceSampleResponse) receiveN[0];
                                Assert.assertEquals(1337L, stackTraceSampleResponse.getSampleId());
                                Assert.assertEquals(createTaskDeploymentDescriptor.getExecutionAttemptId(), stackTraceSampleResponse.getExecutionAttemptID());
                                List samples = stackTraceSampleResponse.getSamples();
                                Assert.assertEquals("Number of samples", 5, samples.size());
                                Iterator it = samples.iterator();
                                while (it.hasNext()) {
                                    Assert.assertEquals("Max depth", 2, ((StackTraceElement[]) it.next()).length);
                                }
                            } catch (Exception e) {
                                e.printStackTrace();
                                Assert.fail(e.getMessage());
                            }
                        }
                    };
                    new JavaTestKit.Within(TaskManagerTest.d) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerTest.10.5
                        protected void run() {
                            Object[] receiveN;
                            int i = 100;
                            int i2 = 0;
                            while (i2 < 10) {
                                try {
                                    createTaskManager.tell(new StackTraceSampleMessages.TriggerStackTraceSample(44, createTaskDeploymentDescriptor.getExecutionAttemptId(), Integer.MAX_VALUE, Time.milliseconds(10L), 0), akkaActorGateway2);
                                    Thread.sleep(i);
                                    Future ask = createTaskManager.ask(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobID), remaining());
                                    createTaskManager.tell(new TaskMessages.CancelTask(createTaskDeploymentDescriptor.getExecutionAttemptId()));
                                    do {
                                        receiveN = receiveN(1);
                                        if (receiveN[0] instanceof StackTraceSampleResponse) {
                                            Assert.assertEquals(createTaskDeploymentDescriptor.getExecutionAttemptId(), ((StackTraceSampleResponse) receiveN[0]).getExecutionAttemptID());
                                            Assert.assertEquals(44L, r0.getSampleId());
                                            return;
                                        }
                                    } while (!(receiveN[0] instanceof Failure));
                                    Await.ready(ask, remaining());
                                    Future ask2 = createTaskManager.ask(new TestingTaskManagerMessages.NotifyWhenTaskIsRunning(createTaskDeploymentDescriptor.getExecutionAttemptId()), TaskManagerTest.timeout);
                                    createTaskManager.tell(new TaskMessages.SubmitTask(createTaskDeploymentDescriptor));
                                    Await.ready(ask2, remaining());
                                    i2++;
                                    i *= 2;
                                } catch (Exception e) {
                                    e.printStackTrace();
                                    Assert.fail(e.getMessage());
                                    return;
                                }
                            }
                        }
                    };
                    TestingUtils.stopActor((ActorGateway) null);
                    TestingUtils.stopActor((ActorGateway) akkaActorGateway);
                } catch (Throwable th) {
                    TestingUtils.stopActor((ActorGateway) null);
                    TestingUtils.stopActor((ActorGateway) akkaActorGateway);
                    throw th;
                }
            }
        };
    }

    @Test
    public void testTerminationOnFatalError() {
        this.highAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID, new TestingLeaderRetrievalService());
        new JavaTestKit(system) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerTest.11
            {
                ActorGateway createTaskManager = TestingUtils.createTaskManager(TaskManagerTest.system, TaskManagerTest.this.highAvailabilityServices, new Configuration(), true, false);
                try {
                    watch(createTaskManager.actor());
                    createTaskManager.tell(new TaskManagerMessages.FatalError("test fatal error", new Exception("something super bad")));
                    expectTerminated(TaskManagerTest.d, createTaskManager.actor());
                    createTaskManager.tell(Kill.getInstance());
                } catch (Throwable th) {
                    createTaskManager.tell(Kill.getInstance());
                    throw th;
                }
            }
        };
    }

    @Test(timeout = 10000)
    public void testFailingScheduleOrUpdateConsumersMessage() throws Exception {
        new JavaTestKit(system) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerTest.12
            {
                Configuration configuration = new Configuration();
                configuration.setInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 4096);
                TaskDeploymentDescriptor createTaskDeploymentDescriptor = TaskManagerTest.createTaskDeploymentDescriptor(new JobID(), "TestJob", new JobVertexID(), new ExecutionAttemptID(), new SerializedValue(new ExecutionConfig()), "TestTask", 1, 0, 1, 0, new Configuration(), new Configuration(), TestInvokableRecordCancel.class.getName(), Collections.singletonList(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), new IntermediateResultPartitionID(), ResultPartitionType.PIPELINED, 1, 1, true)), Collections.emptyList(), new ArrayList(), Collections.emptyList(), 0);
                AkkaActorGateway akkaActorGateway = new AkkaActorGateway(TaskManagerTest.system.actorOf(Props.create(FailingScheduleOrUpdateConsumersJobManager.class, new Object[]{TaskManagerTest.leaderSessionID}), "jobmanager"), TaskManagerTest.leaderSessionID);
                TaskManagerTest.this.highAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID, new StandaloneLeaderRetrievalService(akkaActorGateway.path(), akkaActorGateway.leaderSessionID()));
                ActorGateway createTaskManager = TestingUtils.createTaskManager(TaskManagerTest.system, TaskManagerTest.this.highAvailabilityServices, configuration, true, true);
                try {
                    TestInvokableRecordCancel.resetGotCanceledFuture();
                    Await.result(createTaskManager.ask(new TaskMessages.SubmitTask(createTaskDeploymentDescriptor), TaskManagerTest.timeout), TaskManagerTest.timeout);
                    Assert.assertEquals(true, TestInvokableRecordCancel.gotCanceled().get());
                    TestingUtils.stopActor(createTaskManager);
                    TestingUtils.stopActor((ActorGateway) akkaActorGateway);
                } catch (Throwable th) {
                    TestingUtils.stopActor(createTaskManager);
                    TestingUtils.stopActor((ActorGateway) akkaActorGateway);
                    throw th;
                }
            }
        };
    }

    @Test
    public void testSubmitTaskFailure() throws Exception {
        ActorGateway actorGateway = null;
        ActorGateway actorGateway2 = null;
        try {
            actorGateway = new AkkaActorGateway(system.actorOf(Props.create(SimpleJobManager.class, new Object[]{leaderSessionID})), leaderSessionID);
            this.highAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID, new StandaloneLeaderRetrievalService(actorGateway.path(), actorGateway.leaderSessionID()));
            actorGateway2 = TestingUtils.createTaskManager(system, this.highAvailabilityServices, new Configuration(), true, true);
            try {
                Await.result(actorGateway2.ask(new TaskMessages.SubmitTask(createTaskDeploymentDescriptor(new JobID(), "test job", new JobVertexID(), new ExecutionAttemptID(), new SerializedValue(new ExecutionConfig()), "test task", 0, 0, 1, 0, new Configuration(), new Configuration(), "Foobar", Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), 0)), timeout), timeout);
                Assert.fail("The submit task message should have failed.");
            } catch (IllegalArgumentException e) {
            }
            TestingUtils.stopActor(actorGateway);
            TestingUtils.stopActor(actorGateway2);
        } catch (Throwable th) {
            TestingUtils.stopActor(actorGateway);
            TestingUtils.stopActor(actorGateway2);
            throw th;
        }
    }

    @Test
    public void testStopTaskFailure() throws Exception {
        ActorGateway actorGateway = null;
        ActorGateway actorGateway2 = null;
        try {
            ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
            actorGateway = new AkkaActorGateway(system.actorOf(Props.create(SimpleJobManager.class, new Object[]{leaderSessionID})), leaderSessionID);
            this.highAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID, new StandaloneLeaderRetrievalService(actorGateway.path(), actorGateway.leaderSessionID()));
            actorGateway2 = TestingUtils.createTaskManager(system, this.highAvailabilityServices, new Configuration(), true, true);
            Await.result(actorGateway2.ask(new TaskMessages.SubmitTask(createTaskDeploymentDescriptor(new JobID(), "test job", new JobVertexID(), executionAttemptID, new SerializedValue(new ExecutionConfig()), "test task", 1, 0, 1, 0, new Configuration(), new Configuration(), BlockingNoOpInvokable.class.getName(), Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), 0)), timeout), timeout);
            try {
                Await.result(actorGateway2.ask(new TaskMessages.StopTask(executionAttemptID), timeout), timeout);
                Assert.fail("The stop task message should have failed.");
            } catch (UnsupportedOperationException e) {
            }
            TestingUtils.stopActor(actorGateway);
            TestingUtils.stopActor(actorGateway2);
        } catch (Throwable th) {
            TestingUtils.stopActor(actorGateway);
            TestingUtils.stopActor(actorGateway2);
            throw th;
        }
    }

    @Test
    public void testStackTraceSampleFailure() throws Exception {
        ActorGateway actorGateway = null;
        ActorGateway actorGateway2 = null;
        try {
            actorGateway = new AkkaActorGateway(system.actorOf(Props.create(SimpleJobManager.class, new Object[]{leaderSessionID})), leaderSessionID);
            this.highAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID, new StandaloneLeaderRetrievalService(actorGateway.path(), actorGateway.leaderSessionID()));
            actorGateway2 = TestingUtils.createTaskManager(system, this.highAvailabilityServices, new Configuration(), true, true);
            try {
                Await.result(actorGateway2.ask(new StackTraceSampleMessages.TriggerStackTraceSample(0, new ExecutionAttemptID(), 0, Time.milliseconds(1L), 0), timeout), timeout);
                Assert.fail("The trigger stack trace message should have failed.");
            } catch (IllegalStateException e) {
            }
            TestingUtils.stopActor(actorGateway);
            TestingUtils.stopActor(actorGateway2);
        } catch (Throwable th) {
            TestingUtils.stopActor(actorGateway);
            TestingUtils.stopActor(actorGateway2);
            throw th;
        }
    }

    @Test
    public void testUpdateTaskInputPartitionsFailure() throws Exception {
        ActorGateway actorGateway = null;
        ActorGateway actorGateway2 = null;
        try {
            ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
            actorGateway = new AkkaActorGateway(system.actorOf(Props.create(SimpleJobManager.class, new Object[]{leaderSessionID})), leaderSessionID);
            this.highAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID, new StandaloneLeaderRetrievalService(actorGateway.path(), actorGateway.leaderSessionID()));
            actorGateway2 = TestingUtils.createTaskManager(system, this.highAvailabilityServices, new Configuration(), true, true);
            Await.result(actorGateway2.ask(new TaskMessages.SubmitTask(createTaskDeploymentDescriptor(new JobID(), "test job", new JobVertexID(), executionAttemptID, new SerializedValue(new ExecutionConfig()), "test task", 1, 0, 1, 0, new Configuration(), new Configuration(), BlockingNoOpInvokable.class.getName(), Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), 0)), timeout), timeout);
            try {
                Await.result(actorGateway2.ask(new TaskMessages.UpdateTaskSinglePartitionInfo(executionAttemptID, new IntermediateDataSetID(), new InputChannelDeploymentDescriptor(new ResultPartitionID(), ResultPartitionLocation.createLocal())), timeout), timeout);
                Assert.fail("The update task input partitions message should have failed.");
            } catch (Exception e) {
            }
            TestingUtils.stopActor(actorGateway);
            TestingUtils.stopActor(actorGateway2);
        } catch (Throwable th) {
            TestingUtils.stopActor(actorGateway);
            TestingUtils.stopActor(actorGateway2);
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static TaskDeploymentDescriptor createTaskDeploymentDescriptor(JobID jobID, String str, JobVertexID jobVertexID, ExecutionAttemptID executionAttemptID, SerializedValue<ExecutionConfig> serializedValue, String str2, int i, int i2, int i3, int i4, Configuration configuration, Configuration configuration2, String str3, Collection<ResultPartitionDeploymentDescriptor> collection, Collection<InputGateDeploymentDescriptor> collection2, Collection<PermanentBlobKey> collection3, Collection<URL> collection4, int i5) throws IOException {
        JobInformation jobInformation = new JobInformation(jobID, str, serializedValue, configuration, collection3, collection4);
        TaskInformation taskInformation = new TaskInformation(jobVertexID, str2, i3, i, str3, configuration2);
        return new TaskDeploymentDescriptor(jobID, new TaskDeploymentDescriptor.NonOffloaded(new SerializedValue(jobInformation)), new TaskDeploymentDescriptor.NonOffloaded(new SerializedValue(taskInformation)), executionAttemptID, new AllocationID(), i2, i4, i5, (TaskStateSnapshot) null, collection, collection2);
    }
}
