/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import akka.actor.ActorRef;
import java.io.Closeable;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.AsynchronousStateHandle;
import org.apache.flink.runtime.state.StateBackendFactory;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OperatorChain;
import org.apache.flink.streaming.runtime.tasks.SourceStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.SerializedValue;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.internal.util.reflection.Whitebox;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;

public class StreamTaskTest {
    private static OneShotLatch SYNC_LATCH;

    @Test
    public void testEarlyCanceling() {
        try {
            StreamConfig cfg = new StreamConfig(new Configuration());
            cfg.setStreamOperator((StreamOperator)new SlowlyDeserializingOperator());
            Task task = this.createTask(SourceStreamTask.class, cfg, new Configuration());
            task.startTaskThread();
            while (task.getExecutionState() == ExecutionState.CREATED || task.getExecutionState() == ExecutionState.DEPLOYING) {
                Thread.sleep(5L);
            }
            if (task.getExecutionState() != ExecutionState.RUNNING) {
                Assert.fail((String)("Task entered state " + task.getExecutionState() + " with error " + ExceptionUtils.stringifyException((Throwable)task.getFailureCause())));
            }
            task.cancelExecution();
            Assert.assertTrue((task.getExecutionState() == ExecutionState.CANCELING || task.getExecutionState() == ExecutionState.CANCELED ? 1 : 0) != 0);
            task.getExecutingThread().join(60000L);
            Assert.assertFalse((String)"Task did not cancel", (boolean)task.getExecutingThread().isAlive());
            Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)task.getExecutionState());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testStateBackendLoading() throws Exception {
        Configuration taskManagerConfig = new Configuration();
        taskManagerConfig.setString("state.backend", MockStateBackend.class.getName());
        StreamConfig cfg = new StreamConfig(new Configuration());
        cfg.setStreamOperator((StreamOperator)new StreamSource((SourceFunction)new MockSourceFunction()));
        cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        Task task = this.createTask(SourceStreamTask.class, cfg, taskManagerConfig);
        task.startTaskThread();
        task.getExecutingThread().join();
        Assert.assertEquals((Object)ExecutionState.FINISHED, (Object)task.getExecutionState());
    }

