/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.io.Closeable;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.blob.PermanentBlobCache;
import org.apache.flink.runtime.blob.TransientBlobCache;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.TestingUncaughtExceptionHandler;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.DoneFuture;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.OperatorStreamStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateBackendFactory;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StatePartitionStreamProvider;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TaskLocalStateStore;
import org.apache.flink.runtime.state.TaskLocalStateStoreImpl;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.TaskStateManagerImpl;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
import org.apache.flink.runtime.taskexecutor.KvStateService;
import org.apache.flink.runtime.taskexecutor.PartitionProducerStateChecker;
import org.apache.flink.runtime.taskexecutor.TestGlobalAggregateManager;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.runtime.util.FatalExitExceptionHandler;
import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorStateContext;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.AsynchronousException;
import org.apache.flink.streaming.runtime.tasks.CheckpointExceptionHandlerTest;
import org.apache.flink.streaming.runtime.tasks.ExceptionallyDoneFuture;
import org.apache.flink.streaming.runtime.tasks.OperatorChain;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.SourceStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.TestSpyWrapperStateBackend;
import org.apache.flink.streaming.util.StreamTaskUtil;
import org.apache.flink.util.CloseableIterable;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.SupplierWithException;
import org.hamcrest.Matcher;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;

public class StreamTaskTest
extends TestLogger {
    private static OneShotLatch syncLatch;
    @Rule
    public final Timeout timeoutPerTest = Timeout.seconds((long)30L);

    @Test
    public void streamTaskAsyncExceptionHandler_handleException_forwardsMessageProperly() {
        MockEnvironment mockEnvironment = MockEnvironment.builder().build();
        RuntimeException expectedException = new RuntimeException("RUNTIME EXCEPTION");
        StreamTask.StreamTaskAsyncExceptionHandler asyncExceptionHandler = new StreamTask.StreamTaskAsyncExceptionHandler((Environment)mockEnvironment);
        mockEnvironment.setExpectedExternalFailureCause(AsynchronousException.class);
        String expectedErrorMessage = "EXPECTED_ERROR MESSAGE";
        asyncExceptionHandler.handleAsyncException("EXPECTED_ERROR MESSAGE", (Throwable)expectedException);
        Optional actualExternalFailureCause = mockEnvironment.getActualExternalFailureCause();
        Throwable actualException = (Throwable)actualExternalFailureCause.orElseThrow(() -> new AssertionError((Object)"Expected exceptional completion"));
        Assert.assertThat((Object)actualException, (Matcher)org.hamcrest.Matchers.instanceOf(AsynchronousException.class));
        Assert.assertThat((Object)actualException.getMessage(), (Matcher)org.hamcrest.Matchers.is((Object)"EXPECTED_ERROR MESSAGE"));
        Assert.assertThat((Object)actualException.getCause(), (Matcher)org.hamcrest.Matchers.is((Object)expectedException));
    }

    @Test
    public void testEarlyCanceling() throws Exception {
        StreamConfig cfg = new StreamConfig(new Configuration());
        cfg.setOperatorID(new OperatorID(4711L, 42L));
        cfg.setStreamOperator((StreamOperator)new SlowlyDeserializingOperator());
        cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        TaskManagerActions taskManagerActions = (TaskManagerActions)Mockito.spy((Object)new NoOpTaskManagerActions());
        Task task = StreamTaskTest.createTask(SourceStreamTask.class, cfg, new Configuration(), taskManagerActions);
        TaskExecutionState state = new TaskExecutionState(task.getJobID(), task.getExecutionId(), ExecutionState.RUNNING);
        task.startTaskThread();
        ((TaskManagerActions)Mockito.verify((Object)taskManagerActions, (VerificationMode)Mockito.timeout((long)2000L))).updateTaskExecutionState((TaskExecutionState)Matchers.eq((Object)state));
        task.cancelExecution();
        task.getExecutingThread().join();
        Assert.assertFalse((String)"Task did not cancel", (boolean)task.getExecutingThread().isAlive());
        Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)task.getExecutionState());
    }

    @Test
    public void testStateBackendLoadingAndClosing() throws Exception {
        Configuration taskManagerConfig = new Configuration();
        taskManagerConfig.setString(CheckpointingOptions.STATE_BACKEND, TestMemoryStateBackendFactory.class.getName());
        StreamConfig cfg = new StreamConfig(new Configuration());
        cfg.setStateKeySerializer((TypeSerializer)Mockito.mock(TypeSerializer.class));
        cfg.setOperatorID(new OperatorID(4711L, 42L));
        TestStreamSource streamSource = new TestStreamSource(new MockSourceFunction());
        cfg.setStreamOperator(streamSource);
        cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        Task task = StreamTaskTest.createTask(StateBackendTestSource.class, cfg, taskManagerConfig);
        StateBackendTestSource.fail = false;
        task.startTaskThread();
        task.getExecutingThread().join();
        ((OperatorStateBackend)Mockito.verify((Object)TestStreamSource.operatorStateBackend)).close();
        ((AbstractKeyedStateBackend)Mockito.verify(TestStreamSource.keyedStateBackend)).close();
        ((CloseableIterable)Mockito.verify(TestStreamSource.rawOperatorStateInputs)).close();
        ((CloseableIterable)Mockito.verify(TestStreamSource.rawKeyedStateInputs)).close();
        ((OperatorStateBackend)Mockito.verify((Object)TestStreamSource.operatorStateBackend)).dispose();
        ((AbstractKeyedStateBackend)Mockito.verify(TestStreamSource.keyedStateBackend)).dispose();
        Assert.assertEquals((Object)ExecutionState.FINISHED, (Object)task.getExecutionState());
    }

    @Test
    public void testStateBackendClosingOnFailure() throws Exception {
        Configuration taskManagerConfig = new Configuration();
        taskManagerConfig.setString(CheckpointingOptions.STATE_BACKEND, TestMemoryStateBackendFactory.class.getName());
        StreamConfig cfg = new StreamConfig(new Configuration());
        cfg.setStateKeySerializer((TypeSerializer)Mockito.mock(TypeSerializer.class));
        cfg.setOperatorID(new OperatorID(4711L, 42L));
        TestStreamSource streamSource = new TestStreamSource(new MockSourceFunction());
        cfg.setStreamOperator(streamSource);
        cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        Task task = StreamTaskTest.createTask(StateBackendTestSource.class, cfg, taskManagerConfig);
        StateBackendTestSource.fail = true;
        task.startTaskThread();
        task.getExecutingThread().join();
        ((OperatorStateBackend)Mockito.verify((Object)TestStreamSource.operatorStateBackend)).close();
        ((AbstractKeyedStateBackend)Mockito.verify(TestStreamSource.keyedStateBackend)).close();
        ((CloseableIterable)Mockito.verify(TestStreamSource.rawOperatorStateInputs)).close();
        ((CloseableIterable)Mockito.verify(TestStreamSource.rawKeyedStateInputs)).close();
        ((OperatorStateBackend)Mockito.verify((Object)TestStreamSource.operatorStateBackend)).dispose();
        ((AbstractKeyedStateBackend)Mockito.verify(TestStreamSource.keyedStateBackend)).dispose();
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
    }

    @Test
    public void testCancellationNotBlockedOnLock() throws Exception {
        syncLatch = new OneShotLatch();
        StreamConfig cfg = new StreamConfig(new Configuration());
        Task task = StreamTaskTest.createTask(CancelLockingTask.class, cfg, new Configuration());
        task.startTaskThread();
        syncLatch.await();
        task.cancelExecution();
        task.getExecutingThread().join();
        Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)task.getExecutionState());
    }

    @Test
    public void testCancellationFailsWithBlockingLock() throws Exception {
        syncLatch = new OneShotLatch();
        StreamConfig cfg = new StreamConfig(new Configuration());
        Task task = StreamTaskTest.createTask(CancelFailingTask.class, cfg, new Configuration());
        task.startTaskThread();
        syncLatch.await();
        task.cancelExecution();
        task.getExecutingThread().join();
        Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)task.getExecutionState());
    }

    @Test
    public void testDecliningCheckpointStreamOperator() throws Exception {
        CheckpointExceptionHandlerTest.DeclineDummyEnvironment declineDummyEnvironment = new CheckpointExceptionHandlerTest.DeclineDummyEnvironment();
        OperatorSnapshotFutures operatorSnapshotResult1 = (OperatorSnapshotFutures)Mockito.mock(OperatorSnapshotFutures.class);
        OperatorSnapshotFutures operatorSnapshotResult2 = (OperatorSnapshotFutures)Mockito.mock(OperatorSnapshotFutures.class);
        Exception testException = new Exception("Test exception");
        RunningTask task = StreamTaskTest.runTask(() -> StreamTaskTest.createMockStreamTask((Environment)declineDummyEnvironment, StreamTaskTest.operatorChain(StreamTaskTest.streamOperatorWithSnapshot(operatorSnapshotResult1), StreamTaskTest.streamOperatorWithSnapshot(operatorSnapshotResult2), StreamTaskTest.streamOperatorWithSnapshotException(testException))));
        MockStreamTask streamTask = (MockStreamTask)((Object)task.streamTask);
        StreamTaskUtil.waitTaskIsRunning(streamTask, task.invocationFuture);
        streamTask.triggerCheckpoint(new CheckpointMetaData(42L, 1L), CheckpointOptions.forCheckpointWithDefaultLocation(), false);
        Assert.assertEquals((Object)testException, (Object)declineDummyEnvironment.getLastDeclinedCheckpointCause());
        ((OperatorSnapshotFutures)Mockito.verify((Object)operatorSnapshotResult1)).cancel();
        ((OperatorSnapshotFutures)Mockito.verify((Object)operatorSnapshotResult2)).cancel();
        ((MockStreamTask)((Object)task.streamTask)).finishInput();
        task.waitForTaskCompletion(false);
    }

    @Test
    public void testUncaughtExceptionInAsynchronousCheckpointingOperation() throws Exception {
        RuntimeException failingCause = new RuntimeException("Test exception");
        FailingDummyEnvironment failingDummyEnvironment = new FailingDummyEnvironment(failingCause);
        OperatorSnapshotFutures operatorSnapshotResult = new OperatorSnapshotFutures(ExceptionallyDoneFuture.of(failingCause), (RunnableFuture)DoneFuture.of((Object)SnapshotResult.empty()), (RunnableFuture)DoneFuture.of((Object)SnapshotResult.empty()), (RunnableFuture)DoneFuture.of((Object)SnapshotResult.empty()));
        TestingUncaughtExceptionHandler uncaughtExceptionHandler = new TestingUncaughtExceptionHandler();
        RunningTask task = StreamTaskTest.runTask(() -> new MockStreamTask((Environment)failingDummyEnvironment, StreamTaskTest.operatorChain(StreamTaskTest.streamOperatorWithSnapshot(operatorSnapshotResult)), (Thread.UncaughtExceptionHandler)uncaughtExceptionHandler));
        MockStreamTask streamTask = (MockStreamTask)((Object)task.streamTask);
        StreamTaskUtil.waitTaskIsRunning(streamTask, task.invocationFuture);
        streamTask.triggerCheckpoint(new CheckpointMetaData(42L, 1L), CheckpointOptions.forCheckpointWithDefaultLocation(), false);
        Throwable uncaughtException = uncaughtExceptionHandler.waitForUncaughtException();
        Assert.assertThat((Object)uncaughtException, (Matcher)org.hamcrest.Matchers.is((Object)failingCause));
        streamTask.finishInput();
        task.waitForTaskCompletion(false);
    }

    @Test
    public void testFailingAsyncCheckpointRunnable() throws Exception {
        OperatorSnapshotFutures operatorSnapshotResult1 = (OperatorSnapshotFutures)Mockito.mock(OperatorSnapshotFutures.class);
        OperatorSnapshotFutures operatorSnapshotResult2 = (OperatorSnapshotFutures)Mockito.mock(OperatorSnapshotFutures.class);
        OperatorSnapshotFutures operatorSnapshotResult3 = (OperatorSnapshotFutures)Mockito.mock(OperatorSnapshotFutures.class);
        RunnableFuture failingFuture = (RunnableFuture)Mockito.mock(RunnableFuture.class);
        Mockito.when(failingFuture.get()).thenThrow(new Throwable[]{new ExecutionException(new Exception("Test exception"))});
        Mockito.when((Object)operatorSnapshotResult3.getOperatorStateRawFuture()).thenReturn((Object)failingFuture);
        try (MockEnvironment mockEnvironment = new MockEnvironmentBuilder().build();){
            RunningTask task = StreamTaskTest.runTask(() -> StreamTaskTest.createMockStreamTask((Environment)mockEnvironment, StreamTaskTest.operatorChain(StreamTaskTest.streamOperatorWithSnapshot(operatorSnapshotResult1), StreamTaskTest.streamOperatorWithSnapshot(operatorSnapshotResult2), StreamTaskTest.streamOperatorWithSnapshot(operatorSnapshotResult3))));
            MockStreamTask streamTask = (MockStreamTask)((Object)task.streamTask);
            StreamTaskUtil.waitTaskIsRunning(streamTask, task.invocationFuture);
            mockEnvironment.setExpectedExternalFailureCause(Throwable.class);
            streamTask.triggerCheckpoint(new CheckpointMetaData(42L, 1L), CheckpointOptions.forCheckpointWithDefaultLocation(), false);
            ExecutorService executor = streamTask.getAsyncOperationsThreadPool();
            executor.shutdown();
            if (!executor.awaitTermination(10000L, TimeUnit.MILLISECONDS)) {
                Assert.fail((String)"Executor did not shut down within the given timeout. This indicates that the checkpointing did not resume.");
            }
            Assert.assertTrue((boolean)mockEnvironment.getActualExternalFailureCause().isPresent());
            ((OperatorSnapshotFutures)Mockito.verify((Object)operatorSnapshotResult1)).cancel();
            ((OperatorSnapshotFutures)Mockito.verify((Object)operatorSnapshotResult2)).cancel();
            ((OperatorSnapshotFutures)Mockito.verify((Object)operatorSnapshotResult3)).cancel();
            streamTask.finishInput();
            task.waitForTaskCompletion(false);
        }
    }

    @Test
    public void testAsyncCheckpointingConcurrentCloseAfterAcknowledge() throws Exception {
        final OneShotLatch acknowledgeCheckpointLatch = new OneShotLatch();
        final OneShotLatch completeAcknowledge = new OneShotLatch();
        CheckpointResponder checkpointResponder = (CheckpointResponder)Mockito.mock(CheckpointResponder.class);
        ((CheckpointResponder)Mockito.doAnswer((Answer)new Answer(){

            public Object answer(InvocationOnMock invocation) {
                acknowledgeCheckpointLatch.trigger();
                while (true) {
                    try {
                        completeAcknowledge.await();
                    }
                    catch (InterruptedException interruptedException) {
                        continue;
                    }
                    break;
                }
                return null;
            }
        }).when((Object)checkpointResponder)).acknowledgeCheckpoint((JobID)Matchers.any(JobID.class), (ExecutionAttemptID)Matchers.any(ExecutionAttemptID.class), Matchers.anyLong(), (CheckpointMetrics)Matchers.any(CheckpointMetrics.class), (TaskStateSnapshot)Matchers.any(TaskStateSnapshot.class));
        TaskStateManagerImpl taskStateManager = new TaskStateManagerImpl(new JobID(1L, 2L), new ExecutionAttemptID(1L, 2L), (TaskLocalStateStore)Mockito.mock(TaskLocalStateStoreImpl.class), null, checkpointResponder);
        KeyedStateHandle managedKeyedStateHandle = (KeyedStateHandle)Mockito.mock(KeyedStateHandle.class);
        KeyedStateHandle rawKeyedStateHandle = (KeyedStateHandle)Mockito.mock(KeyedStateHandle.class);
        OperatorStateHandle managedOperatorStateHandle = (OperatorStateHandle)Mockito.mock(OperatorStreamStateHandle.class);
        OperatorStateHandle rawOperatorStateHandle = (OperatorStateHandle)Mockito.mock(OperatorStreamStateHandle.class);
        OperatorSnapshotFutures operatorSnapshotResult = new OperatorSnapshotFutures((RunnableFuture)DoneFuture.of((Object)SnapshotResult.of((StateObject)managedKeyedStateHandle)), (RunnableFuture)DoneFuture.of((Object)SnapshotResult.of((StateObject)rawKeyedStateHandle)), (RunnableFuture)DoneFuture.of((Object)SnapshotResult.of((StateObject)managedOperatorStateHandle)), (RunnableFuture)DoneFuture.of((Object)SnapshotResult.of((StateObject)rawOperatorStateHandle)));
        try (MockEnvironment mockEnvironment = new MockEnvironmentBuilder().setTaskName("mock-task").setTaskStateManager((TaskStateManager)taskStateManager).build();){
            RunningTask task = StreamTaskTest.runTask(() -> StreamTaskTest.createMockStreamTask((Environment)mockEnvironment, StreamTaskTest.operatorChain(StreamTaskTest.streamOperatorWithSnapshot(operatorSnapshotResult))));
            MockStreamTask streamTask = (MockStreamTask)((Object)task.streamTask);
            StreamTaskUtil.waitTaskIsRunning(streamTask, task.invocationFuture);
            long checkpointId = 42L;
            streamTask.triggerCheckpoint(new CheckpointMetaData(42L, 1L), CheckpointOptions.forCheckpointWithDefaultLocation(), false);
            acknowledgeCheckpointLatch.await();
            ArgumentCaptor subtaskStateCaptor = ArgumentCaptor.forClass(TaskStateSnapshot.class);
            ((CheckpointResponder)Mockito.verify((Object)checkpointResponder)).acknowledgeCheckpoint((JobID)Matchers.any(JobID.class), (ExecutionAttemptID)Matchers.any(ExecutionAttemptID.class), Matchers.eq((long)42L), (CheckpointMetrics)Matchers.any(CheckpointMetrics.class), (TaskStateSnapshot)subtaskStateCaptor.capture());
            TaskStateSnapshot subtaskStates = (TaskStateSnapshot)subtaskStateCaptor.getValue();
            OperatorSubtaskState subtaskState = (OperatorSubtaskState)((Map.Entry)subtaskStates.getSubtaskStateMappings().iterator().next()).getValue();
            Assert.assertEquals((Object)StateObjectCollection.singleton((StateObject)managedKeyedStateHandle), (Object)subtaskState.getManagedKeyedState());
            Assert.assertEquals((Object)StateObjectCollection.singleton((StateObject)rawKeyedStateHandle), (Object)subtaskState.getRawKeyedState());
            Assert.assertEquals((Object)StateObjectCollection.singleton((StateObject)managedOperatorStateHandle), (Object)subtaskState.getManagedOperatorState());
            Assert.assertEquals((Object)StateObjectCollection.singleton((StateObject)rawOperatorStateHandle), (Object)subtaskState.getRawOperatorState());
            ((KeyedStateHandle)Mockito.verify((Object)managedKeyedStateHandle, (VerificationMode)Mockito.never())).discardState();
            ((KeyedStateHandle)Mockito.verify((Object)rawKeyedStateHandle, (VerificationMode)Mockito.never())).discardState();
            ((OperatorStateHandle)Mockito.verify((Object)managedOperatorStateHandle, (VerificationMode)Mockito.never())).discardState();
            ((OperatorStateHandle)Mockito.verify((Object)rawOperatorStateHandle, (VerificationMode)Mockito.never())).discardState();
            streamTask.cancel();
            completeAcknowledge.trigger();
            ((KeyedStateHandle)Mockito.verify((Object)managedKeyedStateHandle, (VerificationMode)Mockito.never())).discardState();
            ((KeyedStateHandle)Mockito.verify((Object)rawKeyedStateHandle, (VerificationMode)Mockito.never())).discardState();
            ((OperatorStateHandle)Mockito.verify((Object)managedOperatorStateHandle, (VerificationMode)Mockito.never())).discardState();
            ((OperatorStateHandle)Mockito.verify((Object)rawOperatorStateHandle, (VerificationMode)Mockito.never())).discardState();
            task.waitForTaskCompletion(true);
        }
    }

    @Test
    public void testAsyncCheckpointingConcurrentCloseBeforeAcknowledge() throws Exception {
        TestingKeyedStateHandle managedKeyedStateHandle = new TestingKeyedStateHandle();
        TestingKeyedStateHandle rawKeyedStateHandle = new TestingKeyedStateHandle();
        TestingOperatorStateHandle managedOperatorStateHandle = new TestingOperatorStateHandle();
        TestingOperatorStateHandle rawOperatorStateHandle = new TestingOperatorStateHandle();
        BlockingRunnableFuture rawKeyedStateHandleFuture = new BlockingRunnableFuture(2, SnapshotResult.of((StateObject)rawKeyedStateHandle));
        OperatorSnapshotFutures operatorSnapshotResult = new OperatorSnapshotFutures((RunnableFuture)DoneFuture.of((Object)SnapshotResult.of((StateObject)managedKeyedStateHandle)), rawKeyedStateHandleFuture, (RunnableFuture)DoneFuture.of((Object)SnapshotResult.of((StateObject)managedOperatorStateHandle)), (RunnableFuture)DoneFuture.of((Object)SnapshotResult.of((StateObject)rawOperatorStateHandle)));
        StreamOperator<?> streamOperator = StreamTaskTest.streamOperatorWithSnapshot(operatorSnapshotResult);
        AcknowledgeDummyEnvironment mockEnvironment = new AcknowledgeDummyEnvironment();
        RunningTask task = StreamTaskTest.runTask(() -> StreamTaskTest.createMockStreamTask((Environment)mockEnvironment, StreamTaskTest.operatorChain(streamOperator)));
        StreamTaskUtil.waitTaskIsRunning(task.streamTask, task.invocationFuture);
        long checkpointId = 42L;
        ((MockStreamTask)((Object)task.streamTask)).triggerCheckpoint(new CheckpointMetaData(42L, 1L), CheckpointOptions.forCheckpointWithDefaultLocation(), false);
        rawKeyedStateHandleFuture.awaitRun();
        ((MockStreamTask)((Object)task.streamTask)).cancel();
        FutureUtils.ConjunctFuture discardFuture = FutureUtils.waitForAll(Arrays.asList(managedKeyedStateHandle.getDiscardFuture(), rawKeyedStateHandle.getDiscardFuture(), managedOperatorStateHandle.getDiscardFuture(), rawOperatorStateHandle.getDiscardFuture()));
        discardFuture.get();
        try {
            mockEnvironment.getAcknowledgeCheckpointFuture().get(10L, TimeUnit.MILLISECONDS);
            Assert.fail((String)"The checkpoint should not get acknowledged.");
        }
        catch (TimeoutException timeoutException) {
            // empty catch block
        }
        task.waitForTaskCompletion(true);
    }

    @Test
    public void testEmptySubtaskStateLeadsToStatelessAcknowledgment() throws Exception {
        final OneShotLatch checkpointCompletedLatch = new OneShotLatch();
        final ArrayList checkpointResult = new ArrayList(1);
        CheckpointResponder checkpointResponder = (CheckpointResponder)Mockito.mock(CheckpointResponder.class);
        ((CheckpointResponder)Mockito.doAnswer((Answer)new Answer(){

            public Object answer(InvocationOnMock invocation) throws Throwable {
                SubtaskState subtaskState = (SubtaskState)invocation.getArgument(4);
                checkpointResult.add(subtaskState);
                checkpointCompletedLatch.trigger();
                return null;
            }
        }).when((Object)checkpointResponder)).acknowledgeCheckpoint((JobID)Matchers.any(JobID.class), (ExecutionAttemptID)Matchers.any(ExecutionAttemptID.class), Matchers.anyLong(), (CheckpointMetrics)Matchers.any(CheckpointMetrics.class), (TaskStateSnapshot)ArgumentMatchers.nullable(TaskStateSnapshot.class));
        TaskStateManagerImpl taskStateManager = new TaskStateManagerImpl(new JobID(1L, 2L), new ExecutionAttemptID(1L, 2L), (TaskLocalStateStore)Mockito.mock(TaskLocalStateStoreImpl.class), null, checkpointResponder);
        StreamOperator<?> statelessOperator = StreamTaskTest.streamOperatorWithSnapshot(new OperatorSnapshotFutures());
        try (MockEnvironment mockEnvironment = new MockEnvironmentBuilder().setTaskStateManager((TaskStateManager)taskStateManager).build();){
            RunningTask task = StreamTaskTest.runTask(() -> StreamTaskTest.createMockStreamTask((Environment)mockEnvironment, StreamTaskTest.operatorChain(statelessOperator)));
            StreamTaskUtil.waitTaskIsRunning(task.streamTask, task.invocationFuture);
            ((MockStreamTask)((Object)task.streamTask)).triggerCheckpoint(new CheckpointMetaData(42L, 1L), CheckpointOptions.forCheckpointWithDefaultLocation(), false);
            checkpointCompletedLatch.await(30L, TimeUnit.SECONDS);
            Assert.assertNull(checkpointResult.get(0));
            ((MockStreamTask)((Object)task.streamTask)).cancel();
            task.waitForTaskCompletion(true);
        }
    }

    @Test
    public void testOperatorClosingBeforeStopRunning() throws Throwable {
        BlockingCloseStreamOperator.resetLatches();
        Configuration taskConfiguration = new Configuration();
        StreamConfig streamConfig = new StreamConfig(taskConfiguration);
        streamConfig.setStreamOperator((StreamOperator)new BlockingCloseStreamOperator());
        streamConfig.setOperatorID(new OperatorID());
        try (MockEnvironment mockEnvironment = new MockEnvironmentBuilder().setTaskName("Test Task").setMemorySize(32768L).setInputSplitProvider(new MockInputSplitProvider()).setBufferSize(1).setTaskConfiguration(taskConfiguration).build();){
            RunningTask task = StreamTaskTest.runTask(() -> new NoOpStreamTask((Environment)mockEnvironment));
            BlockingCloseStreamOperator.inClose.await();
            Assert.assertTrue((boolean)task.streamTask.isRunning());
            BlockingCloseStreamOperator.finishClose.trigger();
            task.waitForTaskCompletion(false);
            Assert.assertFalse((boolean)task.streamTask.isRunning());
        }
    }

    @Test
    public void testSetsUserCodeClassLoaderForTimerThreadFactory() throws Throwable {
        syncLatch = new OneShotLatch();
        try (MockEnvironment mockEnvironment = new MockEnvironmentBuilder().setUserCodeClassLoader((ClassLoader)new TestUserCodeClassLoader()).build();){
            RunningTask task = StreamTaskTest.runTask(() -> new TimeServiceTask((Environment)mockEnvironment));
            task.waitForTaskCompletion(false);
            Assert.assertThat(((TimeServiceTask)((Object)task.streamTask)).getClassLoaders(), (Matcher)org.hamcrest.Matchers.hasSize((Matcher)org.hamcrest.Matchers.greaterThanOrEqualTo((Comparable)Integer.valueOf(1))));
            Assert.assertThat(((TimeServiceTask)((Object)task.streamTask)).getClassLoaders(), (Matcher)org.hamcrest.Matchers.everyItem((Matcher)org.hamcrest.Matchers.instanceOf(TestUserCodeClassLoader.class)));
        }
    }

    private static StreamOperator<?> streamOperatorWithSnapshot(OperatorSnapshotFutures operatorSnapshotResult) throws Exception {
        StreamOperator operator = (StreamOperator)Mockito.mock(StreamOperator.class);
        Mockito.when((Object)operator.getOperatorID()).thenReturn((Object)new OperatorID());
        Mockito.when((Object)operator.snapshotState(Matchers.anyLong(), Matchers.anyLong(), (CheckpointOptions)Matchers.any(CheckpointOptions.class), (CheckpointStreamFactory)Matchers.any(CheckpointStreamFactory.class))).thenReturn((Object)operatorSnapshotResult);
        return operator;
    }

    private static StreamOperator<?> streamOperatorWithSnapshotException(Exception exception) throws Exception {
        StreamOperator operator = (StreamOperator)Mockito.mock(StreamOperator.class);
        Mockito.when((Object)operator.getOperatorID()).thenReturn((Object)new OperatorID());
        Mockito.when((Object)operator.snapshotState(Matchers.anyLong(), Matchers.anyLong(), (CheckpointOptions)Matchers.any(CheckpointOptions.class), (CheckpointStreamFactory)Matchers.any(CheckpointStreamFactory.class))).thenThrow(new Throwable[]{exception});
        return operator;
    }

    private static <T> OperatorChain<T, AbstractStreamOperator<T>> operatorChain(StreamOperator<?> ... streamOperators) {
        OperatorChain operatorChain = (OperatorChain)Mockito.mock(OperatorChain.class);
        Mockito.when((Object)operatorChain.getAllOperators()).thenReturn(streamOperators);
        return operatorChain;
    }

    private static <T extends StreamTask<?, ?>> RunningTask<T> runTask(SupplierWithException<T, Exception> taskFactory) throws Exception {
        CompletableFuture taskCreationFuture = new CompletableFuture();
        CompletableFuture<Void> invocationFuture = CompletableFuture.runAsync(() -> {
            StreamTask task;
            try {
                task = (StreamTask)taskFactory.get();
                taskCreationFuture.complete(task);
            }
            catch (Exception e) {
                taskCreationFuture.completeExceptionally(e);
                return;
            }
            try {
                task.invoke();
            }
            catch (RuntimeException e) {
                throw e;
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }, Executors.newSingleThreadExecutor());
        return new RunningTask<StreamTask>((StreamTask)taskCreationFuture.get(), invocationFuture);
    }

    public static Task createTask(Class<? extends AbstractInvokable> invokable, StreamConfig taskConfig, Configuration taskManagerConfig) throws Exception {
        return StreamTaskTest.createTask(invokable, taskConfig, taskManagerConfig, new TestTaskStateManager(), (TaskManagerActions)Mockito.mock(TaskManagerActions.class));
    }

    public static Task createTask(Class<? extends AbstractInvokable> invokable, StreamConfig taskConfig, Configuration taskManagerConfig, TaskManagerActions taskManagerActions) throws Exception {
        return StreamTaskTest.createTask(invokable, taskConfig, taskManagerConfig, new TestTaskStateManager(), taskManagerActions);
    }

    public static Task createTask(Class<? extends AbstractInvokable> invokable, StreamConfig taskConfig, Configuration taskManagerConfig, TestTaskStateManager taskStateManager, TaskManagerActions taskManagerActions) throws Exception {
        BlobCacheService blobService = new BlobCacheService((PermanentBlobCache)Mockito.mock(PermanentBlobCache.class), (TransientBlobCache)Mockito.mock(TransientBlobCache.class));
        LibraryCacheManager libCache = (LibraryCacheManager)Mockito.mock(LibraryCacheManager.class);
        Mockito.when((Object)libCache.getClassLoader((JobID)Matchers.any(JobID.class))).thenReturn((Object)StreamTaskTest.class.getClassLoader());
        NoOpResultPartitionConsumableNotifier consumableNotifier = new NoOpResultPartitionConsumableNotifier();
        PartitionProducerStateChecker partitionProducerStateChecker = (PartitionProducerStateChecker)Mockito.mock(PartitionProducerStateChecker.class);
        Executor executor = (Executor)Mockito.mock(Executor.class);
        NettyShuffleEnvironment shuffleEnvironment = new NettyShuffleEnvironmentBuilder().build();
        JobInformation jobInformation = new JobInformation(new JobID(), "Job Name", new SerializedValue((Object)new ExecutionConfig()), new Configuration(), Collections.emptyList(), Collections.emptyList());
        TaskInformation taskInformation = new TaskInformation(new JobVertexID(), "Test Task", 1, 1, invokable.getName(), taskConfig.getConfiguration());
        return new Task(jobInformation, taskInformation, new ExecutionAttemptID(), new AllocationID(), 0, 0, Collections.emptyList(), Collections.emptyList(), 0, (MemoryManager)Mockito.mock(MemoryManager.class), (IOManager)Mockito.mock(IOManager.class), (ShuffleEnvironment)shuffleEnvironment, new KvStateService(new KvStateRegistry(), null, null), (BroadcastVariableManager)Mockito.mock(BroadcastVariableManager.class), new TaskEventDispatcher(), (TaskStateManager)taskStateManager, taskManagerActions, (InputSplitProvider)Mockito.mock(InputSplitProvider.class), (CheckpointResponder)Mockito.mock(CheckpointResponder.class), (GlobalAggregateManager)new TestGlobalAggregateManager(), blobService, libCache, (FileCache)Mockito.mock(FileCache.class), (TaskManagerRuntimeInfo)new TestingTaskManagerRuntimeInfo(taskManagerConfig, new String[]{System.getProperty("java.io.tmpdir")}), UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(), (ResultPartitionConsumableNotifier)consumableNotifier, partitionProducerStateChecker, executor);
    }

    private static MockStreamTask createMockStreamTask(Environment env, OperatorChain<String, AbstractStreamOperator<String>> operatorChain) {
        return new MockStreamTask(env, operatorChain, (Thread.UncaughtExceptionHandler)FatalExitExceptionHandler.INSTANCE);
    }

    private static class FailingDummyEnvironment
    extends DummyEnvironment {
        final RuntimeException failingCause;

        private FailingDummyEnvironment(RuntimeException failingCause) {
            this.failingCause = failingCause;
        }

        public void declineCheckpoint(long checkpointId, Throwable cause) {
            throw this.failingCause;
        }

        public void failExternally(Throwable cause) {
            throw this.failingCause;
        }
    }

    private static final class BlockingRunnableFuture<V>
    implements RunnableFuture<V> {
        private final CompletableFuture<V> future = new CompletableFuture();
        private final OneShotLatch signalRunLatch = new OneShotLatch();
        private final CountDownLatch continueRunLatch;
        private final V value;

        private BlockingRunnableFuture(int parties, V value) {
            this.continueRunLatch = new CountDownLatch(parties);
            this.value = value;
        }

        @Override
        public void run() {
            this.signalRunLatch.trigger();
            this.continueRunLatch.countDown();
            try {
                this.continueRunLatch.await();
            }
            catch (InterruptedException e) {
                ExceptionUtils.rethrow((Throwable)e);
            }
            this.future.complete(this.value);
        }

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            return false;
        }

        @Override
        public boolean isCancelled() {
            return this.future.isCancelled();
        }

        @Override
        public boolean isDone() {
            return this.future.isDone();
        }

        @Override
        public V get() throws InterruptedException, ExecutionException {
            return this.future.get();
        }

        @Override
        public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
            return this.future.get(timeout, unit);
        }

        void awaitRun() throws InterruptedException {
            this.signalRunLatch.await();
        }
    }

    private static class AcknowledgeDummyEnvironment
    extends DummyEnvironment {
        private final CompletableFuture<Long> acknowledgeCheckpointFuture = new CompletableFuture();

        private AcknowledgeDummyEnvironment() {
        }

        public CompletableFuture<Long> getAcknowledgeCheckpointFuture() {
            return this.acknowledgeCheckpointFuture;
        }

        public void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics checkpointMetrics) {
            this.acknowledgeCheckpointFuture.complete(checkpointId);
        }

        public void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics checkpointMetrics, TaskStateSnapshot subtaskState) {
            this.acknowledgeCheckpointFuture.complete(checkpointId);
        }
    }

    private static class TestingOperatorStateHandle
    implements OperatorStateHandle {
        private static final long serialVersionUID = 923794934187614088L;
        private final transient CompletableFuture<Void> discardFuture = new CompletableFuture();

        private TestingOperatorStateHandle() {
        }

        public CompletableFuture<Void> getDiscardFuture() {
            return this.discardFuture;
        }

        public Map<String, OperatorStateHandle.StateMetaInfo> getStateNameToPartitionOffsets() {
            return Collections.emptyMap();
        }

        public FSDataInputStream openInputStream() throws IOException {
            throw new IOException("Cannot open input streams in testing implementation.");
        }

        public StreamStateHandle getDelegateStateHandle() {
            throw new UnsupportedOperationException("Not implemented.");
        }

        public void discardState() throws Exception {
            this.discardFuture.complete(null);
        }

        public long getStateSize() {
            return 0L;
        }
    }

    private static class TestingKeyedStateHandle
    implements KeyedStateHandle {
        private static final long serialVersionUID = -2473861305282291582L;
        private final transient CompletableFuture<Void> discardFuture = new CompletableFuture();

        private TestingKeyedStateHandle() {
        }

        public CompletableFuture<Void> getDiscardFuture() {
            return this.discardFuture;
        }

        public KeyGroupRange getKeyGroupRange() {
            return KeyGroupRange.EMPTY_KEY_GROUP_RANGE;
        }

        public TestingKeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
            return this;
        }

        public void registerSharedStates(SharedStateRegistry stateRegistry) {
        }

        public void discardState() {
            this.discardFuture.complete(null);
        }

        public long getStateSize() {
            return 0L;
        }
    }

    static class TestStreamSource<OUT, SRC extends SourceFunction<OUT>>
    extends StreamSource<OUT, SRC> {
        static AbstractKeyedStateBackend<?> keyedStateBackend;
        static OperatorStateBackend operatorStateBackend;
        static CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs;
        static CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs;

        public TestStreamSource(SRC sourceFunction) {
            super(sourceFunction);
        }

        public void initializeState(StateInitializationContext context) throws Exception {
            keyedStateBackend = (AbstractKeyedStateBackend)this.getKeyedStateBackend();
            operatorStateBackend = this.getOperatorStateBackend();
            rawOperatorStateInputs = (CloseableIterable)context.getRawOperatorStateInputs();
            rawKeyedStateInputs = (CloseableIterable)context.getRawKeyedStateInputs();
            super.initializeState(context);
        }
    }

    private static final class LockHolder
    extends Thread
    implements Closeable {
        private final OneShotLatch trigger;
        private final Object lock;
        private volatile boolean canceled;

        private LockHolder(Object lock, OneShotLatch trigger) {
            this.lock = lock;
            this.trigger = trigger;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            Object object = this.lock;
            synchronized (object) {
                while (!this.canceled) {
                    this.trigger.trigger();
                    try {
                        Thread.sleep(1000000000L);
                    }
                    catch (InterruptedException interruptedException) {}
                }
            }
        }

        public void cancel() {
            this.canceled = true;
        }

        @Override
        public void close() {
            this.canceled = true;
            this.interrupt();
        }
    }

    private static class TestUserCodeClassLoader
    extends ClassLoader {
        public TestUserCodeClassLoader() {
            super(ClassLoader.getSystemClassLoader());
        }
    }

    public static class TimeServiceTask
    extends NoOpStreamTask<String, AbstractStreamOperator<String>> {
        private final List<ClassLoader> classLoaders = Collections.synchronizedList(new ArrayList());

        public TimeServiceTask(Environment env) {
            super(env);
        }

        public List<ClassLoader> getClassLoaders() {
            return this.classLoaders;
        }

        @Override
        protected void init() throws Exception {
            this.getProcessingTimeService().registerTimer(0L, new ProcessingTimeCallback(){

                public void onProcessingTime(long timestamp) throws Exception {
                    classLoaders.add(Thread.currentThread().getContextClassLoader());
                    syncLatch.trigger();
                }
            });
        }

        @Override
        protected void processInput(StreamTask.ActionContext context) throws Exception {
            syncLatch.await();
            super.processInput(context);
        }
    }

    public static class CancelFailingTask
    extends StreamTask<String, AbstractStreamOperator<String>> {
        public CancelFailingTask(Environment env) {
            super(env);
        }

        protected void init() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected void processInput(StreamTask.ActionContext context) throws Exception {
            OneShotLatch latch = new OneShotLatch();
            Object lock = new Object();
            holder.start();
            try (LockHolder holder = new LockHolder(lock, latch);){
                this.getCancelables().registerCloseable((Closeable)holder);
                latch.await();
                syncLatch.trigger();
                Object object = lock;
                synchronized (object) {
                }
            }
            context.allActionsCompleted();
        }

        protected void cleanup() {
        }

        protected void cancelTask() throws Exception {
            throw new Exception("test exception");
        }
    }

    public static class CancelLockingTask
    extends StreamTask<String, AbstractStreamOperator<String>> {
        private final OneShotLatch latch = new OneShotLatch();
        private LockHolder holder;

        public CancelLockingTask(Environment env) {
            super(env);
        }

        protected void init() {
        }

        protected void processInput(StreamTask.ActionContext context) throws Exception {
            this.holder = new LockHolder(this.getCheckpointLock(), this.latch);
            this.holder.start();
            this.latch.await();
            syncLatch.trigger();
            try {
                Thread.sleep(100000000L);
            }
            catch (InterruptedException ignored) {
                Thread.currentThread().interrupt();
            }
            context.allActionsCompleted();
        }

        protected void cleanup() {
            this.holder.close();
        }

        protected void cancelTask() {
            this.holder.cancel();
        }
    }

    public static class StateBackendTestSource
    extends StreamTask<Long, StreamSource<Long, SourceFunction<Long>>> {
        private static volatile boolean fail;

        public StateBackendTestSource(Environment env) {
            super(env);
        }

        protected void init() throws Exception {
        }

        protected void processInput(StreamTask.ActionContext context) throws Exception {
            if (fail) {
                throw new RuntimeException();
            }
            context.allActionsCompleted();
        }

        protected void cleanup() throws Exception {
        }

        public StreamTaskStateInitializer createStreamTaskStateInitializer() {
            StreamTaskStateInitializer streamTaskStateManager = super.createStreamTaskStateInitializer();
            return (operatorID, operatorClassName, keyContext, keySerializer, closeableRegistry, metricGroup) -> {
                final StreamOperatorStateContext context = streamTaskStateManager.streamOperatorStateContext(operatorID, operatorClassName, keyContext, keySerializer, closeableRegistry, metricGroup);
                return new StreamOperatorStateContext(){

                    public boolean isRestored() {
                        return context.isRestored();
                    }

                    public OperatorStateBackend operatorStateBackend() {
                        return context.operatorStateBackend();
                    }

                    public AbstractKeyedStateBackend<?> keyedStateBackend() {
                        return context.keyedStateBackend();
                    }

                    public InternalTimeServiceManager<?> internalTimerServiceManager() {
                        InternalTimeServiceManager timeServiceManager = context.internalTimerServiceManager();
                        return timeServiceManager != null ? (InternalTimeServiceManager)Mockito.spy((Object)timeServiceManager) : null;
                    }

                    public CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs() {
                        return this.replaceWithSpy(context.rawOperatorStateInputs());
                    }

                    public CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs() {
                        return this.replaceWithSpy(context.rawKeyedStateInputs());
                    }

                    public <T extends Closeable> T replaceWithSpy(T closeable) {
                        Closeable spyCloseable = (Closeable)Mockito.spy(closeable);
                        if (closeableRegistry.unregisterCloseable(closeable)) {
                            try {
                                closeableRegistry.registerCloseable(spyCloseable);
                            }
                            catch (IOException e) {
                                throw new RuntimeException(e);
                            }
                        }
                        return (T)spyCloseable;
                    }
                };
            };
        }
    }

    private static class MockStreamTask
    extends StreamTask<String, AbstractStreamOperator<String>> {
        private final OperatorChain<String, AbstractStreamOperator<String>> overrideOperatorChain;
        private volatile boolean inputFinished;

        MockStreamTask(Environment env, OperatorChain<String, AbstractStreamOperator<String>> operatorChain, Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
            super(env, null, uncaughtExceptionHandler);
            this.overrideOperatorChain = operatorChain;
        }

        protected void init() {
            this.operatorChain = this.overrideOperatorChain;
            this.headOperator = this.operatorChain.getHeadOperator();
        }

        protected void processInput(StreamTask.ActionContext context) {
            if (this.isCanceled() || this.inputFinished) {
                context.allActionsCompleted();
            }
        }

        protected void cleanup() throws Exception {
        }

        void finishInput() {
            this.inputFinished = true;
        }
    }

    public static final class TestMemoryStateBackendFactory
    implements StateBackendFactory<AbstractStateBackend> {
        private static final long serialVersionUID = 1L;

        public AbstractStateBackend createFromConfig(Configuration config, ClassLoader classLoader) {
            return new TestSpyWrapperStateBackend(this.createInnerBackend(config));
        }

        protected AbstractStateBackend createInnerBackend(Configuration config) {
            return new MemoryStateBackend();
        }
    }

    private static class MockSourceFunction
    implements SourceFunction<Long> {
        private static final long serialVersionUID = 1L;

        private MockSourceFunction() {
        }

        public void run(SourceFunction.SourceContext<Long> ctx) {
        }

        public void cancel() {
        }
    }

    private static class SlowlyDeserializingOperator
    extends StreamSource<Long, SourceFunction<Long>> {
        private static final long serialVersionUID = 1L;
        private volatile boolean canceled = false;

        public SlowlyDeserializingOperator() {
            super((SourceFunction)new MockSourceFunction());
        }

        public void run(Object lockingObject, StreamStatusMaintainer streamStatusMaintainer, Output<StreamRecord<Long>> collector, OperatorChain<?, ?> operatorChain) throws Exception {
            while (!this.canceled) {
                try {
                    Thread.sleep(500L);
                }
                catch (InterruptedException interruptedException) {}
            }
        }

        public void cancel() {
            this.canceled = true;
        }

        private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
            in.defaultReadObject();
            long delay = 500L;
            long deadline = System.currentTimeMillis() + delay;
            do {
                try {
                    Thread.sleep(delay);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
            } while ((delay = deadline - System.currentTimeMillis()) > 0L);
        }
    }

    private static class BlockingCloseStreamOperator
    extends AbstractStreamOperator<Void> {
        private static final long serialVersionUID = -9042150529568008847L;
        private static volatile OneShotLatch inClose;
        private static volatile OneShotLatch finishClose;

        private BlockingCloseStreamOperator() {
        }

        public void close() throws Exception {
            this.checkLatches();
            inClose.trigger();
            finishClose.await();
            super.close();
        }

        private void checkLatches() {
            Preconditions.checkNotNull((Object)inClose);
            Preconditions.checkNotNull((Object)finishClose);
        }

        private static void resetLatches() {
            inClose = new OneShotLatch();
            finishClose = new OneShotLatch();
        }
    }

    public static class NoOpStreamTask<T, OP extends StreamOperator<T>>
    extends StreamTask<T, OP> {
        public NoOpStreamTask(Environment environment) {
            super(environment, null);
        }

        protected void init() throws Exception {
        }

        protected void processInput(StreamTask.ActionContext context) throws Exception {
            context.allActionsCompleted();
        }

        protected void cleanup() throws Exception {
        }
    }

    private static class RunningTask<T extends StreamTask<?, ?>> {
        final T streamTask;
        final CompletableFuture<Void> invocationFuture;

        RunningTask(T streamTask, CompletableFuture<Void> invocationFuture) {
            this.streamTask = streamTask;
            this.invocationFuture = invocationFuture;
        }

        void waitForTaskCompletion(boolean cancelled) throws Exception {
            try {
                this.invocationFuture.get();
            }
            catch (Exception e) {
                if (cancelled) {
                    Assert.assertThat((Object)e.getCause(), (Matcher)org.hamcrest.Matchers.is((Matcher)org.hamcrest.Matchers.instanceOf(CancelTaskException.class)));
                }
                throw e;
            }
            Assert.assertThat((Object)this.streamTask.isCanceled(), (Matcher)org.hamcrest.Matchers.is((Object)cancelled));
        }
    }
}

