/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.io.Closeable;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.mock.Whitebox;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.blob.PermanentBlobCache;
import org.apache.flink.runtime.blob.TransientBlobCache;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.DoneFuture;
import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.OperatorStreamStateHandle;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateBackendFactory;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StatePartitionStreamProvider;
import org.apache.flink.runtime.state.TaskLocalStateStore;
import org.apache.flink.runtime.state.TaskLocalStateStoreImpl;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.TaskStateManagerImpl;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.runtime.state.memory.MemoryBackendCheckpointStorage;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.util.DirectExecutorService;
import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorStateContext;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.CheckpointExceptionHandler;
import org.apache.flink.streaming.runtime.tasks.CheckpointExceptionHandlerFactory;
import org.apache.flink.streaming.runtime.tasks.OperatorChain;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.SourceStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.TestSpyWrapperStateBackend;
import org.apache.flink.util.CloseableIterable;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

@RunWith(value=PowerMockRunner.class)
@PrepareForTest(value={StreamTask.class})
@PowerMockIgnore(value={"org.apache.log4j.*"})
public class StreamTaskTest
extends TestLogger {
    private static OneShotLatch syncLatch;

    @Test
    public void testEarlyCanceling() throws Exception {
        StreamConfig cfg = new StreamConfig(new Configuration());
        cfg.setOperatorID(new OperatorID(4711L, 42L));
        cfg.setStreamOperator((StreamOperator)new SlowlyDeserializingOperator());
        cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        TaskManagerActions taskManagerActions = (TaskManagerActions)Mockito.spy((Object)new NoOpTaskManagerActions());
        Task task = StreamTaskTest.createTask(SourceStreamTask.class, cfg, new Configuration(), taskManagerActions);
        TaskExecutionState state = new TaskExecutionState(task.getJobID(), task.getExecutionId(), ExecutionState.RUNNING);
        task.startTaskThread();
        ((TaskManagerActions)Mockito.verify((Object)taskManagerActions, (VerificationMode)Mockito.timeout((long)2000L))).updateTaskExecutionState((TaskExecutionState)Matchers.eq((Object)state));
        task.cancelExecution();
        task.getExecutingThread().join();
        Assert.assertFalse((String)"Task did not cancel", (boolean)task.getExecutingThread().isAlive());
        Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)task.getExecutionState());
    }

    @Test
    public void testStateBackendLoadingAndClosing() throws Exception {
        Configuration taskManagerConfig = new Configuration();
        taskManagerConfig.setString(CheckpointingOptions.STATE_BACKEND, TestMemoryStateBackendFactory.class.getName());
        StreamConfig cfg = new StreamConfig(new Configuration());
        cfg.setStateKeySerializer((TypeSerializer)Mockito.mock(TypeSerializer.class));
        cfg.setOperatorID(new OperatorID(4711L, 42L));
        TestStreamSource streamSource = new TestStreamSource(new MockSourceFunction());
        cfg.setStreamOperator(streamSource);
        cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        Task task = StreamTaskTest.createTask(StateBackendTestSource.class, cfg, taskManagerConfig);
        StateBackendTestSource.fail = false;
        task.startTaskThread();
        task.getExecutingThread().join();
        ((OperatorStateBackend)Mockito.verify((Object)TestStreamSource.operatorStateBackend)).close();
        ((AbstractKeyedStateBackend)Mockito.verify(TestStreamSource.keyedStateBackend)).close();
        ((CloseableIterable)Mockito.verify(TestStreamSource.rawOperatorStateInputs)).close();
        ((CloseableIterable)Mockito.verify(TestStreamSource.rawKeyedStateInputs)).close();
        ((OperatorStateBackend)Mockito.verify((Object)TestStreamSource.operatorStateBackend)).dispose();
        ((AbstractKeyedStateBackend)Mockito.verify(TestStreamSource.keyedStateBackend)).dispose();
        Assert.assertEquals((Object)ExecutionState.FINISHED, (Object)task.getExecutionState());
    }

    @Test
    public void testStateBackendClosingOnFailure() throws Exception {
        Configuration taskManagerConfig = new Configuration();
        taskManagerConfig.setString(CheckpointingOptions.STATE_BACKEND, TestMemoryStateBackendFactory.class.getName());
        StreamConfig cfg = new StreamConfig(new Configuration());
        cfg.setStateKeySerializer((TypeSerializer)Mockito.mock(TypeSerializer.class));
        cfg.setOperatorID(new OperatorID(4711L, 42L));
        TestStreamSource streamSource = new TestStreamSource(new MockSourceFunction());
        cfg.setStreamOperator(streamSource);
        cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        Task task = StreamTaskTest.createTask(StateBackendTestSource.class, cfg, taskManagerConfig);
        StateBackendTestSource.fail = true;
        task.startTaskThread();
        task.getExecutingThread().join();
        ((OperatorStateBackend)Mockito.verify((Object)TestStreamSource.operatorStateBackend)).close();
        ((AbstractKeyedStateBackend)Mockito.verify(TestStreamSource.keyedStateBackend)).close();
        ((CloseableIterable)Mockito.verify(TestStreamSource.rawOperatorStateInputs)).close();
        ((CloseableIterable)Mockito.verify(TestStreamSource.rawKeyedStateInputs)).close();
        ((OperatorStateBackend)Mockito.verify((Object)TestStreamSource.operatorStateBackend)).dispose();
        ((AbstractKeyedStateBackend)Mockito.verify(TestStreamSource.keyedStateBackend)).dispose();
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
    }

    @Test
    public void testCancellationNotBlockedOnLock() throws Exception {
        syncLatch = new OneShotLatch();
        StreamConfig cfg = new StreamConfig(new Configuration());
        Task task = StreamTaskTest.createTask(CancelLockingTask.class, cfg, new Configuration());
        task.startTaskThread();
        syncLatch.await();
        task.cancelExecution();
        task.getExecutingThread().join();
        Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)task.getExecutionState());
    }

    @Test
    public void testCancellationFailsWithBlockingLock() throws Exception {
        syncLatch = new OneShotLatch();
        StreamConfig cfg = new StreamConfig(new Configuration());
        Task task = StreamTaskTest.createTask(CancelFailingTask.class, cfg, new Configuration());
        task.startTaskThread();
        syncLatch.await();
        task.cancelExecution();
        task.getExecutingThread().join();
        Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)task.getExecutionState());
    }

    @Test
    public void testFailingCheckpointStreamOperator() throws Exception {
        long checkpointId = 42L;
        long timestamp = 1L;
        TaskInfo mockTaskInfo = (TaskInfo)Mockito.mock(TaskInfo.class);
        Mockito.when((Object)mockTaskInfo.getTaskNameWithSubtasks()).thenReturn((Object)"foobar");
        Mockito.when((Object)mockTaskInfo.getIndexOfThisSubtask()).thenReturn((Object)0);
        MockEnvironment mockEnvironment = new MockEnvironmentBuilder().build();
        EmptyStreamTask streamTask = new EmptyStreamTask((Environment)mockEnvironment);
        CheckpointMetaData checkpointMetaData = new CheckpointMetaData(42L, 1L);
        StreamOperator streamOperator1 = (StreamOperator)Mockito.mock(StreamOperator.class);
        StreamOperator streamOperator2 = (StreamOperator)Mockito.mock(StreamOperator.class);
        StreamOperator streamOperator3 = (StreamOperator)Mockito.mock(StreamOperator.class);
        OperatorSnapshotFutures operatorSnapshotResult1 = (OperatorSnapshotFutures)Mockito.mock(OperatorSnapshotFutures.class);
        OperatorSnapshotFutures operatorSnapshotResult2 = (OperatorSnapshotFutures)Mockito.mock(OperatorSnapshotFutures.class);
        Exception testException = new Exception("Test exception");
        Mockito.when((Object)streamOperator1.snapshotState(Matchers.anyLong(), Matchers.anyLong(), (CheckpointOptions)Matchers.any(CheckpointOptions.class), (CheckpointStreamFactory)Matchers.any(CheckpointStreamFactory.class))).thenReturn((Object)operatorSnapshotResult1);
        Mockito.when((Object)streamOperator2.snapshotState(Matchers.anyLong(), Matchers.anyLong(), (CheckpointOptions)Matchers.any(CheckpointOptions.class), (CheckpointStreamFactory)Matchers.any(CheckpointStreamFactory.class))).thenReturn((Object)operatorSnapshotResult2);
        Mockito.when((Object)streamOperator3.snapshotState(Matchers.anyLong(), Matchers.anyLong(), (CheckpointOptions)Matchers.any(CheckpointOptions.class), (CheckpointStreamFactory)Matchers.any(CheckpointStreamFactory.class))).thenThrow(new Throwable[]{testException});
        OperatorID operatorID1 = new OperatorID();
        OperatorID operatorID2 = new OperatorID();
        OperatorID operatorID3 = new OperatorID();
        Mockito.when((Object)streamOperator1.getOperatorID()).thenReturn((Object)operatorID1);
        Mockito.when((Object)streamOperator2.getOperatorID()).thenReturn((Object)operatorID2);
        Mockito.when((Object)streamOperator3.getOperatorID()).thenReturn((Object)operatorID3);
        StreamOperator[] streamOperators = new StreamOperator[]{streamOperator1, streamOperator2, streamOperator3};
        OperatorChain operatorChain = (OperatorChain)Mockito.mock(OperatorChain.class);
        Mockito.when((Object)operatorChain.getAllOperators()).thenReturn((Object)streamOperators);
        Whitebox.setInternalState((Object)((Object)streamTask), (String)"isRunning", (Object)true);
        Whitebox.setInternalState((Object)((Object)streamTask), (String)"lock", (Object)new Object());
        Whitebox.setInternalState((Object)((Object)streamTask), (String)"operatorChain", (Object)operatorChain);
        Whitebox.setInternalState((Object)((Object)streamTask), (String)"cancelables", (Object)new CloseableRegistry());
        Whitebox.setInternalState((Object)((Object)streamTask), (String)"configuration", (Object)new StreamConfig(new Configuration()));
        Whitebox.setInternalState((Object)((Object)streamTask), (String)"checkpointStorage", (Object)new MemoryBackendCheckpointStorage(new JobID(), null, null, Integer.MAX_VALUE));
        CheckpointExceptionHandlerFactory checkpointExceptionHandlerFactory = new CheckpointExceptionHandlerFactory();
        CheckpointExceptionHandler checkpointExceptionHandler = checkpointExceptionHandlerFactory.createCheckpointExceptionHandler(true, (Environment)mockEnvironment);
        Whitebox.setInternalState((Object)((Object)streamTask), (String)"synchronousCheckpointExceptionHandler", (Object)checkpointExceptionHandler);
        StreamTask.AsyncCheckpointExceptionHandler asyncCheckpointExceptionHandler = new StreamTask.AsyncCheckpointExceptionHandler((StreamTask)streamTask);
        Whitebox.setInternalState((Object)((Object)streamTask), (String)"asynchronousCheckpointExceptionHandler", (Object)asyncCheckpointExceptionHandler);
        try {
            streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forCheckpointWithDefaultLocation());
            Assert.fail((String)"Expected test exception here.");
        }
        catch (Exception e) {
            Assert.assertEquals((Object)testException, (Object)e.getCause());
        }
        ((OperatorSnapshotFutures)Mockito.verify((Object)operatorSnapshotResult1)).cancel();
        ((OperatorSnapshotFutures)Mockito.verify((Object)operatorSnapshotResult2)).cancel();
    }

    @Test
    public void testFailingAsyncCheckpointRunnable() throws Exception {
        long checkpointId = 42L;
        long timestamp = 1L;
        MockEnvironment mockEnvironment = new MockEnvironmentBuilder().build();
        StreamTask streamTask = (StreamTask)Mockito.spy((Object)((Object)new EmptyStreamTask((Environment)mockEnvironment)));
        CheckpointMetaData checkpointMetaData = new CheckpointMetaData(42L, 1L);
        StreamOperator streamOperator1 = (StreamOperator)Mockito.mock(StreamOperator.class);
        StreamOperator streamOperator2 = (StreamOperator)Mockito.mock(StreamOperator.class);
        StreamOperator streamOperator3 = (StreamOperator)Mockito.mock(StreamOperator.class);
        OperatorSnapshotFutures operatorSnapshotResult1 = (OperatorSnapshotFutures)Mockito.mock(OperatorSnapshotFutures.class);
        OperatorSnapshotFutures operatorSnapshotResult2 = (OperatorSnapshotFutures)Mockito.mock(OperatorSnapshotFutures.class);
        OperatorSnapshotFutures operatorSnapshotResult3 = (OperatorSnapshotFutures)Mockito.mock(OperatorSnapshotFutures.class);
        RunnableFuture failingFuture = (RunnableFuture)Mockito.mock(RunnableFuture.class);
        Mockito.when(failingFuture.get()).thenThrow(new Throwable[]{new ExecutionException(new Exception("Test exception"))});
        Mockito.when((Object)operatorSnapshotResult3.getOperatorStateRawFuture()).thenReturn((Object)failingFuture);
        Mockito.when((Object)streamOperator1.snapshotState(Matchers.anyLong(), Matchers.anyLong(), (CheckpointOptions)Matchers.any(CheckpointOptions.class), (CheckpointStreamFactory)Matchers.any(CheckpointStreamFactory.class))).thenReturn((Object)operatorSnapshotResult1);
        Mockito.when((Object)streamOperator2.snapshotState(Matchers.anyLong(), Matchers.anyLong(), (CheckpointOptions)Matchers.any(CheckpointOptions.class), (CheckpointStreamFactory)Matchers.any(CheckpointStreamFactory.class))).thenReturn((Object)operatorSnapshotResult2);
        Mockito.when((Object)streamOperator3.snapshotState(Matchers.anyLong(), Matchers.anyLong(), (CheckpointOptions)Matchers.any(CheckpointOptions.class), (CheckpointStreamFactory)Matchers.any(CheckpointStreamFactory.class))).thenReturn((Object)operatorSnapshotResult3);
        OperatorID operatorID1 = new OperatorID();
        OperatorID operatorID2 = new OperatorID();
        OperatorID operatorID3 = new OperatorID();
        Mockito.when((Object)streamOperator1.getOperatorID()).thenReturn((Object)operatorID1);
        Mockito.when((Object)streamOperator2.getOperatorID()).thenReturn((Object)operatorID2);
        Mockito.when((Object)streamOperator3.getOperatorID()).thenReturn((Object)operatorID3);
        StreamOperator[] streamOperators = new StreamOperator[]{streamOperator1, streamOperator2, streamOperator3};
        OperatorChain operatorChain = (OperatorChain)Mockito.mock(OperatorChain.class);
        Mockito.when((Object)operatorChain.getAllOperators()).thenReturn((Object)streamOperators);
        Whitebox.setInternalState((Object)streamTask, (String)"isRunning", (Object)true);
        Whitebox.setInternalState((Object)streamTask, (String)"lock", (Object)new Object());
        Whitebox.setInternalState((Object)streamTask, (String)"operatorChain", (Object)operatorChain);
        Whitebox.setInternalState((Object)streamTask, (String)"cancelables", (Object)new CloseableRegistry());
        Whitebox.setInternalState((Object)streamTask, (String)"asyncOperationsThreadPool", (Object)new DirectExecutorService());
        Whitebox.setInternalState((Object)streamTask, (String)"configuration", (Object)new StreamConfig(new Configuration()));
        Whitebox.setInternalState((Object)streamTask, (String)"checkpointStorage", (Object)new MemoryBackendCheckpointStorage(new JobID(), null, null, Integer.MAX_VALUE));
        CheckpointExceptionHandlerFactory checkpointExceptionHandlerFactory = new CheckpointExceptionHandlerFactory();
        CheckpointExceptionHandler checkpointExceptionHandler = checkpointExceptionHandlerFactory.createCheckpointExceptionHandler(true, (Environment)mockEnvironment);
        Whitebox.setInternalState((Object)streamTask, (String)"synchronousCheckpointExceptionHandler", (Object)checkpointExceptionHandler);
        StreamTask.AsyncCheckpointExceptionHandler asyncCheckpointExceptionHandler = new StreamTask.AsyncCheckpointExceptionHandler(streamTask);
        Whitebox.setInternalState((Object)streamTask, (String)"asynchronousCheckpointExceptionHandler", (Object)asyncCheckpointExceptionHandler);
        mockEnvironment.setExpectedExternalFailureCause(Throwable.class);
        streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forCheckpointWithDefaultLocation());
        ((StreamTask)Mockito.verify((Object)streamTask)).handleAsyncException(Matchers.anyString(), (Throwable)Matchers.any(Throwable.class));
        ((OperatorSnapshotFutures)Mockito.verify((Object)operatorSnapshotResult1)).cancel();
        ((OperatorSnapshotFutures)Mockito.verify((Object)operatorSnapshotResult2)).cancel();
        ((OperatorSnapshotFutures)Mockito.verify((Object)operatorSnapshotResult3)).cancel();
    }

    @Test
    public void testAsyncCheckpointingConcurrentCloseAfterAcknowledge() throws Exception {
        long checkpointId = 42L;
        long timestamp = 1L;
        final OneShotLatch acknowledgeCheckpointLatch = new OneShotLatch();
        final OneShotLatch completeAcknowledge = new OneShotLatch();
        CheckpointResponder checkpointResponder = (CheckpointResponder)Mockito.mock(CheckpointResponder.class);
        ((CheckpointResponder)Mockito.doAnswer((Answer)new Answer(){

            public Object answer(InvocationOnMock invocation) throws Throwable {
                acknowledgeCheckpointLatch.trigger();
                completeAcknowledge.await();
                return null;
            }
        }).when((Object)checkpointResponder)).acknowledgeCheckpoint((JobID)Matchers.any(JobID.class), (ExecutionAttemptID)Matchers.any(ExecutionAttemptID.class), Matchers.anyLong(), (CheckpointMetrics)Matchers.any(CheckpointMetrics.class), (TaskStateSnapshot)Matchers.any(TaskStateSnapshot.class));
        TaskStateManagerImpl taskStateManager = new TaskStateManagerImpl(new JobID(1L, 2L), new ExecutionAttemptID(1L, 2L), (TaskLocalStateStore)Mockito.mock(TaskLocalStateStoreImpl.class), null, checkpointResponder);
        MockEnvironment mockEnvironment = new MockEnvironmentBuilder().setTaskName("mock-task").setTaskStateManager((TaskStateManager)taskStateManager).build();
        EmptyStreamTask streamTask = new EmptyStreamTask((Environment)mockEnvironment);
        CheckpointMetaData checkpointMetaData = new CheckpointMetaData(42L, 1L);
        StreamOperator streamOperator = (StreamOperator)Mockito.mock(StreamOperator.class);
        Mockito.when((Object)streamOperator.getOperatorID()).thenReturn((Object)new OperatorID(42L, 42L));
        KeyedStateHandle managedKeyedStateHandle = (KeyedStateHandle)Mockito.mock(KeyedStateHandle.class);
        KeyedStateHandle rawKeyedStateHandle = (KeyedStateHandle)Mockito.mock(KeyedStateHandle.class);
        OperatorStateHandle managedOperatorStateHandle = (OperatorStateHandle)Mockito.mock(OperatorStreamStateHandle.class);
        OperatorStateHandle rawOperatorStateHandle = (OperatorStateHandle)Mockito.mock(OperatorStreamStateHandle.class);
        OperatorSnapshotFutures operatorSnapshotResult = new OperatorSnapshotFutures((RunnableFuture)DoneFuture.of((Object)SnapshotResult.of((StateObject)managedKeyedStateHandle)), (RunnableFuture)DoneFuture.of((Object)SnapshotResult.of((StateObject)rawKeyedStateHandle)), (RunnableFuture)DoneFuture.of((Object)SnapshotResult.of((StateObject)managedOperatorStateHandle)), (RunnableFuture)DoneFuture.of((Object)SnapshotResult.of((StateObject)rawOperatorStateHandle)));
        Mockito.when((Object)streamOperator.snapshotState(Matchers.anyLong(), Matchers.anyLong(), (CheckpointOptions)Matchers.any(CheckpointOptions.class), (CheckpointStreamFactory)Matchers.any(CheckpointStreamFactory.class))).thenReturn((Object)operatorSnapshotResult);
        StreamOperator[] streamOperators = new StreamOperator[]{streamOperator};
        OperatorChain operatorChain = (OperatorChain)Mockito.mock(OperatorChain.class);
        Mockito.when((Object)operatorChain.getAllOperators()).thenReturn((Object)streamOperators);
        MemoryBackendCheckpointStorage checkpointStorage = new MemoryBackendCheckpointStorage(new JobID(), null, null, Integer.MAX_VALUE);
        Whitebox.setInternalState((Object)((Object)streamTask), (String)"isRunning", (Object)true);
        Whitebox.setInternalState((Object)((Object)streamTask), (String)"lock", (Object)new Object());
        Whitebox.setInternalState((Object)((Object)streamTask), (String)"operatorChain", (Object)operatorChain);
        Whitebox.setInternalState((Object)((Object)streamTask), (String)"cancelables", (Object)new CloseableRegistry());
        Whitebox.setInternalState((Object)((Object)streamTask), (String)"asyncOperationsThreadPool", (Object)Executors.newFixedThreadPool(1));
        Whitebox.setInternalState((Object)((Object)streamTask), (String)"configuration", (Object)new StreamConfig(new Configuration()));
        Whitebox.setInternalState((Object)((Object)streamTask), (String)"checkpointStorage", (Object)checkpointStorage);
        streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forCheckpointWithDefaultLocation());
        acknowledgeCheckpointLatch.await();
        ArgumentCaptor subtaskStateCaptor = ArgumentCaptor.forClass(TaskStateSnapshot.class);
        ((CheckpointResponder)Mockito.verify((Object)checkpointResponder)).acknowledgeCheckpoint((JobID)Matchers.any(JobID.class), (ExecutionAttemptID)Matchers.any(ExecutionAttemptID.class), Matchers.eq((long)42L), (CheckpointMetrics)Matchers.any(CheckpointMetrics.class), (TaskStateSnapshot)subtaskStateCaptor.capture());
        TaskStateSnapshot subtaskStates = (TaskStateSnapshot)subtaskStateCaptor.getValue();
        OperatorSubtaskState subtaskState = (OperatorSubtaskState)((Map.Entry)subtaskStates.getSubtaskStateMappings().iterator().next()).getValue();
        Assert.assertEquals((Object)StateObjectCollection.singleton((StateObject)managedKeyedStateHandle), (Object)subtaskState.getManagedKeyedState());
        Assert.assertEquals((Object)StateObjectCollection.singleton((StateObject)rawKeyedStateHandle), (Object)subtaskState.getRawKeyedState());
        Assert.assertEquals((Object)StateObjectCollection.singleton((StateObject)managedOperatorStateHandle), (Object)subtaskState.getManagedOperatorState());
        Assert.assertEquals((Object)StateObjectCollection.singleton((StateObject)rawOperatorStateHandle), (Object)subtaskState.getRawOperatorState());
        ((KeyedStateHandle)Mockito.verify((Object)managedKeyedStateHandle, (VerificationMode)Mockito.never())).discardState();
        ((KeyedStateHandle)Mockito.verify((Object)rawKeyedStateHandle, (VerificationMode)Mockito.never())).discardState();
        ((OperatorStateHandle)Mockito.verify((Object)managedOperatorStateHandle, (VerificationMode)Mockito.never())).discardState();
        ((OperatorStateHandle)Mockito.verify((Object)rawOperatorStateHandle, (VerificationMode)Mockito.never())).discardState();
        streamTask.cancel();
        completeAcknowledge.trigger();
        ((KeyedStateHandle)Mockito.verify((Object)managedKeyedStateHandle, (VerificationMode)Mockito.never())).discardState();
        ((KeyedStateHandle)Mockito.verify((Object)rawKeyedStateHandle, (VerificationMode)Mockito.never())).discardState();
        ((OperatorStateHandle)Mockito.verify((Object)managedOperatorStateHandle, (VerificationMode)Mockito.never())).discardState();
        ((OperatorStateHandle)Mockito.verify((Object)rawOperatorStateHandle, (VerificationMode)Mockito.never())).discardState();
    }

    @Test
    public void testAsyncCheckpointingConcurrentCloseBeforeAcknowledge() throws Exception {
        long checkpointId = 42L;
        long timestamp = 1L;
        OneShotLatch createSubtask = new OneShotLatch();
        OneShotLatch completeSubtask = new OneShotLatch();
        Environment mockEnvironment = (Environment)Mockito.spy((Object)new MockEnvironmentBuilder().build());
        PowerMockito.whenNew(OperatorSnapshotFinalizer.class).withAnyArguments().thenAnswer(invocation -> {
            createSubtask.trigger();
            completeSubtask.await();
            Object[] arguments = invocation.getArguments();
            return new OperatorSnapshotFinalizer((OperatorSnapshotFutures)arguments[0]);
        });
        EmptyStreamTask streamTask = new EmptyStreamTask(mockEnvironment);
        CheckpointMetaData checkpointMetaData = new CheckpointMetaData(42L, 1L);
        StreamOperator streamOperator = (StreamOperator)Mockito.mock(StreamOperator.class);
        OperatorID operatorID = new OperatorID();
        Mockito.when((Object)streamOperator.getOperatorID()).thenReturn((Object)operatorID);
        KeyedStateHandle managedKeyedStateHandle = (KeyedStateHandle)Mockito.mock(KeyedStateHandle.class);
        KeyedStateHandle rawKeyedStateHandle = (KeyedStateHandle)Mockito.mock(KeyedStateHandle.class);
        OperatorStateHandle managedOperatorStateHandle = (OperatorStateHandle)Mockito.mock(OperatorStreamStateHandle.class);
        OperatorStateHandle rawOperatorStateHandle = (OperatorStateHandle)Mockito.mock(OperatorStreamStateHandle.class);
        OperatorSnapshotFutures operatorSnapshotResult = new OperatorSnapshotFutures((RunnableFuture)DoneFuture.of((Object)SnapshotResult.of((StateObject)managedKeyedStateHandle)), (RunnableFuture)DoneFuture.of((Object)SnapshotResult.of((StateObject)rawKeyedStateHandle)), (RunnableFuture)DoneFuture.of((Object)SnapshotResult.of((StateObject)managedOperatorStateHandle)), (RunnableFuture)DoneFuture.of((Object)SnapshotResult.of((StateObject)rawOperatorStateHandle)));
        Mockito.when((Object)streamOperator.snapshotState(Matchers.anyLong(), Matchers.anyLong(), (CheckpointOptions)Matchers.any(CheckpointOptions.class), (CheckpointStreamFactory)Matchers.any(CheckpointStreamFactory.class))).thenReturn((Object)operatorSnapshotResult);
        StreamOperator[] streamOperators = new StreamOperator[]{streamOperator};
        OperatorChain operatorChain = (OperatorChain)Mockito.mock(OperatorChain.class);
        Mockito.when((Object)operatorChain.getAllOperators()).thenReturn((Object)streamOperators);
        MemoryBackendCheckpointStorage checkpointStorage = new MemoryBackendCheckpointStorage(new JobID(), null, null, Integer.MAX_VALUE);
        ExecutorService executor = Executors.newFixedThreadPool(1);
        Whitebox.setInternalState((Object)((Object)streamTask), (String)"isRunning", (Object)true);
        Whitebox.setInternalState((Object)((Object)streamTask), (String)"lock", (Object)new Object());
        Whitebox.setInternalState((Object)((Object)streamTask), (String)"operatorChain", (Object)operatorChain);
        Whitebox.setInternalState((Object)((Object)streamTask), (String)"cancelables", (Object)new CloseableRegistry());
        Whitebox.setInternalState((Object)((Object)streamTask), (String)"asyncOperationsThreadPool", (Object)executor);
        Whitebox.setInternalState((Object)((Object)streamTask), (String)"configuration", (Object)new StreamConfig(new Configuration()));
        Whitebox.setInternalState((Object)((Object)streamTask), (String)"checkpointStorage", (Object)checkpointStorage);
        streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forCheckpointWithDefaultLocation());
        createSubtask.await();
        streamTask.cancel();
        completeSubtask.trigger();
        executor.shutdown();
        if (!executor.awaitTermination(10000L, TimeUnit.MILLISECONDS)) {
            Assert.fail((String)"Executor did not shut down within the given timeout. This indicates that the checkpointing did not resume.");
        }
        ((Environment)Mockito.verify((Object)mockEnvironment, (VerificationMode)Mockito.never())).acknowledgeCheckpoint(Matchers.eq((long)42L), (CheckpointMetrics)Matchers.any(CheckpointMetrics.class), (TaskStateSnapshot)Matchers.any(TaskStateSnapshot.class));
        ((KeyedStateHandle)Mockito.verify((Object)managedKeyedStateHandle)).discardState();
        ((KeyedStateHandle)Mockito.verify((Object)rawKeyedStateHandle)).discardState();
        ((OperatorStateHandle)Mockito.verify((Object)managedOperatorStateHandle)).discardState();
        ((OperatorStateHandle)Mockito.verify((Object)rawOperatorStateHandle)).discardState();
    }

    @Test
    public void testEmptySubtaskStateLeadsToStatelessAcknowledgment() throws Exception {
        long checkpointId = 42L;
        long timestamp = 1L;
        Environment mockEnvironment = (Environment)Mockito.spy((Object)new MockEnvironmentBuilder().build());
        final OneShotLatch checkpointCompletedLatch = new OneShotLatch();
        final ArrayList checkpointResult = new ArrayList(1);
        CheckpointResponder checkpointResponder = (CheckpointResponder)Mockito.mock(CheckpointResponder.class);
        ((CheckpointResponder)Mockito.doAnswer((Answer)new Answer(){

            public Object answer(InvocationOnMock invocation) throws Throwable {
                SubtaskState subtaskState = (SubtaskState)invocation.getArgument(4);
                checkpointResult.add(subtaskState);
                checkpointCompletedLatch.trigger();
                return null;
            }
        }).when((Object)checkpointResponder)).acknowledgeCheckpoint((JobID)Matchers.any(JobID.class), (ExecutionAttemptID)Matchers.any(ExecutionAttemptID.class), Matchers.anyLong(), (CheckpointMetrics)Matchers.any(CheckpointMetrics.class), (TaskStateSnapshot)ArgumentMatchers.nullable(TaskStateSnapshot.class));
        TaskStateManagerImpl taskStateManager = new TaskStateManagerImpl(new JobID(1L, 2L), new ExecutionAttemptID(1L, 2L), (TaskLocalStateStore)Mockito.mock(TaskLocalStateStoreImpl.class), null, checkpointResponder);
        Mockito.when((Object)mockEnvironment.getTaskStateManager()).thenReturn((Object)taskStateManager);
        EmptyStreamTask streamTask = new EmptyStreamTask(mockEnvironment);
        CheckpointMetaData checkpointMetaData = new CheckpointMetaData(42L, 1L);
        StreamOperator statelessOperator = (StreamOperator)Mockito.mock(StreamOperator.class);
        OperatorID operatorID = new OperatorID();
        Mockito.when((Object)statelessOperator.getOperatorID()).thenReturn((Object)operatorID);
        OperatorSnapshotFutures statelessOperatorSnapshotResult = new OperatorSnapshotFutures();
        Mockito.when((Object)statelessOperator.snapshotState(Matchers.anyLong(), Matchers.anyLong(), (CheckpointOptions)Matchers.any(CheckpointOptions.class), (CheckpointStreamFactory)Matchers.any(CheckpointStreamFactory.class))).thenReturn((Object)statelessOperatorSnapshotResult);
        StreamOperator[] streamOperators = new StreamOperator[]{statelessOperator};
        OperatorChain operatorChain = (OperatorChain)Mockito.mock(OperatorChain.class);
        Mockito.when((Object)operatorChain.getAllOperators()).thenReturn((Object)streamOperators);
        Whitebox.setInternalState((Object)((Object)streamTask), (String)"isRunning", (Object)true);
        Whitebox.setInternalState((Object)((Object)streamTask), (String)"lock", (Object)new Object());
        Whitebox.setInternalState((Object)((Object)streamTask), (String)"operatorChain", (Object)operatorChain);
        Whitebox.setInternalState((Object)((Object)streamTask), (String)"cancelables", (Object)new CloseableRegistry());
        Whitebox.setInternalState((Object)((Object)streamTask), (String)"configuration", (Object)new StreamConfig(new Configuration()));
        Whitebox.setInternalState((Object)((Object)streamTask), (String)"asyncOperationsThreadPool", (Object)Executors.newCachedThreadPool());
        Whitebox.setInternalState((Object)((Object)streamTask), (String)"checkpointStorage", (Object)new MemoryBackendCheckpointStorage(new JobID(), null, null, Integer.MAX_VALUE));
        streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forCheckpointWithDefaultLocation());
        checkpointCompletedLatch.await(30L, TimeUnit.SECONDS);
        streamTask.cancel();
        Assert.assertNull(checkpointResult.get(0));
    }

    @Test
    public void testOperatorClosingBeforeStopRunning() throws Throwable {
        Configuration taskConfiguration = new Configuration();
        StreamConfig streamConfig = new StreamConfig(taskConfiguration);
        streamConfig.setStreamOperator((StreamOperator)new BlockingCloseStreamOperator());
        streamConfig.setOperatorID(new OperatorID());
        try (MockEnvironment mockEnvironment = new MockEnvironmentBuilder().setTaskName("Test Task").setMemorySize(32768L).setInputSplitProvider(new MockInputSplitProvider()).setBufferSize(1).setTaskConfiguration(taskConfiguration).build();){
            NoOpStreamTask streamTask = new NoOpStreamTask((Environment)mockEnvironment);
            AtomicReference<Object> atomicThrowable = new AtomicReference<Object>(null);
            CompletableFuture<Void> invokeFuture = CompletableFuture.runAsync(() -> {
                try {
                    streamTask.invoke();
                }
                catch (Exception e) {
                    atomicThrowable.set(e);
                }
            }, TestingUtils.defaultExecutor());
            BlockingCloseStreamOperator.IN_CLOSE.await();
            Assert.assertTrue((boolean)streamTask.isRunning());
            BlockingCloseStreamOperator.FINISH_CLOSE.trigger();
            invokeFuture.get();
            Assert.assertFalse((boolean)streamTask.isRunning());
            if (atomicThrowable.get() != null) {
                throw (Throwable)atomicThrowable.get();
            }
        }
    }

    @Test
    public void testSetsUserCodeClassLoaderForTimerThreadFactory() throws Throwable {
        syncLatch = new OneShotLatch();
        try (MockEnvironment mockEnvironment = new MockEnvironmentBuilder().setUserCodeClassLoader((ClassLoader)new TestUserCodeClassLoader()).build();){
            TimeServiceTask timerServiceTask = new TimeServiceTask((Environment)mockEnvironment);
            CompletableFuture<Void> invokeFuture = CompletableFuture.runAsync(() -> {
                try {
                    timerServiceTask.invoke();
                }
                catch (Exception e) {
                    throw new CompletionException(e);
                }
            }, TestingUtils.defaultExecutor());
            invokeFuture.get();
            Assert.assertThat(timerServiceTask.getClassLoaders(), (Matcher)org.hamcrest.Matchers.hasSize((Matcher)org.hamcrest.Matchers.greaterThanOrEqualTo((Comparable)Integer.valueOf(1))));
            Assert.assertThat(timerServiceTask.getClassLoaders(), (Matcher)org.hamcrest.Matchers.everyItem((Matcher)org.hamcrest.Matchers.instanceOf(TestUserCodeClassLoader.class)));
        }
    }

    public static Task createTask(Class<? extends AbstractInvokable> invokable, StreamConfig taskConfig, Configuration taskManagerConfig) throws Exception {
        return StreamTaskTest.createTask(invokable, taskConfig, taskManagerConfig, new TestTaskStateManager(), (TaskManagerActions)Mockito.mock(TaskManagerActions.class));
    }

    public static Task createTask(Class<? extends AbstractInvokable> invokable, StreamConfig taskConfig, Configuration taskManagerConfig, TaskManagerActions taskManagerActions) throws Exception {
        return StreamTaskTest.createTask(invokable, taskConfig, taskManagerConfig, new TestTaskStateManager(), taskManagerActions);
    }

    public static Task createTask(Class<? extends AbstractInvokable> invokable, StreamConfig taskConfig, Configuration taskManagerConfig, TestTaskStateManager taskStateManager, TaskManagerActions taskManagerActions) throws Exception {
        BlobCacheService blobService = new BlobCacheService((PermanentBlobCache)Mockito.mock(PermanentBlobCache.class), (TransientBlobCache)Mockito.mock(TransientBlobCache.class));
        LibraryCacheManager libCache = (LibraryCacheManager)Mockito.mock(LibraryCacheManager.class);
        Mockito.when((Object)libCache.getClassLoader((JobID)Matchers.any(JobID.class))).thenReturn((Object)StreamTaskTest.class.getClassLoader());
        ResultPartitionManager partitionManager = (ResultPartitionManager)Mockito.mock(ResultPartitionManager.class);
        NoOpResultPartitionConsumableNotifier consumableNotifier = new NoOpResultPartitionConsumableNotifier();
        PartitionProducerStateChecker partitionProducerStateChecker = (PartitionProducerStateChecker)Mockito.mock(PartitionProducerStateChecker.class);
        Executor executor = (Executor)Mockito.mock(Executor.class);
        TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
        NetworkEnvironment network = (NetworkEnvironment)Mockito.mock(NetworkEnvironment.class);
        Mockito.when((Object)network.getResultPartitionManager()).thenReturn((Object)partitionManager);
        Mockito.when((Object)network.getDefaultIOMode()).thenReturn((Object)IOManager.IOMode.SYNC);
        Mockito.when((Object)network.createKvStateTaskRegistry((JobID)Matchers.any(JobID.class), (JobVertexID)Matchers.any(JobVertexID.class))).thenReturn(Mockito.mock(TaskKvStateRegistry.class));
        Mockito.when((Object)network.getTaskEventDispatcher()).thenReturn((Object)taskEventDispatcher);
        JobInformation jobInformation = new JobInformation(new JobID(), "Job Name", new SerializedValue((Object)new ExecutionConfig()), new Configuration(), Collections.emptyList(), Collections.emptyList());
        TaskInformation taskInformation = new TaskInformation(new JobVertexID(), "Test Task", 1, 1, invokable.getName(), taskConfig.getConfiguration());
        return new Task(jobInformation, taskInformation, new ExecutionAttemptID(), new AllocationID(), 0, 0, Collections.emptyList(), Collections.emptyList(), 0, (MemoryManager)Mockito.mock(MemoryManager.class), (IOManager)Mockito.mock(IOManager.class), network, (BroadcastVariableManager)Mockito.mock(BroadcastVariableManager.class), (TaskStateManager)taskStateManager, taskManagerActions, (InputSplitProvider)Mockito.mock(InputSplitProvider.class), (CheckpointResponder)Mockito.mock(CheckpointResponder.class), blobService, libCache, (FileCache)Mockito.mock(FileCache.class), (TaskManagerRuntimeInfo)new TestingTaskManagerRuntimeInfo(taskManagerConfig, new String[]{System.getProperty("java.io.tmpdir")}), UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(), (ResultPartitionConsumableNotifier)consumableNotifier, partitionProducerStateChecker, executor);
    }

    static class TestStreamSource<OUT, SRC extends SourceFunction<OUT>>
    extends StreamSource<OUT, SRC> {
        static AbstractKeyedStateBackend<?> keyedStateBackend;
        static OperatorStateBackend operatorStateBackend;
        static CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs;
        static CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs;

        public TestStreamSource(SRC sourceFunction) {
            super(sourceFunction);
        }

        public void initializeState(StateInitializationContext context) throws Exception {
            keyedStateBackend = (AbstractKeyedStateBackend)this.getKeyedStateBackend();
            operatorStateBackend = this.getOperatorStateBackend();
            rawOperatorStateInputs = (CloseableIterable)context.getRawOperatorStateInputs();
            rawKeyedStateInputs = (CloseableIterable)context.getRawKeyedStateInputs();
            super.initializeState(context);
        }
    }

    private static final class LockHolder
    extends Thread
    implements Closeable {
        private final OneShotLatch trigger;
        private final Object lock;
        private volatile boolean canceled;

        private LockHolder(Object lock, OneShotLatch trigger) {
            this.lock = lock;
            this.trigger = trigger;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            Object object = this.lock;
            synchronized (object) {
                while (!this.canceled) {
                    this.trigger.trigger();
                    try {
                        Thread.sleep(1000000000L);
                    }
                    catch (InterruptedException interruptedException) {}
                }
            }
        }

        public void cancel() {
            this.canceled = true;
        }

        @Override
        public void close() {
            this.canceled = true;
            this.interrupt();
        }
    }

    private static class TestUserCodeClassLoader
    extends ClassLoader {
        public TestUserCodeClassLoader() {
            super(ClassLoader.getSystemClassLoader());
        }
    }

    public static class TimeServiceTask
    extends StreamTask<String, AbstractStreamOperator<String>> {
        private final List<ClassLoader> classLoaders = Collections.synchronizedList(new ArrayList());

        public TimeServiceTask(Environment env) {
            super(env, null);
        }

        public List<ClassLoader> getClassLoaders() {
            return this.classLoaders;
        }

        protected void init() throws Exception {
            this.getProcessingTimeService().registerTimer(0L, new ProcessingTimeCallback(){

                public void onProcessingTime(long timestamp) throws Exception {
                    classLoaders.add(Thread.currentThread().getContextClassLoader());
                    syncLatch.trigger();
                }
            });
        }

        protected void run() throws Exception {
            syncLatch.await();
        }

        protected void cleanup() throws Exception {
        }

        protected void cancelTask() throws Exception {
        }
    }

    public static class CancelFailingTask
    extends StreamTask<String, AbstractStreamOperator<String>> {
        public CancelFailingTask(Environment env) {
            super(env);
        }

        protected void init() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected void run() throws Exception {
            OneShotLatch latch = new OneShotLatch();
            Object lock = new Object();
            holder.start();
            try (LockHolder holder = new LockHolder(lock, latch);){
                this.getCancelables().registerCloseable((Closeable)holder);
                latch.await();
                syncLatch.trigger();
                Object object = lock;
                synchronized (object) {
                }
            }
        }

        protected void cleanup() {
        }

        protected void cancelTask() throws Exception {
            throw new Exception("test exception");
        }
    }

    public static class CancelLockingTask
    extends StreamTask<String, AbstractStreamOperator<String>> {
        private final OneShotLatch latch = new OneShotLatch();
        private LockHolder holder;

        public CancelLockingTask(Environment env) {
            super(env);
        }

        protected void init() {
        }

        protected void run() throws Exception {
            this.holder = new LockHolder(this.getCheckpointLock(), this.latch);
            this.holder.start();
            this.latch.await();
            syncLatch.trigger();
            try {
                Thread.sleep(100000000L);
            }
            catch (InterruptedException ignored) {
                Thread.currentThread().interrupt();
            }
        }

        protected void cleanup() {
            this.holder.close();
        }

        protected void cancelTask() {
            this.holder.cancel();
        }
    }

    public static class StateBackendTestSource
    extends StreamTask<Long, StreamSource<Long, SourceFunction<Long>>> {
        private static volatile boolean fail;

        public StateBackendTestSource(Environment env) {
            super(env);
        }

        protected void init() throws Exception {
        }

        protected void run() throws Exception {
            if (fail) {
                throw new RuntimeException();
            }
        }

        protected void cleanup() throws Exception {
        }

        protected void cancelTask() throws Exception {
        }

        public StreamTaskStateInitializer createStreamTaskStateInitializer() {
            StreamTaskStateInitializer streamTaskStateManager = super.createStreamTaskStateInitializer();
            return (operatorID, operatorClassName, keyContext, keySerializer, closeableRegistry, metricGroup) -> {
                final StreamOperatorStateContext context = streamTaskStateManager.streamOperatorStateContext(operatorID, operatorClassName, keyContext, keySerializer, closeableRegistry, metricGroup);
                return new StreamOperatorStateContext(){

                    public boolean isRestored() {
                        return context.isRestored();
                    }

                    public OperatorStateBackend operatorStateBackend() {
                        return context.operatorStateBackend();
                    }

                    public AbstractKeyedStateBackend<?> keyedStateBackend() {
                        return context.keyedStateBackend();
                    }

                    public InternalTimeServiceManager<?> internalTimerServiceManager() {
                        InternalTimeServiceManager timeServiceManager = context.internalTimerServiceManager();
                        return timeServiceManager != null ? (InternalTimeServiceManager)Mockito.spy((Object)timeServiceManager) : null;
                    }

                    public CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs() {
                        return this.replaceWithSpy(context.rawOperatorStateInputs());
                    }

                    public CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs() {
                        return this.replaceWithSpy(context.rawKeyedStateInputs());
                    }

                    public <T extends Closeable> T replaceWithSpy(T closeable) {
                        Closeable spyCloseable = (Closeable)Mockito.spy(closeable);
                        if (closeableRegistry.unregisterCloseable(closeable)) {
                            try {
                                closeableRegistry.registerCloseable(spyCloseable);
                            }
                            catch (IOException e) {
                                throw new RuntimeException(e);
                            }
                        }
                        return (T)spyCloseable;
                    }
                };
            };
        }
    }

    private static class EmptyStreamTask
    extends StreamTask<String, AbstractStreamOperator<String>> {
        public EmptyStreamTask(Environment env) {
            super(env, null);
        }

        protected void init() throws Exception {
        }

        protected void run() throws Exception {
        }

        protected void cleanup() throws Exception {
        }

        protected void cancelTask() throws Exception {
        }
    }

    public static final class TestMemoryStateBackendFactory
    implements StateBackendFactory<AbstractStateBackend> {
        private static final long serialVersionUID = 1L;

        public AbstractStateBackend createFromConfig(Configuration config) {
            return new TestSpyWrapperStateBackend(this.createInnerBackend(config));
        }

        protected AbstractStateBackend createInnerBackend(Configuration config) {
            return new MemoryStateBackend();
        }
    }

    private static class MockSourceFunction
    implements SourceFunction<Long> {
        private static final long serialVersionUID = 1L;

        private MockSourceFunction() {
        }

        public void run(SourceFunction.SourceContext<Long> ctx) {
        }

        public void cancel() {
        }
    }

    private static class SlowlyDeserializingOperator
    extends StreamSource<Long, SourceFunction<Long>> {
        private static final long serialVersionUID = 1L;
        private volatile boolean canceled = false;

        public SlowlyDeserializingOperator() {
            super((SourceFunction)new MockSourceFunction());
        }

        public void run(Object lockingObject, StreamStatusMaintainer streamStatusMaintainer, Output<StreamRecord<Long>> collector) throws Exception {
            while (!this.canceled) {
                try {
                    Thread.sleep(500L);
                }
                catch (InterruptedException interruptedException) {}
            }
        }

        public void cancel() {
            this.canceled = true;
        }

        private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
            in.defaultReadObject();
            long delay = 500L;
            long deadline = System.currentTimeMillis() + delay;
            do {
                try {
                    Thread.sleep(delay);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
            } while ((delay = deadline - System.currentTimeMillis()) > 0L);
        }
    }

    private static class BlockingCloseStreamOperator
    extends AbstractStreamOperator<Void> {
        private static final long serialVersionUID = -9042150529568008847L;
        public static final OneShotLatch IN_CLOSE = new OneShotLatch();
        public static final OneShotLatch FINISH_CLOSE = new OneShotLatch();

        private BlockingCloseStreamOperator() {
        }

        public void close() throws Exception {
            IN_CLOSE.trigger();
            FINISH_CLOSE.await();
            super.close();
        }
    }

    public static class NoOpStreamTask<T, OP extends StreamOperator<T>>
    extends StreamTask<T, OP> {
        public NoOpStreamTask(Environment environment) {
            super(environment, null);
        }

        protected void init() throws Exception {
        }

        protected void run() throws Exception {
        }

        protected void cleanup() throws Exception {
        }

        protected void cancelTask() throws Exception {
        }
    }
}

