/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import akka.actor.ActorRef;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.SourceStreamTask;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.SerializedValue;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;

public class StreamTaskTest {
    @Test
    public void testEarlyCanceling() {
        try {
            StreamConfig cfg = new StreamConfig(new Configuration());
            cfg.setStreamOperator((StreamOperator)new SlowlyDeserializingOperator());
            Task task = this.createTask(SourceStreamTask.class, cfg);
            task.startTaskThread();
            while (task.getExecutionState() == ExecutionState.CREATED || task.getExecutionState() == ExecutionState.DEPLOYING) {
                Thread.sleep(5L);
            }
            if (task.getExecutionState() != ExecutionState.RUNNING) {
                Assert.fail((String)("Task entered state " + task.getExecutionState() + " with error " + ExceptionUtils.stringifyException((Throwable)task.getFailureCause())));
            }
            task.cancelExecution();
            Assert.assertTrue((task.getExecutionState() == ExecutionState.CANCELING || task.getExecutionState() == ExecutionState.CANCELED ? 1 : 0) != 0);
            task.getExecutingThread().join(60000L);
            Assert.assertFalse((String)"Task did not cancel", (boolean)task.getExecutingThread().isAlive());
            Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)task.getExecutionState());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    private Task createTask(Class<? extends AbstractInvokable> invokable, StreamConfig taskConfig) throws Exception {
        LibraryCacheManager libCache = (LibraryCacheManager)Mockito.mock(LibraryCacheManager.class);
        Mockito.when((Object)libCache.getClassLoader((JobID)Matchers.any(JobID.class))).thenReturn((Object)this.getClass().getClassLoader());
        ResultPartitionManager partitionManager = (ResultPartitionManager)Mockito.mock(ResultPartitionManager.class);
        ResultPartitionConsumableNotifier consumableNotifier = (ResultPartitionConsumableNotifier)Mockito.mock(ResultPartitionConsumableNotifier.class);
        NetworkEnvironment network = (NetworkEnvironment)Mockito.mock(NetworkEnvironment.class);
        Mockito.when((Object)network.getPartitionManager()).thenReturn((Object)partitionManager);
        Mockito.when((Object)network.getPartitionConsumableNotifier()).thenReturn((Object)consumableNotifier);
        Mockito.when((Object)network.getDefaultIOMode()).thenReturn((Object)IOManager.IOMode.SYNC);
        TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(new JobID(), "Job Name", new JobVertexID(), new ExecutionAttemptID(), new SerializedValue((Object)new ExecutionConfig()), "Test Task", 0, 1, 0, new Configuration(), taskConfig.getConfiguration(), invokable.getName(), Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), 0);
        return new Task(tdd, (MemoryManager)Mockito.mock(MemoryManager.class), (IOManager)Mockito.mock(IOManager.class), network, (BroadcastVariableManager)Mockito.mock(BroadcastVariableManager.class), (ActorGateway)new DummyGateway(), (ActorGateway)new DummyGateway(), new FiniteDuration(60L, TimeUnit.SECONDS), libCache, (FileCache)Mockito.mock(FileCache.class), new TaskManagerRuntimeInfo("localhost", new Configuration(), System.getProperty("java.io.tmpdir")), (TaskMetricGroup)Mockito.mock(TaskMetricGroup.class));
    }

    private static class DummyGateway
    implements ActorGateway {
        private static final long serialVersionUID = 1L;

        private DummyGateway() {
        }

        public Future<Object> ask(Object message, FiniteDuration timeout) {
            return null;
        }

        public void tell(Object message) {
        }

        public void tell(Object message, ActorGateway sender) {
        }

        public void forward(Object message, ActorGateway sender) {
        }

        public Future<Object> retry(Object message, int numberRetries, FiniteDuration timeout, ExecutionContext executionContext) {
            return null;
        }

        public String path() {
            return null;
        }

        public ActorRef actor() {
            return null;
        }

        public UUID leaderSessionID() {
            return null;
        }
    }

    private static class MockSourceFunction
    implements SourceFunction<Long> {
        private static final long serialVersionUID = 1L;

        private MockSourceFunction() {
        }

        public void run(SourceFunction.SourceContext<Long> ctx) {
        }

        public void cancel() {
        }
    }

    public static class SlowlyDeserializingOperator
    extends StreamSource<Long, SourceFunction<Long>> {
        private static final long serialVersionUID = 1L;
        private volatile boolean canceled = false;

        public SlowlyDeserializingOperator() {
            super((SourceFunction)new MockSourceFunction());
        }

        public void run(Object lockingObject, Output<StreamRecord<Long>> collector) throws Exception {
            while (!this.canceled) {
                try {
                    Thread.sleep(500L);
                }
                catch (InterruptedException interruptedException) {}
            }
        }

        public void cancel() {
            this.canceled = true;
        }

        private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
            in.defaultReadObject();
            long delay = 500L;
            long deadline = System.currentTimeMillis() + delay;
            do {
                try {
                    Thread.sleep(delay);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
            } while ((delay = deadline - System.currentTimeMillis()) > 0L);
        }
    }
}

