/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators.async;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;

public class AsyncWaitOperatorTest
extends TestLogger {
    private static final long TIMEOUT = 1000L;
    @Rule
    public Timeout timeoutRule = new Timeout(10L, TimeUnit.SECONDS);

    @Test
    public void testEventTimeOrdered() throws Exception {
        this.testEventTime(AsyncDataStream.OutputMode.ORDERED);
    }

    @Test
    public void testWaterMarkUnordered() throws Exception {
        this.testEventTime(AsyncDataStream.OutputMode.UNORDERED);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testEventTime(AsyncDataStream.OutputMode mode) throws Exception {
        OneInputStreamOperatorTestHarness testHarness = AsyncWaitOperatorTest.createTestHarness(new MyAsyncFunction(), 1000L, 2, mode);
        long initialTime = 0L;
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        Object object = testHarness.getCheckpointLock();
        synchronized (object) {
            testHarness.processElement((StreamRecord<Integer>)new StreamRecord((Object)1, 1L));
            testHarness.processElement((StreamRecord<Integer>)new StreamRecord((Object)2, 2L));
            testHarness.processWatermark(new Watermark(2L));
            testHarness.processElement((StreamRecord<Integer>)new StreamRecord((Object)3, 3L));
        }
        object = testHarness.getCheckpointLock();
        synchronized (object) {
            testHarness.endInput();
            testHarness.close();
        }
        expectedOutput.add(new StreamRecord((Object)2, 1L));
        expectedOutput.add(new StreamRecord((Object)4, 2L));
        expectedOutput.add(new Watermark(2L));
        expectedOutput.add(new StreamRecord((Object)6, 3L));
        if (AsyncDataStream.OutputMode.ORDERED == mode) {
            TestHarnessUtil.assertOutputEquals("Output with watermark was not correct.", expectedOutput, testHarness.getOutput());
        } else {
            Object[] jobOutputQueue = testHarness.getOutput().toArray();
            Assert.assertEquals((String)"Watermark should be at index 2", (Object)new Watermark(2L), (Object)jobOutputQueue[2]);
            Assert.assertEquals((String)"StreamRecord 3 should be at the end", (Object)new StreamRecord((Object)6, 3L), (Object)jobOutputQueue[3]);
            TestHarnessUtil.assertOutputEqualsSorted("Output for StreamRecords does not match", expectedOutput, testHarness.getOutput(), new StreamRecordComparator());
        }
    }

    @Test
    public void testProcessingTimeOrdered() throws Exception {
        this.testProcessingTime(AsyncDataStream.OutputMode.ORDERED);
    }

    @Test
    public void testProcessingUnordered() throws Exception {
        this.testProcessingTime(AsyncDataStream.OutputMode.UNORDERED);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testProcessingTime(AsyncDataStream.OutputMode mode) throws Exception {
        OneInputStreamOperatorTestHarness testHarness = AsyncWaitOperatorTest.createTestHarness(new MyAsyncFunction(), 1000L, 6, mode);
        long initialTime = 0L;
        ArrayDeque<Object> expectedOutput = new ArrayDeque<Object>();
        testHarness.open();
        Object object = testHarness.getCheckpointLock();
        synchronized (object) {
            testHarness.processElement((StreamRecord<Integer>)new StreamRecord((Object)1, 1L));
            testHarness.processElement((StreamRecord<Integer>)new StreamRecord((Object)2, 2L));
            testHarness.processElement((StreamRecord<Integer>)new StreamRecord((Object)3, 3L));
            testHarness.processElement((StreamRecord<Integer>)new StreamRecord((Object)4, 4L));
            testHarness.processElement((StreamRecord<Integer>)new StreamRecord((Object)5, 5L));
            testHarness.processElement((StreamRecord<Integer>)new StreamRecord((Object)6, 6L));
            testHarness.processElement((StreamRecord<Integer>)new StreamRecord((Object)7, 7L));
            testHarness.processElement((StreamRecord<Integer>)new StreamRecord((Object)8, 8L));
        }
        expectedOutput.add(new StreamRecord((Object)2, 1L));
        expectedOutput.add(new StreamRecord((Object)4, 2L));
        expectedOutput.add(new StreamRecord((Object)6, 3L));
        expectedOutput.add(new StreamRecord((Object)8, 4L));
        expectedOutput.add(new StreamRecord((Object)10, 5L));
        expectedOutput.add(new StreamRecord((Object)12, 6L));
        expectedOutput.add(new StreamRecord((Object)14, 7L));
        expectedOutput.add(new StreamRecord((Object)16, 8L));
        object = testHarness.getCheckpointLock();
        synchronized (object) {
            testHarness.endInput();
            testHarness.close();
        }
        if (mode == AsyncDataStream.OutputMode.ORDERED) {
            TestHarnessUtil.assertOutputEquals("ORDERED Output was not correct.", expectedOutput, testHarness.getOutput());
        } else {
            TestHarnessUtil.assertOutputEqualsSorted("UNORDERED Output was not correct.", expectedOutput, testHarness.getOutput(), new StreamRecordComparator());
        }
    }

    @Test
    public void testOperatorChainWithProcessingTime() throws Exception {
        JobVertex chainedVertex = this.createChainedVertex((AsyncFunction<Integer, Integer>)new MyAsyncFunction(), (AsyncFunction<Integer, Integer>)new MyAsyncFunction());
        OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness(OneInputStreamTask::new, 1, 1, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
        testHarness.setupOutputForSingletonOperatorChain();
        testHarness.taskConfig = chainedVertex.getConfiguration();
        StreamConfig streamConfig = testHarness.getStreamConfig();
        StreamConfig operatorChainStreamConfig = new StreamConfig(chainedVertex.getConfiguration());
        streamConfig.setStreamOperatorFactory(operatorChainStreamConfig.getStreamOperatorFactory(AsyncWaitOperatorTest.class.getClassLoader()));
        testHarness.invoke();
        testHarness.waitForTaskRunning();
        long initialTimestamp = 0L;
        testHarness.processElement(new StreamRecord((Object)5, initialTimestamp));
        testHarness.processElement(new StreamRecord((Object)6, initialTimestamp + 1L));
        testHarness.processElement(new StreamRecord((Object)7, initialTimestamp + 2L));
        testHarness.processElement(new StreamRecord((Object)8, initialTimestamp + 3L));
        testHarness.processElement(new StreamRecord((Object)9, initialTimestamp + 4L));
        testHarness.endInput();
        testHarness.waitForTaskCompletion();
        LinkedList<Object> expectedOutput = new LinkedList<Object>();
        expectedOutput.add(new StreamRecord((Object)22, initialTimestamp));
        expectedOutput.add(new StreamRecord((Object)26, initialTimestamp + 1L));
        expectedOutput.add(new StreamRecord((Object)30, initialTimestamp + 2L));
        expectedOutput.add(new StreamRecord((Object)34, initialTimestamp + 3L));
        expectedOutput.add(new StreamRecord((Object)38, initialTimestamp + 4L));
        TestHarnessUtil.assertOutputEqualsSorted("Test for chained operator with AsyncWaitOperator failed", expectedOutput, testHarness.getOutput(), new StreamRecordComparator());
    }

    private JobVertex createChainedVertex(AsyncFunction<Integer, Integer> firstFunction, AsyncFunction<Integer, Integer> secondFunction) {
        StreamExecutionEnvironment chainEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        chainEnv.setParallelism(2);
        SingleOutputStreamOperator<Integer> input = chainEnv.fromElements((Object[])new Integer[]{1, 2, 3});
        input = this.addAsyncOperatorLegacyChained((DataStream)input, (AsyncFunction)firstFunction, 1000L, 6, AsyncDataStream.OutputMode.ORDERED);
        input = input.map((MapFunction)new RichMapFunction<Integer, Integer>(){
            private static final long serialVersionUID = 1L;
            private Integer initialValue = null;

            public void open(Configuration parameters) throws Exception {
                this.initialValue = 1;
            }

            public Integer map(Integer value) throws Exception {
                return this.initialValue + value;
            }
        });
        input = this.addAsyncOperatorLegacyChained((DataStream)input, (AsyncFunction)secondFunction, 1000L, 3, AsyncDataStream.OutputMode.UNORDERED);
        input.map((MapFunction)new MapFunction<Integer, Integer>(){
            private static final long serialVersionUID = 5162085254238405527L;

            public Integer map(Integer value) throws Exception {
                return value;
            }
        }).startNewChain().addSink((SinkFunction)new DiscardingSink());
        JobGraph jobGraph = chainEnv.getStreamGraph().getJobGraph();
        Assert.assertEquals((long)3L, (long)jobGraph.getVerticesSortedTopologicallyFromSources().size());
        return (JobVertex)jobGraph.getVerticesSortedTopologicallyFromSources().get(1);
    }

    @Test
    public void testStateSnapshotAndRestore() throws Exception {
        OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness(OneInputStreamTask::new, 1, 1, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
        testHarness.setupOutputForSingletonOperatorChain();
        AsyncWaitOperatorFactory factory = new AsyncWaitOperatorFactory((AsyncFunction)new LazyAsyncFunction(), 1000L, 4, AsyncDataStream.OutputMode.ORDERED);
        StreamConfig streamConfig = testHarness.getStreamConfig();
        OperatorID operatorID = new OperatorID(42L, 4711L);
        streamConfig.setStreamOperatorFactory((StreamOperatorFactory)factory);
        streamConfig.setOperatorID(operatorID);
        TestTaskStateManager taskStateManagerMock = testHarness.getTaskStateManager();
        taskStateManagerMock.setWaitForReportLatch(new OneShotLatch());
        testHarness.invoke();
        testHarness.waitForTaskRunning();
        OneInputStreamTask task = testHarness.getTask();
        long initialTime = 0L;
        testHarness.processElement(new StreamRecord((Object)1, 1L));
        testHarness.processElement(new StreamRecord((Object)2, 2L));
        testHarness.processElement(new StreamRecord((Object)3, 3L));
        testHarness.processElement(new StreamRecord((Object)4, 4L));
        testHarness.waitForInputProcessing();
        long checkpointId = 1L;
        long checkpointTimestamp = 1L;
        CheckpointMetaData checkpointMetaData = new CheckpointMetaData(1L, 1L);
        task.triggerCheckpointAsync(checkpointMetaData, CheckpointOptions.forCheckpointWithDefaultLocation(), false);
        taskStateManagerMock.getWaitForReportLatch().await();
        Assert.assertEquals((long)1L, (long)taskStateManagerMock.getReportedCheckpointId());
        LazyAsyncFunction.countDown();
        testHarness.endInput();
        testHarness.waitForTaskCompletion();
        TaskStateSnapshot subtaskStates = taskStateManagerMock.getLastJobManagerTaskStateSnapshot();
        OneInputStreamTaskTestHarness restoredTaskHarness = new OneInputStreamTaskTestHarness(OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
        restoredTaskHarness.setTaskStateSnapshot(1L, subtaskStates);
        restoredTaskHarness.setupOutputForSingletonOperatorChain();
        AsyncWaitOperatorFactory restoredOperator = new AsyncWaitOperatorFactory((AsyncFunction)new MyAsyncFunction(), 1000L, 6, AsyncDataStream.OutputMode.ORDERED);
        restoredTaskHarness.getStreamConfig().setStreamOperatorFactory((StreamOperatorFactory)restoredOperator);
        restoredTaskHarness.getStreamConfig().setOperatorID(operatorID);
        restoredTaskHarness.invoke();
        restoredTaskHarness.waitForTaskRunning();
        OneInputStreamTask restoredTask = restoredTaskHarness.getTask();
        restoredTaskHarness.processElement(new StreamRecord((Object)5, 5L));
        restoredTaskHarness.processElement(new StreamRecord((Object)6, 6L));
        restoredTaskHarness.processElement(new StreamRecord((Object)7, 7L));
        restoredTask.triggerCheckpointAsync(new CheckpointMetaData(1L, 1L), CheckpointOptions.forCheckpointWithDefaultLocation(), false).get();
        restoredTaskHarness.processElement(new StreamRecord((Object)8, 8L));
        restoredTaskHarness.endInput();
        restoredTaskHarness.waitForTaskCompletion();
        ConcurrentLinkedQueue<StreamRecord> expectedOutput = new ConcurrentLinkedQueue<StreamRecord>();
        expectedOutput.add(new StreamRecord((Object)2, 1L));
        expectedOutput.add(new StreamRecord((Object)4, 2L));
        expectedOutput.add(new StreamRecord((Object)6, 3L));
        expectedOutput.add(new StreamRecord((Object)8, 4L));
        expectedOutput.add(new StreamRecord((Object)10, 5L));
        expectedOutput.add(new StreamRecord((Object)12, 6L));
        expectedOutput.add(new StreamRecord((Object)14, 7L));
        expectedOutput.add(new StreamRecord((Object)16, 8L));
        restoredTaskHarness.getOutput().removeIf(record -> record instanceof CheckpointBarrier);
        TestHarnessUtil.assertOutputEquals("StateAndRestored Test Output was not correct.", expectedOutput, restoredTaskHarness.getOutput());
    }

    @Test
    public void testAsyncTimeoutFailure() throws Exception {
        this.testAsyncTimeout(new LazyAsyncFunction(), Optional.of(TimeoutException.class), new StreamRecord((Object)2, 5L));
    }

    @Test
    public void testAsyncTimeoutIgnore() throws Exception {
        this.testAsyncTimeout(new IgnoreTimeoutLazyAsyncFunction(), Optional.empty(), new StreamRecord((Object)3, 0L), new StreamRecord((Object)2, 5L));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testAsyncTimeout(LazyAsyncFunction lazyAsyncFunction, Optional<Class<? extends Throwable>> expectedException, StreamRecord<Integer> ... expectedRecords) throws Exception {
        long timeout = 10L;
        OneInputStreamOperatorTestHarness testHarness = AsyncWaitOperatorTest.createTestHarness(lazyAsyncFunction, 10L, 2, AsyncDataStream.OutputMode.ORDERED);
        MockEnvironment mockEnvironment = testHarness.getEnvironment();
        mockEnvironment.setExpectedExternalFailureCause(Throwable.class);
        long initialTime = 0L;
        ConcurrentLinkedQueue<StreamRecord<Integer>> expectedOutput = new ConcurrentLinkedQueue<StreamRecord<Integer>>();
        testHarness.open();
        testHarness.setProcessingTime(0L);
        Object object = testHarness.getCheckpointLock();
        synchronized (object) {
            testHarness.processElement((StreamRecord<Integer>)new StreamRecord((Object)1, 0L));
            testHarness.setProcessingTime(5L);
            testHarness.processElement((StreamRecord<Integer>)new StreamRecord((Object)2, 5L));
        }
        testHarness.setProcessingTime(11L);
        lazyAsyncFunction.countDown();
        object = testHarness.getCheckpointLock();
        synchronized (object) {
            testHarness.endInput();
            testHarness.close();
        }
        expectedOutput.addAll(Arrays.asList(expectedRecords));
        TestHarnessUtil.assertOutputEquals("Output with watermark was not correct.", expectedOutput, testHarness.getOutput());
        if (expectedException.isPresent()) {
            Assert.assertTrue((boolean)mockEnvironment.getActualExternalFailureCause().isPresent());
            Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)((Throwable)mockEnvironment.getActualExternalFailureCause().get()), expectedException.get()).isPresent());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTimeoutCleanup() throws Exception {
        OneInputStreamOperatorTestHarness harness = AsyncWaitOperatorTest.createTestHarness(new MyAsyncFunction(), 1000L, 1, AsyncDataStream.OutputMode.UNORDERED);
        harness.open();
        Object object = harness.getCheckpointLock();
        synchronized (object) {
            harness.processElement(42, 1L);
        }
        object = harness.getCheckpointLock();
        synchronized (object) {
            harness.endInput();
            harness.close();
        }
        Assert.assertEquals(Arrays.asList(new StreamRecord((Object)84, 1L)), new ArrayList<Object>(harness.getOutput()));
        Assert.assertEquals((long)0L, (long)harness.getProcessingTimeService().getNumActiveTimers());
    }

    @Test
    public void testOrderedWaitUserExceptionHandling() throws Exception {
        this.testUserExceptionHandling(AsyncDataStream.OutputMode.ORDERED);
    }

    @Test
    public void testUnorderedWaitUserExceptionHandling() throws Exception {
        this.testUserExceptionHandling(AsyncDataStream.OutputMode.UNORDERED);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testUserExceptionHandling(AsyncDataStream.OutputMode outputMode) throws Exception {
        OneInputStreamOperatorTestHarness<Integer, Integer> harness = AsyncWaitOperatorTest.createTestHarness(new UserExceptionAsyncFunction(), 1000L, 2, outputMode);
        harness.getEnvironment().setExpectedExternalFailureCause(Throwable.class);
        harness.open();
        Object object = harness.getCheckpointLock();
        synchronized (object) {
            harness.processElement(1, 1L);
        }
        object = harness.getCheckpointLock();
        synchronized (object) {
            harness.close();
        }
        Assert.assertTrue((boolean)harness.getEnvironment().getActualExternalFailureCause().isPresent());
    }

    @Test
    public void testOrderedWaitTimeoutHandling() throws Exception {
        this.testTimeoutExceptionHandling(AsyncDataStream.OutputMode.ORDERED);
    }

    @Test
    public void testUnorderedWaitTimeoutHandling() throws Exception {
        this.testTimeoutExceptionHandling(AsyncDataStream.OutputMode.UNORDERED);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testTimeoutExceptionHandling(AsyncDataStream.OutputMode outputMode) throws Exception {
        OneInputStreamOperatorTestHarness harness = AsyncWaitOperatorTest.createTestHarness(new NoOpAsyncFunction(), 10L, 2, outputMode);
        harness.getEnvironment().setExpectedExternalFailureCause(Throwable.class);
        harness.open();
        Object object = harness.getCheckpointLock();
        synchronized (object) {
            harness.processElement(1, 1L);
        }
        harness.setProcessingTime(10L);
        object = harness.getCheckpointLock();
        synchronized (object) {
            harness.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=10000L)
    public void testRestartWithFullQueue() throws Exception {
        OperatorSubtaskState snapshot;
        Object object;
        int capacity = 10;
        CompletableFuture<Object> trigger = new CompletableFuture<Object>();
        OneInputStreamOperatorTestHarness snapshotHarness = AsyncWaitOperatorTest.createTestHarness(new ControllableAsyncFunction(trigger), 1000L, 10, AsyncDataStream.OutputMode.ORDERED);
        snapshotHarness.open();
        ArrayList<Integer> expectedOutput = new ArrayList<Integer>(10);
        try {
            object = snapshotHarness.getCheckpointLock();
            synchronized (object) {
                for (int i = 0; i < 10; ++i) {
                    snapshotHarness.processElement(i, 0L);
                    expectedOutput.add(i);
                }
            }
            object = snapshotHarness.getCheckpointLock();
            synchronized (object) {
                snapshot = snapshotHarness.snapshot(0L, 0L);
            }
            trigger.complete(null);
        }
        finally {
            object = snapshotHarness.getCheckpointLock();
            synchronized (object) {
                snapshotHarness.close();
            }
        }
        OneInputStreamOperatorTestHarness recoverHarness = AsyncWaitOperatorTest.createTestHarness(new ControllableAsyncFunction(CompletableFuture.completedFuture(null)), 1000L, 10, AsyncDataStream.OutputMode.ORDERED);
        recoverHarness.initializeState(snapshot);
        Object i = recoverHarness.getCheckpointLock();
        synchronized (i) {
            recoverHarness.open();
        }
        i = recoverHarness.getCheckpointLock();
        synchronized (i) {
            recoverHarness.endInput();
            recoverHarness.close();
        }
        ConcurrentLinkedQueue<Object> output = recoverHarness.getOutput();
        List outputElements = output.stream().map(r -> (Integer)((StreamRecord)r).getValue()).collect(Collectors.toList());
        Assert.assertThat(outputElements, (Matcher)Matchers.equalTo(expectedOutput));
    }

    private <IN, OUT> SingleOutputStreamOperator<OUT> addAsyncOperatorLegacyChained(DataStream<IN> in, AsyncFunction<IN, OUT> func, long timeout, int bufSize, AsyncDataStream.OutputMode mode) {
        TypeInformation outTypeInfo = TypeExtractor.getUnaryOperatorReturnType(func, AsyncFunction.class, (int)0, (int)1, (int[])new int[]{1, 0}, (TypeInformation)in.getType(), (String)Utils.getCallLocationName(), (boolean)true);
        AsyncWaitOperatorFactory factory = new AsyncWaitOperatorFactory((AsyncFunction)in.getExecutionEnvironment().clean(func), timeout, bufSize, mode);
        factory.setChainingStrategy(ChainingStrategy.ALWAYS);
        return in.transform("async wait operator", outTypeInfo, (OneInputStreamOperatorFactory)factory);
    }

    private static <OUT> OneInputStreamOperatorTestHarness<Integer, OUT> createTestHarness(AsyncFunction<Integer, OUT> function, long timeout, int capacity, AsyncDataStream.OutputMode outputMode) throws Exception {
        return new OneInputStreamOperatorTestHarness(new AsyncWaitOperatorFactory(function, timeout, capacity, outputMode), IntSerializer.INSTANCE);
    }

    private static class NoOpAsyncFunction<IN, OUT>
    implements AsyncFunction<IN, OUT> {
        private static final long serialVersionUID = -3060481953330480694L;

        private NoOpAsyncFunction() {
        }

        public void asyncInvoke(IN input, ResultFuture<OUT> resultFuture) throws Exception {
        }
    }

    private static class ControllableAsyncFunction<IN>
    implements AsyncFunction<IN, IN> {
        private static final long serialVersionUID = -4214078239267288636L;
        private transient CompletableFuture<Void> trigger;

        private ControllableAsyncFunction(CompletableFuture<Void> trigger) {
            this.trigger = (CompletableFuture)Preconditions.checkNotNull(trigger);
        }

        public void asyncInvoke(IN input, ResultFuture<IN> resultFuture) throws Exception {
            this.trigger.thenAccept(v -> resultFuture.complete(Collections.singleton(input)));
        }
    }

    private static class UserExceptionAsyncFunction
    implements AsyncFunction<Integer, Integer> {
        private static final long serialVersionUID = 6326568632967110990L;

        private UserExceptionAsyncFunction() {
        }

        public void asyncInvoke(Integer input, ResultFuture<Integer> resultFuture) throws Exception {
            resultFuture.completeExceptionally((Throwable)new Exception("Test exception"));
        }
    }

    private class StreamRecordComparator
    implements Comparator<Object> {
        private StreamRecordComparator() {
        }

        @Override
        public int compare(Object o1, Object o2) {
            if (o1 instanceof Watermark || o2 instanceof Watermark) {
                return 0;
            }
            StreamRecord sr0 = (StreamRecord)o1;
            StreamRecord sr1 = (StreamRecord)o2;
            if (sr0.getTimestamp() != sr1.getTimestamp()) {
                return (int)(sr0.getTimestamp() - sr1.getTimestamp());
            }
            int comparison = ((Integer)sr0.getValue()).compareTo((Integer)sr1.getValue());
            if (comparison != 0) {
                return comparison;
            }
            return (Integer)sr0.getValue() - (Integer)sr1.getValue();
        }
    }

    private static class IgnoreTimeoutLazyAsyncFunction
    extends LazyAsyncFunction {
        private static final long serialVersionUID = 1428714561365346128L;

        private IgnoreTimeoutLazyAsyncFunction() {
        }

        public void timeout(Integer input, ResultFuture<Integer> resultFuture) throws Exception {
            resultFuture.complete(Collections.singletonList(input * 3));
        }
    }

    private static class LazyAsyncFunction
    extends MyAsyncFunction {
        private static final long serialVersionUID = 3537791752703154670L;
        private static CountDownLatch latch;

        public LazyAsyncFunction() {
            latch = new CountDownLatch(1);
        }

        @Override
        public void asyncInvoke(final Integer input, final ResultFuture<Integer> resultFuture) throws Exception {
            executorService.submit(new Runnable(){

                @Override
                public void run() {
                    try {
                        latch.await();
                    }
                    catch (InterruptedException interruptedException) {
                        // empty catch block
                    }
                    resultFuture.complete(Collections.singletonList(input));
                }
            });
        }

        public static void countDown() {
            latch.countDown();
        }
    }

    private static class MyAsyncFunction
    extends RichAsyncFunction<Integer, Integer> {
        private static final long serialVersionUID = 8522411971886428444L;
        private static final long TERMINATION_TIMEOUT = 5000L;
        private static final int THREAD_POOL_SIZE = 10;
        static ExecutorService executorService;
        static int counter;

        private MyAsyncFunction() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            Class<MyAsyncFunction> clazz = MyAsyncFunction.class;
            synchronized (MyAsyncFunction.class) {
                if (counter == 0) {
                    executorService = Executors.newFixedThreadPool(10);
                }
                ++counter;
                // ** MonitorExit[var2_2] (shouldn't be in output)
                return;
            }
        }

        public void close() throws Exception {
            super.close();
            this.freeExecutor();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void freeExecutor() {
            Class<MyAsyncFunction> clazz = MyAsyncFunction.class;
            synchronized (MyAsyncFunction.class) {
                if (--counter == 0) {
                    executorService.shutdown();
                    try {
                        if (!executorService.awaitTermination(5000L, TimeUnit.MILLISECONDS)) {
                            executorService.shutdownNow();
                        }
                    }
                    catch (InterruptedException interrupted) {
                        executorService.shutdownNow();
                        Thread.currentThread().interrupt();
                    }
                }
                // ** MonitorExit[var1_1] (shouldn't be in output)
                return;
            }
        }

        public void asyncInvoke(final Integer input, final ResultFuture<Integer> resultFuture) throws Exception {
            executorService.submit(new Runnable(){

                @Override
                public void run() {
                    resultFuture.complete(Collections.singletonList(input * 2));
                }
            });
        }

        static {
            counter = 0;
        }
    }
}

