package org.apache.flink.runtime.taskmanager;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Kill;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.japi.Creator;
import akka.pattern.Patterns;
import akka.testkit.JavaTestKit;
import akka.util.Timeout;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.camel.management.DefaultManagementAgent;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.StreamingMode;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionLocation;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.Tasks;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.Messages;
import org.apache.flink.runtime.messages.RegistrationMessages;
import org.apache.flink.runtime.messages.TaskManagerMessages;
import org.apache.flink.runtime.messages.TaskMessages;
import org.apache.flink.runtime.net.NetUtils;
import org.apache.flink.runtime.testingUtils.TestingTaskManager;
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
import org.apache.hadoop.hdfs.TestBlockReaderLocal;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskManagerTest.class */
public class TaskManagerTest {
    private static final Logger LOG = LoggerFactory.getLogger(TaskManagerTest.class);
    private static final Timeout timeout = new Timeout(1, TimeUnit.MINUTES);
    private static final FiniteDuration d = new FiniteDuration(20, TimeUnit.SECONDS);
    private static ActorSystem system;

    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskManagerTest$SimpleJobManager.class */
    public static class SimpleJobManager extends UntypedActor {
        @Override // akka.actor.UntypedActor
        public void onReceive(Object obj) throws Exception {
            if (obj instanceof RegistrationMessages.RegisterTaskManager) {
                InstanceID instanceID = new InstanceID();
                ActorRef self = getSelf();
                getSender().tell(new RegistrationMessages.AcknowledgeRegistration(self, instanceID, TestBlockReaderLocal.BlockReaderLocalTest.TEST_LENGTH), self);
            } else if (obj instanceof TaskMessages.UpdateTaskExecutionState) {
                getSender().tell(true, getSelf());
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskManagerTest$SimpleLookupFailingUpdateJobManager.class */
    public static class SimpleLookupFailingUpdateJobManager extends SimpleLookupJobManager {
        private final Set<ExecutionAttemptID> validIDs;

        public SimpleLookupFailingUpdateJobManager(Set<ExecutionAttemptID> set) {
            this.validIDs = new HashSet(set);
        }

        @Override // org.apache.flink.runtime.taskmanager.TaskManagerTest.SimpleLookupJobManager, org.apache.flink.runtime.taskmanager.TaskManagerTest.SimpleJobManager, akka.actor.UntypedActor
        public void onReceive(Object obj) throws Exception {
            if (!(obj instanceof TaskMessages.UpdateTaskExecutionState)) {
                super.onReceive(obj);
            } else if (this.validIDs.contains(((TaskMessages.UpdateTaskExecutionState) obj).taskExecutionState().getID())) {
                getSender().tell(true, getSelf());
            } else {
                getSender().tell(false, getSelf());
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskManagerTest$SimpleLookupFailingUpdateJobManagerCreator.class */
    public static class SimpleLookupFailingUpdateJobManagerCreator implements Creator<SimpleLookupFailingUpdateJobManager> {
        private final Set<ExecutionAttemptID> validIDs = new HashSet();

        public SimpleLookupFailingUpdateJobManagerCreator(ExecutionAttemptID... executionAttemptIDArr) {
            for (ExecutionAttemptID executionAttemptID : executionAttemptIDArr) {
                this.validIDs.add(executionAttemptID);
            }
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // akka.japi.Creator
        public SimpleLookupFailingUpdateJobManager create() throws Exception {
            return new SimpleLookupFailingUpdateJobManager(this.validIDs);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskManagerTest$SimpleLookupJobManager.class */
    public static class SimpleLookupJobManager extends SimpleJobManager {
        @Override // org.apache.flink.runtime.taskmanager.TaskManagerTest.SimpleJobManager, akka.actor.UntypedActor
        public void onReceive(Object obj) throws Exception {
            if (obj instanceof JobManagerMessages.ScheduleOrUpdateConsumers) {
                getSender().tell(new JobManagerMessages.ConsumerNotificationResult(true, Option.apply(null)), getSelf());
            } else {
                super.onReceive(obj);
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskManagerTest$SimpleLookupJobManagerCreator.class */
    public static class SimpleLookupJobManagerCreator implements Creator<SimpleLookupJobManager> {
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // akka.japi.Creator
        public SimpleLookupJobManager create() throws Exception {
            return new SimpleLookupJobManager();
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskManagerTest$SimplePartitionStateLookupJobManager.class */
    public static class SimplePartitionStateLookupJobManager extends SimpleJobManager {
        private final ActorRef testActor;

        public SimplePartitionStateLookupJobManager(ActorRef actorRef) {
            this.testActor = actorRef;
        }

        @Override // org.apache.flink.runtime.taskmanager.TaskManagerTest.SimpleJobManager, akka.actor.UntypedActor
        public void onReceive(Object obj) throws Exception {
            if (obj instanceof JobManagerMessages.RequestPartitionState) {
                JobManagerMessages.RequestPartitionState requestPartitionState = (JobManagerMessages.RequestPartitionState) obj;
                getSender().tell(new TaskMessages.PartitionState(requestPartitionState.taskExecutionId(), requestPartitionState.taskResultId(), requestPartitionState.partitionId().getPartitionId(), ExecutionState.RUNNING), getSelf());
            } else {
                if (!(obj instanceof TaskMessages.UpdateTaskExecutionState)) {
                    super.onReceive(obj);
                    return;
                }
                TaskExecutionState taskExecutionState = ((TaskMessages.UpdateTaskExecutionState) obj).taskExecutionState();
                if (taskExecutionState.getExecutionState().isTerminal()) {
                    this.testActor.tell(taskExecutionState, self());
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskManagerTest$SimplePartitionStateLookupJobManagerCreator.class */
    public static class SimplePartitionStateLookupJobManagerCreator implements Creator<SimplePartitionStateLookupJobManager> {
        private final ActorRef testActor;

        public SimplePartitionStateLookupJobManagerCreator(IntermediateDataSetID intermediateDataSetID, ActorRef actorRef) {
            this.testActor = actorRef;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // akka.japi.Creator
        public SimplePartitionStateLookupJobManager create() throws Exception {
            return new SimplePartitionStateLookupJobManager(this.testActor);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskManagerTest$TestInvokableBlockingCancelable.class */
    public static final class TestInvokableBlockingCancelable extends AbstractInvokable {
        @Override // org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable
        public void registerInputOutput() {
        }

        @Override // org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable
        public void invoke() throws Exception {
            Object obj = new Object();
            synchronized (obj) {
                obj.wait();
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskManagerTest$TestInvokableCorrect.class */
    public static final class TestInvokableCorrect extends AbstractInvokable {
        @Override // org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable
        public void registerInputOutput() {
        }

        @Override // org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable
        public void invoke() {
        }
    }

    @BeforeClass
    public static void setup() {
        system = AkkaUtils.createLocalActorSystem(new Configuration());
    }

    @AfterClass
    public static void teardown() {
        JavaTestKit.shutdownActorSystem(system);
    }

    @Test
    public void testSubmitAndExecuteTask() {
        LOG.info("--------------------------------------------------------------------\n     Starting testSubmitAndExecuteTask() \n--------------------------------------------------------------------");
        new JavaTestKit(system) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerTest.1
            {
                final ActorRef actorRef = null;
                try {
                    try {
                        actorRef = TaskManagerTest.createTaskManager(getTestActor(), false);
                        new JavaTestKit.Within(TaskManagerTest.d) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerTest.1.1
                            protected void run() {
                                expectMsgClass(RegistrationMessages.RegisterTaskManager.class);
                                InstanceID instanceID = new InstanceID();
                                Assert.assertEquals(actorRef, getLastSender());
                                actorRef.tell(new RegistrationMessages.AcknowledgeRegistration(getTestActor(), instanceID, TestBlockReaderLocal.BlockReaderLocalTest.TEST_LENGTH), getTestActor());
                            }
                        };
                        final JobID jobID = new JobID();
                        JobVertexID jobVertexID = new JobVertexID();
                        final ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
                        final TaskDeploymentDescriptor taskDeploymentDescriptor = new TaskDeploymentDescriptor(jobID, jobVertexID, executionAttemptID, "TestTask", 2, 7, new Configuration(), new Configuration(), TestInvokableCorrect.class.getName(), Collections.emptyList(), Collections.emptyList(), new ArrayList(), 0);
                        new JavaTestKit.Within(TaskManagerTest.d) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerTest.1.2
                            protected void run() {
                                actorRef.tell(new TaskMessages.SubmitTask(taskDeploymentDescriptor), getRef());
                                long currentTimeMillis = System.currentTimeMillis() + 10000;
                                while (receiveOne(TaskManagerTest.d) != Messages.getAcknowledge() && System.currentTimeMillis() < currentTimeMillis) {
                                }
                                TaskMessages.UpdateTaskExecutionState updateTaskExecutionState = new TaskMessages.UpdateTaskExecutionState(new TaskExecutionState(jobID, executionAttemptID, ExecutionState.RUNNING));
                                TaskMessages.UpdateTaskExecutionState updateTaskExecutionState2 = new TaskMessages.UpdateTaskExecutionState(new TaskExecutionState(jobID, executionAttemptID, ExecutionState.FINISHED));
                                long currentTimeMillis2 = System.currentTimeMillis() + 10000;
                                do {
                                    Object receiveOne = receiveOne(TaskManagerTest.d);
                                    if (receiveOne.equals(updateTaskExecutionState)) {
                                        break;
                                    } else if (!(receiveOne instanceof TaskManagerMessages.Heartbeat)) {
                                        Assert.fail("Unexpected message: " + receiveOne);
                                    }
                                } while (System.currentTimeMillis() < currentTimeMillis2);
                                long currentTimeMillis3 = System.currentTimeMillis() + 10000;
                                do {
                                    Object receiveOne2 = receiveOne(TaskManagerTest.d);
                                    if (receiveOne2.equals(updateTaskExecutionState2)) {
                                        return;
                                    }
                                    if (!(receiveOne2 instanceof TaskManagerMessages.Heartbeat)) {
                                        Assert.fail("Unexpected message: " + receiveOne2);
                                    }
                                } while (System.currentTimeMillis() < currentTimeMillis3);
                            }
                        };
                        if (actorRef != null) {
                            actorRef.tell(Kill.getInstance(), ActorRef.noSender());
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                        Assert.fail(e.getMessage());
                        if (actorRef != null) {
                            actorRef.tell(Kill.getInstance(), ActorRef.noSender());
                        }
                    }
                } catch (Throwable th) {
                    if (actorRef != null) {
                        actorRef.tell(Kill.getInstance(), ActorRef.noSender());
                    }
                    throw th;
                }
            }
        };
    }

    @Test
    public void testJobSubmissionAndCanceling() {
        LOG.info("--------------------------------------------------------------------\n     Starting testJobSubmissionAndCanceling() \n--------------------------------------------------------------------");
        new JavaTestKit(system) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerTest.2
            {
                ActorRef actorRef = null;
                final ActorRef actorRef2 = null;
                try {
                    try {
                        actorRef = TaskManagerTest.system.actorOf(Props.create((Class<?>) SimpleJobManager.class, new Object[0]));
                        actorRef2 = TaskManagerTest.createTaskManager(actorRef, true);
                        JobID jobID = new JobID();
                        JobID jobID2 = new JobID();
                        JobVertexID jobVertexID = new JobVertexID();
                        JobVertexID jobVertexID2 = new JobVertexID();
                        final ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
                        final ExecutionAttemptID executionAttemptID2 = new ExecutionAttemptID();
                        final TaskDeploymentDescriptor taskDeploymentDescriptor = new TaskDeploymentDescriptor(jobID, jobVertexID, executionAttemptID, "TestTask1", 1, 5, new Configuration(), new Configuration(), TestInvokableBlockingCancelable.class.getName(), Collections.emptyList(), Collections.emptyList(), new ArrayList(), 0);
                        final TaskDeploymentDescriptor taskDeploymentDescriptor2 = new TaskDeploymentDescriptor(jobID2, jobVertexID2, executionAttemptID2, "TestTask2", 2, 7, new Configuration(), new Configuration(), TestInvokableBlockingCancelable.class.getName(), Collections.emptyList(), Collections.emptyList(), new ArrayList(), 0);
                        new JavaTestKit.Within(TaskManagerTest.d) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerTest.2.1
                            protected void run() {
                                try {
                                    Future<Object> ask = Patterns.ask(actorRef2, new TestingTaskManagerMessages.NotifyWhenTaskIsRunning(executionAttemptID), TaskManagerTest.timeout);
                                    Future<Object> ask2 = Patterns.ask(actorRef2, new TestingTaskManagerMessages.NotifyWhenTaskIsRunning(executionAttemptID2), TaskManagerTest.timeout);
                                    actorRef2.tell(new TaskMessages.SubmitTask(taskDeploymentDescriptor), getRef());
                                    actorRef2.tell(new TaskMessages.SubmitTask(taskDeploymentDescriptor2), getRef());
                                    expectMsgEquals(Messages.getAcknowledge());
                                    expectMsgEquals(Messages.getAcknowledge());
                                    Await.ready(ask, TaskManagerTest.d);
                                    Await.ready(ask2, TaskManagerTest.d);
                                    actorRef2.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
                                    Map<ExecutionAttemptID, Task> asJava = ((TestingTaskManagerMessages.ResponseRunningTasks) expectMsgClass(TestingTaskManagerMessages.ResponseRunningTasks.class)).asJava();
                                    Assert.assertEquals(2L, asJava.size());
                                    Task task = asJava.get(executionAttemptID);
                                    Task task2 = asJava.get(executionAttemptID2);
                                    Assert.assertNotNull(task);
                                    Assert.assertNotNull(task2);
                                    Assert.assertEquals(ExecutionState.RUNNING, task.getExecutionState());
                                    Assert.assertEquals(ExecutionState.RUNNING, task2.getExecutionState());
                                    actorRef2.tell(new TaskMessages.CancelTask(executionAttemptID), getRef());
                                    expectMsgEquals(new TaskMessages.TaskOperationResult(executionAttemptID, true));
                                    Await.ready(Patterns.ask(actorRef2, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(executionAttemptID), TaskManagerTest.timeout), TaskManagerTest.d);
                                    Assert.assertEquals(ExecutionState.CANCELED, task.getExecutionState());
                                    actorRef2.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
                                    Assert.assertEquals(1L, ((TestingTaskManagerMessages.ResponseRunningTasks) expectMsgClass(TestingTaskManagerMessages.ResponseRunningTasks.class)).asJava().size());
                                    actorRef2.tell(new TaskMessages.CancelTask(executionAttemptID), getRef());
                                    expectMsgEquals(new TaskMessages.TaskOperationResult(executionAttemptID, false, "No task with that execution ID was found."));
                                    actorRef2.tell(new TaskMessages.CancelTask(executionAttemptID2), getRef());
                                    expectMsgEquals(new TaskMessages.TaskOperationResult(executionAttemptID2, true));
                                    Await.ready(Patterns.ask(actorRef2, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(executionAttemptID2), TaskManagerTest.timeout), TaskManagerTest.d);
                                    Assert.assertEquals(ExecutionState.CANCELED, task2.getExecutionState());
                                    actorRef2.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
                                    Assert.assertEquals(0L, ((TestingTaskManagerMessages.ResponseRunningTasks) expectMsgClass(TestingTaskManagerMessages.ResponseRunningTasks.class)).asJava().size());
                                } catch (Exception e) {
                                    e.printStackTrace();
                                    Assert.fail(e.getMessage());
                                }
                            }
                        };
                        if (actorRef2 != null) {
                            actorRef2.tell(Kill.getInstance(), ActorRef.noSender());
                        }
                        if (actorRef != null) {
                            actorRef.tell(Kill.getInstance(), ActorRef.noSender());
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                        Assert.fail(e.getMessage());
                        if (actorRef2 != null) {
                            actorRef2.tell(Kill.getInstance(), ActorRef.noSender());
                        }
                        if (actorRef != null) {
                            actorRef.tell(Kill.getInstance(), ActorRef.noSender());
                        }
                    }
                } catch (Throwable th) {
                    if (actorRef2 != null) {
                        actorRef2.tell(Kill.getInstance(), ActorRef.noSender());
                    }
                    if (actorRef != null) {
                        actorRef.tell(Kill.getInstance(), ActorRef.noSender());
                    }
                    throw th;
                }
            }
        };
    }

    @Test
    public void testGateChannelEdgeMismatch() {
        LOG.info("--------------------------------------------------------------------\n     Starting testGateChannelEdgeMismatch() \n--------------------------------------------------------------------");
        new JavaTestKit(system) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerTest.3
            {
                ActorRef actorRef = null;
                final ActorRef actorRef2 = null;
                try {
                    try {
                        actorRef = TaskManagerTest.system.actorOf(Props.create((Class<?>) SimpleJobManager.class, new Object[0]));
                        actorRef2 = TaskManagerTest.createTaskManager(actorRef, true);
                        JobID jobID = new JobID();
                        JobVertexID jobVertexID = new JobVertexID();
                        JobVertexID jobVertexID2 = new JobVertexID();
                        final ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
                        final ExecutionAttemptID executionAttemptID2 = new ExecutionAttemptID();
                        final TaskDeploymentDescriptor taskDeploymentDescriptor = new TaskDeploymentDescriptor(jobID, jobVertexID, executionAttemptID, "Sender", 0, 1, new Configuration(), new Configuration(), Tasks.Sender.class.getName(), Collections.emptyList(), Collections.emptyList(), new ArrayList(), 0);
                        final TaskDeploymentDescriptor taskDeploymentDescriptor2 = new TaskDeploymentDescriptor(jobID, jobVertexID2, executionAttemptID2, "Receiver", 2, 7, new Configuration(), new Configuration(), Tasks.Receiver.class.getName(), Collections.emptyList(), Collections.emptyList(), new ArrayList(), 0);
                        new JavaTestKit.Within(TaskManagerTest.d) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerTest.3.1
                            protected void run() {
                                try {
                                    actorRef2.tell(new TaskMessages.SubmitTask(taskDeploymentDescriptor), getRef());
                                    actorRef2.tell(new TaskMessages.SubmitTask(taskDeploymentDescriptor2), getRef());
                                    expectMsgEquals(Messages.getAcknowledge());
                                    expectMsgEquals(Messages.getAcknowledge());
                                    actorRef2.tell(new TestingTaskManagerMessages.NotifyWhenTaskRemoved(executionAttemptID), getRef());
                                    actorRef2.tell(new TestingTaskManagerMessages.NotifyWhenTaskRemoved(executionAttemptID2), getRef());
                                    expectMsgEquals(true);
                                    expectMsgEquals(true);
                                    actorRef2.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
                                    Assert.assertEquals(0L, ((TestingTaskManagerMessages.ResponseRunningTasks) expectMsgClass(TestingTaskManagerMessages.ResponseRunningTasks.class)).asJava().size());
                                } catch (Exception e) {
                                    e.printStackTrace();
                                    Assert.fail(e.getMessage());
                                }
                            }
                        };
                        if (actorRef2 != null) {
                            actorRef2.tell(Kill.getInstance(), ActorRef.noSender());
                        }
                        if (actorRef != null) {
                            actorRef.tell(Kill.getInstance(), ActorRef.noSender());
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                        Assert.fail(e.getMessage());
                        if (actorRef2 != null) {
                            actorRef2.tell(Kill.getInstance(), ActorRef.noSender());
                        }
                        if (actorRef != null) {
                            actorRef.tell(Kill.getInstance(), ActorRef.noSender());
                        }
                    }
                } catch (Throwable th) {
                    if (actorRef2 != null) {
                        actorRef2.tell(Kill.getInstance(), ActorRef.noSender());
                    }
                    if (actorRef != null) {
                        actorRef.tell(Kill.getInstance(), ActorRef.noSender());
                    }
                    throw th;
                }
            }
        };
    }

    @Test
    public void testRunJobWithForwardChannel() {
        LOG.info("--------------------------------------------------------------------\n     Starting testRunJobWithForwardChannel() \n--------------------------------------------------------------------");
        new JavaTestKit(system) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerTest.4
            {
                ActorRef actorRef = null;
                final ActorRef actorRef2 = null;
                try {
                    try {
                        JobID jobID = new JobID();
                        JobVertexID jobVertexID = new JobVertexID();
                        JobVertexID jobVertexID2 = new JobVertexID();
                        final ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
                        final ExecutionAttemptID executionAttemptID2 = new ExecutionAttemptID();
                        actorRef = TaskManagerTest.system.actorOf(Props.create(new SimpleLookupJobManagerCreator()));
                        actorRef2 = TaskManagerTest.createTaskManager(actorRef, true);
                        IntermediateResultPartitionID intermediateResultPartitionID = new IntermediateResultPartitionID();
                        ArrayList arrayList = new ArrayList();
                        arrayList.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), intermediateResultPartitionID, ResultPartitionType.PIPELINED, 1));
                        InputGateDeploymentDescriptor inputGateDeploymentDescriptor = new InputGateDeploymentDescriptor(new IntermediateDataSetID(), 0, new InputChannelDeploymentDescriptor[]{new InputChannelDeploymentDescriptor(new ResultPartitionID(intermediateResultPartitionID, executionAttemptID), ResultPartitionLocation.createLocal())});
                        final TaskDeploymentDescriptor taskDeploymentDescriptor = new TaskDeploymentDescriptor(jobID, jobVertexID, executionAttemptID, "Sender", 0, 1, new Configuration(), new Configuration(), Tasks.Sender.class.getName(), arrayList, Collections.emptyList(), new ArrayList(), 0);
                        final TaskDeploymentDescriptor taskDeploymentDescriptor2 = new TaskDeploymentDescriptor(jobID, jobVertexID2, executionAttemptID2, "Receiver", 2, 7, new Configuration(), new Configuration(), Tasks.Receiver.class.getName(), Collections.emptyList(), Collections.singletonList(inputGateDeploymentDescriptor), new ArrayList(), 0);
                        new JavaTestKit.Within(TaskManagerTest.d) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerTest.4.1
                            protected void run() {
                                try {
                                    Future<Object> ask = Patterns.ask(actorRef2, new TestingTaskManagerMessages.NotifyWhenTaskIsRunning(executionAttemptID), TaskManagerTest.timeout);
                                    Future<Object> ask2 = Patterns.ask(actorRef2, new TestingTaskManagerMessages.NotifyWhenTaskIsRunning(executionAttemptID2), TaskManagerTest.timeout);
                                    actorRef2.tell(new TaskMessages.SubmitTask(taskDeploymentDescriptor), getRef());
                                    expectMsgEquals(Messages.getAcknowledge());
                                    Await.ready(ask, TaskManagerTest.d);
                                    actorRef2.tell(new TaskMessages.SubmitTask(taskDeploymentDescriptor2), getRef());
                                    expectMsgEquals(Messages.getAcknowledge());
                                    Await.ready(ask2, TaskManagerTest.d);
                                    actorRef2.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
                                    Map<ExecutionAttemptID, Task> asJava = ((TestingTaskManagerMessages.ResponseRunningTasks) expectMsgClass(TestingTaskManagerMessages.ResponseRunningTasks.class)).asJava();
                                    Task task = asJava.get(executionAttemptID);
                                    Task task2 = asJava.get(executionAttemptID2);
                                    if (task != null) {
                                        Await.ready(Patterns.ask(actorRef2, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(executionAttemptID), TaskManagerTest.timeout), TaskManagerTest.d);
                                    }
                                    if (task2 != null) {
                                        Await.ready(Patterns.ask(actorRef2, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(executionAttemptID2), TaskManagerTest.timeout), TaskManagerTest.d);
                                        Assert.assertEquals(ExecutionState.FINISHED, task2.getExecutionState());
                                    }
                                    actorRef2.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
                                    Assert.assertEquals(0L, ((TestingTaskManagerMessages.ResponseRunningTasks) expectMsgClass(TestingTaskManagerMessages.ResponseRunningTasks.class)).asJava().size());
                                } catch (Exception e) {
                                    e.printStackTrace();
                                    Assert.fail(e.getMessage());
                                }
                            }
                        };
                        if (actorRef2 != null) {
                            actorRef2.tell(Kill.getInstance(), ActorRef.noSender());
                        }
                        if (actorRef != null) {
                            actorRef.tell(Kill.getInstance(), ActorRef.noSender());
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                        Assert.fail(e.getMessage());
                        if (actorRef2 != null) {
                            actorRef2.tell(Kill.getInstance(), ActorRef.noSender());
                        }
                        if (actorRef != null) {
                            actorRef.tell(Kill.getInstance(), ActorRef.noSender());
                        }
                    }
                } catch (Throwable th) {
                    if (actorRef2 != null) {
                        actorRef2.tell(Kill.getInstance(), ActorRef.noSender());
                    }
                    if (actorRef != null) {
                        actorRef.tell(Kill.getInstance(), ActorRef.noSender());
                    }
                    throw th;
                }
            }
        };
    }

    @Test
    public void testCancellingDependentAndStateUpdateFails() {
        LOG.info("--------------------------------------------------------------------\n     Starting testCancellingDependentAndStateUpdateFails() \n--------------------------------------------------------------------");
        new JavaTestKit(system) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerTest.5
            {
                ActorRef actorRef = null;
                final ActorRef actorRef2 = null;
                try {
                    try {
                        JobID jobID = new JobID();
                        JobVertexID jobVertexID = new JobVertexID();
                        JobVertexID jobVertexID2 = new JobVertexID();
                        final ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
                        final ExecutionAttemptID executionAttemptID2 = new ExecutionAttemptID();
                        actorRef = TaskManagerTest.system.actorOf(Props.create(new SimpleLookupFailingUpdateJobManagerCreator(executionAttemptID2)));
                        actorRef2 = TaskManagerTest.createTaskManager(actorRef, true);
                        IntermediateResultPartitionID intermediateResultPartitionID = new IntermediateResultPartitionID();
                        ArrayList arrayList = new ArrayList();
                        arrayList.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), intermediateResultPartitionID, ResultPartitionType.PIPELINED, 1));
                        InputGateDeploymentDescriptor inputGateDeploymentDescriptor = new InputGateDeploymentDescriptor(new IntermediateDataSetID(), 0, new InputChannelDeploymentDescriptor[]{new InputChannelDeploymentDescriptor(new ResultPartitionID(intermediateResultPartitionID, executionAttemptID), ResultPartitionLocation.createLocal())});
                        final TaskDeploymentDescriptor taskDeploymentDescriptor = new TaskDeploymentDescriptor(jobID, jobVertexID, executionAttemptID, "Sender", 0, 1, new Configuration(), new Configuration(), Tasks.Sender.class.getName(), arrayList, Collections.emptyList(), new ArrayList(), 0);
                        final TaskDeploymentDescriptor taskDeploymentDescriptor2 = new TaskDeploymentDescriptor(jobID, jobVertexID2, executionAttemptID2, "Receiver", 2, 7, new Configuration(), new Configuration(), Tasks.BlockingReceiver.class.getName(), Collections.emptyList(), Collections.singletonList(inputGateDeploymentDescriptor), new ArrayList(), 0);
                        new JavaTestKit.Within(TaskManagerTest.d) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerTest.5.1
                            protected void run() {
                                try {
                                    Future<Object> ask = Patterns.ask(actorRef2, new TestingTaskManagerMessages.NotifyWhenTaskIsRunning(executionAttemptID), TaskManagerTest.timeout);
                                    Future<Object> ask2 = Patterns.ask(actorRef2, new TestingTaskManagerMessages.NotifyWhenTaskIsRunning(executionAttemptID2), TaskManagerTest.timeout);
                                    actorRef2.tell(new TaskMessages.SubmitTask(taskDeploymentDescriptor2), getRef());
                                    actorRef2.tell(new TaskMessages.SubmitTask(taskDeploymentDescriptor), getRef());
                                    expectMsgEquals(Messages.getAcknowledge());
                                    expectMsgEquals(Messages.getAcknowledge());
                                    Await.ready(ask, TaskManagerTest.d);
                                    Await.ready(ask2, TaskManagerTest.d);
                                    actorRef2.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
                                    Map<ExecutionAttemptID, Task> asJava = ((TestingTaskManagerMessages.ResponseRunningTasks) expectMsgClass(TestingTaskManagerMessages.ResponseRunningTasks.class)).asJava();
                                    Task task = asJava.get(executionAttemptID);
                                    Task task2 = asJava.get(executionAttemptID2);
                                    actorRef2.tell(new TaskMessages.CancelTask(executionAttemptID2), getRef());
                                    expectMsgEquals(new TaskMessages.TaskOperationResult(executionAttemptID2, true));
                                    if (task2 != null) {
                                        Await.ready(Patterns.ask(actorRef2, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(executionAttemptID2), TaskManagerTest.timeout), TaskManagerTest.d);
                                    }
                                    if (task != null) {
                                        if (task.getExecutionState() == ExecutionState.RUNNING) {
                                            actorRef2.tell(new TaskMessages.CancelTask(executionAttemptID), getRef());
                                            expectMsgEquals(new TaskMessages.TaskOperationResult(executionAttemptID, true));
                                        }
                                        Await.ready(Patterns.ask(actorRef2, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(executionAttemptID), TaskManagerTest.timeout), TaskManagerTest.d);
                                    }
                                    actorRef2.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
                                    Assert.assertEquals(0L, ((TestingTaskManagerMessages.ResponseRunningTasks) expectMsgClass(TestingTaskManagerMessages.ResponseRunningTasks.class)).asJava().size());
                                } catch (Exception e) {
                                    e.printStackTrace();
                                    Assert.fail(e.getMessage());
                                }
                            }
                        };
                        if (actorRef2 != null) {
                            actorRef2.tell(Kill.getInstance(), ActorRef.noSender());
                        }
                        if (actorRef != null) {
                            actorRef.tell(Kill.getInstance(), ActorRef.noSender());
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                        Assert.fail(e.getMessage());
                        if (actorRef2 != null) {
                            actorRef2.tell(Kill.getInstance(), ActorRef.noSender());
                        }
                        if (actorRef != null) {
                            actorRef.tell(Kill.getInstance(), ActorRef.noSender());
                        }
                    }
                } catch (Throwable th) {
                    if (actorRef2 != null) {
                        actorRef2.tell(Kill.getInstance(), ActorRef.noSender());
                    }
                    if (actorRef != null) {
                        actorRef.tell(Kill.getInstance(), ActorRef.noSender());
                    }
                    throw th;
                }
            }
        };
    }

    @Test
    public void testRemotePartitionNotFound() throws Exception {
        new JavaTestKit(system) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerTest.6
            {
                ActorRef actorRef = null;
                final ActorRef actorRef2 = null;
                try {
                    try {
                        IntermediateDataSetID intermediateDataSetID = new IntermediateDataSetID();
                        actorRef = TaskManagerTest.system.actorOf(Props.create(new SimplePartitionStateLookupJobManagerCreator(intermediateDataSetID, getTestActor())));
                        int availablePort = NetUtils.getAvailablePort();
                        actorRef2 = TaskManagerTest.createTaskManager(actorRef, true, false, availablePort);
                        final TaskDeploymentDescriptor taskDeploymentDescriptor = new TaskDeploymentDescriptor(new JobID(), new JobVertexID(), new ExecutionAttemptID(), "Receiver", 0, 1, new Configuration(), new Configuration(), Tasks.AgnosticReceiver.class.getName(), Collections.emptyList(), Collections.singletonList(new InputGateDeploymentDescriptor(intermediateDataSetID, 0, new InputChannelDeploymentDescriptor[]{new InputChannelDeploymentDescriptor(new ResultPartitionID(), ResultPartitionLocation.createRemote(new ConnectionID(new InetSocketAddress(DefaultManagementAgent.DEFAULT_HOST, availablePort), 0)))})), Collections.emptyList(), 0);
                        new JavaTestKit.Within(TaskManagerTest.d) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerTest.6.1
                            protected void run() {
                                actorRef2.tell(new TaskMessages.SubmitTask(taskDeploymentDescriptor), getTestActor());
                                expectMsgClass(Messages.getAcknowledge().getClass());
                                TaskExecutionState taskExecutionState = (TaskExecutionState) expectMsgClass(TaskExecutionState.class);
                                Assert.assertEquals(taskExecutionState.getExecutionState(), ExecutionState.FAILED);
                                Assert.assertEquals(taskExecutionState.getError(ClassLoader.getSystemClassLoader()).getClass(), PartitionNotFoundException.class);
                            }
                        };
                        if (actorRef2 != null) {
                            actorRef2.tell(Kill.getInstance(), ActorRef.noSender());
                        }
                        if (actorRef != null) {
                            actorRef.tell(Kill.getInstance(), ActorRef.noSender());
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                        Assert.fail(e.getMessage());
                        if (actorRef2 != null) {
                            actorRef2.tell(Kill.getInstance(), ActorRef.noSender());
                        }
                        if (actorRef != null) {
                            actorRef.tell(Kill.getInstance(), ActorRef.noSender());
                        }
                    }
                } catch (Throwable th) {
                    if (actorRef2 != null) {
                        actorRef2.tell(Kill.getInstance(), ActorRef.noSender());
                    }
                    if (actorRef != null) {
                        actorRef.tell(Kill.getInstance(), ActorRef.noSender());
                    }
                    throw th;
                }
            }
        };
    }

    @Test
    public void testLocalPartitionNotFound() throws Exception {
        new JavaTestKit(system) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerTest.7
            {
                ActorRef actorRef = null;
                final ActorRef actorRef2 = null;
                try {
                    try {
                        IntermediateDataSetID intermediateDataSetID = new IntermediateDataSetID();
                        actorRef = TaskManagerTest.system.actorOf(Props.create(new SimplePartitionStateLookupJobManagerCreator(intermediateDataSetID, getTestActor())));
                        actorRef2 = TaskManagerTest.createTaskManager(actorRef, true, true, NetUtils.getAvailablePort());
                        final TaskDeploymentDescriptor taskDeploymentDescriptor = new TaskDeploymentDescriptor(new JobID(), new JobVertexID(), new ExecutionAttemptID(), "Receiver", 0, 1, new Configuration(), new Configuration(), Tasks.AgnosticReceiver.class.getName(), Collections.emptyList(), Collections.singletonList(new InputGateDeploymentDescriptor(intermediateDataSetID, 0, new InputChannelDeploymentDescriptor[]{new InputChannelDeploymentDescriptor(new ResultPartitionID(), ResultPartitionLocation.createLocal())})), Collections.emptyList(), 0);
                        new JavaTestKit.Within(TaskManagerTest.d) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerTest.7.1
                            protected void run() {
                                actorRef2.tell(new TaskMessages.SubmitTask(taskDeploymentDescriptor), getTestActor());
                                expectMsgClass(Messages.getAcknowledge().getClass());
                                TaskExecutionState taskExecutionState = (TaskExecutionState) expectMsgClass(TaskExecutionState.class);
                                Assert.assertEquals(taskExecutionState.getExecutionState(), ExecutionState.FAILED);
                                Assert.assertEquals(taskExecutionState.getError(ClassLoader.getSystemClassLoader()).getClass(), PartitionNotFoundException.class);
                            }
                        };
                        if (actorRef2 != null) {
                            actorRef2.tell(Kill.getInstance(), ActorRef.noSender());
                        }
                        if (actorRef != null) {
                            actorRef.tell(Kill.getInstance(), ActorRef.noSender());
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                        Assert.fail(e.getMessage());
                        if (actorRef2 != null) {
                            actorRef2.tell(Kill.getInstance(), ActorRef.noSender());
                        }
                        if (actorRef != null) {
                            actorRef.tell(Kill.getInstance(), ActorRef.noSender());
                        }
                    }
                } catch (Throwable th) {
                    if (actorRef2 != null) {
                        actorRef2.tell(Kill.getInstance(), ActorRef.noSender());
                    }
                    if (actorRef != null) {
                        actorRef.tell(Kill.getInstance(), ActorRef.noSender());
                    }
                    throw th;
                }
            }
        };
    }

    public static ActorRef createTaskManager(ActorRef actorRef, boolean z) {
        return createTaskManager(actorRef, z, true, NetUtils.getAvailablePort());
    }

    public static ActorRef createTaskManager(ActorRef actorRef, boolean z, boolean z2, int i) {
        ActorRef actorRef2 = null;
        try {
            Configuration configuration = new Configuration();
            configuration.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 10);
            configuration.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, i);
            actorRef2 = TaskManager.startTaskManagerComponentsAndActor(configuration, system, DefaultManagementAgent.DEFAULT_HOST, Option.empty(), Option.apply(actorRef.path().toString()), z2, StreamingMode.BATCH_ONLY, TestingTaskManager.class);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail("Could not create test TaskManager: " + e.getMessage());
        }
        if (z) {
            try {
                Await.ready(Patterns.ask(actorRef2, TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(), timeout), new FiniteDuration(100L, TimeUnit.SECONDS));
            } catch (Exception e2) {
                e2.printStackTrace();
                Assert.fail("Exception while waiting for the task manager registration: " + e2.getMessage());
            }
        }
        return actorRef2;
    }
}