    @Test
    public void testCancellationNotBlockedOnLock() throws Exception {
        SYNC_LATCH = new OneShotLatch();
        StreamConfig cfg = new StreamConfig(new Configuration());
        cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        Task task = this.createTask(CancelLockingTask.class, cfg, new Configuration());
        task.startTaskThread();
        SYNC_LATCH.await();
        task.cancelExecution();
        task.getExecutingThread().join();
        Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)task.getExecutionState());
    }

    @Test
    public void testCancellationFailsWithBlockingLock() throws Exception {
        SYNC_LATCH = new OneShotLatch();
        StreamConfig cfg = new StreamConfig(new Configuration());
        cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        Task task = this.createTask(CancelFailingTask.class, cfg, new Configuration());
        task.startTaskThread();
        SYNC_LATCH.await();
        task.cancelExecution();
        task.getExecutingThread().join();
        Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)task.getExecutionState());
    }

    @Test
    public void testStateCleanupWhenFailingCheckpoint() throws Exception {
        long checkpointId = 1L;
        long timestamp = 42L;
        TestingStreamTask streamTask = new TestingStreamTask();
        streamTask.setEnvironment((Environment)new DummyEnvironment("test task", 1, 0));
        OperatorChain operatorChain = (OperatorChain)Mockito.mock(OperatorChain.class);
        StreamOperator firstOperator = (StreamOperator)Mockito.mock(StreamOperator.class);
        StreamTaskState firstStreamTaskState = (StreamTaskState)Mockito.mock(StreamTaskState.class);
        StreamOperator secondOperator = (StreamOperator)Mockito.mock(StreamOperator.class);
        ((StreamOperator)Mockito.doReturn((Object)firstStreamTaskState).when((Object)firstOperator)).snapshotOperatorState(Matchers.anyLong(), Matchers.anyLong());
        ((StreamOperator)Mockito.doThrow((Throwable)new Exception("Test Exception")).when((Object)secondOperator)).snapshotOperatorState(Matchers.anyLong(), Matchers.anyLong());
        ((OperatorChain)Mockito.doReturn((Object)new StreamOperator[]{firstOperator, secondOperator}).when((Object)operatorChain)).getAllOperators();
        Whitebox.setInternalState((Object)((Object)streamTask), (String)"operatorChain", (Object)operatorChain);
        Whitebox.setInternalState((Object)((Object)streamTask), (String)"isRunning", (Object)true);
        try {
            streamTask.triggerCheckpoint(1L, 42L);
            Assert.fail((String)"Expected exception here.");
        }
        catch (Exception exception) {
            // empty catch block
        }
        ((StreamTaskState)Mockito.verify((Object)firstStreamTaskState)).discardState();
    }

    @Test
    public void testAsyncCheckpointThreadStateCleanup() throws Exception {
        long checkpointId = 1L;
        StreamTaskState firstState = (StreamTaskState)Mockito.mock(StreamTaskState.class);
        StreamTaskState secondState = (StreamTaskState)Mockito.mock(StreamTaskState.class);
        AsynchronousStateHandle functionStateHandle = (AsynchronousStateHandle)Mockito.mock(AsynchronousStateHandle.class);
        ((StreamTaskState)Mockito.doReturn((Object)functionStateHandle).when((Object)firstState)).getFunctionState();
        ((AsynchronousStateHandle)Mockito.doThrow((Throwable)new Exception("Test exception")).when((Object)functionStateHandle)).materialize();
        StreamTask owner = (StreamTask)Mockito.mock(StreamTask.class);
        StreamTaskState[] states = new StreamTaskState[]{firstState, secondState};
        StreamTask.AsyncCheckpointThread asyncCheckpointThread = new StreamTask.AsyncCheckpointThread("AsyncCheckpointThread", owner, new HashSet(), states, 1L);
        asyncCheckpointThread.run();
        for (StreamTaskState streamTaskState : states) {
            ((StreamTaskState)Mockito.verify((Object)streamTaskState)).discardState();
        }
    }

    private Task createTask(Class<? extends AbstractInvokable> invokable, StreamConfig taskConfig, Configuration taskManagerConfig) throws Exception {
        LibraryCacheManager libCache = (LibraryCacheManager)Mockito.mock(LibraryCacheManager.class);
        Mockito.when((Object)libCache.getClassLoader((JobID)Matchers.any(JobID.class))).thenReturn((Object)this.getClass().getClassLoader());
        ResultPartitionManager partitionManager = (ResultPartitionManager)Mockito.mock(ResultPartitionManager.class);
        ResultPartitionConsumableNotifier consumableNotifier = (ResultPartitionConsumableNotifier)Mockito.mock(ResultPartitionConsumableNotifier.class);
        NetworkEnvironment network = (NetworkEnvironment)Mockito.mock(NetworkEnvironment.class);
        Mockito.when((Object)network.getPartitionManager()).thenReturn((Object)partitionManager);
        Mockito.when((Object)network.getPartitionConsumableNotifier()).thenReturn((Object)consumableNotifier);
        Mockito.when((Object)network.getDefaultIOMode()).thenReturn((Object)IOManager.IOMode.SYNC);
        JobInformation jobInformation = new JobInformation(new JobID(), "Job Name", new SerializedValue((Object)new ExecutionConfig()), new Configuration(), Collections.emptyList(), Collections.emptyList());
        TaskInformation taskInformation = new TaskInformation(new JobVertexID(), "Test Task", 1, invokable.getName(), taskConfig.getConfiguration());
        return new Task(jobInformation, taskInformation, new ExecutionAttemptID(), 0, 0, Collections.emptyList(), Collections.emptyList(), 0, null, (MemoryManager)Mockito.mock(MemoryManager.class), (IOManager)Mockito.mock(IOManager.class), network, (BroadcastVariableManager)Mockito.mock(BroadcastVariableManager.class), (ActorGateway)new DummyGateway(), (ActorGateway)new DummyGateway(), new FiniteDuration(60L, TimeUnit.SECONDS), libCache, (FileCache)Mockito.mock(FileCache.class), new TaskManagerRuntimeInfo("localhost", taskManagerConfig, System.getProperty("java.io.tmpdir")), (TaskMetricGroup)new UnregisteredTaskMetricsGroup());
    }

    private static class TestingStreamTask
    extends StreamTask<Integer, StreamOperator<Integer>> {
        private TestingStreamTask() {
        }

        protected void init() throws Exception {
        }

        protected void run() throws Exception {
        }

        protected void cleanup() throws Exception {
        }

        protected void cancelTask() throws Exception {
        }
    }

    private static final class LockHolder
    extends Thread
    implements Closeable {
        private final OneShotLatch trigger;
        private final Object lock;
        private volatile boolean canceled;

        private LockHolder(Object lock, OneShotLatch trigger) {
            this.lock = lock;
            this.trigger = trigger;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            Object object = this.lock;
            synchronized (object) {
                while (!this.canceled) {
                    this.trigger.trigger();
                    try {
                        Thread.sleep(1000000000L);
                    }
                    catch (InterruptedException interruptedException) {}
                }
            }
        }

        public void cancel() {
            this.canceled = true;
        }

        @Override
        public void close() {
            this.canceled = true;
            this.interrupt();
        }
    }

    public static class CancelFailingTask
    extends StreamTask<String, AbstractStreamOperator<String>> {
        protected void init() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected void run() throws Exception {
            OneShotLatch latch = new OneShotLatch();
            Object lock = new Object();
            holder.start();
            try (LockHolder holder = new LockHolder(lock, latch);){
                Set canceleables = this.getCancelables();
                Object object = canceleables;
                synchronized (object) {
                    canceleables.add(holder);
                }
                latch.await();
                SYNC_LATCH.trigger();
                object = lock;
                synchronized (object) {
                }
            }
        }

        protected void cleanup() {
        }

        protected void cancelTask() throws Exception {
            throw new Exception("test exception");
        }
    }

    public static class CancelLockingTask
    extends StreamTask<String, AbstractStreamOperator<String>> {
        private final OneShotLatch latch = new OneShotLatch();
        private LockHolder holder;

        protected void init() {
        }

        protected void run() throws Exception {
            this.holder = new LockHolder(this.getCheckpointLock(), this.latch);
            this.holder.start();
            this.latch.await();
            SYNC_LATCH.trigger();
            try {
                Thread.sleep(100000000L);
            }
            catch (InterruptedException ignored) {
                Thread.currentThread().interrupt();
            }
        }

        protected void cleanup() {
            this.holder.close();
        }

        protected void cancelTask() {
            this.holder.cancel();
        }
    }

    public static final class MockStateBackend
    implements StateBackendFactory<AbstractStateBackend> {
        public AbstractStateBackend createFromConfig(Configuration config) throws Exception {
            return (AbstractStateBackend)Mockito.mock(AbstractStateBackend.class);
        }
    }

    private static class DummyGateway
    implements ActorGateway {
        private static final long serialVersionUID = 1L;

        private DummyGateway() {
        }

        public Future<Object> ask(Object message, FiniteDuration timeout) {
            return null;
        }

        public void tell(Object message) {
        }

        public void tell(Object message, ActorGateway sender) {
        }

        public void forward(Object message, ActorGateway sender) {
        }

        public Future<Object> retry(Object message, int numberRetries, FiniteDuration timeout, ExecutionContext executionContext) {
            return null;
        }

        public String path() {
            return null;
        }

        public ActorRef actor() {
            return null;
        }

        public UUID leaderSessionID() {
            return null;
        }
    }

    private static class MockSourceFunction
    implements SourceFunction<Long> {
        private static final long serialVersionUID = 1L;

        private MockSourceFunction() {
        }

        public void run(SourceFunction.SourceContext<Long> ctx) {
        }

        public void cancel() {
        }
    }

    public static class SlowlyDeserializingOperator
    extends StreamSource<Long, SourceFunction<Long>> {
        private static final long serialVersionUID = 1L;
        private volatile boolean canceled = false;

        public SlowlyDeserializingOperator() {
            super((SourceFunction)new MockSourceFunction());
        }

        public void run(Object lockingObject, Output<StreamRecord<Long>> collector) throws Exception {
            while (!this.canceled) {
                try {
                    Thread.sleep(500L);
                }
                catch (InterruptedException interruptedException) {}
            }
        }

        public void cancel() {
            this.canceled = true;
        }

        private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
            in.defaultReadObject();
            long delay = 500L;
            long deadline = System.currentTimeMillis() + delay;
            do {
                try {
                    Thread.sleep(delay);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
            } while ((delay = deadline - System.currentTimeMillis()) > 0L);
        }
    }
}

