package org.apache.flink.streaming.runtime.tasks;

import akka.actor.ActorRef;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.SerializedValue;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskTest.class */
public class StreamTaskTest {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskTest$DummyGateway.class */
    public static class DummyGateway implements ActorGateway {
        private static final long serialVersionUID = 1;

        private DummyGateway() {
        }

        public Future<Object> ask(Object obj, FiniteDuration finiteDuration) {
            return null;
        }

        public void tell(Object obj) {
        }

        public void tell(Object obj, ActorGateway actorGateway) {
        }

        public void forward(Object obj, ActorGateway actorGateway) {
        }

        public Future<Object> retry(Object obj, int i, FiniteDuration finiteDuration, ExecutionContext executionContext) {
            return null;
        }

        public String path() {
            return null;
        }

        public ActorRef actor() {
            return null;
        }

        public UUID leaderSessionID() {
            return null;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskTest$MockSourceFunction.class */
    private static class MockSourceFunction implements SourceFunction<Long> {
        private static final long serialVersionUID = 1;

        private MockSourceFunction() {
        }

        public void run(SourceFunction.SourceContext<Long> sourceContext) {
        }

        public void cancel() {
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskTest$SlowlyDeserializingOperator.class */
    public static class SlowlyDeserializingOperator extends StreamSource<Long, SourceFunction<Long>> {
        private static final long serialVersionUID = 1;
        private volatile boolean canceled;

        public SlowlyDeserializingOperator() {
            super(new MockSourceFunction());
            this.canceled = false;
        }

        public void run(Object obj, Output<StreamRecord<Long>> output) throws Exception {
            while (!this.canceled) {
                try {
                    Thread.sleep(500L);
                } catch (InterruptedException e) {
                }
            }
        }

        public void cancel() {
            this.canceled = true;
        }

        private void readObject(ObjectInputStream objectInputStream) throws IOException, ClassNotFoundException {
            long currentTimeMillis;
            objectInputStream.defaultReadObject();
            long j = 500;
            long currentTimeMillis2 = System.currentTimeMillis() + 500;
            do {
                try {
                    Thread.sleep(j);
                } catch (InterruptedException e) {
                }
                currentTimeMillis = currentTimeMillis2 - System.currentTimeMillis();
                j = currentTimeMillis;
            } while (currentTimeMillis > 0);
        }
    }

    @Test
    public void testEarlyCanceling() {
        try {
            StreamConfig streamConfig = new StreamConfig(new Configuration());
            streamConfig.setStreamOperator(new SlowlyDeserializingOperator());
            Task createTask = createTask(SourceStreamTask.class, streamConfig);
            createTask.startTaskThread();
            while (true) {
                if (createTask.getExecutionState() != ExecutionState.CREATED && createTask.getExecutionState() != ExecutionState.DEPLOYING) {
                    break;
                } else {
                    Thread.sleep(5L);
                }
            }
            if (createTask.getExecutionState() != ExecutionState.RUNNING) {
                Assert.fail("Task entered state " + createTask.getExecutionState() + " with error " + ExceptionUtils.stringifyException(createTask.getFailureCause()));
            }
            createTask.cancelExecution();
            Assert.assertTrue(createTask.getExecutionState() == ExecutionState.CANCELING || createTask.getExecutionState() == ExecutionState.CANCELED);
            createTask.getExecutingThread().join(60000L);
            Assert.assertFalse("Task did not cancel", createTask.getExecutingThread().isAlive());
            Assert.assertEquals(ExecutionState.CANCELED, createTask.getExecutionState());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    private Task createTask(Class<? extends AbstractInvokable> cls, StreamConfig streamConfig) throws Exception {
        LibraryCacheManager libraryCacheManager = (LibraryCacheManager) Mockito.mock(LibraryCacheManager.class);
        Mockito.when(libraryCacheManager.getClassLoader((JobID) Matchers.any(JobID.class))).thenReturn(getClass().getClassLoader());
        ResultPartitionManager resultPartitionManager = (ResultPartitionManager) Mockito.mock(ResultPartitionManager.class);
        ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = (ResultPartitionConsumableNotifier) Mockito.mock(ResultPartitionConsumableNotifier.class);
        NetworkEnvironment networkEnvironment = (NetworkEnvironment) Mockito.mock(NetworkEnvironment.class);
        Mockito.when(networkEnvironment.getPartitionManager()).thenReturn(resultPartitionManager);
        Mockito.when(networkEnvironment.getPartitionConsumableNotifier()).thenReturn(resultPartitionConsumableNotifier);
        Mockito.when(networkEnvironment.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
        return new Task(new TaskDeploymentDescriptor(new JobID(), "Job Name", new JobVertexID(), new ExecutionAttemptID(), new SerializedValue(new ExecutionConfig()), "Test Task", 0, 1, 0, new Configuration(), streamConfig.getConfiguration(), cls.getName(), Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), 0), (MemoryManager) Mockito.mock(MemoryManager.class), (IOManager) Mockito.mock(IOManager.class), networkEnvironment, (BroadcastVariableManager) Mockito.mock(BroadcastVariableManager.class), new DummyGateway(), new DummyGateway(), new FiniteDuration(60L, TimeUnit.SECONDS), libraryCacheManager, (FileCache) Mockito.mock(FileCache.class), new TaskManagerRuntimeInfo("localhost", new Configuration(), System.getProperty("java.io.tmpdir")), (TaskMetricGroup) Mockito.mock(TaskMetricGroup.class));
    }
}
