/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.taskmanager;

import java.io.Serializable;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.DummyActorGateway;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.util.SerializedValue;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import scala.concurrent.duration.FiniteDuration;

public class TaskAsyncCallTest {
    private static final int NUM_CALLS = 1000;
    private static OneShotLatch awaitLatch;
    private static OneShotLatch triggerLatch;

    @Before
    public void createQueuesAndActors() {
        awaitLatch = new OneShotLatch();
        triggerLatch = new OneShotLatch();
    }

    @Test
    public void testCheckpointCallsInOrder() {
        try {
            Task task = TaskAsyncCallTest.createTask();
            task.startTaskThread();
            awaitLatch.await();
            for (int i = 1; i <= 1000; ++i) {
                task.triggerCheckpointBarrier((long)i, 156865867234L);
            }
            triggerLatch.await();
            Assert.assertFalse((boolean)task.isCanceledOrFailed());
            ExecutionState currentState = task.getExecutionState();
            if (currentState != ExecutionState.RUNNING && currentState != ExecutionState.FINISHED) {
                Assert.fail((String)("Task should be RUNNING or FINISHED, but is " + currentState));
            }
            task.cancelExecution();
            task.getExecutingThread().join();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testMixedAsyncCallsInOrder() {
        try {
            Task task = TaskAsyncCallTest.createTask();
            task.startTaskThread();
            awaitLatch.await();
            for (int i = 1; i <= 1000; ++i) {
                task.triggerCheckpointBarrier((long)i, 156865867234L);
                task.notifyCheckpointComplete((long)i);
            }
            triggerLatch.await();
            Assert.assertFalse((boolean)task.isCanceledOrFailed());
            ExecutionState currentState = task.getExecutionState();
            if (currentState != ExecutionState.RUNNING && currentState != ExecutionState.FINISHED) {
                Assert.fail((String)("Task should be RUNNING or FINISHED, but is " + currentState));
            }
            task.cancelExecution();
            task.getExecutingThread().join();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    private static Task createTask() throws Exception {
        LibraryCacheManager libCache = (LibraryCacheManager)Mockito.mock(LibraryCacheManager.class);
        Mockito.when((Object)libCache.getClassLoader((JobID)Matchers.any(JobID.class))).thenReturn((Object)ClassLoader.getSystemClassLoader());
        ResultPartitionManager partitionManager = (ResultPartitionManager)Mockito.mock(ResultPartitionManager.class);
        ResultPartitionConsumableNotifier consumableNotifier = (ResultPartitionConsumableNotifier)Mockito.mock(ResultPartitionConsumableNotifier.class);
        NetworkEnvironment networkEnvironment = (NetworkEnvironment)Mockito.mock(NetworkEnvironment.class);
        Mockito.when((Object)networkEnvironment.getPartitionManager()).thenReturn((Object)partitionManager);
        Mockito.when((Object)networkEnvironment.getPartitionConsumableNotifier()).thenReturn((Object)consumableNotifier);
        Mockito.when((Object)networkEnvironment.getDefaultIOMode()).thenReturn((Object)IOManager.IOMode.SYNC);
        TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(new JobID(), "Job Name", new JobVertexID(), new ExecutionAttemptID(), new SerializedValue((Object)new ExecutionConfig()), "Test Task", 0, 1, 0, new Configuration(), new Configuration(), CheckpointsInOrderInvokable.class.getName(), Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), 0);
        DummyActorGateway taskManagerGateway = DummyActorGateway.INSTANCE;
        return new Task(tdd, (MemoryManager)Mockito.mock(MemoryManager.class), (IOManager)Mockito.mock(IOManager.class), networkEnvironment, (BroadcastVariableManager)Mockito.mock(BroadcastVariableManager.class), (ActorGateway)taskManagerGateway, (ActorGateway)DummyActorGateway.INSTANCE, new FiniteDuration(60L, TimeUnit.SECONDS), libCache, (FileCache)Mockito.mock(FileCache.class), new TaskManagerRuntimeInfo("localhost", new Configuration(), System.getProperty("java.io.tmpdir")), (TaskMetricGroup)Mockito.mock(TaskMetricGroup.class));
    }

    public static class CheckpointsInOrderInvokable
    extends AbstractInvokable
    implements StatefulTask<StateHandle<Serializable>> {
        private volatile long lastCheckpointId = 0L;
        private volatile Exception error;

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void invoke() throws Exception {
            awaitLatch.trigger();
            CheckpointsInOrderInvokable checkpointsInOrderInvokable = this;
            synchronized (checkpointsInOrderInvokable) {
                while (this.error == null && this.lastCheckpointId < 1000L) {
                    ((Object)((Object)this)).wait();
                }
            }
            triggerLatch.trigger();
            if (this.error != null) {
                throw this.error;
            }
        }

        public void setInitialState(StateHandle<Serializable> stateHandle) throws Exception {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public boolean triggerCheckpoint(long checkpointId, long timestamp) {
            ++this.lastCheckpointId;
            if (checkpointId == this.lastCheckpointId) {
                if (this.lastCheckpointId == 1000L) {
                    triggerLatch.trigger();
                }
            } else if (this.error == null) {
                this.error = new Exception("calls out of order");
                CheckpointsInOrderInvokable checkpointsInOrderInvokable = this;
                synchronized (checkpointsInOrderInvokable) {
                    ((Object)((Object)this)).notifyAll();
                }
            }
            return true;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void notifyCheckpointComplete(long checkpointId) {
            if (checkpointId != this.lastCheckpointId && this.error == null) {
                this.error = new Exception("calls out of order");
                CheckpointsInOrderInvokable checkpointsInOrderInvokable = this;
                synchronized (checkpointsInOrderInvokable) {
                    ((Object)((Object)this)).notifyAll();
                }
            }
        }
    }
}

