/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators.async;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.util.MockStreamConfig;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

public class AsyncWaitOperatorTest
extends TestLogger {
    private static final long TIMEOUT = 1000L;

    @Test
    public void testEventTimeOrdered() throws Exception {
        this.testEventTime(AsyncDataStream.OutputMode.ORDERED);
    }

    @Test
    public void testWaterMarkUnordered() throws Exception {
        this.testEventTime(AsyncDataStream.OutputMode.UNORDERED);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testEventTime(AsyncDataStream.OutputMode mode) throws Exception {
        AsyncWaitOperator operator = new AsyncWaitOperator((AsyncFunction)new MyAsyncFunction(), 1000L, 2, mode);
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness(operator, IntSerializer.INSTANCE);
        long initialTime = 0L;
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        Object object = testHarness.getCheckpointLock();
        synchronized (object) {
            testHarness.processElement(new StreamRecord((Object)1, 1L));
            testHarness.processElement(new StreamRecord((Object)2, 2L));
            testHarness.processWatermark(new Watermark(2L));
            testHarness.processElement(new StreamRecord((Object)3, 3L));
        }
        object = testHarness.getCheckpointLock();
        synchronized (object) {
            testHarness.close();
        }
        expectedOutput.add(new StreamRecord((Object)2, 1L));
        expectedOutput.add(new StreamRecord((Object)4, 2L));
        expectedOutput.add(new Watermark(2L));
        expectedOutput.add(new StreamRecord((Object)6, 3L));
        if (AsyncDataStream.OutputMode.ORDERED == mode) {
            TestHarnessUtil.assertOutputEquals("Output with watermark was not correct.", expectedOutput, testHarness.getOutput());
        } else {
            Object[] jobOutputQueue = testHarness.getOutput().toArray();
            Assert.assertEquals((String)"Watermark should be at index 2", (Object)new Watermark(2L), (Object)jobOutputQueue[2]);
            Assert.assertEquals((String)"StreamRecord 3 should be at the end", (Object)new StreamRecord((Object)6, 3L), (Object)jobOutputQueue[3]);
            TestHarnessUtil.assertOutputEqualsSorted("Output for StreamRecords does not match", expectedOutput, testHarness.getOutput(), new StreamRecordComparator());
        }
    }

    @Test
    public void testProcessingTimeOrdered() throws Exception {
        this.testProcessingTime(AsyncDataStream.OutputMode.ORDERED);
    }

    @Test
    public void testProcessingUnordered() throws Exception {
        this.testProcessingTime(AsyncDataStream.OutputMode.UNORDERED);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testProcessingTime(AsyncDataStream.OutputMode mode) throws Exception {
        AsyncWaitOperator operator = new AsyncWaitOperator((AsyncFunction)new MyAsyncFunction(), 1000L, 6, mode);
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness(operator, IntSerializer.INSTANCE);
        long initialTime = 0L;
        ArrayDeque<Object> expectedOutput = new ArrayDeque<Object>();
        testHarness.open();
        Object object = testHarness.getCheckpointLock();
        synchronized (object) {
            testHarness.processElement(new StreamRecord((Object)1, 1L));
            testHarness.processElement(new StreamRecord((Object)2, 2L));
            testHarness.processElement(new StreamRecord((Object)3, 3L));
            testHarness.processElement(new StreamRecord((Object)4, 4L));
            testHarness.processElement(new StreamRecord((Object)5, 5L));
            testHarness.processElement(new StreamRecord((Object)6, 6L));
            testHarness.processElement(new StreamRecord((Object)7, 7L));
            testHarness.processElement(new StreamRecord((Object)8, 8L));
        }
        expectedOutput.add(new StreamRecord((Object)2, 1L));
        expectedOutput.add(new StreamRecord((Object)4, 2L));
        expectedOutput.add(new StreamRecord((Object)6, 3L));
        expectedOutput.add(new StreamRecord((Object)8, 4L));
        expectedOutput.add(new StreamRecord((Object)10, 5L));
        expectedOutput.add(new StreamRecord((Object)12, 6L));
        expectedOutput.add(new StreamRecord((Object)14, 7L));
        expectedOutput.add(new StreamRecord((Object)16, 8L));
        object = testHarness.getCheckpointLock();
        synchronized (object) {
            testHarness.close();
        }
        if (mode == AsyncDataStream.OutputMode.ORDERED) {
            TestHarnessUtil.assertOutputEquals("ORDERED Output was not correct.", expectedOutput, testHarness.getOutput());
        } else {
            TestHarnessUtil.assertOutputEqualsSorted("UNORDERED Output was not correct.", expectedOutput, testHarness.getOutput(), new StreamRecordComparator());
        }
    }

    @Test
    public void testOperatorChainWithProcessingTime() throws Exception {
        JobVertex chainedVertex = this.createChainedVertex(false);
        OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness(OneInputStreamTask::new, 1, 1, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
        testHarness.setupOutputForSingletonOperatorChain();
        testHarness.taskConfig = chainedVertex.getConfiguration();
        StreamConfig streamConfig = testHarness.getStreamConfig();
        StreamConfig operatorChainStreamConfig = new StreamConfig(chainedVertex.getConfiguration());
        AsyncWaitOperator headOperator = (AsyncWaitOperator)operatorChainStreamConfig.getStreamOperator(AsyncWaitOperatorTest.class.getClassLoader());
        streamConfig.setStreamOperator((StreamOperator)headOperator);
        testHarness.invoke();
        testHarness.waitForTaskRunning();
        long initialTimestamp = 0L;
        testHarness.processElement(new StreamRecord((Object)5, initialTimestamp));
        testHarness.processElement(new StreamRecord((Object)6, initialTimestamp + 1L));
        testHarness.processElement(new StreamRecord((Object)7, initialTimestamp + 2L));
        testHarness.processElement(new StreamRecord((Object)8, initialTimestamp + 3L));
        testHarness.processElement(new StreamRecord((Object)9, initialTimestamp + 4L));
        testHarness.endInput();
        testHarness.waitForTaskCompletion();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        expectedOutput.add(new StreamRecord((Object)22, initialTimestamp));
        expectedOutput.add(new StreamRecord((Object)26, initialTimestamp + 1L));
        expectedOutput.add(new StreamRecord((Object)30, initialTimestamp + 2L));
        expectedOutput.add(new StreamRecord((Object)34, initialTimestamp + 3L));
        expectedOutput.add(new StreamRecord((Object)38, initialTimestamp + 4L));
        TestHarnessUtil.assertOutputEqualsSorted("Test for chained operator with AsyncWaitOperator failed", expectedOutput, testHarness.getOutput(), new StreamRecordComparator());
    }

    private JobVertex createChainedVertex(boolean withLazyFunction) {
        StreamExecutionEnvironment chainEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource input = chainEnv.fromElements((Object[])new Integer[]{1, 2, 3});
        input = withLazyFunction ? AsyncDataStream.orderedWait((DataStream)input, (AsyncFunction)new LazyAsyncFunction(), (long)1000L, (TimeUnit)TimeUnit.MILLISECONDS, (int)6) : AsyncDataStream.orderedWait((DataStream)input, (AsyncFunction)new MyAsyncFunction(), (long)1000L, (TimeUnit)TimeUnit.MILLISECONDS, (int)6);
        input = input.map((MapFunction)new RichMapFunction<Integer, Integer>(){
            private static final long serialVersionUID = 1L;
            private Integer initialValue = null;

            public void open(Configuration parameters) throws Exception {
                this.initialValue = 1;
            }

            public Integer map(Integer value) throws Exception {
                return this.initialValue + value;
            }
        });
        input = AsyncDataStream.unorderedWait((DataStream)input, (AsyncFunction)new MyAsyncFunction(), (long)1000L, (TimeUnit)TimeUnit.MILLISECONDS, (int)3);
        input.map((MapFunction)new MapFunction<Integer, Integer>(){
            private static final long serialVersionUID = 5162085254238405527L;

            public Integer map(Integer value) throws Exception {
                return value;
            }
        }).startNewChain().addSink((SinkFunction)new DiscardingSink());
        JobGraph jobGraph = chainEnv.getStreamGraph().getJobGraph();
        Assert.assertTrue((jobGraph.getVerticesSortedTopologicallyFromSources().size() == 3 ? 1 : 0) != 0);
        return (JobVertex)jobGraph.getVerticesSortedTopologicallyFromSources().get(1);
    }

    @Test
    public void testStateSnapshotAndRestore() throws Exception {
        OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness(OneInputStreamTask::new, 1, 1, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
        testHarness.setupOutputForSingletonOperatorChain();
        AsyncWaitOperator operator = new AsyncWaitOperator((AsyncFunction)new LazyAsyncFunction(), 1000L, 3, AsyncDataStream.OutputMode.ORDERED);
        StreamConfig streamConfig = testHarness.getStreamConfig();
        OperatorID operatorID = new OperatorID(42L, 4711L);
        streamConfig.setStreamOperator((StreamOperator)operator);
        streamConfig.setOperatorID(operatorID);
        TestTaskStateManager taskStateManagerMock = testHarness.getTaskStateManager();
        taskStateManagerMock.setWaitForReportLatch(new OneShotLatch());
        testHarness.invoke();
        testHarness.waitForTaskRunning();
        OneInputStreamTask task = testHarness.getTask();
        long initialTime = 0L;
        testHarness.processElement(new StreamRecord((Object)1, 1L));
        testHarness.processElement(new StreamRecord((Object)2, 2L));
        testHarness.processElement(new StreamRecord((Object)3, 3L));
        testHarness.processElement(new StreamRecord((Object)4, 4L));
        testHarness.waitForInputProcessing();
        long checkpointId = 1L;
        long checkpointTimestamp = 1L;
        CheckpointMetaData checkpointMetaData = new CheckpointMetaData(1L, 1L);
        task.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forCheckpointWithDefaultLocation());
        taskStateManagerMock.getWaitForReportLatch().await();
        Assert.assertEquals((long)1L, (long)taskStateManagerMock.getReportedCheckpointId());
        LazyAsyncFunction.countDown();
        testHarness.endInput();
        testHarness.waitForTaskCompletion();
        TaskStateSnapshot subtaskStates = taskStateManagerMock.getLastJobManagerTaskStateSnapshot();
        OneInputStreamTaskTestHarness restoredTaskHarness = new OneInputStreamTaskTestHarness(OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
        restoredTaskHarness.setTaskStateSnapshot(1L, subtaskStates);
        restoredTaskHarness.setupOutputForSingletonOperatorChain();
        AsyncWaitOperator restoredOperator = new AsyncWaitOperator((AsyncFunction)new MyAsyncFunction(), 1000L, 6, AsyncDataStream.OutputMode.ORDERED);
        restoredTaskHarness.getStreamConfig().setStreamOperator((StreamOperator)restoredOperator);
        restoredTaskHarness.getStreamConfig().setOperatorID(operatorID);
        restoredTaskHarness.invoke();
        restoredTaskHarness.waitForTaskRunning();
        OneInputStreamTask restoredTask = restoredTaskHarness.getTask();
        restoredTaskHarness.processElement(new StreamRecord((Object)5, 5L));
        restoredTaskHarness.processElement(new StreamRecord((Object)6, 6L));
        restoredTaskHarness.processElement(new StreamRecord((Object)7, 7L));
        restoredTask.triggerCheckpoint(new CheckpointMetaData(1L, 1L), CheckpointOptions.forCheckpointWithDefaultLocation());
        restoredTaskHarness.processElement(new StreamRecord((Object)8, 8L));
        restoredTaskHarness.endInput();
        restoredTaskHarness.waitForTaskCompletion();
        ConcurrentLinkedQueue<StreamRecord> expectedOutput = new ConcurrentLinkedQueue<StreamRecord>();
        expectedOutput.add(new StreamRecord((Object)2, 1L));
        expectedOutput.add(new StreamRecord((Object)4, 2L));
        expectedOutput.add(new StreamRecord((Object)6, 3L));
        expectedOutput.add(new StreamRecord((Object)8, 4L));
        expectedOutput.add(new StreamRecord((Object)10, 5L));
        expectedOutput.add(new StreamRecord((Object)12, 6L));
        expectedOutput.add(new StreamRecord((Object)14, 7L));
        expectedOutput.add(new StreamRecord((Object)16, 8L));
        Iterator<Object> iterator = restoredTaskHarness.getOutput().iterator();
        while (iterator.hasNext()) {
            if (!(iterator.next() instanceof CheckpointBarrier)) continue;
            iterator.remove();
        }
        TestHarnessUtil.assertOutputEquals("StateAndRestored Test Output was not correct.", expectedOutput, restoredTaskHarness.getOutput());
    }

    @Test
    public void testAsyncTimeoutFailure() throws Exception {
        this.testAsyncTimeout(new LazyAsyncFunction(), Optional.of(TimeoutException.class), new StreamRecord((Object)2, 5L));
    }

    @Test
    public void testAsyncTimeoutIgnore() throws Exception {
        this.testAsyncTimeout(new IgnoreTimeoutLazyAsyncFunction(), Optional.empty(), new StreamRecord((Object)3, 0L), new StreamRecord((Object)2, 5L));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testAsyncTimeout(LazyAsyncFunction lazyAsyncFunction, Optional<Class<? extends Throwable>> expectedException, StreamRecord<Integer> ... expectedRecords) throws Exception {
        long timeout = 10L;
        AsyncWaitOperator operator = new AsyncWaitOperator((AsyncFunction)lazyAsyncFunction, 10L, 2, AsyncDataStream.OutputMode.ORDERED);
        MockEnvironment mockEnvironment = this.createMockEnvironment();
        mockEnvironment.setExpectedExternalFailureCause(Throwable.class);
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness(operator, IntSerializer.INSTANCE, mockEnvironment);
        long initialTime = 0L;
        ConcurrentLinkedQueue<StreamRecord<Integer>> expectedOutput = new ConcurrentLinkedQueue<StreamRecord<Integer>>();
        testHarness.open();
        testHarness.setProcessingTime(0L);
        Object object = testHarness.getCheckpointLock();
        synchronized (object) {
            testHarness.processElement(new StreamRecord((Object)1, 0L));
            testHarness.setProcessingTime(5L);
            testHarness.processElement(new StreamRecord((Object)2, 5L));
        }
        testHarness.setProcessingTime(11L);
        lazyAsyncFunction.countDown();
        object = testHarness.getCheckpointLock();
        synchronized (object) {
            testHarness.close();
        }
        expectedOutput.addAll(Arrays.asList(expectedRecords));
        TestHarnessUtil.assertOutputEquals("Output with watermark was not correct.", expectedOutput, testHarness.getOutput());
        if (expectedException.isPresent()) {
            Assert.assertTrue((boolean)mockEnvironment.getActualExternalFailureCause().isPresent());
            Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)((Throwable)mockEnvironment.getActualExternalFailureCause().get()), expectedException.get()).isPresent());
        }
    }

    @Nonnull
    private MockEnvironment createMockEnvironment() {
        return new MockEnvironmentBuilder().setTaskName("foobarTask").setMemorySize(0x100000L).setInputSplitProvider(new MockInputSplitProvider()).setBufferSize(4096).build();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=10000L)
    public void testClosingWithBlockedEmitter() throws Exception {
        final Object lock = new Object();
        ArgumentCaptor failureReason = ArgumentCaptor.forClass(Throwable.class);
        MockEnvironment environment = this.createMockEnvironment();
        StreamTask containingTask = (StreamTask)Mockito.mock(StreamTask.class);
        Mockito.when((Object)containingTask.getEnvironment()).thenReturn((Object)environment);
        Mockito.when((Object)containingTask.getCheckpointLock()).thenReturn(lock);
        Mockito.when((Object)containingTask.getProcessingTimeService()).thenReturn((Object)new TestProcessingTimeService());
        MockStreamConfig streamConfig = new MockStreamConfig();
        streamConfig.setTypeSerializerIn1((TypeSerializer)IntSerializer.INSTANCE);
        final OneShotLatch closingLatch = new OneShotLatch();
        final OneShotLatch outputLatch = new OneShotLatch();
        Output output = (Output)Mockito.mock(Output.class);
        ((Output)Mockito.doAnswer((Answer)new Answer(){

            public Object answer(InvocationOnMock invocation) throws Throwable {
                Thread.currentThread();
                Assert.assertTrue((String)"Output should happen under the checkpoint lock.", (boolean)Thread.holdsLock(lock));
                outputLatch.trigger();
                while (!closingLatch.isTriggered()) {
                    lock.wait();
                }
                return null;
            }
        }).when((Object)output)).collect(Matchers.any(StreamRecord.class));
        TestAsyncWaitOperator operator = new TestAsyncWaitOperator(new MyAsyncFunction(), 1000L, 1, AsyncDataStream.OutputMode.ORDERED, closingLatch);
        operator.setup(containingTask, streamConfig, output);
        operator.open();
        Object object = lock;
        synchronized (object) {
            operator.processElement(new StreamRecord((Object)42));
        }
        outputLatch.await();
        object = lock;
        synchronized (object) {
            operator.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTimeoutCleanup() throws Exception {
        Object lock = new Object();
        long timeout = 100000L;
        long timestamp = 1L;
        MockEnvironment environment = this.createMockEnvironment();
        ScheduledFuture scheduledFuture = (ScheduledFuture)Mockito.mock(ScheduledFuture.class);
        ProcessingTimeService processingTimeService = (ProcessingTimeService)Mockito.mock(ProcessingTimeService.class);
        Mockito.when((Object)processingTimeService.getCurrentProcessingTime()).thenReturn((Object)1L);
        ((ProcessingTimeService)Mockito.doReturn((Object)scheduledFuture).when((Object)processingTimeService)).registerTimer(Matchers.anyLong(), (ProcessingTimeCallback)Matchers.any(ProcessingTimeCallback.class));
        StreamTask containingTask = (StreamTask)Mockito.mock(StreamTask.class);
        Mockito.when((Object)containingTask.getEnvironment()).thenReturn((Object)environment);
        Mockito.when((Object)containingTask.getCheckpointLock()).thenReturn(lock);
        Mockito.when((Object)containingTask.getProcessingTimeService()).thenReturn((Object)processingTimeService);
        MockStreamConfig streamConfig = new MockStreamConfig();
        streamConfig.setTypeSerializerIn1((TypeSerializer)IntSerializer.INSTANCE);
        Output output = (Output)Mockito.mock(Output.class);
        AsyncWaitOperator operator = new AsyncWaitOperator((AsyncFunction)new AsyncFunction<Integer, Integer>(){
            private static final long serialVersionUID = -3718276118074877073L;

            public void asyncInvoke(Integer input, ResultFuture<Integer> resultFuture) throws Exception {
                resultFuture.complete(Collections.singletonList(input));
            }
        }, 100000L, 1, AsyncDataStream.OutputMode.UNORDERED);
        operator.setup(containingTask, (StreamConfig)streamConfig, output);
        operator.open();
        StreamRecord streamRecord = new StreamRecord((Object)42, 1L);
        Object object = lock;
        synchronized (object) {
            operator.processElement(streamRecord);
        }
        object = lock;
        synchronized (object) {
            operator.close();
        }
        ((Output)Mockito.verify((Object)output)).collect(Matchers.eq((Object)streamRecord));
        ((ProcessingTimeService)Mockito.verify((Object)processingTimeService)).registerTimer(Matchers.eq((long)(processingTimeService.getCurrentProcessingTime() + 100000L)), (ProcessingTimeCallback)Matchers.any(ProcessingTimeCallback.class));
        ((ScheduledFuture)Mockito.verify((Object)scheduledFuture)).cancel(Matchers.eq((boolean)true));
    }

    @Test(timeout=2000L)
    public void testOrderedWaitUserExceptionHandling() throws Exception {
        this.testUserExceptionHandling(AsyncDataStream.OutputMode.ORDERED);
    }

    @Test(timeout=2000L)
    public void testUnorderedWaitUserExceptionHandling() throws Exception {
        this.testUserExceptionHandling(AsyncDataStream.OutputMode.UNORDERED);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testUserExceptionHandling(AsyncDataStream.OutputMode outputMode) throws Exception {
        UserExceptionAsyncFunction asyncWaitFunction = new UserExceptionAsyncFunction();
        long timeout = 2000L;
        AsyncWaitOperator asyncWaitOperator = new AsyncWaitOperator((AsyncFunction)asyncWaitFunction, 1000L, 2, outputMode);
        MockEnvironment mockEnvironment = this.createMockEnvironment();
        mockEnvironment.setExpectedExternalFailureCause(Throwable.class);
        OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness(asyncWaitOperator, (TypeSerializer<Integer>)IntSerializer.INSTANCE, mockEnvironment);
        harness.open();
        Object object = harness.getCheckpointLock();
        synchronized (object) {
            harness.processElement(1, 1L);
        }
        object = harness.getCheckpointLock();
        synchronized (object) {
            harness.close();
        }
        Assert.assertTrue((boolean)harness.getEnvironment().getActualExternalFailureCause().isPresent());
    }

    @Test
    public void testOrderedWaitTimeoutHandling() throws Exception {
        this.testTimeoutExceptionHandling(AsyncDataStream.OutputMode.ORDERED);
    }

    @Test
    public void testUnorderedWaitTimeoutHandling() throws Exception {
        this.testTimeoutExceptionHandling(AsyncDataStream.OutputMode.UNORDERED);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testTimeoutExceptionHandling(AsyncDataStream.OutputMode outputMode) throws Exception {
        NoOpAsyncFunction asyncFunction = new NoOpAsyncFunction();
        long timeout = 10L;
        AsyncWaitOperator asyncWaitOperator = new AsyncWaitOperator(asyncFunction, timeout, 2, outputMode);
        MockEnvironment mockEnvironment = this.createMockEnvironment();
        mockEnvironment.setExpectedExternalFailureCause(Throwable.class);
        OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness(asyncWaitOperator, (TypeSerializer<Integer>)IntSerializer.INSTANCE, mockEnvironment);
        harness.open();
        Object object = harness.getCheckpointLock();
        synchronized (object) {
            harness.processElement(1, 1L);
        }
        harness.setProcessingTime(10L);
        object = harness.getCheckpointLock();
        synchronized (object) {
            harness.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=10000L)
    public void testRestartWithFullQueue() throws Exception {
        OperatorSubtaskState snapshot;
        Object lastElementWriter;
        Object lastElement;
        int capacity = 10;
        CompletableFuture<Object> trigger = new CompletableFuture<Object>();
        ControllableAsyncFunction controllableAsyncFunction = new ControllableAsyncFunction(trigger);
        final OneInputStreamOperatorTestHarness snapshotHarness = new OneInputStreamOperatorTestHarness(new AsyncWaitOperator(controllableAsyncFunction, 1000L, capacity, AsyncDataStream.OutputMode.ORDERED), (TypeSerializer<Integer>)IntSerializer.INSTANCE);
        snapshotHarness.open();
        ArrayList<Integer> expectedOutput = new ArrayList<Integer>(capacity + 1);
        try {
            Object object = snapshotHarness.getCheckpointLock();
            synchronized (object) {
                for (int i = 0; i < capacity; ++i) {
                    snapshotHarness.processElement(i, 0L);
                    expectedOutput.add(i);
                }
            }
            expectedOutput.add(capacity);
            lastElement = new OneShotLatch();
            lastElementWriter = new CheckedThread((OneShotLatch)lastElement, capacity){
                final /* synthetic */ OneShotLatch val$lastElement;
                final /* synthetic */ int val$capacity;
                {
                    this.val$lastElement = oneShotLatch;
                    this.val$capacity = n;
                }

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                public void go() throws Exception {
                    Object object = snapshotHarness.getCheckpointLock();
                    synchronized (object) {
                        this.val$lastElement.trigger();
                        snapshotHarness.processElement(this.val$capacity, 0L);
                    }
                }
            };
            lastElementWriter.start();
            lastElement.await();
            Object object2 = snapshotHarness.getCheckpointLock();
            synchronized (object2) {
                snapshot = snapshotHarness.snapshot(0L, 0L);
            }
            trigger.complete(null);
        }
        finally {
            lastElement = snapshotHarness.getCheckpointLock();
            synchronized (lastElement) {
                snapshotHarness.close();
            }
        }
        OneInputStreamOperatorTestHarness recoverHarness = new OneInputStreamOperatorTestHarness(new AsyncWaitOperator(new ControllableAsyncFunction(CompletableFuture.completedFuture(null)), 1000L, capacity, AsyncDataStream.OutputMode.ORDERED), IntSerializer.INSTANCE);
        recoverHarness.initializeState(snapshot);
        lastElementWriter = recoverHarness.getCheckpointLock();
        synchronized (lastElementWriter) {
            recoverHarness.open();
        }
        lastElementWriter = recoverHarness.getCheckpointLock();
        synchronized (lastElementWriter) {
            recoverHarness.close();
        }
        ConcurrentLinkedQueue<Object> output = recoverHarness.getOutput();
        Assert.assertThat((Object)output.size(), (Matcher)org.hamcrest.Matchers.equalTo((Object)(capacity + 1)));
        ArrayList<Object> outputElements = new ArrayList<Object>(capacity + 1);
        for (int i = 0; i < capacity + 1; ++i) {
            StreamRecord streamRecord = (StreamRecord)output.poll();
            outputElements.add(streamRecord.getValue());
        }
        Assert.assertThat(outputElements, (Matcher)org.hamcrest.Matchers.equalTo(expectedOutput));
    }

    private static class NoOpAsyncFunction<IN, OUT>
    implements AsyncFunction<IN, OUT> {
        private static final long serialVersionUID = -3060481953330480694L;

        private NoOpAsyncFunction() {
        }

        public void asyncInvoke(IN input, ResultFuture<OUT> resultFuture) throws Exception {
        }
    }

    private static class ControllableAsyncFunction<IN>
    implements AsyncFunction<IN, IN> {
        private static final long serialVersionUID = -4214078239267288636L;
        private transient CompletableFuture<Void> trigger;

        private ControllableAsyncFunction(CompletableFuture<Void> trigger) {
            this.trigger = (CompletableFuture)Preconditions.checkNotNull(trigger);
        }

        public void asyncInvoke(IN input, ResultFuture<IN> resultFuture) throws Exception {
            this.trigger.thenAccept(v -> resultFuture.complete(Collections.singleton(input)));
        }
    }

    private static class UserExceptionAsyncFunction
    implements AsyncFunction<Integer, Integer> {
        private static final long serialVersionUID = 6326568632967110990L;

        private UserExceptionAsyncFunction() {
        }

        public void asyncInvoke(Integer input, ResultFuture<Integer> resultFuture) throws Exception {
            resultFuture.completeExceptionally((Throwable)new Exception("Test exception"));
        }
    }

    private static final class TestAsyncWaitOperator<IN, OUT>
    extends AsyncWaitOperator<IN, OUT> {
        private static final long serialVersionUID = -8528791694746625560L;
        private final transient OneShotLatch closingLatch;

        public TestAsyncWaitOperator(AsyncFunction<IN, OUT> asyncFunction, long timeout, int capacity, AsyncDataStream.OutputMode outputMode, OneShotLatch closingLatch) {
            super(asyncFunction, timeout, capacity, outputMode);
            this.closingLatch = (OneShotLatch)Preconditions.checkNotNull((Object)closingLatch);
        }

        public void close() throws Exception {
            this.closingLatch.trigger();
            this.checkpointingLock.notifyAll();
            super.close();
        }
    }

    private class StreamRecordComparator
    implements Comparator<Object> {
        private StreamRecordComparator() {
        }

        @Override
        public int compare(Object o1, Object o2) {
            if (o1 instanceof Watermark || o2 instanceof Watermark) {
                return 0;
            }
            StreamRecord sr0 = (StreamRecord)o1;
            StreamRecord sr1 = (StreamRecord)o2;
            if (sr0.getTimestamp() != sr1.getTimestamp()) {
                return (int)(sr0.getTimestamp() - sr1.getTimestamp());
            }
            int comparison = ((Integer)sr0.getValue()).compareTo((Integer)sr1.getValue());
            if (comparison != 0) {
                return comparison;
            }
            return (Integer)sr0.getValue() - (Integer)sr1.getValue();
        }
    }

    private static class IgnoreTimeoutLazyAsyncFunction
    extends LazyAsyncFunction {
        private static final long serialVersionUID = 1428714561365346128L;

        private IgnoreTimeoutLazyAsyncFunction() {
        }

        public void timeout(Integer input, ResultFuture<Integer> resultFuture) throws Exception {
            resultFuture.complete(Collections.singletonList(input * 3));
        }
    }

    private static class LazyAsyncFunction
    extends MyAsyncFunction {
        private static final long serialVersionUID = 3537791752703154670L;
        private static CountDownLatch latch;

        public LazyAsyncFunction() {
            latch = new CountDownLatch(1);
        }

        @Override
        public void asyncInvoke(final Integer input, final ResultFuture<Integer> resultFuture) throws Exception {
            executorService.submit(new Runnable(){

                @Override
                public void run() {
                    try {
                        latch.await();
                    }
                    catch (InterruptedException interruptedException) {
                        // empty catch block
                    }
                    resultFuture.complete(Collections.singletonList(input));
                }
            });
        }

        public static void countDown() {
            latch.countDown();
        }
    }

    private static class MyAsyncFunction
    extends RichAsyncFunction<Integer, Integer> {
        private static final long serialVersionUID = 8522411971886428444L;
        private static final long TERMINATION_TIMEOUT = 5000L;
        private static final int THREAD_POOL_SIZE = 10;
        static ExecutorService executorService;
        static int counter;

        private MyAsyncFunction() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            Class<MyAsyncFunction> clazz = MyAsyncFunction.class;
            synchronized (MyAsyncFunction.class) {
                if (counter == 0) {
                    executorService = Executors.newFixedThreadPool(10);
                }
                ++counter;
                // ** MonitorExit[var2_2] (shouldn't be in output)
                return;
            }
        }

        public void close() throws Exception {
            super.close();
            this.freeExecutor();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void freeExecutor() {
            Class<MyAsyncFunction> clazz = MyAsyncFunction.class;
            synchronized (MyAsyncFunction.class) {
                if (--counter == 0) {
                    executorService.shutdown();
                    try {
                        if (!executorService.awaitTermination(5000L, TimeUnit.MILLISECONDS)) {
                            executorService.shutdownNow();
                        }
                    }
                    catch (InterruptedException interrupted) {
                        executorService.shutdownNow();
                        Thread.currentThread().interrupt();
                    }
                }
                // ** MonitorExit[var1_1] (shouldn't be in output)
                return;
            }
        }

        public void asyncInvoke(final Integer input, final ResultFuture<Integer> resultFuture) throws Exception {
            executorService.submit(new Runnable(){

                @Override
                public void run() {
                    resultFuture.complete(Collections.singletonList(input * 2));
                }
            });
        }

        static {
            counter = 0;
        }
    }
}

