/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.io.Closeable;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
import java.io.StreamCorruptedException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.TestingUncaughtExceptionHandler;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.api.writer.AvailabilityTestResultPartitionWriter;
import org.apache.flink.runtime.io.network.partition.MockResultPartitionWriter;
import org.apache.flink.runtime.io.network.partition.ResultPartitionTest;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.DoneFuture;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.OperatorStreamStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateBackendFactory;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StatePartitionStreamProvider;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TaskLocalStateStore;
import org.apache.flink.runtime.state.TaskLocalStateStoreImpl;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.TaskStateManagerImpl;
import org.apache.flink.runtime.state.TestTaskLocalStateStore;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.NoOpCheckpointResponder;
import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.runtime.taskmanager.TestTaskBuilder;
import org.apache.flink.runtime.util.FatalExitExceptionHandler;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.MailboxExecutor;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.StreamOperatorStateContext;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.runtime.io.MockIndexedInputGate;
import org.apache.flink.streaming.runtime.io.StreamInputProcessor;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.AsynchronousException;
import org.apache.flink.streaming.runtime.tasks.ExceptionallyDoneFuture;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
import org.apache.flink.streaming.runtime.tasks.OperatorChain;
import org.apache.flink.streaming.runtime.tasks.OperatorChainTest;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.SourceStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamConfigChainer;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarness;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarnessBuilder;
import org.apache.flink.streaming.runtime.tasks.TestSpyWrapperStateBackend;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
import org.apache.flink.streaming.util.MockStreamConfig;
import org.apache.flink.streaming.util.MockStreamTaskBuilder;
import org.apache.flink.streaming.util.StreamTaskUtil;
import org.apache.flink.util.CloseableIterable;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.RunnableWithException;
import org.apache.flink.util.function.SupplierWithException;
import org.hamcrest.Matcher;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;

