/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.util.Collections;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.blob.PermanentBlobCache;
import org.apache.flink.runtime.blob.TransientBlobCache;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
import org.apache.flink.runtime.taskexecutor.KvStateService;
import org.apache.flink.runtime.taskexecutor.PartitionProducerStateChecker;
import org.apache.flink.runtime.taskexecutor.TestGlobalAggregateManager;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.SerializedValue;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

public class SynchronousCheckpointITCase {
    private static LinkedBlockingQueue<Event> eventQueue = new LinkedBlockingQueue();
    @Rule
    public final Timeout timeoutPerTest = Timeout.seconds((long)10L);

    @Test
    public void taskDispatcherThreadPoolAllowsForSynchronousCheckpoints() throws Exception {
        Task task = this.createTask(SynchronousCheckpointTestingTask.class);
        try (TaskCleaner ignored = new TaskCleaner(task);){
            task.startTaskThread();
            Assert.assertThat((Object)((Object)eventQueue.take()), (Matcher)Matchers.is((Object)((Object)Event.TASK_IS_RUNNING)));
            Assert.assertTrue((boolean)eventQueue.isEmpty());
            Assert.assertEquals((Object)ExecutionState.RUNNING, (Object)task.getExecutionState());
            task.triggerCheckpointBarrier(42L, 156865867234L, new CheckpointOptions(CheckpointType.SYNC_SAVEPOINT, CheckpointStorageLocationReference.getDefault()), true);
            Assert.assertThat((Object)((Object)eventQueue.take()), (Matcher)Matchers.is((Object)((Object)Event.PRE_TRIGGER_CHECKPOINT)));
            Assert.assertThat((Object)((Object)eventQueue.take()), (Matcher)Matchers.is((Object)((Object)Event.POST_TRIGGER_CHECKPOINT)));
            Assert.assertTrue((boolean)eventQueue.isEmpty());
            task.notifyCheckpointComplete(42L);
            Assert.assertThat((Object)((Object)eventQueue.take()), (Matcher)Matchers.is((Object)((Object)Event.PRE_NOTIFY_CHECKPOINT_COMPLETE)));
            Assert.assertThat((Object)((Object)eventQueue.take()), (Matcher)Matchers.is((Object)((Object)Event.POST_NOTIFY_CHECKPOINT_COMPLETE)));
            Assert.assertTrue((boolean)eventQueue.isEmpty());
            Assert.assertEquals((Object)ExecutionState.RUNNING, (Object)task.getExecutionState());
        }
    }

    private Task createTask(Class<? extends AbstractInvokable> invokableClass) throws Exception {
        BlobCacheService blobService = new BlobCacheService((PermanentBlobCache)Mockito.mock(PermanentBlobCache.class), (TransientBlobCache)Mockito.mock(TransientBlobCache.class));
        LibraryCacheManager libCache = (LibraryCacheManager)Mockito.mock(LibraryCacheManager.class);
        Mockito.when((Object)libCache.getClassLoader((JobID)ArgumentMatchers.any(JobID.class))).thenReturn((Object)ClassLoader.getSystemClassLoader());
        NoOpResultPartitionConsumableNotifier consumableNotifier = new NoOpResultPartitionConsumableNotifier();
        PartitionProducerStateChecker partitionProducerStateChecker = (PartitionProducerStateChecker)Mockito.mock(PartitionProducerStateChecker.class);
        Executor executor = (Executor)Mockito.mock(Executor.class);
        NettyShuffleEnvironment shuffleEnvironment = new NettyShuffleEnvironmentBuilder().build();
        TaskMetricGroup taskMetricGroup = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
        JobInformation jobInformation = new JobInformation(new JobID(), "Job Name", new SerializedValue((Object)new ExecutionConfig()), new Configuration(), Collections.emptyList(), Collections.emptyList());
        TaskInformation taskInformation = new TaskInformation(new JobVertexID(), "Test Task", 1, 1, invokableClass.getName(), new Configuration());
        return new Task(jobInformation, taskInformation, new ExecutionAttemptID(), new AllocationID(), 0, 0, Collections.emptyList(), Collections.emptyList(), 0, (MemoryManager)Mockito.mock(MemoryManager.class), (IOManager)Mockito.mock(IOManager.class), (ShuffleEnvironment)shuffleEnvironment, new KvStateService(new KvStateRegistry(), null, null), (BroadcastVariableManager)Mockito.mock(BroadcastVariableManager.class), new TaskEventDispatcher(), (TaskStateManager)new TestTaskStateManager(), (TaskManagerActions)Mockito.mock(TaskManagerActions.class), (InputSplitProvider)Mockito.mock(InputSplitProvider.class), (CheckpointResponder)Mockito.mock(CheckpointResponder.class), (GlobalAggregateManager)new TestGlobalAggregateManager(), blobService, libCache, (FileCache)Mockito.mock(FileCache.class), (TaskManagerRuntimeInfo)new TestingTaskManagerRuntimeInfo(), taskMetricGroup, (ResultPartitionConsumableNotifier)consumableNotifier, partitionProducerStateChecker, executor);
    }

    private static class TaskCleaner
    implements AutoCloseable {
        private final Task task;

        private TaskCleaner(Task task) {
            this.task = task;
        }

        @Override
        public void close() throws Exception {
            this.task.cancelExecution();
            this.task.getExecutingThread().join(5000L);
        }
    }

    private static enum Event {
        TASK_IS_RUNNING,
        PRE_TRIGGER_CHECKPOINT,
        PRE_NOTIFY_CHECKPOINT_COMPLETE,
        POST_NOTIFY_CHECKPOINT_COMPLETE,
        POST_TRIGGER_CHECKPOINT;

    }

    public static class SynchronousCheckpointTestingTask
    extends StreamTask {
        private boolean isRunning;

        public SynchronousCheckpointTestingTask(Environment environment) {
            super(environment);
        }

        protected void processInput(StreamTask.ActionContext context) throws Exception {
            if (!this.isRunning) {
                this.isRunning = true;
                eventQueue.put(Event.TASK_IS_RUNNING);
            }
            if (this.isCanceled()) {
                context.allActionsCompleted();
            } else {
                context.actionsUnavailable();
            }
        }

        public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) throws Exception {
            eventQueue.put(Event.PRE_TRIGGER_CHECKPOINT);
            boolean result = super.triggerCheckpoint(checkpointMetaData, checkpointOptions, advanceToEndOfEventTime);
            eventQueue.put(Event.POST_TRIGGER_CHECKPOINT);
            return result;
        }

        public void notifyCheckpointComplete(long checkpointId) throws Exception {
            eventQueue.put(Event.PRE_NOTIFY_CHECKPOINT_COMPLETE);
            super.notifyCheckpointComplete(checkpointId);
            eventQueue.put(Event.POST_NOTIFY_CHECKPOINT_COMPLETE);
        }

        protected void init() {
        }

        public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) {
            throw new UnsupportedOperationException("Should not be called");
        }

        public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) {
            throw new UnsupportedOperationException("Should not be called");
        }

        protected void cleanup() {
        }
    }
}

