package org.apache.flink.runtime.taskmanager;

import java.io.Serializable;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.instance.DummyActorGateway;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.state.StateHandle;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.class */
public class TaskAsyncCallTest {
    private static final int NUM_CALLS = 1000;
    private static OneShotLatch awaitLatch;
    private static OneShotLatch triggerLatch;

    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskAsyncCallTest$CheckpointsInOrderInvokable.class */
    public static class CheckpointsInOrderInvokable extends AbstractInvokable implements StatefulTask<StateHandle<Serializable>> {
        private volatile long lastCheckpointId = 0;
        private volatile Exception error;

        public void invoke() throws Exception {
            TaskAsyncCallTest.awaitLatch.trigger();
            synchronized (this) {
                while (this.error == null && this.lastCheckpointId < 1000) {
                    wait();
                }
            }
            TaskAsyncCallTest.triggerLatch.trigger();
            if (this.error != null) {
                throw this.error;
            }
        }

        public void setInitialState(StateHandle<Serializable> stateHandle, long j) throws Exception {
        }

        public boolean triggerCheckpoint(long j, long j2) {
            this.lastCheckpointId++;
            if (j == this.lastCheckpointId) {
                if (this.lastCheckpointId != 1000) {
                    return true;
                }
                TaskAsyncCallTest.triggerLatch.trigger();
                return true;
            }
            if (this.error != null) {
                return true;
            }
            this.error = new Exception("calls out of order");
            synchronized (this) {
                notifyAll();
            }
            return true;
        }

        public void notifyCheckpointComplete(long j) {
            if (j == this.lastCheckpointId || this.error != null) {
                return;
            }
            this.error = new Exception("calls out of order");
            synchronized (this) {
                notifyAll();
            }
        }
    }

    @Before
    public void createQueuesAndActors() {
        awaitLatch = new OneShotLatch();
        triggerLatch = new OneShotLatch();
    }

    @Test
    public void testCheckpointCallsInOrder() {
        try {
            Task createTask = createTask();
            createTask.startTaskThread();
            awaitLatch.await();
            for (int i = 1; i <= NUM_CALLS; i++) {
                createTask.triggerCheckpointBarrier(i, 156865867234L);
            }
            triggerLatch.await();
            Assert.assertFalse(createTask.isCanceledOrFailed());
            ExecutionState executionState = createTask.getExecutionState();
            if (executionState != ExecutionState.RUNNING && executionState != ExecutionState.FINISHED) {
                Assert.fail("Task should be RUNNING or FINISHED, but is " + executionState);
            }
            createTask.cancelExecution();
            createTask.getExecutingThread().join();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testMixedAsyncCallsInOrder() {
        try {
            Task createTask = createTask();
            createTask.startTaskThread();
            awaitLatch.await();
            for (int i = 1; i <= NUM_CALLS; i++) {
                createTask.triggerCheckpointBarrier(i, 156865867234L);
                createTask.notifyCheckpointComplete(i);
            }
            triggerLatch.await();
            Assert.assertFalse(createTask.isCanceledOrFailed());
            ExecutionState executionState = createTask.getExecutionState();
            if (executionState != ExecutionState.RUNNING && executionState != ExecutionState.FINISHED) {
                Assert.fail("Task should be RUNNING or FINISHED, but is " + executionState);
            }
            createTask.cancelExecution();
            createTask.getExecutingThread().join();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    private static Task createTask() {
        LibraryCacheManager libraryCacheManager = (LibraryCacheManager) Mockito.mock(LibraryCacheManager.class);
        Mockito.when(libraryCacheManager.getClassLoader((JobID) Matchers.any(JobID.class))).thenReturn(ClassLoader.getSystemClassLoader());
        ResultPartitionManager resultPartitionManager = (ResultPartitionManager) Mockito.mock(ResultPartitionManager.class);
        ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = (ResultPartitionConsumableNotifier) Mockito.mock(ResultPartitionConsumableNotifier.class);
        NetworkEnvironment networkEnvironment = (NetworkEnvironment) Mockito.mock(NetworkEnvironment.class);
        Mockito.when(networkEnvironment.getPartitionManager()).thenReturn(resultPartitionManager);
        Mockito.when(networkEnvironment.getPartitionConsumableNotifier()).thenReturn(resultPartitionConsumableNotifier);
        Mockito.when(networkEnvironment.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
        return new Task(new TaskDeploymentDescriptor(new JobID(), new JobVertexID(), new ExecutionAttemptID(), "Test Task", 0, 1, 0, new Configuration(), new Configuration(), CheckpointsInOrderInvokable.class.getName(), Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), 0), (MemoryManager) Mockito.mock(MemoryManager.class), (IOManager) Mockito.mock(IOManager.class), networkEnvironment, (BroadcastVariableManager) Mockito.mock(BroadcastVariableManager.class), DummyActorGateway.INSTANCE, DummyActorGateway.INSTANCE, new FiniteDuration(60L, TimeUnit.SECONDS), libraryCacheManager, (FileCache) Mockito.mock(FileCache.class), new TaskManagerRuntimeInfo("localhost", new Configuration()));
    }
}
