/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.io.EOFException;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.Executor;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.blob.PermanentBlobCache;
import org.apache.flink.runtime.blob.PermanentBlobService;
import org.apache.flink.runtime.blob.TransientBlobCache;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.OperatorStreamStateHandle;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StatePartitionStreamProvider;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.runtime.tasks.SourceStreamTask;
import org.apache.flink.util.SerializedValue;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

public class InterruptSensitiveRestoreTest {
    private static final OneShotLatch IN_RESTORE_LATCH = new OneShotLatch();
    private static final int OPERATOR_MANAGED = 0;
    private static final int OPERATOR_RAW = 1;
    private static final int KEYED_MANAGED = 2;
    private static final int KEYED_RAW = 3;

    @Test
    public void testRestoreWithInterruptOperatorManaged() throws Exception {
        this.testRestoreWithInterrupt(0);
    }

    @Test
    public void testRestoreWithInterruptOperatorRaw() throws Exception {
        this.testRestoreWithInterrupt(1);
    }

    @Test
    public void testRestoreWithInterruptKeyedManaged() throws Exception {
        this.testRestoreWithInterrupt(2);
    }

    @Test
    public void testRestoreWithInterruptKeyedRaw() throws Exception {
        this.testRestoreWithInterrupt(3);
    }

    private void testRestoreWithInterrupt(int mode) throws Exception {
        IN_RESTORE_LATCH.reset();
        Configuration taskConfig = new Configuration();
        StreamConfig cfg = new StreamConfig(taskConfig);
        cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        switch (mode) {
            case 0: 
            case 1: 
            case 2: 
            case 3: {
                cfg.setStateKeySerializer((TypeSerializer)IntSerializer.INSTANCE);
                cfg.setStreamOperator((StreamOperator)new StreamSource((SourceFunction)new TestSource(mode)));
                break;
            }
            default: {
                throw new IllegalArgumentException();
            }
        }
        InterruptLockingStateHandle lockingHandle = new InterruptLockingStateHandle();
        Task task = InterruptSensitiveRestoreTest.createTask(cfg, taskConfig, lockingHandle, mode);
        task.startTaskThread();
        IN_RESTORE_LATCH.await();
        task.cancelExecution();
        task.getExecutingThread().join(30000L);
        if (task.getExecutionState() == ExecutionState.CANCELING) {
            Assert.fail((String)"Task is stuck and not canceling");
        }
        Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)task.getExecutionState());
        Assert.assertNull((Object)task.getFailureCause());
    }

    private static Task createTask(StreamConfig streamConfig, Configuration taskConfig, StreamStateHandle state, int mode) throws IOException {
        TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
        NetworkEnvironment networkEnvironment = (NetworkEnvironment)Mockito.mock(NetworkEnvironment.class);
        Mockito.when((Object)networkEnvironment.createKvStateTaskRegistry((JobID)Mockito.any(JobID.class), (JobVertexID)Mockito.any(JobVertexID.class))).thenReturn(Mockito.mock(TaskKvStateRegistry.class));
        Mockito.when((Object)networkEnvironment.getTaskEventDispatcher()).thenReturn((Object)taskEventDispatcher);
        List<Object> keyedStateFromBackend = Collections.emptyList();
        List<Object> keyedStateFromStream = Collections.emptyList();
        List<Object> operatorStateBackend = Collections.emptyList();
        List<Object> operatorStateStream = Collections.emptyList();
        HashMap<String, OperatorStateHandle.StateMetaInfo> operatorStateMetadata = new HashMap<String, OperatorStateHandle.StateMetaInfo>(1);
        OperatorStateHandle.StateMetaInfo metaInfo = new OperatorStateHandle.StateMetaInfo(new long[]{0L}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE);
        operatorStateMetadata.put("_default_", metaInfo);
        KeyGroupRangeOffsets keyGroupRangeOffsets = new KeyGroupRangeOffsets(new KeyGroupRange(0, 0));
        List<OperatorStreamStateHandle> operatorStateHandles = Collections.singletonList(new OperatorStreamStateHandle(operatorStateMetadata, state));
        List<KeyGroupsStateHandle> keyedStateHandles = Collections.singletonList(new KeyGroupsStateHandle(keyGroupRangeOffsets, state));
        switch (mode) {
            case 0: {
                operatorStateBackend = operatorStateHandles;
                break;
            }
            case 1: {
                operatorStateStream = operatorStateHandles;
                break;
            }
            case 2: {
                keyedStateFromBackend = keyedStateHandles;
                break;
            }
            case 3: {
                keyedStateFromStream = keyedStateHandles;
                break;
            }
            default: {
                throw new IllegalArgumentException();
            }
        }
        OperatorSubtaskState operatorSubtaskState = new OperatorSubtaskState(new StateObjectCollection(operatorStateBackend), new StateObjectCollection(operatorStateStream), new StateObjectCollection(keyedStateFromBackend), new StateObjectCollection(keyedStateFromStream));
        JobVertexID jobVertexID = new JobVertexID();
        OperatorID operatorID = OperatorID.fromJobVertexID((JobVertexID)jobVertexID);
        streamConfig.setOperatorID(operatorID);
        TaskStateSnapshot stateSnapshot = new TaskStateSnapshot();
        stateSnapshot.putSubtaskStateByOperatorID(operatorID, operatorSubtaskState);
        JobManagerTaskRestore taskRestore = new JobManagerTaskRestore(1L, stateSnapshot);
        JobInformation jobInformation = new JobInformation(new JobID(), "test job name", new SerializedValue((Object)new ExecutionConfig()), new Configuration(), Collections.emptyList(), Collections.emptyList());
        TaskInformation taskInformation = new TaskInformation(jobVertexID, "test task name", 1, 1, SourceStreamTask.class.getName(), taskConfig);
        BlobCacheService blobService = new BlobCacheService((PermanentBlobCache)Mockito.mock(PermanentBlobCache.class), (TransientBlobCache)Mockito.mock(TransientBlobCache.class));
        TestTaskStateManager taskStateManager = new TestTaskStateManager();
        taskStateManager.setReportedCheckpointId(taskRestore.getRestoreCheckpointId());
        taskStateManager.setJobManagerTaskStateSnapshotsByCheckpointId(Collections.singletonMap(taskRestore.getRestoreCheckpointId(), taskRestore.getTaskStateSnapshot()));
        return new Task(jobInformation, taskInformation, new ExecutionAttemptID(), new AllocationID(), 0, 0, Collections.emptyList(), Collections.emptyList(), 0, (MemoryManager)Mockito.mock(MemoryManager.class), (IOManager)Mockito.mock(IOManager.class), networkEnvironment, (BroadcastVariableManager)Mockito.mock(BroadcastVariableManager.class), (TaskStateManager)taskStateManager, (TaskManagerActions)Mockito.mock(TaskManagerActions.class), (InputSplitProvider)Mockito.mock(InputSplitProvider.class), (CheckpointResponder)Mockito.mock(CheckpointResponder.class), blobService, (LibraryCacheManager)new BlobLibraryCacheManager((PermanentBlobService)blobService.getPermanentBlobService(), FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST, new String[0]), new FileCache(new String[]{EnvironmentInformation.getTemporaryFileDirectory()}, (PermanentBlobService)blobService.getPermanentBlobService()), (TaskManagerRuntimeInfo)new TestingTaskManagerRuntimeInfo(), UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(), (ResultPartitionConsumableNotifier)new NoOpResultPartitionConsumableNotifier(), (PartitionProducerStateChecker)Mockito.mock(PartitionProducerStateChecker.class), (Executor)Mockito.mock(Executor.class));
    }

    private static class TestSource
    implements SourceFunction<Object>,
    CheckpointedFunction {
        private static final long serialVersionUID = 1L;
        private final int testType;

        public TestSource(int testType) {
            this.testType = testType;
        }

        public void run(SourceFunction.SourceContext<Object> ctx) throws Exception {
            Assert.fail((String)"should never be called");
        }

        public void cancel() {
        }

        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            Assert.fail((String)"should never be called");
        }

        public void initializeState(FunctionInitializationContext context) throws Exception {
            ((StatePartitionStreamProvider)((StateInitializationContext)context).getRawOperatorStateInputs().iterator().next()).getStream().read();
        }
    }

    private static class InterruptLockingStateHandle
    implements StreamStateHandle {
        private static final long serialVersionUID = 1L;
        private volatile boolean closed;

        private InterruptLockingStateHandle() {
        }

        public FSDataInputStream openInputStream() throws IOException {
            this.closed = false;
            FSDataInputStream is = new FSDataInputStream(){

                public void seek(long desired) {
                }

                public long getPos() {
                    return 0L;
                }

                public int read() throws IOException {
                    this.block();
                    throw new EOFException();
                }

                public void close() throws IOException {
                    super.close();
                    closed = true;
                }
            };
            return is;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void block() {
            IN_RESTORE_LATCH.trigger();
            try {
                InterruptLockingStateHandle interruptLockingStateHandle = this;
                synchronized (interruptLockingStateHandle) {
                    this.wait();
                }
            }
            catch (InterruptedException e) {
                while (!this.closed) {
                    try {
                        InterruptLockingStateHandle interruptLockingStateHandle = this;
                        synchronized (interruptLockingStateHandle) {
                            this.wait();
                        }
                    }
                    catch (InterruptedException interruptedException) {
                    }
                }
            }
        }

        public void discardState() throws Exception {
        }

        public long getStateSize() {
            return 0L;
        }
    }
}