public class StreamTaskTest
extends TestLogger {
    private static OneShotLatch syncLatch;
    @Rule
    public final Timeout timeoutPerTest = Timeout.seconds((long)30L);

    @Test
    public void testCleanUpExceptionSuppressing() throws Exception {
        block2: {
            OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness(OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
            testHarness.setupOutputForSingletonOperatorChain();
            StreamConfig streamConfig = testHarness.getStreamConfig();
            streamConfig.setStreamOperator((StreamOperator)new FailingTwiceOperator());
            streamConfig.setOperatorID(new OperatorID());
            testHarness.invoke();
            testHarness.waitForTaskRunning();
            testHarness.processElement(new StreamRecord((Object)"Doesn't matter", 0L));
            try {
                testHarness.waitForTaskCompletion();
            }
            catch (Exception ex) {
                if (ExceptionUtils.findThrowable((Throwable)ex, ExpectedTestException.class).isPresent()) break block2;
                throw ex;
            }
        }
    }

    @Test
    public void streamTaskAsyncExceptionHandler_handleException_forwardsMessageProperly() {
        MockEnvironment mockEnvironment = MockEnvironment.builder().build();
        RuntimeException expectedException = new RuntimeException("RUNTIME EXCEPTION");
        StreamTask.StreamTaskAsyncExceptionHandler asyncExceptionHandler = new StreamTask.StreamTaskAsyncExceptionHandler((Environment)mockEnvironment);
        mockEnvironment.setExpectedExternalFailureCause(AsynchronousException.class);
        String expectedErrorMessage = "EXPECTED_ERROR MESSAGE";
        asyncExceptionHandler.handleAsyncException("EXPECTED_ERROR MESSAGE", (Throwable)expectedException);
        Optional actualExternalFailureCause = mockEnvironment.getActualExternalFailureCause();
        Throwable actualException = (Throwable)actualExternalFailureCause.orElseThrow(() -> new AssertionError((Object)"Expected exceptional completion"));
        Assert.assertThat((Object)actualException, (Matcher)org.hamcrest.Matchers.instanceOf(AsynchronousException.class));
        Assert.assertThat((Object)actualException.getMessage(), (Matcher)org.hamcrest.Matchers.is((Object)"EXPECTED_ERROR MESSAGE"));
        Assert.assertThat((Object)actualException.getCause(), (Matcher)org.hamcrest.Matchers.is((Object)expectedException));
    }

    @Test
    public void testEarlyCanceling() throws Exception {
        StreamConfig cfg = new StreamConfig(new Configuration());
        cfg.setOperatorID(new OperatorID(4711L, 42L));
        cfg.setStreamOperator((StreamOperator)new SlowlyDeserializingOperator());
        cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        TaskManagerActions taskManagerActions = (TaskManagerActions)Mockito.spy((Object)new NoOpTaskManagerActions());
        try (NettyShuffleEnvironment shuffleEnvironment = new NettyShuffleEnvironmentBuilder().build();){
            Task task = new TestTaskBuilder((ShuffleEnvironment)shuffleEnvironment).setInvokable(SourceStreamTask.class).setTaskConfig(cfg.getConfiguration()).setTaskManagerActions(taskManagerActions).build();
            TaskExecutionState state = new TaskExecutionState(task.getJobID(), task.getExecutionId(), ExecutionState.RUNNING);
            task.startTaskThread();
            ((TaskManagerActions)Mockito.verify((Object)taskManagerActions, (VerificationMode)Mockito.timeout((long)2000L))).updateTaskExecutionState((TaskExecutionState)Matchers.eq((Object)state));
            task.cancelExecution();
            task.getExecutingThread().join();
            Assert.assertFalse((String)"Task did not cancel", (boolean)task.getExecutingThread().isAlive());
            Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)task.getExecutionState());
        }
    }

    @Test
    public void testStateBackendLoadingAndClosing() throws Exception {
        Configuration taskManagerConfig = new Configuration();
        taskManagerConfig.setString(CheckpointingOptions.STATE_BACKEND, TestMemoryStateBackendFactory.class.getName());
        StreamConfig cfg = new StreamConfig(new Configuration());
        cfg.setStateKeySerializer((TypeSerializer)Mockito.mock(TypeSerializer.class));
        cfg.setOperatorID(new OperatorID(4711L, 42L));
        TestStreamSource streamSource = new TestStreamSource(new MockSourceFunction());
        cfg.setStreamOperator(streamSource);
        cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        try (NettyShuffleEnvironment shuffleEnvironment = new NettyShuffleEnvironmentBuilder().build();){
            Task task = StreamTaskTest.createTask(StateBackendTestSource.class, (ShuffleEnvironment)shuffleEnvironment, cfg, taskManagerConfig);
            StateBackendTestSource.fail = false;
            task.startTaskThread();
            task.getExecutingThread().join();
            ((OperatorStateBackend)Mockito.verify((Object)TestStreamSource.operatorStateBackend)).close();
            ((AbstractKeyedStateBackend)Mockito.verify(TestStreamSource.keyedStateBackend)).close();
            ((CloseableIterable)Mockito.verify(TestStreamSource.rawOperatorStateInputs)).close();
            ((CloseableIterable)Mockito.verify(TestStreamSource.rawKeyedStateInputs)).close();
            ((OperatorStateBackend)Mockito.verify((Object)TestStreamSource.operatorStateBackend)).dispose();
            ((AbstractKeyedStateBackend)Mockito.verify(TestStreamSource.keyedStateBackend)).dispose();
            Assert.assertEquals((Object)ExecutionState.FINISHED, (Object)task.getExecutionState());
        }
    }

    @Test
    public void testStateBackendClosingOnFailure() throws Exception {
        Configuration taskManagerConfig = new Configuration();
        taskManagerConfig.setString(CheckpointingOptions.STATE_BACKEND, TestMemoryStateBackendFactory.class.getName());
        StreamConfig cfg = new StreamConfig(new Configuration());
        cfg.setStateKeySerializer((TypeSerializer)Mockito.mock(TypeSerializer.class));
        cfg.setOperatorID(new OperatorID(4711L, 42L));
        TestStreamSource streamSource = new TestStreamSource(new MockSourceFunction());
        cfg.setStreamOperator(streamSource);
        cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        try (NettyShuffleEnvironment shuffleEnvironment = new NettyShuffleEnvironmentBuilder().build();){
            Task task = StreamTaskTest.createTask(StateBackendTestSource.class, (ShuffleEnvironment)shuffleEnvironment, cfg, taskManagerConfig);
            StateBackendTestSource.fail = true;
            task.startTaskThread();
            task.getExecutingThread().join();
            ((OperatorStateBackend)Mockito.verify((Object)TestStreamSource.operatorStateBackend)).close();
            ((AbstractKeyedStateBackend)Mockito.verify(TestStreamSource.keyedStateBackend)).close();
            ((CloseableIterable)Mockito.verify(TestStreamSource.rawOperatorStateInputs)).close();
            ((CloseableIterable)Mockito.verify(TestStreamSource.rawKeyedStateInputs)).close();
            ((OperatorStateBackend)Mockito.verify((Object)TestStreamSource.operatorStateBackend)).dispose();
            ((AbstractKeyedStateBackend)Mockito.verify(TestStreamSource.keyedStateBackend)).dispose();
            Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
        }
    }

    @Test
    public void testCanceleablesCanceledOnCancelTaskError() throws Exception {
        syncLatch = new OneShotLatch();
        StreamConfig cfg = new StreamConfig(new Configuration());
        try (NettyShuffleEnvironment shuffleEnvironment = new NettyShuffleEnvironmentBuilder().build();){
            Task task = StreamTaskTest.createTask(CancelFailingTask.class, (ShuffleEnvironment)shuffleEnvironment, cfg, new Configuration());
            task.startTaskThread();
            syncLatch.await();
            task.cancelExecution();
            task.getExecutingThread().join();
            Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)task.getExecutionState());
        }
    }

    @Test
    public void testCancelTaskExceptionHandling() throws Exception {
        StreamConfig cfg = new StreamConfig(new Configuration());
        try (NettyShuffleEnvironment shuffleEnvironment = new NettyShuffleEnvironmentBuilder().build();){
            Task task = StreamTaskTest.createTask(CancelThrowingTask.class, (ShuffleEnvironment)shuffleEnvironment, cfg, new Configuration());
            task.startTaskThread();
            task.getExecutingThread().join();
            Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)task.getExecutionState());
        }
    }

    @Test
    public void testDecliningCheckpointStreamOperator() throws Exception {
        OperatorSnapshotFutures operatorSnapshotResult2;
        OperatorSnapshotFutures operatorSnapshotResult1;
        block2: {
            DeclineDummyEnvironment declineDummyEnvironment = new DeclineDummyEnvironment();
            operatorSnapshotResult1 = (OperatorSnapshotFutures)Mockito.mock(OperatorSnapshotFutures.class);
            operatorSnapshotResult2 = (OperatorSnapshotFutures)Mockito.mock(OperatorSnapshotFutures.class);
            ExpectedTestException testException = new ExpectedTestException();
            RunningTask task = StreamTaskTest.runTask(() -> StreamTaskTest.lambda$testDecliningCheckpointStreamOperator$1(declineDummyEnvironment, (Exception)testException, operatorSnapshotResult1, operatorSnapshotResult2));
            MockStreamTask streamTask = (MockStreamTask)((Object)task.streamTask);
            StreamTaskUtil.waitTaskIsRunning(streamTask, task.invocationFuture);
            streamTask.triggerCheckpointAsync(new CheckpointMetaData(42L, 1L), CheckpointOptions.forCheckpointWithDefaultLocation(), false);
            try {
                task.waitForTaskCompletion(false);
            }
            catch (Exception ex) {
                if (ExceptionUtils.findThrowable((Throwable)ex, ExpectedTestException.class).isPresent()) break block2;
                throw ex;
            }
        }
        ((OperatorSnapshotFutures)Mockito.verify((Object)operatorSnapshotResult1)).cancel();
        ((OperatorSnapshotFutures)Mockito.verify((Object)operatorSnapshotResult2)).cancel();
    }

    @Test
    public void testUncaughtExceptionInAsynchronousCheckpointingOperation() throws Exception {
        RuntimeException failingCause = new RuntimeException("Test exception");
        FailingDummyEnvironment failingDummyEnvironment = new FailingDummyEnvironment(failingCause);
        OperatorSnapshotFutures operatorSnapshotResult = new OperatorSnapshotFutures(ExceptionallyDoneFuture.of(failingCause), (RunnableFuture)DoneFuture.of((Object)SnapshotResult.empty()), (RunnableFuture)DoneFuture.of((Object)SnapshotResult.empty()), (RunnableFuture)DoneFuture.of((Object)SnapshotResult.empty()), (Future)DoneFuture.of((Object)SnapshotResult.empty()), (Future)DoneFuture.of((Object)SnapshotResult.empty()));
        TestingUncaughtExceptionHandler uncaughtExceptionHandler = new TestingUncaughtExceptionHandler();
        RunningTask task = StreamTaskTest.runTask(() -> new MockStreamTask((Environment)failingDummyEnvironment, StreamTaskTest.operatorChain(StreamTaskTest.streamOperatorWithSnapshot(operatorSnapshotResult)), (Thread.UncaughtExceptionHandler)uncaughtExceptionHandler));
        MockStreamTask streamTask = (MockStreamTask)((Object)task.streamTask);
        StreamTaskUtil.waitTaskIsRunning(streamTask, task.invocationFuture);
        streamTask.triggerCheckpointAsync(new CheckpointMetaData(42L, 1L), CheckpointOptions.forCheckpointWithDefaultLocation(), false);
        Throwable uncaughtException = uncaughtExceptionHandler.waitForUncaughtException();
        Assert.assertThat((Object)uncaughtException, (Matcher)org.hamcrest.Matchers.is((Object)failingCause));
        streamTask.finishInput();
        task.waitForTaskCompletion(false);
    }

    @Test
    public void testFailingAsyncCheckpointRunnable() throws Exception {
        OperatorSnapshotFutures operatorSnapshotResult1 = (OperatorSnapshotFutures)Mockito.mock(OperatorSnapshotFutures.class);
        OperatorSnapshotFutures operatorSnapshotResult2 = (OperatorSnapshotFutures)Mockito.mock(OperatorSnapshotFutures.class);
        OperatorSnapshotFutures operatorSnapshotResult3 = (OperatorSnapshotFutures)Mockito.mock(OperatorSnapshotFutures.class);
        RunnableFuture failingFuture = (RunnableFuture)Mockito.mock(RunnableFuture.class);
        Mockito.when(failingFuture.get()).thenThrow(new Throwable[]{new ExecutionException(new Exception("Test exception"))});
        Mockito.when((Object)operatorSnapshotResult3.getOperatorStateRawFuture()).thenReturn((Object)failingFuture);
        try (MockEnvironment mockEnvironment = new MockEnvironmentBuilder().build();){
            RunningTask task = StreamTaskTest.runTask(() -> StreamTaskTest.createMockStreamTask((Environment)mockEnvironment, StreamTaskTest.operatorChain(StreamTaskTest.streamOperatorWithSnapshot(operatorSnapshotResult1), StreamTaskTest.streamOperatorWithSnapshot(operatorSnapshotResult2), StreamTaskTest.streamOperatorWithSnapshot(operatorSnapshotResult3))));
            MockStreamTask streamTask = (MockStreamTask)((Object)task.streamTask);
            StreamTaskUtil.waitTaskIsRunning(streamTask, task.invocationFuture);
            mockEnvironment.setExpectedExternalFailureCause(Throwable.class);
            streamTask.triggerCheckpointAsync(new CheckpointMetaData(42L, 1L), CheckpointOptions.forCheckpointWithDefaultLocation(), false).get();
            ExecutorService executor = streamTask.getAsyncOperationsThreadPool();
            executor.shutdown();
            if (!executor.awaitTermination(10000L, TimeUnit.MILLISECONDS)) {
                Assert.fail((String)"Executor did not shut down within the given timeout. This indicates that the checkpointing did not resume.");
            }
            Assert.assertTrue((boolean)mockEnvironment.getActualExternalFailureCause().isPresent());
            ((OperatorSnapshotFutures)Mockito.verify((Object)operatorSnapshotResult1)).cancel();
            ((OperatorSnapshotFutures)Mockito.verify((Object)operatorSnapshotResult2)).cancel();
            ((OperatorSnapshotFutures)Mockito.verify((Object)operatorSnapshotResult3)).cancel();
            streamTask.finishInput();
            task.waitForTaskCompletion(false);
        }
    }

    @Test
    public void testAsyncCheckpointingConcurrentCloseAfterAcknowledge() throws Exception {
        final OneShotLatch acknowledgeCheckpointLatch = new OneShotLatch();
        final OneShotLatch completeAcknowledge = new OneShotLatch();
        CheckpointResponder checkpointResponder = (CheckpointResponder)Mockito.mock(CheckpointResponder.class);
        ((CheckpointResponder)Mockito.doAnswer((Answer)new Answer(){

            public Object answer(InvocationOnMock invocation) {
                acknowledgeCheckpointLatch.trigger();
                while (true) {
                    try {
                        completeAcknowledge.await();
                    }
                    catch (InterruptedException interruptedException) {
                        continue;
                    }
                    break;
                }
                return null;
            }
        }).when((Object)checkpointResponder)).acknowledgeCheckpoint((JobID)Matchers.any(JobID.class), (ExecutionAttemptID)Matchers.any(ExecutionAttemptID.class), Matchers.anyLong(), (CheckpointMetrics)Matchers.any(CheckpointMetrics.class), (TaskStateSnapshot)Matchers.any(TaskStateSnapshot.class));
        TaskStateManagerImpl taskStateManager = new TaskStateManagerImpl(new JobID(1L, 2L), new ExecutionAttemptID(1L, 2L), (TaskLocalStateStore)Mockito.mock(TaskLocalStateStoreImpl.class), null, checkpointResponder);
        KeyedStateHandle managedKeyedStateHandle = (KeyedStateHandle)Mockito.mock(KeyedStateHandle.class);
        KeyedStateHandle rawKeyedStateHandle = (KeyedStateHandle)Mockito.mock(KeyedStateHandle.class);
        OperatorStateHandle managedOperatorStateHandle = (OperatorStateHandle)Mockito.mock(OperatorStreamStateHandle.class);
        OperatorStateHandle rawOperatorStateHandle = (OperatorStateHandle)Mockito.mock(OperatorStreamStateHandle.class);
        OperatorSnapshotFutures operatorSnapshotResult = new OperatorSnapshotFutures((RunnableFuture)DoneFuture.of((Object)SnapshotResult.of((StateObject)managedKeyedStateHandle)), (RunnableFuture)DoneFuture.of((Object)SnapshotResult.of((StateObject)rawKeyedStateHandle)), (RunnableFuture)DoneFuture.of((Object)SnapshotResult.of((StateObject)managedOperatorStateHandle)), (RunnableFuture)DoneFuture.of((Object)SnapshotResult.of((StateObject)rawOperatorStateHandle)), (Future)DoneFuture.of((Object)SnapshotResult.empty()), (Future)DoneFuture.of((Object)SnapshotResult.empty()));
        try (MockEnvironment mockEnvironment = new MockEnvironmentBuilder().setTaskName("mock-task").setTaskStateManager((TaskStateManager)taskStateManager).build();){
            RunningTask task = StreamTaskTest.runTask(() -> StreamTaskTest.createMockStreamTask((Environment)mockEnvironment, StreamTaskTest.operatorChain(StreamTaskTest.streamOperatorWithSnapshot(operatorSnapshotResult))));
            MockStreamTask streamTask = (MockStreamTask)((Object)task.streamTask);
            StreamTaskUtil.waitTaskIsRunning(streamTask, task.invocationFuture);
            long checkpointId = 42L;
            streamTask.triggerCheckpointAsync(new CheckpointMetaData(42L, 1L), CheckpointOptions.forCheckpointWithDefaultLocation(), false);
            acknowledgeCheckpointLatch.await();
            ArgumentCaptor subtaskStateCaptor = ArgumentCaptor.forClass(TaskStateSnapshot.class);
            ((CheckpointResponder)Mockito.verify((Object)checkpointResponder)).acknowledgeCheckpoint((JobID)Matchers.any(JobID.class), (ExecutionAttemptID)Matchers.any(ExecutionAttemptID.class), Matchers.eq((long)42L), (CheckpointMetrics)Matchers.any(CheckpointMetrics.class), (TaskStateSnapshot)subtaskStateCaptor.capture());
            TaskStateSnapshot subtaskStates = (TaskStateSnapshot)subtaskStateCaptor.getValue();
            OperatorSubtaskState subtaskState = (OperatorSubtaskState)((Map.Entry)subtaskStates.getSubtaskStateMappings().iterator().next()).getValue();
            Assert.assertEquals((Object)StateObjectCollection.singleton((StateObject)managedKeyedStateHandle), (Object)subtaskState.getManagedKeyedState());
            Assert.assertEquals((Object)StateObjectCollection.singleton((StateObject)rawKeyedStateHandle), (Object)subtaskState.getRawKeyedState());
            Assert.assertEquals((Object)StateObjectCollection.singleton((StateObject)managedOperatorStateHandle), (Object)subtaskState.getManagedOperatorState());
            Assert.assertEquals((Object)StateObjectCollection.singleton((StateObject)rawOperatorStateHandle), (Object)subtaskState.getRawOperatorState());
            ((KeyedStateHandle)Mockito.verify((Object)managedKeyedStateHandle, (VerificationMode)Mockito.never())).discardState();
            ((KeyedStateHandle)Mockito.verify((Object)rawKeyedStateHandle, (VerificationMode)Mockito.never())).discardState();
            ((OperatorStateHandle)Mockito.verify((Object)managedOperatorStateHandle, (VerificationMode)Mockito.never())).discardState();
            ((OperatorStateHandle)Mockito.verify((Object)rawOperatorStateHandle, (VerificationMode)Mockito.never())).discardState();
            streamTask.cancel();
            completeAcknowledge.trigger();
            ((KeyedStateHandle)Mockito.verify((Object)managedKeyedStateHandle, (VerificationMode)Mockito.never())).discardState();
            ((KeyedStateHandle)Mockito.verify((Object)rawKeyedStateHandle, (VerificationMode)Mockito.never())).discardState();
            ((OperatorStateHandle)Mockito.verify((Object)managedOperatorStateHandle, (VerificationMode)Mockito.never())).discardState();
            ((OperatorStateHandle)Mockito.verify((Object)rawOperatorStateHandle, (VerificationMode)Mockito.never())).discardState();
            task.waitForTaskCompletion(true);
        }
    }

    @Test
    public void testAsyncCheckpointingConcurrentCloseBeforeAcknowledge() throws Exception {
        TestingKeyedStateHandle managedKeyedStateHandle = new TestingKeyedStateHandle();
        TestingKeyedStateHandle rawKeyedStateHandle = new TestingKeyedStateHandle();
        TestingOperatorStateHandle managedOperatorStateHandle = new TestingOperatorStateHandle();
        TestingOperatorStateHandle rawOperatorStateHandle = new TestingOperatorStateHandle();
        BlockingRunnableFuture rawKeyedStateHandleFuture = new BlockingRunnableFuture(2, SnapshotResult.of((StateObject)rawKeyedStateHandle));
        OperatorSnapshotFutures operatorSnapshotResult = new OperatorSnapshotFutures((RunnableFuture)DoneFuture.of((Object)SnapshotResult.of((StateObject)managedKeyedStateHandle)), rawKeyedStateHandleFuture, (RunnableFuture)DoneFuture.of((Object)SnapshotResult.of((StateObject)managedOperatorStateHandle)), (RunnableFuture)DoneFuture.of((Object)SnapshotResult.of((StateObject)rawOperatorStateHandle)), (Future)DoneFuture.of((Object)SnapshotResult.empty()), (Future)DoneFuture.of((Object)SnapshotResult.empty()));
        OneInputStreamOperator streamOperator = StreamTaskTest.streamOperatorWithSnapshot(operatorSnapshotResult);
        AcknowledgeDummyEnvironment mockEnvironment = new AcknowledgeDummyEnvironment();
        RunningTask task = StreamTaskTest.runTask(() -> StreamTaskTest.createMockStreamTask((Environment)mockEnvironment, StreamTaskTest.operatorChain(streamOperator)));
        StreamTaskUtil.waitTaskIsRunning(task.streamTask, task.invocationFuture);
        long checkpointId = 42L;
        ((MockStreamTask)((Object)task.streamTask)).triggerCheckpointAsync(new CheckpointMetaData(42L, 1L), CheckpointOptions.forCheckpointWithDefaultLocation(), false);
        rawKeyedStateHandleFuture.awaitRun();
        ((MockStreamTask)((Object)task.streamTask)).cancel();
        FutureUtils.ConjunctFuture discardFuture = FutureUtils.waitForAll(Arrays.asList(managedKeyedStateHandle.getDiscardFuture(), rawKeyedStateHandle.getDiscardFuture(), managedOperatorStateHandle.getDiscardFuture(), rawOperatorStateHandle.getDiscardFuture()));
        discardFuture.get();
        try {
            mockEnvironment.getAcknowledgeCheckpointFuture().get(10L, TimeUnit.MILLISECONDS);
            Assert.fail((String)"The checkpoint should not get acknowledged.");
        }
        catch (TimeoutException timeoutException) {
            // empty catch block
        }
        task.waitForTaskCompletion(true);
    }

    @Test
    public void testEmptySubtaskStateLeadsToStatelessAcknowledgment() throws Exception {
        final OneShotLatch checkpointCompletedLatch = new OneShotLatch();
        final ArrayList checkpointResult = new ArrayList(1);
        CheckpointResponder checkpointResponder = (CheckpointResponder)Mockito.mock(CheckpointResponder.class);
        ((CheckpointResponder)Mockito.doAnswer((Answer)new Answer(){

            public Object answer(InvocationOnMock invocation) throws Throwable {
                SubtaskState subtaskState = (SubtaskState)invocation.getArgument(4);
                checkpointResult.add(subtaskState);
                checkpointCompletedLatch.trigger();
                return null;
            }
        }).when((Object)checkpointResponder)).acknowledgeCheckpoint((JobID)Matchers.any(JobID.class), (ExecutionAttemptID)Matchers.any(ExecutionAttemptID.class), Matchers.anyLong(), (CheckpointMetrics)Matchers.any(CheckpointMetrics.class), (TaskStateSnapshot)ArgumentMatchers.nullable(TaskStateSnapshot.class));
        TaskStateManagerImpl taskStateManager = new TaskStateManagerImpl(new JobID(1L, 2L), new ExecutionAttemptID(1L, 2L), (TaskLocalStateStore)Mockito.mock(TaskLocalStateStoreImpl.class), null, checkpointResponder);
        OneInputStreamOperator statelessOperator = StreamTaskTest.streamOperatorWithSnapshot(new OperatorSnapshotFutures());
        try (MockEnvironment mockEnvironment = new MockEnvironmentBuilder().setTaskStateManager((TaskStateManager)taskStateManager).build();){
            RunningTask task = StreamTaskTest.runTask(() -> StreamTaskTest.createMockStreamTask((Environment)mockEnvironment, StreamTaskTest.operatorChain(statelessOperator)));
            StreamTaskUtil.waitTaskIsRunning(task.streamTask, task.invocationFuture);
            ((MockStreamTask)((Object)task.streamTask)).triggerCheckpointAsync(new CheckpointMetaData(42L, 1L), CheckpointOptions.forCheckpointWithDefaultLocation(), false);
            checkpointCompletedLatch.await(30L, TimeUnit.SECONDS);
            Assert.assertNull(checkpointResult.get(0));
            ((MockStreamTask)((Object)task.streamTask)).cancel();
            task.waitForTaskCompletion(true);
        }
    }

    @Test
    public void testOperatorClosingBeforeStopRunning() throws Throwable {
        BlockingCloseStreamOperator.resetLatches();
        Configuration taskConfiguration = new Configuration();
        StreamConfig streamConfig = new StreamConfig(taskConfiguration);
        streamConfig.setStreamOperator((StreamOperator)new BlockingCloseStreamOperator());
        streamConfig.setOperatorID(new OperatorID());
        try (MockEnvironment mockEnvironment = new MockEnvironmentBuilder().setTaskName("Test Task").setManagedMemorySize(32768L).setInputSplitProvider(new MockInputSplitProvider()).setBufferSize(1).setTaskConfiguration(taskConfiguration).build();){
            RunningTask task = StreamTaskTest.runTask(() -> new NoOpStreamTask((Environment)mockEnvironment));
            BlockingCloseStreamOperator.inClose.await();
            Assert.assertTrue((boolean)task.streamTask.isRunning());
            BlockingCloseStreamOperator.finishClose.trigger();
            task.waitForTaskCompletion(false);
            Assert.assertFalse((boolean)task.streamTask.isRunning());
        }
    }

    @Test
    public void testNotifyCheckpointOnClosedOperator() throws Throwable {
        ClosingOperator operator = new ClosingOperator();
        StreamTaskMailboxTestHarnessBuilder builder = new StreamTaskMailboxTestHarnessBuilder(OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.INT_TYPE_INFO);
        StreamTaskMailboxTestHarness harness = builder.setupOutputForSingletonOperatorChain((StreamOperator<?>)operator).build();
        harness.setAutoProcess(false);
        harness.processElement(new StreamRecord((Object)1));
        harness.streamTask.notifyCheckpointCompleteAsync(1L);
        harness.streamTask.runMailboxStep();
        Assert.assertEquals((long)1L, (long)ClosingOperator.notified.get());
        Assert.assertEquals((Object)false, (Object)ClosingOperator.closed.get());
        harness.streamTask.operatorChain.closeOperators(harness.streamTask.getActionExecutor());
        harness.streamTask.notifyCheckpointCompleteAsync(2L);
        harness.streamTask.runMailboxStep();
        Assert.assertEquals((long)1L, (long)ClosingOperator.notified.get());
        Assert.assertEquals((Object)true, (Object)ClosingOperator.closed.get());
    }

    @Test
    public void testFailToConfirmCheckpointCompleted() throws Exception {
        this.testFailToConfirmCheckpointMessage(streamTask -> streamTask.notifyCheckpointCompleteAsync(1L));
    }

    @Test
    public void testFailToConfirmCheckpointAborted() throws Exception {
        this.testFailToConfirmCheckpointMessage(streamTask -> streamTask.notifyCheckpointAbortAsync(1L));
    }

    private void testFailToConfirmCheckpointMessage(Consumer<StreamTask<?, ?>> consumer) throws Exception {
        StreamMap streamMap = new StreamMap(new FailOnNotifyCheckpointMapper());
        StreamTaskMailboxTestHarnessBuilder builder = new StreamTaskMailboxTestHarnessBuilder(OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.INT_TYPE_INFO);
        StreamTaskMailboxTestHarness harness = builder.setupOutputForSingletonOperatorChain((StreamOperator<?>)streamMap).build();
        try {
            consumer.accept(harness.streamTask);
            harness.streamTask.runMailboxStep();
            Assert.fail();
        }
        catch (ExpectedTestException expectedTestException) {
            // empty catch block
        }
    }

    @Test
    public void testCheckpointDeclinedOnClosedOperator() throws Throwable {
        ClosingOperator operator = new ClosingOperator();
        StreamTaskMailboxTestHarnessBuilder builder = new StreamTaskMailboxTestHarnessBuilder(OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.INT_TYPE_INFO);
        StreamTaskMailboxTestHarness harness = builder.setupOutputForSingletonOperatorChain((StreamOperator<?>)operator).build();
        harness.setAutoProcess(false);
        harness.processElement(new StreamRecord((Object)1));
        harness.streamTask.operatorChain.closeOperators(harness.streamTask.getActionExecutor());
        Assert.assertEquals((Object)true, (Object)ClosingOperator.closed.get());
        harness.streamTask.triggerCheckpointOnBarrier(new CheckpointMetaData(1L, 0L), CheckpointOptions.forCheckpointWithDefaultLocation(), new CheckpointMetrics());
        Assert.assertEquals((long)1L, (long)harness.getCheckpointResponder().getDeclineReports().size());
    }

    @Test
    public void testExecuteMailboxActionsAfterLeavingInputProcessorMailboxLoop() throws Exception {
        final OneShotLatch latch = new OneShotLatch();
        try (MockEnvironment mockEnvironment = new MockEnvironmentBuilder().build();){
            RunningTask task = StreamTaskTest.runTask(() -> new StreamTask<Object, StreamOperator<Object>>((Environment)mockEnvironment){

                protected void init() throws Exception {
                }

                protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
                    this.mailboxProcessor.getMailboxExecutor(0).execute(() -> ((OneShotLatch)latch).trigger(), "trigger");
                    controller.allActionsCompleted();
                }
            });
            latch.await();
            task.waitForTaskCompletion(false);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testBeforeInvokeWithoutChannelStates() throws Exception {
        int numWriters = 2;
        int numGates = 2;
        RecoveryResultPartition[] partitions = new RecoveryResultPartition[numWriters];
        for (int i = 0; i < numWriters; ++i) {
            partitions[i] = new RecoveryResultPartition();
        }
        RecoveryInputGate[] gates = new RecoveryInputGate[numGates];
        for (int i = 0; i < numGates; ++i) {
            gates[i] = new RecoveryInputGate(partitions);
        }
        MockEnvironment mockEnvironment = new MockEnvironmentBuilder().build();
        mockEnvironment.addOutputs(Arrays.asList(partitions));
        mockEnvironment.addInputs(Arrays.asList(gates));
        org.apache.flink.streaming.util.MockStreamTask task = new MockStreamTaskBuilder((Environment)mockEnvironment).build();
        try {
            this.verifyResults(gates, partitions, false, false);
            task.beforeInvoke();
            this.verifyResults(gates, partitions, false, true);
        }
        finally {
            task.cleanUpInvoke();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testBeforeInvokeWithChannelStates() throws Exception {
        int numWriters = 2;
        int numGates = 2;
        RecoveryResultPartition[] partitions = new RecoveryResultPartition[numWriters];
        for (int i = 0; i < numWriters; ++i) {
            partitions[i] = new RecoveryResultPartition();
        }
        RecoveryInputGate[] gates = new RecoveryInputGate[numGates];
        for (int i = 0; i < numGates; ++i) {
            gates[i] = new RecoveryInputGate(partitions);
        }
        ResultPartitionTest.FiniteChannelStateReader reader = new ResultPartitionTest.FiniteChannelStateReader(1, new int[]{0});
        TaskStateManagerImpl taskStateManager = new TaskStateManagerImpl(new JobID(), new ExecutionAttemptID(), (TaskLocalStateStore)new TestTaskLocalStateStore(), null, (CheckpointResponder)NoOpCheckpointResponder.INSTANCE, (ChannelStateReader)reader);
        MockEnvironment mockEnvironment = new MockEnvironmentBuilder().setTaskStateManager((TaskStateManager)taskStateManager).build();
        mockEnvironment.addOutputs(Arrays.asList(partitions));
        mockEnvironment.addInputs(Arrays.asList(gates));
        org.apache.flink.streaming.util.MockStreamTask task = new MockStreamTaskBuilder((Environment)mockEnvironment).build();
        try {
            this.verifyResults(gates, partitions, false, false);
            task.beforeInvoke();
            this.verifyResults(gates, partitions, true, false);
            task.mailboxProcessor.drain();
            for (RecoveryInputGate inputGate : gates) {
                Assert.assertTrue((boolean)inputGate.isPartitionRequested());
            }
        }
        finally {
            task.cleanUpInvoke();
        }
    }

    private void verifyResults(RecoveryInputGate[] gates, RecoveryResultPartition[] partitions, boolean recoveryExpected, boolean requestExpected) {
        for (RecoveryResultPartition recoveryResultPartition : partitions) {
            Assert.assertEquals((Object)recoveryExpected, (Object)recoveryResultPartition.isStateRecovered());
        }
        for (RecoveryInputGate recoveryInputGate : gates) {
            Assert.assertEquals((Object)recoveryExpected, (Object)recoveryInputGate.isStateRecovered());
            Assert.assertEquals((Object)requestExpected, (Object)recoveryInputGate.isPartitionRequested());
        }
    }

    @Test
    public void testThreadInvariants() throws Throwable {
        Configuration taskConfiguration = new Configuration();
        StreamConfig streamConfig = new StreamConfig(taskConfiguration);
        streamConfig.setStreamOperator((StreamOperator)new StreamMap((MapFunction & Serializable)value -> value));
        streamConfig.setOperatorID(new OperatorID());
        try (MockEnvironment mockEnvironment = new MockEnvironmentBuilder().setTaskConfiguration(taskConfiguration).build();){
            TestUserCodeClassLoader taskClassLoader = new TestUserCodeClassLoader();
            RunningTask runningTask = StreamTaskTest.runTask(() -> {
                Thread.currentThread().setContextClassLoader(taskClassLoader);
                return new ThreadInspectingTask((Environment)mockEnvironment);
            });
            runningTask.invocationFuture.get();
            Assert.assertThat((Object)((ThreadInspectingTask)((Object)runningTask.streamTask)).getTaskClassLoader(), (Matcher)org.hamcrest.Matchers.is((Matcher)org.hamcrest.Matchers.sameInstance((Object)taskClassLoader)));
        }
    }

    @Test
    public void testRecordWriterClosedOnStreamOperatorFactoryDeserializationError() throws Exception {
        Configuration taskConfiguration = new Configuration();
        StreamConfig streamConfig = new StreamConfig(taskConfiguration);
        streamConfig.setStreamOperatorFactory((StreamOperatorFactory)new UnusedOperatorFactory());
        StreamConfigChainer<StreamTaskTest> cfg = new StreamConfigChainer<StreamTaskTest>(new OperatorID(42L, 42L), streamConfig, this);
        cfg.chain(new OperatorID(44L, 44L), new UnusedOperatorFactory(), StringSerializer.INSTANCE, StringSerializer.INSTANCE, false);
        cfg.finish();
        taskConfiguration.setBytes("serializedUDF", new byte[42]);
        try (MockEnvironment mockEnvironment = new MockEnvironmentBuilder().setTaskConfiguration(taskConfiguration).build();){
            mockEnvironment.addOutput(new ArrayList());
            NoOpStreamTask streamTask = new NoOpStreamTask((Environment)mockEnvironment);
            try {
                streamTask.invoke();
                Assert.fail((String)"Should have failed with an exception!");
            }
            catch (Exception ex) {
                if (!ExceptionUtils.findThrowable((Throwable)ex, StreamCorruptedException.class).isPresent()) {
                    throw ex;
                }
            }
        }
        Assert.assertTrue((String)"OutputFlusher thread is still running", (boolean)Thread.getAllStackTraces().keySet().stream().noneMatch(thread -> thread.getName().startsWith("OutputFlusher")));
    }

    @Test
    public void testProcessWithAvailableOutput() throws Exception {
        try (MockEnvironment environment = this.setupEnvironment(new boolean[]{true, true});){
            int numberOfProcessCalls = 10;
            AvailabilityTestInputProcessor inputProcessor = new AvailabilityTestInputProcessor(10);
            org.apache.flink.streaming.util.MockStreamTask task = new MockStreamTaskBuilder((Environment)environment).setStreamInputProcessor(inputProcessor).build();
            task.invoke();
            Assert.assertEquals((long)10L, (long)inputProcessor.currentNumProcessCalls);
        }
    }

    @Test
    public void testProcessWithUnAvailableOutput() throws Exception {
        try (MockEnvironment environment = this.setupEnvironment(new boolean[]{true, false});){
            int numberOfProcessCalls = 10;
            AvailabilityTestInputProcessor inputProcessor = new AvailabilityTestInputProcessor(10);
            org.apache.flink.streaming.util.MockStreamTask task = new MockStreamTaskBuilder((Environment)environment).setStreamInputProcessor(inputProcessor).build();
            MailboxExecutor executor = task.mailboxProcessor.getMainMailboxExecutor();
            RunnableWithException completeFutureTask = () -> {
                Assert.assertEquals((long)1L, (long)inputProcessor.currentNumProcessCalls);
                Assert.assertTrue((boolean)task.mailboxProcessor.isDefaultActionUnavailable());
                environment.getWriter(1).getAvailableFuture().complete(null);
            };
            executor.submit(() -> executor.submit(completeFutureTask, "This task will complete the future to resume process input action."), "This task will submit another task to execute after processing input once.");
            task.invoke();
            Assert.assertEquals((long)10L, (long)inputProcessor.currentNumProcessCalls);
        }
    }

    private MockEnvironment setupEnvironment(boolean[] outputAvailabilities) {
        Configuration configuration = new Configuration();
        new MockStreamConfig(configuration, outputAvailabilities.length);
        ArrayList<AvailabilityTestResultPartitionWriter> writers = new ArrayList<AvailabilityTestResultPartitionWriter>(outputAvailabilities.length);
        for (int i = 0; i < outputAvailabilities.length; ++i) {
            writers.add(new AvailabilityTestResultPartitionWriter(outputAvailabilities[i]));
        }
        MockEnvironment environment = new MockEnvironmentBuilder().setTaskConfiguration(configuration).build();
        environment.addOutputs(writers);
        return environment;
    }

    private static <T> OneInputStreamOperator<T, T> streamOperatorWithSnapshot(OperatorSnapshotFutures operatorSnapshotResult) throws Exception {
        OneInputStreamOperator operator = (OneInputStreamOperator)Mockito.mock(OneInputStreamOperator.class);
        Mockito.when((Object)operator.getOperatorID()).thenReturn((Object)new OperatorID());
        Mockito.when((Object)operator.snapshotState(Matchers.anyLong(), Matchers.anyLong(), (CheckpointOptions)Matchers.any(CheckpointOptions.class), (CheckpointStreamFactory)Matchers.any(CheckpointStreamFactory.class))).thenReturn((Object)operatorSnapshotResult);
        return operator;
    }

    private static <T> OneInputStreamOperator<T, T> streamOperatorWithSnapshotException(Exception exception) throws Exception {
        OneInputStreamOperator operator = (OneInputStreamOperator)Mockito.mock(OneInputStreamOperator.class);
        Mockito.when((Object)operator.getOperatorID()).thenReturn((Object)new OperatorID());
        Mockito.when((Object)operator.snapshotState(Matchers.anyLong(), Matchers.anyLong(), (CheckpointOptions)Matchers.any(CheckpointOptions.class), (CheckpointStreamFactory)Matchers.any(CheckpointStreamFactory.class))).thenThrow(new Throwable[]{exception});
        return operator;
    }

    private static <T> OperatorChain<T, AbstractStreamOperator<T>> operatorChain(OneInputStreamOperator<T, T> ... streamOperators) throws Exception {
        return OperatorChainTest.setupOperatorChain(streamOperators);
    }

    private static <T extends StreamTask<?, ?>> RunningTask<T> runTask(SupplierWithException<T, Exception> taskFactory) throws Exception {
        CompletableFuture taskCreationFuture = new CompletableFuture();
        CompletableFuture<Void> invocationFuture = CompletableFuture.runAsync(() -> {
            StreamTask task;
            try {
                task = (StreamTask)taskFactory.get();
                taskCreationFuture.complete(task);
            }
            catch (Exception e) {
                taskCreationFuture.completeExceptionally(e);
                return;
            }
            try {
                task.invoke();
            }
            catch (RuntimeException e) {
                throw e;
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }, Executors.newSingleThreadExecutor());
        return new RunningTask<StreamTask>((StreamTask)taskCreationFuture.get(), invocationFuture);
    }

    public static Task createTask(Class<? extends AbstractInvokable> invokable, ShuffleEnvironment shuffleEnvironment, StreamConfig taskConfig, Configuration taskManagerConfig) throws Exception {
        return new TestTaskBuilder(shuffleEnvironment).setTaskManagerConfig(taskManagerConfig).setInvokable(invokable).setTaskConfig(taskConfig.getConfiguration()).build();
    }

    private static MockStreamTask createMockStreamTask(Environment env, OperatorChain<String, AbstractStreamOperator<String>> operatorChain) throws Exception {
        return new MockStreamTask(env, operatorChain, (Thread.UncaughtExceptionHandler)FatalExitExceptionHandler.INSTANCE);
    }

    private static /* synthetic */ MockStreamTask lambda$testDecliningCheckpointStreamOperator$1(DeclineDummyEnvironment declineDummyEnvironment, Exception testException, OperatorSnapshotFutures operatorSnapshotResult1, OperatorSnapshotFutures operatorSnapshotResult2) throws Exception {
        return StreamTaskTest.createMockStreamTask((Environment)declineDummyEnvironment, StreamTaskTest.operatorChain(StreamTaskTest.streamOperatorWithSnapshotException(testException), StreamTaskTest.streamOperatorWithSnapshot(operatorSnapshotResult1), StreamTaskTest.streamOperatorWithSnapshot(operatorSnapshotResult2)));
    }

    private static class FailOnNotifyCheckpointMapper<T>
    implements MapFunction<T, T>,
    CheckpointListener {
        private static final long serialVersionUID = 1L;

        private FailOnNotifyCheckpointMapper() {
        }

        public T map(T value) throws Exception {
            return value;
        }

        public void notifyCheckpointAborted(long checkpointId) {
            throw new ExpectedTestException();
        }

        public void notifyCheckpointComplete(long checkpointId) {
            throw new ExpectedTestException();
        }
    }

    private static class ClosingOperator<T>
    extends AbstractStreamOperator<T>
    implements OneInputStreamOperator<T, T> {
        static AtomicBoolean closed = new AtomicBoolean();
        static AtomicInteger notified = new AtomicInteger();

        private ClosingOperator() {
        }

        public void open() throws Exception {
            super.open();
            closed.set(false);
            notified.set(0);
        }

        public void close() throws Exception {
            super.close();
            closed.set(true);
        }

        public void notifyCheckpointComplete(long checkpointId) throws Exception {
            super.notifyCheckpointComplete(checkpointId);
            notified.incrementAndGet();
        }

        public void processElement(StreamRecord<T> element) throws Exception {
        }
    }

    private static class RecoveryInputGate
    extends MockIndexedInputGate {
        private final RecoveryResultPartition[] partitions;
        private boolean isStateRecovered;
        private boolean isPartitionRequested;

        RecoveryInputGate(RecoveryResultPartition[] partitions) {
            this.partitions = (RecoveryResultPartition[])Preconditions.checkNotNull((Object)partitions);
        }

        @Override
        public CompletableFuture<?> readRecoveredState(ExecutorService executor, ChannelStateReader reader) {
            for (RecoveryResultPartition partition : this.partitions) {
                Preconditions.checkState((boolean)partition.isStateRecovered(), (Object)"The output state recovery should happen before input state recovery.");
                Preconditions.checkState((!this.isPartitionRequested ? 1 : 0) != 0, (Object)"The partition request should happen after completing all input gates recovery.");
            }
            this.isStateRecovered = true;
            return CompletableFuture.completedFuture(null);
        }

        @Override
        public void requestPartitions() {
            this.isPartitionRequested = true;
        }

        boolean isStateRecovered() {
            return this.isStateRecovered;
        }

        boolean isPartitionRequested() {
            return this.isPartitionRequested;
        }
    }

    private static class RecoveryResultPartition
    extends MockResultPartitionWriter {
        private boolean isStateRecovered;

        RecoveryResultPartition() {
        }

        public void readRecoveredState(ChannelStateReader stateReader) {
            this.isStateRecovered = true;
        }

        boolean isStateRecovered() {
            return this.isStateRecovered;
        }
    }

    private static class UnusedOperatorFactory
    extends AbstractStreamOperatorFactory<String> {
        private UnusedOperatorFactory() {
        }

        public <T extends StreamOperator<String>> T createStreamOperator(StreamOperatorParameters<String> parameters) {
            throw new UnsupportedOperationException("This shouldn't be called");
        }

        public void setChainingStrategy(ChainingStrategy strategy) {
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            throw new UnsupportedOperationException();
        }
    }

    static final class DeclineDummyEnvironment
    extends DummyEnvironment {
        private long lastDeclinedCheckpointId = Long.MIN_VALUE;
        private Throwable lastDeclinedCheckpointCause = null;

        DeclineDummyEnvironment() {
            super("test", 1, 0);
        }

        public void declineCheckpoint(long checkpointId, Throwable cause) {
            this.lastDeclinedCheckpointId = checkpointId;
            this.lastDeclinedCheckpointCause = cause;
        }

        long getLastDeclinedCheckpointId() {
            return this.lastDeclinedCheckpointId;
        }

        Throwable getLastDeclinedCheckpointCause() {
            return this.lastDeclinedCheckpointCause;
        }
    }

    private static class FailingDummyEnvironment
    extends DummyEnvironment {
        final RuntimeException failingCause;

        private FailingDummyEnvironment(RuntimeException failingCause) {
            this.failingCause = failingCause;
        }

        public void declineCheckpoint(long checkpointId, Throwable cause) {
            throw this.failingCause;
        }

        public void failExternally(Throwable cause) {
            throw this.failingCause;
        }
    }

    private static final class BlockingRunnableFuture<V>
    implements RunnableFuture<V> {
        private final CompletableFuture<V> future = new CompletableFuture();
        private final OneShotLatch signalRunLatch = new OneShotLatch();
        private final CountDownLatch continueRunLatch;
        private final V value;

        private BlockingRunnableFuture(int parties, V value) {
            this.continueRunLatch = new CountDownLatch(parties);
            this.value = value;
        }

        @Override
        public void run() {
            this.signalRunLatch.trigger();
            this.continueRunLatch.countDown();
            try {
                this.continueRunLatch.await();
            }
            catch (InterruptedException e) {
                ExceptionUtils.rethrow((Throwable)e);
            }
            this.future.complete(this.value);
        }

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            return false;
        }

        @Override
        public boolean isCancelled() {
            return this.future.isCancelled();
        }

        @Override
        public boolean isDone() {
            return this.future.isDone();
        }

        @Override
        public V get() throws InterruptedException, ExecutionException {
            return this.future.get();
        }

        @Override
        public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
            return this.future.get(timeout, unit);
        }

        void awaitRun() throws InterruptedException {
            this.signalRunLatch.await();
        }
    }

    private static class AcknowledgeDummyEnvironment
    extends DummyEnvironment {
        private final CompletableFuture<Long> acknowledgeCheckpointFuture = new CompletableFuture();

        private AcknowledgeDummyEnvironment() {
        }

        public CompletableFuture<Long> getAcknowledgeCheckpointFuture() {
            return this.acknowledgeCheckpointFuture;
        }

        public void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics checkpointMetrics) {
            this.acknowledgeCheckpointFuture.complete(checkpointId);
        }

        public void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics checkpointMetrics, TaskStateSnapshot subtaskState) {
            this.acknowledgeCheckpointFuture.complete(checkpointId);
        }
    }

    private static class TestingOperatorStateHandle
    implements OperatorStateHandle {
        private static final long serialVersionUID = 923794934187614088L;
        private final transient CompletableFuture<Void> discardFuture = new CompletableFuture();

        private TestingOperatorStateHandle() {
        }

        public CompletableFuture<Void> getDiscardFuture() {
            return this.discardFuture;
        }

        public Map<String, OperatorStateHandle.StateMetaInfo> getStateNameToPartitionOffsets() {
            return Collections.emptyMap();
        }

        public FSDataInputStream openInputStream() throws IOException {
            throw new IOException("Cannot open input streams in testing implementation.");
        }

        public Optional<byte[]> asBytesIfInMemory() {
            return Optional.empty();
        }

        public StreamStateHandle getDelegateStateHandle() {
            throw new UnsupportedOperationException("Not implemented.");
        }

        public void discardState() throws Exception {
            this.discardFuture.complete(null);
        }

        public long getStateSize() {
            return 0L;
        }
    }

    private static class TestingKeyedStateHandle
    implements KeyedStateHandle {
        private static final long serialVersionUID = -2473861305282291582L;
        private final transient CompletableFuture<Void> discardFuture = new CompletableFuture();

        private TestingKeyedStateHandle() {
        }

        public CompletableFuture<Void> getDiscardFuture() {
            return this.discardFuture;
        }

        public KeyGroupRange getKeyGroupRange() {
            return KeyGroupRange.EMPTY_KEY_GROUP_RANGE;
        }

        public TestingKeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
            return this;
        }

        public void registerSharedStates(SharedStateRegistry stateRegistry) {
        }

        public void discardState() {
            this.discardFuture.complete(null);
        }

        public long getStateSize() {
            return 0L;
        }
    }

    static class TestStreamSource<OUT, SRC extends SourceFunction<OUT>>
    extends StreamSource<OUT, SRC> {
        static AbstractKeyedStateBackend<?> keyedStateBackend;
        static OperatorStateBackend operatorStateBackend;
        static CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs;
        static CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs;

        public TestStreamSource(SRC sourceFunction) {
            super(sourceFunction);
        }

        public void initializeState(StateInitializationContext controller) throws Exception {
            keyedStateBackend = (AbstractKeyedStateBackend)this.getKeyedStateBackend();
            operatorStateBackend = this.getOperatorStateBackend();
            rawOperatorStateInputs = (CloseableIterable)controller.getRawOperatorStateInputs();
            rawKeyedStateInputs = (CloseableIterable)controller.getRawKeyedStateInputs();
            super.initializeState(controller);
        }
    }

    private static class TestUserCodeClassLoader
    extends ClassLoader {
        public TestUserCodeClassLoader() {
            super(ClassLoader.getSystemClassLoader());
        }
    }

    private static class ThreadInspectingTask
    extends StreamTask<String, AbstractStreamOperator<String>> {
        private final long taskThreadId;
        private final ClassLoader taskClassLoader;
        private transient boolean hasTimerTriggered;

        ThreadInspectingTask(Environment env) throws Exception {
            super(env);
            Thread currentThread = Thread.currentThread();
            this.taskThreadId = currentThread.getId();
            this.taskClassLoader = currentThread.getContextClassLoader();
        }

        @Nullable
        ClassLoader getTaskClassLoader() {
            return this.taskClassLoader;
        }

        protected void init() throws Exception {
            this.checkTaskThreadInfo();
            ((AbstractStreamOperator)this.getHeadOperator()).getProcessingTimeService().registerTimer(0L, new ProcessingTimeCallback(){

                public void onProcessingTime(long timestamp) throws Exception {
                    this.checkTaskThreadInfo();
                    hasTimerTriggered = true;
                }
            });
        }

        protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
            this.checkTaskThreadInfo();
            if (this.hasTimerTriggered) {
                controller.allActionsCompleted();
            }
        }

        protected void cleanup() throws Exception {
            this.checkTaskThreadInfo();
        }

        private void checkTaskThreadInfo() {
            Thread currentThread = Thread.currentThread();
            Preconditions.checkState((this.taskThreadId == currentThread.getId() ? 1 : 0) != 0, (Object)"Task's method was called in non task thread.");
            Preconditions.checkState((this.taskClassLoader == currentThread.getContextClassLoader() ? 1 : 0) != 0, (Object)"Task's controller class loader has been changed during invocation.");
        }
    }

    public static class StateBackendTestSource
    extends StreamTask<Long, StreamSource<Long, SourceFunction<Long>>> {
        private static volatile boolean fail;

        public StateBackendTestSource(Environment env) throws Exception {
            super(env);
        }

        protected void init() throws Exception {
        }

        protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
            if (fail) {
                throw new RuntimeException();
            }
            controller.allActionsCompleted();
        }

        protected void cleanup() throws Exception {
        }

        public StreamTaskStateInitializer createStreamTaskStateInitializer() {
            StreamTaskStateInitializer streamTaskStateManager = super.createStreamTaskStateInitializer();
            return (operatorID, operatorClassName, processingTimeService, keyContext, keySerializer, closeableRegistry, metricGroup, isUsingCustomRawKeyedState) -> {
                final StreamOperatorStateContext controller = streamTaskStateManager.streamOperatorStateContext(operatorID, operatorClassName, processingTimeService, keyContext, keySerializer, closeableRegistry, metricGroup, isUsingCustomRawKeyedState);
                return new StreamOperatorStateContext(){

                    public boolean isRestored() {
                        return controller.isRestored();
                    }

                    public OperatorStateBackend operatorStateBackend() {
                        return controller.operatorStateBackend();
                    }

                    public AbstractKeyedStateBackend<?> keyedStateBackend() {
                        return controller.keyedStateBackend();
                    }

                    public InternalTimeServiceManager<?> internalTimerServiceManager() {
                        InternalTimeServiceManager timeServiceManager = controller.internalTimerServiceManager();
                        return timeServiceManager != null ? (InternalTimeServiceManager)Mockito.spy((Object)timeServiceManager) : null;
                    }

                    public CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs() {
                        return this.replaceWithSpy(controller.rawOperatorStateInputs());
                    }

                    public CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs() {
                        return this.replaceWithSpy(controller.rawKeyedStateInputs());
                    }

                    public <T extends Closeable> T replaceWithSpy(T closeable) {
                        Closeable spyCloseable = (Closeable)Mockito.spy(closeable);
                        if (closeableRegistry.unregisterCloseable(closeable)) {
                            try {
                                closeableRegistry.registerCloseable(spyCloseable);
                            }
                            catch (IOException e) {
                                throw new RuntimeException(e);
                            }
                        }
                        return (T)spyCloseable;
                    }
                };
            };
        }
    }

    private static class EmptyInputProcessor
    implements StreamInputProcessor {
        private volatile boolean isFinished;

        public EmptyInputProcessor() {
            this(true);
        }

        public EmptyInputProcessor(boolean startFinished) {
            this.isFinished = startFinished;
        }

        public InputStatus processInput() throws Exception {
            return this.isFinished ? InputStatus.END_OF_INPUT : InputStatus.NOTHING_AVAILABLE;
        }

        public CompletableFuture<Void> prepareSnapshot(ChannelStateWriter channelStateWriter, long checkpointId) {
            return FutureUtils.completedVoidFuture();
        }

        public void close() throws IOException {
        }

        public CompletableFuture<?> getAvailableFuture() {
            return AVAILABLE;
        }

        public void finishInput() {
            this.isFinished = true;
        }
    }

    private static class MockStreamTask
    extends StreamTask<String, AbstractStreamOperator<String>> {
        private final OperatorChain<String, AbstractStreamOperator<String>> overrideOperatorChain;

        MockStreamTask(Environment env, OperatorChain<String, AbstractStreamOperator<String>> operatorChain, Thread.UncaughtExceptionHandler uncaughtExceptionHandler) throws Exception {
            super(env, null, uncaughtExceptionHandler);
            this.overrideOperatorChain = operatorChain;
        }

        protected void init() {
            this.operatorChain = this.overrideOperatorChain;
            this.headOperator = this.operatorChain.getHeadOperator();
            this.inputProcessor = new EmptyInputProcessor(false);
        }

        void finishInput() {
            Preconditions.checkState((this.inputProcessor != null ? 1 : 0) != 0, (Object)"Tried to finishInput before MockStreamTask was started");
            ((EmptyInputProcessor)this.inputProcessor).finishInput();
        }
    }

    public static final class TestMemoryStateBackendFactory
    implements StateBackendFactory<AbstractStateBackend> {
        private static final long serialVersionUID = 1L;

        public AbstractStateBackend createFromConfig(ReadableConfig config, ClassLoader classLoader) {
            return new TestSpyWrapperStateBackend(this.createInnerBackend(config));
        }

        protected AbstractStateBackend createInnerBackend(ReadableConfig config) {
            return new MemoryStateBackend();
        }
    }

    private static class MockSourceFunction
    implements SourceFunction<Long> {
        private static final long serialVersionUID = 1L;

        private MockSourceFunction() {
        }

        public void run(SourceFunction.SourceContext<Long> ctx) {
        }

        public void cancel() {
        }
    }

    private static class SlowlyDeserializingOperator
    extends StreamSource<Long, SourceFunction<Long>> {
        private static final long serialVersionUID = 1L;
        private volatile boolean canceled = false;

        public SlowlyDeserializingOperator() {
            super((SourceFunction)new MockSourceFunction());
        }

        public void run(Object lockingObject, StreamStatusMaintainer streamStatusMaintainer, Output<StreamRecord<Long>> collector, OperatorChain<?, ?> operatorChain) throws Exception {
            while (!this.canceled) {
                try {
                    Thread.sleep(500L);
                }
                catch (InterruptedException interruptedException) {}
            }
        }

        public void cancel() {
            this.canceled = true;
        }

        private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
            in.defaultReadObject();
            long delay = 500L;
            long deadline = System.currentTimeMillis() + delay;
            do {
                try {
                    Thread.sleep(delay);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
            } while ((delay = deadline - System.currentTimeMillis()) > 0L);
        }
    }

    private static class BlockingCloseStreamOperator
    extends AbstractStreamOperator<Void> {
        private static final long serialVersionUID = -9042150529568008847L;
        private static volatile OneShotLatch inClose;
        private static volatile OneShotLatch finishClose;

        private BlockingCloseStreamOperator() {
        }

        public void close() throws Exception {
            this.checkLatches();
            inClose.trigger();
            finishClose.await();
            super.close();
        }

        private void checkLatches() {
            Preconditions.checkNotNull((Object)inClose);
            Preconditions.checkNotNull((Object)finishClose);
        }

        private static void resetLatches() {
            inClose = new OneShotLatch();
            finishClose = new OneShotLatch();
        }
    }

    private static class AvailabilityTestInputProcessor
    implements StreamInputProcessor {
        private final int totalProcessCalls;
        private int currentNumProcessCalls;

        AvailabilityTestInputProcessor(int totalProcessCalls) {
            this.totalProcessCalls = totalProcessCalls;
        }

        public InputStatus processInput() {
            return ++this.currentNumProcessCalls < this.totalProcessCalls ? InputStatus.MORE_AVAILABLE : InputStatus.END_OF_INPUT;
        }

        public CompletableFuture<Void> prepareSnapshot(ChannelStateWriter channelStateWriter, long checkpointId) {
            return FutureUtils.completedVoidFuture();
        }

        public void close() throws IOException {
        }

        public CompletableFuture<?> getAvailableFuture() {
            return AVAILABLE;
        }
    }

    public static class NoOpStreamTask<T, OP extends StreamOperator<T>>
    extends StreamTask<T, OP> {
        public NoOpStreamTask(Environment environment) throws Exception {
            super(environment);
        }

        protected void init() throws Exception {
            this.inputProcessor = new EmptyInputProcessor();
        }

        protected void cleanup() throws Exception {
        }
    }

    private static class RunningTask<T extends StreamTask<?, ?>> {
        final T streamTask;
        final CompletableFuture<Void> invocationFuture;

        RunningTask(T streamTask, CompletableFuture<Void> invocationFuture) {
            this.streamTask = streamTask;
            this.invocationFuture = invocationFuture;
        }

        void waitForTaskCompletion(boolean cancelled) throws Exception {
            try {
                this.invocationFuture.get();
            }
            catch (Exception e) {
                if (cancelled) {
                    Assert.assertThat((Object)e.getCause(), (Matcher)org.hamcrest.Matchers.is((Matcher)org.hamcrest.Matchers.instanceOf(CancelTaskException.class)));
                }
                throw e;
            }
            Assert.assertThat((Object)this.streamTask.isCanceled(), (Matcher)org.hamcrest.Matchers.is((Object)cancelled));
        }
    }

    public static class CancelThrowingTask
    extends StreamTask<String, AbstractStreamOperator<String>> {
        public CancelThrowingTask(Environment env) throws Exception {
            super(env);
        }

        protected void init() {
        }

        protected void processInput(MailboxDefaultAction.Controller controller) {
            throw new CancelTaskException();
        }
    }

    public static class CancelFailingTask
    extends StreamTask<String, AbstractStreamOperator<String>> {
        public CancelFailingTask(Environment env) throws Exception {
            super(env);
        }

        protected void init() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
            OneShotLatch latch = new OneShotLatch();
            Object lock = new Object();
            holder.start();
            try (LockHolder holder = new LockHolder(lock, latch);){
                this.getCancelables().registerCloseable((Closeable)holder);
                latch.await();
                syncLatch.trigger();
                Object object = lock;
                synchronized (object) {
                }
            }
            controller.allActionsCompleted();
        }

        protected void cleanup() {
        }

        protected void cancelTask() throws Exception {
            throw new Exception("test exception");
        }

        private static final class LockHolder
        extends Thread
        implements Closeable {
            private final OneShotLatch trigger;
            private final Object lock;
            private volatile boolean canceled;

            private LockHolder(Object lock, OneShotLatch trigger) {
                this.lock = lock;
                this.trigger = trigger;
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                Object object = this.lock;
                synchronized (object) {
                    while (!this.canceled) {
                        this.trigger.trigger();
                        try {
                            Thread.sleep(1000000000L);
                        }
                        catch (InterruptedException interruptedException) {}
                    }
                }
            }

            public void cancel() {
                this.canceled = true;
            }

            @Override
            public void close() {
                this.canceled = true;
                this.interrupt();
            }
        }
    }

    private static class FailingTwiceOperator
    extends AbstractStreamOperator<String>
    implements OneInputStreamOperator<String, String> {
        private static final long serialVersionUID = 1L;

        private FailingTwiceOperator() {
        }

        public void processElement(StreamRecord<String> element) throws Exception {
            throw new ExpectedTestException();
        }

        public void dispose() throws Exception {
            Assert.fail((String)"This exception should be suppressed");
        }
    }
}

