package org.apache.flink.runtime.taskexecutor;

import java.io.IOException;
import java.net.URL;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraphException;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.PartitionInfo;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.TestingAbstractInvokables;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.StackTraceSampleResponse;
import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
import org.apache.flink.runtime.shuffle.PartitionDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.taskexecutor.TaskExecutorTest;
import org.apache.flink.runtime.taskexecutor.TaskSubmissionTestEnvironment;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.class */
public class TaskExecutorSubmissionTest extends TestLogger {
    private static final Time timeout = Time.milliseconds(10000);

    @Rule
    public final TestName testName = new TestName();
    private JobID jobId = new JobID();

    @Test(timeout = 10000)
    public void testTaskSubmission() throws Exception {
        ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
        TaskDeploymentDescriptor createTestTaskDeploymentDescriptor = createTestTaskDeploymentDescriptor("test task", executionAttemptID, TaskExecutorTest.TestInvokable.class);
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        TaskSubmissionTestEnvironment build = new TaskSubmissionTestEnvironment.Builder(this.jobId).setSlotSize(1).addTaskManagerActionListener(executionAttemptID, ExecutionState.RUNNING, completableFuture).build();
        Throwable th = null;
        try {
            try {
                TaskExecutorGateway taskExecutorGateway = build.getTaskExecutorGateway();
                build.getTaskSlotTable().allocateSlot(0, this.jobId, createTestTaskDeploymentDescriptor.getAllocationId(), Time.seconds(60L));
                taskExecutorGateway.submitTask(createTestTaskDeploymentDescriptor, build.getJobMasterId(), timeout).get();
                completableFuture.get();
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build.close();
                }
            }
            throw th4;
        }
    }

    @Test(timeout = 10000)
    public void testSubmitTaskFailure() throws Exception {
        TaskDeploymentDescriptor createTestTaskDeploymentDescriptor = createTestTaskDeploymentDescriptor("test task", new ExecutionAttemptID(), BlockingNoOpInvokable.class, 0);
        try {
            TaskSubmissionTestEnvironment build = new TaskSubmissionTestEnvironment.Builder(this.jobId).build();
            Throwable th = null;
            try {
                try {
                    TaskExecutorGateway taskExecutorGateway = build.getTaskExecutorGateway();
                    build.getTaskSlotTable().allocateSlot(0, this.jobId, createTestTaskDeploymentDescriptor.getAllocationId(), Time.seconds(60L));
                    taskExecutorGateway.submitTask(createTestTaskDeploymentDescriptor, build.getJobMasterId(), timeout).get();
                    if (build != null) {
                        if (0 != 0) {
                            try {
                                build.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            build.close();
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } finally {
            }
        } catch (Exception e) {
            Assert.assertThat(e.getCause(), Matchers.instanceOf(IllegalArgumentException.class));
        }
    }

    @Test(timeout = 10000)
    public void testTaskSubmissionAndCancelling() throws Exception {
        ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
        ExecutionAttemptID executionAttemptID2 = new ExecutionAttemptID();
        TaskDeploymentDescriptor createTestTaskDeploymentDescriptor = createTestTaskDeploymentDescriptor("test task", executionAttemptID, BlockingNoOpInvokable.class);
        TaskDeploymentDescriptor createTestTaskDeploymentDescriptor2 = createTestTaskDeploymentDescriptor("test task", executionAttemptID2, BlockingNoOpInvokable.class);
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        CompletableFuture<Void> completableFuture2 = new CompletableFuture<>();
        CompletableFuture<Void> completableFuture3 = new CompletableFuture<>();
        TaskSubmissionTestEnvironment build = new TaskSubmissionTestEnvironment.Builder(this.jobId).setSlotSize(2).addTaskManagerActionListener(executionAttemptID, ExecutionState.RUNNING, completableFuture).addTaskManagerActionListener(executionAttemptID2, ExecutionState.RUNNING, completableFuture2).addTaskManagerActionListener(executionAttemptID, ExecutionState.CANCELED, completableFuture3).build();
        Throwable th = null;
        try {
            try {
                TaskExecutorGateway taskExecutorGateway = build.getTaskExecutorGateway();
                TaskSlotTable taskSlotTable = build.getTaskSlotTable();
                taskSlotTable.allocateSlot(0, this.jobId, createTestTaskDeploymentDescriptor.getAllocationId(), Time.seconds(60L));
                taskExecutorGateway.submitTask(createTestTaskDeploymentDescriptor, build.getJobMasterId(), timeout).get();
                completableFuture.get();
                taskSlotTable.allocateSlot(1, this.jobId, createTestTaskDeploymentDescriptor2.getAllocationId(), Time.seconds(60L));
                taskExecutorGateway.submitTask(createTestTaskDeploymentDescriptor2, build.getJobMasterId(), timeout).get();
                completableFuture2.get();
                Assert.assertSame(taskSlotTable.getTask(executionAttemptID).getExecutionState(), ExecutionState.RUNNING);
                Assert.assertSame(taskSlotTable.getTask(executionAttemptID2).getExecutionState(), ExecutionState.RUNNING);
                taskExecutorGateway.cancelTask(executionAttemptID, timeout);
                completableFuture3.get();
                Assert.assertSame(taskSlotTable.getTask(executionAttemptID).getExecutionState(), ExecutionState.CANCELED);
                Assert.assertSame(taskSlotTable.getTask(executionAttemptID2).getExecutionState(), ExecutionState.RUNNING);
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build.close();
                }
            }
            throw th4;
        }
    }

    @Test(timeout = 10000)
    public void testGateChannelEdgeMismatch() throws Exception {
        ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
        ExecutionAttemptID executionAttemptID2 = new ExecutionAttemptID();
        TaskDeploymentDescriptor createTestTaskDeploymentDescriptor = createTestTaskDeploymentDescriptor("Sender", executionAttemptID, TestingAbstractInvokables.Sender.class);
        TaskDeploymentDescriptor createTestTaskDeploymentDescriptor2 = createTestTaskDeploymentDescriptor("Receiver", executionAttemptID2, TestingAbstractInvokables.Receiver.class);
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        CompletableFuture<Void> completableFuture2 = new CompletableFuture<>();
        CompletableFuture<Void> completableFuture3 = new CompletableFuture<>();
        CompletableFuture<Void> completableFuture4 = new CompletableFuture<>();
        TaskSubmissionTestEnvironment build = new TaskSubmissionTestEnvironment.Builder(this.jobId).addTaskManagerActionListener(executionAttemptID, ExecutionState.RUNNING, completableFuture).addTaskManagerActionListener(executionAttemptID2, ExecutionState.RUNNING, completableFuture2).addTaskManagerActionListener(executionAttemptID, ExecutionState.FAILED, completableFuture3).addTaskManagerActionListener(executionAttemptID2, ExecutionState.FAILED, completableFuture4).setSlotSize(2).build();
        Throwable th = null;
        try {
            try {
                TaskExecutorGateway taskExecutorGateway = build.getTaskExecutorGateway();
                TaskSlotTable taskSlotTable = build.getTaskSlotTable();
                taskSlotTable.allocateSlot(0, this.jobId, createTestTaskDeploymentDescriptor.getAllocationId(), Time.seconds(60L));
                taskExecutorGateway.submitTask(createTestTaskDeploymentDescriptor, build.getJobMasterId(), timeout).get();
                completableFuture.get();
                taskSlotTable.allocateSlot(1, this.jobId, createTestTaskDeploymentDescriptor2.getAllocationId(), Time.seconds(60L));
                taskExecutorGateway.submitTask(createTestTaskDeploymentDescriptor2, build.getJobMasterId(), timeout).get();
                completableFuture2.get();
                completableFuture3.get();
                completableFuture4.get();
                Assert.assertSame(taskSlotTable.getTask(executionAttemptID).getExecutionState(), ExecutionState.FAILED);
                Assert.assertSame(taskSlotTable.getTask(executionAttemptID2).getExecutionState(), ExecutionState.FAILED);
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build.close();
                }
            }
            throw th4;
        }
    }

    @Test(timeout = 10000)
    public void testRunJobWithForwardChannel() throws Exception {
        ResourceID generate = ResourceID.generate();
        NettyShuffleDescriptor createRemoteWithIdAndLocation = NettyShuffleDescriptorBuilder.createRemoteWithIdAndLocation(new IntermediateResultPartitionID(), generate);
        TaskDeploymentDescriptor createSender = createSender(createRemoteWithIdAndLocation);
        TaskDeploymentDescriptor createReceiver = createReceiver(createRemoteWithIdAndLocation);
        ExecutionAttemptID executionAttemptId = createSender.getExecutionAttemptId();
        ExecutionAttemptID executionAttemptId2 = createReceiver.getExecutionAttemptId();
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        CompletableFuture<Void> completableFuture2 = new CompletableFuture<>();
        CompletableFuture<Void> completableFuture3 = new CompletableFuture<>();
        CompletableFuture<Void> completableFuture4 = new CompletableFuture<>();
        JobMasterId generate2 = JobMasterId.generate();
        TaskSubmissionTestEnvironment build = new TaskSubmissionTestEnvironment.Builder(this.jobId).setResourceID(generate).setSlotSize(2).addTaskManagerActionListener(executionAttemptId, ExecutionState.RUNNING, completableFuture).addTaskManagerActionListener(executionAttemptId2, ExecutionState.RUNNING, completableFuture2).addTaskManagerActionListener(executionAttemptId, ExecutionState.FINISHED, completableFuture3).addTaskManagerActionListener(executionAttemptId2, ExecutionState.FINISHED, completableFuture4).setJobMasterId(generate2).setJobMasterGateway(new TestingJobMasterGatewayBuilder().setFencingTokenSupplier(() -> {
            return generate2;
        }).setScheduleOrUpdateConsumersFunction(resultPartitionID -> {
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).build()).useRealNonMockShuffleEnvironment().build();
        Throwable th = null;
        try {
            try {
                TaskExecutorGateway taskExecutorGateway = build.getTaskExecutorGateway();
                TaskSlotTable taskSlotTable = build.getTaskSlotTable();
                taskSlotTable.allocateSlot(0, this.jobId, createSender.getAllocationId(), Time.seconds(60L));
                taskExecutorGateway.submitTask(createSender, generate2, timeout).get();
                completableFuture.get();
                taskSlotTable.allocateSlot(1, this.jobId, createReceiver.getAllocationId(), Time.seconds(60L));
                taskExecutorGateway.submitTask(createReceiver, generate2, timeout).get();
                completableFuture2.get();
                completableFuture3.get();
                completableFuture4.get();
                Assert.assertSame(taskSlotTable.getTask(executionAttemptId).getExecutionState(), ExecutionState.FINISHED);
                Assert.assertSame(taskSlotTable.getTask(executionAttemptId2).getExecutionState(), ExecutionState.FINISHED);
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build.close();
                }
            }
            throw th4;
        }
    }

    @Test(timeout = 10000)
    public void testCancellingDependentAndStateUpdateFails() throws Exception {
        ResourceID generate = ResourceID.generate();
        NettyShuffleDescriptor createRemoteWithIdAndLocation = NettyShuffleDescriptorBuilder.createRemoteWithIdAndLocation(new IntermediateResultPartitionID(), generate);
        TaskDeploymentDescriptor createSender = createSender(createRemoteWithIdAndLocation);
        TaskDeploymentDescriptor createReceiver = createReceiver(createRemoteWithIdAndLocation);
        ExecutionAttemptID executionAttemptId = createSender.getExecutionAttemptId();
        ExecutionAttemptID executionAttemptId2 = createReceiver.getExecutionAttemptId();
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        CompletableFuture<Void> completableFuture2 = new CompletableFuture<>();
        CompletableFuture<Void> completableFuture3 = new CompletableFuture<>();
        CompletableFuture<Void> completableFuture4 = new CompletableFuture<>();
        JobMasterId generate2 = JobMasterId.generate();
        TaskSubmissionTestEnvironment build = new TaskSubmissionTestEnvironment.Builder(this.jobId).setResourceID(generate).setSlotSize(2).addTaskManagerActionListener(executionAttemptId, ExecutionState.RUNNING, completableFuture).addTaskManagerActionListener(executionAttemptId2, ExecutionState.RUNNING, completableFuture2).addTaskManagerActionListener(executionAttemptId, ExecutionState.FAILED, completableFuture3).addTaskManagerActionListener(executionAttemptId2, ExecutionState.CANCELED, completableFuture4).setJobMasterId(generate2).setJobMasterGateway(new TestingJobMasterGatewayBuilder().setFencingTokenSupplier(() -> {
            return generate2;
        }).setUpdateTaskExecutionStateFunction(taskExecutionState -> {
            return (taskExecutionState == null || !taskExecutionState.getID().equals(executionAttemptId)) ? CompletableFuture.completedFuture(Acknowledge.get()) : FutureUtils.completedExceptionally(new ExecutionGraphException("The execution attempt " + executionAttemptId2 + " was not found."));
        }).build()).useRealNonMockShuffleEnvironment().build();
        Throwable th = null;
        try {
            try {
                TaskExecutorGateway taskExecutorGateway = build.getTaskExecutorGateway();
                TaskSlotTable taskSlotTable = build.getTaskSlotTable();
                taskSlotTable.allocateSlot(0, this.jobId, createSender.getAllocationId(), Time.seconds(60L));
                taskExecutorGateway.submitTask(createSender, generate2, timeout).get();
                completableFuture.get();
                taskSlotTable.allocateSlot(1, this.jobId, createReceiver.getAllocationId(), Time.seconds(60L));
                taskExecutorGateway.submitTask(createReceiver, generate2, timeout).get();
                completableFuture2.get();
                completableFuture3.get();
                Assert.assertSame(taskSlotTable.getTask(executionAttemptId).getExecutionState(), ExecutionState.FAILED);
                taskExecutorGateway.cancelTask(executionAttemptId2, timeout);
                completableFuture4.get();
                Assert.assertSame(taskSlotTable.getTask(executionAttemptId2).getExecutionState(), ExecutionState.CANCELED);
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build.close();
                }
            }
            throw th4;
        }
    }

    @Test(timeout = 10000)
    public void testRemotePartitionNotFound() throws Exception {
        int availablePort = NetUtils.getAvailablePort();
        Configuration configuration = new Configuration();
        configuration.setInteger(NettyShuffleEnvironmentOptions.DATA_PORT, availablePort);
        configuration.setInteger(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
        configuration.setInteger(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
        TaskDeploymentDescriptor createReceiver = createReceiver(NettyShuffleDescriptorBuilder.newBuilder().setDataPort(availablePort).buildRemote());
        ExecutionAttemptID executionAttemptId = createReceiver.getExecutionAttemptId();
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        CompletableFuture<Void> completableFuture2 = new CompletableFuture<>();
        TaskSubmissionTestEnvironment build = new TaskSubmissionTestEnvironment.Builder(this.jobId).setSlotSize(2).addTaskManagerActionListener(executionAttemptId, ExecutionState.RUNNING, completableFuture).addTaskManagerActionListener(executionAttemptId, ExecutionState.FAILED, completableFuture2).setConfiguration(configuration).setLocalCommunication(false).useRealNonMockShuffleEnvironment().build();
        Throwable th = null;
        try {
            try {
                TaskExecutorGateway taskExecutorGateway = build.getTaskExecutorGateway();
                TaskSlotTable taskSlotTable = build.getTaskSlotTable();
                taskSlotTable.allocateSlot(0, this.jobId, createReceiver.getAllocationId(), Time.seconds(60L));
                taskExecutorGateway.submitTask(createReceiver, build.getJobMasterId(), timeout).get();
                completableFuture.get();
                completableFuture2.get();
                Assert.assertThat(taskSlotTable.getTask(executionAttemptId).getFailureCause(), Matchers.instanceOf(PartitionNotFoundException.class));
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testUpdateTaskInputPartitionsFailure() throws Exception {
        ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
        TaskDeploymentDescriptor createTestTaskDeploymentDescriptor = createTestTaskDeploymentDescriptor("test task", executionAttemptID, BlockingNoOpInvokable.class);
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        CompletableFuture<Void> completableFuture2 = new CompletableFuture<>();
        ShuffleEnvironment<?, ?> shuffleEnvironment = (ShuffleEnvironment) Mockito.mock(ShuffleEnvironment.class, Mockito.RETURNS_MOCKS);
        TaskSubmissionTestEnvironment build = new TaskSubmissionTestEnvironment.Builder(this.jobId).setShuffleEnvironment(shuffleEnvironment).setSlotSize(1).addTaskManagerActionListener(executionAttemptID, ExecutionState.RUNNING, completableFuture).addTaskManagerActionListener(executionAttemptID, ExecutionState.FAILED, completableFuture2).build();
        Throwable th = null;
        try {
            try {
                TaskExecutorGateway taskExecutorGateway = build.getTaskExecutorGateway();
                TaskSlotTable taskSlotTable = build.getTaskSlotTable();
                taskSlotTable.allocateSlot(0, this.jobId, createTestTaskDeploymentDescriptor.getAllocationId(), Time.seconds(60L));
                taskExecutorGateway.submitTask(createTestTaskDeploymentDescriptor, build.getJobMasterId(), timeout).get();
                completableFuture.get();
                PartitionInfo partitionInfo = new PartitionInfo(new IntermediateDataSetID(), NettyShuffleDescriptorBuilder.createRemoteWithIdAndLocation(new IntermediateResultPartitionID(), build.getTaskExecutor().getResourceID()));
                ((ShuffleEnvironment) Mockito.doThrow(new Throwable[]{new IOException()}).when(shuffleEnvironment)).updatePartitionInfo(executionAttemptID, partitionInfo);
                taskExecutorGateway.updatePartitions(executionAttemptID, Collections.singletonList(partitionInfo), timeout).get();
                completableFuture2.get();
                Task task = taskSlotTable.getTask(createTestTaskDeploymentDescriptor.getExecutionAttemptId());
                Assert.assertThat(task.getExecutionState(), CoreMatchers.is(ExecutionState.FAILED));
                Assert.assertThat(task.getFailureCause(), Matchers.instanceOf(IOException.class));
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build.close();
                }
            }
            throw th4;
        }
    }

    @Test(timeout = 10000)
    public void testLocalPartitionNotFound() throws Exception {
        ResourceID generate = ResourceID.generate();
        TaskDeploymentDescriptor createReceiver = createReceiver(NettyShuffleDescriptorBuilder.createRemoteWithIdAndLocation(new IntermediateResultPartitionID(), generate));
        ExecutionAttemptID executionAttemptId = createReceiver.getExecutionAttemptId();
        Configuration configuration = new Configuration();
        configuration.setInteger(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
        configuration.setInteger(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        CompletableFuture<Void> completableFuture2 = new CompletableFuture<>();
        TaskSubmissionTestEnvironment build = new TaskSubmissionTestEnvironment.Builder(this.jobId).setResourceID(generate).setSlotSize(1).addTaskManagerActionListener(executionAttemptId, ExecutionState.RUNNING, completableFuture).addTaskManagerActionListener(executionAttemptId, ExecutionState.FAILED, completableFuture2).setConfiguration(configuration).useRealNonMockShuffleEnvironment().build();
        Throwable th = null;
        try {
            try {
                TaskExecutorGateway taskExecutorGateway = build.getTaskExecutorGateway();
                TaskSlotTable taskSlotTable = build.getTaskSlotTable();
                taskSlotTable.allocateSlot(0, this.jobId, createReceiver.getAllocationId(), Time.seconds(60L));
                taskExecutorGateway.submitTask(createReceiver, build.getJobMasterId(), timeout).get();
                completableFuture.get();
                completableFuture2.get();
                Assert.assertSame(taskSlotTable.getTask(executionAttemptId).getExecutionState(), ExecutionState.FAILED);
                Assert.assertThat(taskSlotTable.getTask(executionAttemptId).getFailureCause(), Matchers.instanceOf(PartitionNotFoundException.class));
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build.close();
                }
            }
            throw th4;
        }
    }

    @Test(timeout = 10000)
    public void testFailingScheduleOrUpdateConsumers() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setString(TaskManagerOptions.MEMORY_SEGMENT_SIZE, "4096");
        TaskDeploymentDescriptor createSender = createSender(NettyShuffleDescriptorBuilder.createRemoteWithIdAndLocation(new IntermediateResultPartitionID(), ResourceID.generate()), TestingAbstractInvokables.TestInvokableRecordCancel.class);
        ExecutionAttemptID executionAttemptId = createSender.getExecutionAttemptId();
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        Exception exc = new Exception("Failed schedule or update consumers");
        JobMasterId generate = JobMasterId.generate();
        TaskSubmissionTestEnvironment build = new TaskSubmissionTestEnvironment.Builder(this.jobId).setSlotSize(1).setConfiguration(configuration).addTaskManagerActionListener(executionAttemptId, ExecutionState.RUNNING, completableFuture).setJobMasterId(generate).setJobMasterGateway(new TestingJobMasterGatewayBuilder().setFencingTokenSupplier(() -> {
            return generate;
        }).setUpdateTaskExecutionStateFunction(taskExecutionState -> {
            return FutureUtils.completedExceptionally(exc);
        }).build()).useRealNonMockShuffleEnvironment().build();
        Throwable th = null;
        try {
            TaskExecutorGateway taskExecutorGateway = build.getTaskExecutorGateway();
            TaskSlotTable taskSlotTable = build.getTaskSlotTable();
            TestingAbstractInvokables.TestInvokableRecordCancel.resetGotCanceledFuture();
            taskSlotTable.allocateSlot(0, this.jobId, createSender.getAllocationId(), Time.seconds(60L));
            taskExecutorGateway.submitTask(createSender, generate, timeout).get();
            completableFuture.get();
            Assert.assertTrue(TestingAbstractInvokables.TestInvokableRecordCancel.gotCanceled().get().booleanValue());
            Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(taskSlotTable.getTask(executionAttemptId).getFailureCause(), exc.getMessage()).isPresent());
            if (build != null) {
                if (0 == 0) {
                    build.close();
                    return;
                }
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (build != null) {
                if (0 != 0) {
                    try {
                        build.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    build.close();
                }
            }
            throw th3;
        }
    }

    @Test(timeout = 10000)
    public void testRequestStackTraceSample() throws Exception {
        ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
        TaskDeploymentDescriptor createTestTaskDeploymentDescriptor = createTestTaskDeploymentDescriptor("test task", executionAttemptID, BlockingNoOpInvokable.class);
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        CompletableFuture<Void> completableFuture2 = new CompletableFuture<>();
        TaskSubmissionTestEnvironment build = new TaskSubmissionTestEnvironment.Builder(this.jobId).setSlotSize(1).addTaskManagerActionListener(executionAttemptID, ExecutionState.RUNNING, completableFuture).addTaskManagerActionListener(executionAttemptID, ExecutionState.CANCELED, completableFuture2).build();
        Throwable th = null;
        try {
            try {
                TaskExecutorGateway taskExecutorGateway = build.getTaskExecutorGateway();
                build.getTaskSlotTable().allocateSlot(0, this.jobId, createTestTaskDeploymentDescriptor.getAllocationId(), Time.seconds(60L));
                taskExecutorGateway.submitTask(createTestTaskDeploymentDescriptor, build.getJobMasterId(), timeout).get();
                completableFuture.get();
                try {
                    taskExecutorGateway.requestStackTraceSample(new ExecutionAttemptID(), 112223, 100, Time.seconds(60L), 0, timeout).get();
                } catch (Exception e) {
                    Assert.assertThat(e.getCause(), Matchers.instanceOf(IllegalStateException.class));
                    Assert.assertThat(e.getCause().getMessage(), CoreMatchers.startsWith("Cannot sample task"));
                }
                StackTraceSampleResponse stackTraceSampleResponse = (StackTraceSampleResponse) taskExecutorGateway.requestStackTraceSample(executionAttemptID, 19230, 5, Time.milliseconds(100L), 0, timeout).get();
                Assert.assertEquals(stackTraceSampleResponse.getSampleId(), 19230L);
                Assert.assertEquals(stackTraceSampleResponse.getExecutionAttemptID(), executionAttemptID);
                List<StackTraceElement[]> samples = stackTraceSampleResponse.getSamples();
                Assert.assertEquals("Number of samples", 5, samples.size());
                for (StackTraceElement[] stackTraceElementArr : samples) {
                    boolean z = false;
                    int length = stackTraceElementArr.length;
                    int i = 0;
                    while (true) {
                        if (i < length) {
                            StackTraceElement stackTraceElement = stackTraceElementArr[i];
                            if (stackTraceElement.getClassName().equals(BlockingNoOpInvokable.class.getName())) {
                                Assert.assertEquals("invoke", stackTraceElement.getMethodName());
                                z = true;
                                break;
                            } else if (stackTraceElement.getClassName().equals(TestTaskManagerActions.class.getName())) {
                                Assert.assertEquals("updateTaskExecutionState", stackTraceElement.getMethodName());
                                z = true;
                                break;
                            } else {
                                if (stackTraceElement.getClassName().equals(Thread.class) && stackTraceElement.getMethodName().equals("setContextClassLoader")) {
                                    z = true;
                                }
                                i++;
                            }
                        }
                    }
                    Assert.assertTrue("Unexpected stack trace: " + Arrays.toString(stackTraceElementArr), z);
                }
                StackTraceSampleResponse stackTraceSampleResponse2 = (StackTraceSampleResponse) taskExecutorGateway.requestStackTraceSample(executionAttemptID, 1337, 5, Time.milliseconds(100L), 2, timeout).get();
                Assert.assertEquals(1337L, stackTraceSampleResponse2.getSampleId());
                Assert.assertEquals(executionAttemptID, stackTraceSampleResponse2.getExecutionAttemptID());
                List samples2 = stackTraceSampleResponse2.getSamples();
                Assert.assertEquals("Number of samples", 5, samples2.size());
                Iterator it = samples2.iterator();
                while (it.hasNext()) {
                    Assert.assertEquals("Max depth", 2, ((StackTraceElement[]) it.next()).length);
                }
                CompletableFuture requestStackTraceSample = taskExecutorGateway.requestStackTraceSample(executionAttemptID, 44, 100, Time.milliseconds(10L), 2, timeout);
                Thread.sleep(100);
                taskExecutorGateway.cancelTask(executionAttemptID, timeout);
                completableFuture2.get();
                Assert.assertEquals(executionAttemptID, ((StackTraceSampleResponse) requestStackTraceSample.get()).getExecutionAttemptID());
                Assert.assertEquals(44L, r0.getSampleId());
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build.close();
                }
            }
            throw th4;
        }
    }

    private TaskDeploymentDescriptor createSender(NettyShuffleDescriptor nettyShuffleDescriptor) throws IOException {
        return createSender(nettyShuffleDescriptor, TestingAbstractInvokables.Sender.class);
    }

    private TaskDeploymentDescriptor createSender(NettyShuffleDescriptor nettyShuffleDescriptor, Class<? extends AbstractInvokable> cls) throws IOException {
        return createTestTaskDeploymentDescriptor("Sender", nettyShuffleDescriptor.getResultPartitionID().getProducerId(), cls, 1, Collections.singletonList(new ResultPartitionDeploymentDescriptor(new PartitionDescriptor(new IntermediateDataSetID(), nettyShuffleDescriptor.getResultPartitionID().getPartitionId(), ResultPartitionType.PIPELINED, 1, 0), nettyShuffleDescriptor, 1, true)), Collections.emptyList());
    }

    /* JADX WARN: Multi-variable type inference failed */
    private TaskDeploymentDescriptor createReceiver(NettyShuffleDescriptor nettyShuffleDescriptor) throws IOException {
        return createTestTaskDeploymentDescriptor("Receiver", new ExecutionAttemptID(), TestingAbstractInvokables.Receiver.class, 1, Collections.emptyList(), Collections.singletonList(new InputGateDeploymentDescriptor(new IntermediateDataSetID(), ResultPartitionType.PIPELINED, 0, new ShuffleDescriptor[]{nettyShuffleDescriptor})));
    }

    private TaskDeploymentDescriptor createTestTaskDeploymentDescriptor(String str, ExecutionAttemptID executionAttemptID, Class<? extends AbstractInvokable> cls) throws IOException {
        return createTestTaskDeploymentDescriptor(str, executionAttemptID, cls, 1);
    }

    private TaskDeploymentDescriptor createTestTaskDeploymentDescriptor(String str, ExecutionAttemptID executionAttemptID, Class<? extends AbstractInvokable> cls, int i) throws IOException {
        return createTestTaskDeploymentDescriptor(str, executionAttemptID, cls, i, Collections.emptyList(), Collections.emptyList());
    }

    private TaskDeploymentDescriptor createTestTaskDeploymentDescriptor(String str, ExecutionAttemptID executionAttemptID, Class<? extends AbstractInvokable> cls, int i, Collection<ResultPartitionDeploymentDescriptor> collection, Collection<InputGateDeploymentDescriptor> collection2) throws IOException {
        Preconditions.checkNotNull(collection);
        Preconditions.checkNotNull(collection2);
        return createTaskDeploymentDescriptor(this.jobId, this.testName.getMethodName(), executionAttemptID, new SerializedValue(new ExecutionConfig()), str, i, 0, 1, 0, new Configuration(), new Configuration(), cls.getName(), collection, collection2, Collections.emptyList(), Collections.emptyList(), 0);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static TaskDeploymentDescriptor createTaskDeploymentDescriptor(JobID jobID, String str, ExecutionAttemptID executionAttemptID, SerializedValue<ExecutionConfig> serializedValue, String str2, int i, int i2, int i3, int i4, Configuration configuration, Configuration configuration2, String str3, Collection<ResultPartitionDeploymentDescriptor> collection, Collection<InputGateDeploymentDescriptor> collection2, Collection<PermanentBlobKey> collection3, Collection<URL> collection4, int i5) throws IOException {
        JobInformation jobInformation = new JobInformation(jobID, str, serializedValue, configuration, collection3, collection4);
        TaskInformation taskInformation = new TaskInformation(new JobVertexID(), str2, i3, i, str3, configuration2);
        return new TaskDeploymentDescriptor(jobID, new TaskDeploymentDescriptor.NonOffloaded(new SerializedValue(jobInformation)), new TaskDeploymentDescriptor.NonOffloaded(new SerializedValue(taskInformation)), executionAttemptID, new AllocationID(), i2, i4, i5, (JobManagerTaskRestore) null, collection, collection2);
    }
}
