/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.scala;

import java.io.Serializable;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.scala.AsyncDataStream$;
import org.apache.flink.streaming.api.scala.AsyncDataStreamITCase$;
import org.apache.flink.streaming.api.scala.AsyncFunctionWithTimeoutExpired;
import org.apache.flink.streaming.api.scala.AsyncFunctionWithoutTimeoutExpired;
import org.apache.flink.streaming.api.scala.DataStream;
import org.apache.flink.streaming.api.scala.MyRichAsyncFunction;
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment$;
import org.apache.flink.streaming.api.scala.async.AsyncFunction;
import org.apache.flink.test.util.AbstractTestBase;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import scala.Function0;
import scala.Function2;
import scala.Predef$;
import scala.collection.Iterable;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.ArrayBuffer$;
import scala.concurrent.ExecutionContext;
import scala.concurrent.ExecutionContext$;
import scala.concurrent.Future$;
import scala.math.Ordering;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.java8.JFunction0;

@RunWith(value=Parameterized.class)
@ScalaSignature(bytes="\u0006\u0001\u0005=r!B\u0001\u0003\u0011\u0003y\u0011!F!ts:\u001cG)\u0019;b'R\u0014X-Y7J)\u000e\u000b7/\u001a\u0006\u0003\u0007\u0011\tQa]2bY\u0006T!!\u0002\u0004\u0002\u0007\u0005\u0004\u0018N\u0003\u0002\b\u0011\u0005I1\u000f\u001e:fC6Lgn\u001a\u0006\u0003\u0013)\tQA\u001a7j].T!a\u0003\u0007\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005i\u0011aA8sO\u000e\u0001\u0001C\u0001\t\u0012\u001b\u0005\u0011a!\u0002\n\u0003\u0011\u0003\u0019\"!F!ts:\u001cG)\u0019;b'R\u0014X-Y7J)\u000e\u000b7/Z\n\u0003#Q\u0001\"!F\f\u000e\u0003YQ\u0011aA\u0005\u00031Y\u0011a!\u00118z%\u00164\u0007\"\u0002\u000e\u0012\t\u0003Y\u0012A\u0002\u001fj]&$h\bF\u0001\u0010\u0011%i\u0012\u00031AA\u0002\u0013%a$\u0001\u0006uKN$(+Z:vYR,\u0012a\b\t\u0004A\u0015:S\"A\u0011\u000b\u0005\t\u001a\u0013aB7vi\u0006\u0014G.\u001a\u0006\u0003IY\t!bY8mY\u0016\u001cG/[8o\u0013\t1\u0013EA\u0006BeJ\f\u0017PQ;gM\u0016\u0014\bCA\u000b)\u0013\tIcCA\u0002J]RD\u0011bK\tA\u0002\u0003\u0007I\u0011\u0002\u0017\u0002\u001dQ,7\u000f\u001e*fgVdGo\u0018\u0013fcR\u0011Q\u0006\r\t\u0003+9J!a\f\f\u0003\tUs\u0017\u000e\u001e\u0005\bc)\n\t\u00111\u0001 \u0003\rAH%\r\u0005\ngE\u0001\r\u0011!Q!\n}\t1\u0002^3tiJ+7/\u001e7uA!)Q'\u0005C\u0001m\u0005Q\u0001/\u0019:b[\u0016$XM]:\u0016\u0003]\u00022\u0001O\u001f@\u001b\u0005I$B\u0001\u001e<\u0003\u0011)H/\u001b7\u000b\u0003q\nAA[1wC&\u0011a(\u000f\u0002\u000b\u0007>dG.Z2uS>t\u0007CA\u000bA\u0013\t\teCA\u0004C_>dW-\u00198)\tQ\u001au\u000b\u0017\t\u0003\tRs!!R)\u000f\u0005\u0019seBA$M\u001d\tA5*D\u0001J\u0015\tQe\"\u0001\u0004=e>|GOP\u0005\u0002\u001b%\u0011Q\nD\u0001\u0006UVt\u0017\u000e^\u0005\u0003\u001fB\u000bqA];o]\u0016\u00148O\u0003\u0002N\u0019%\u0011!kU\u0001\u000e!\u0006\u0014\u0018-\\3uKJL'0\u001a3\u000b\u0005=\u0003\u0016BA+W\u0005)\u0001\u0016M]1nKR,'o\u001d\u0006\u0003%N\u000bAA\\1nK\u0006\n\u0011,A\u0007pe\u0012,'/\u001a3!{\u0001Z\b' \u0004\u0005%\t\u00011l\u0005\u0002[9B\u0011Q,Y\u0007\u0002=*\u0011!h\u0018\u0006\u0003A\"\tA\u0001^3ti&\u0011!M\u0018\u0002\u0011\u0003\n\u001cHO]1diR+7\u000f\u001e\"bg\u0016D\u0001\u0002\u001a.\u0003\u0002\u0003\u0006IaP\u0001\b_J$WM]3e\u0011\u0015Q\"\f\"\u0001g)\t9\u0007\u000e\u0005\u0002\u00115\")A-\u001aa\u0001\u007f!)!N\u0017C\u0001W\u0006!B/Z:u\u0003NLhnY,ji\"$\u0016.\\3pkR$\u0012!\f\u0015\u0003S6\u0004\"A\\8\u000e\u0003AK!\u0001\u001d)\u0003\tQ+7\u000f\u001e\u0005\u0006ej#\ta[\u0001\u0018i\u0016\u001cH/Q:z]\u000e<\u0016\u000e\u001e5pkR$\u0016.\\3pkRD#!]7\t\u000bUTF\u0011\u0002<\u0002%\u0015DXmY;uK\u0006sGMV1mS\u0012\fG/\u001a\u000b\u0007[]DX0!\u0002\t\u000b\u0011$\b\u0019A \t\u000be$\b\u0019\u0001>\u0002\u0007\u0015tg\u000f\u0005\u0002\u0011w&\u0011AP\u0001\u0002\u001b'R\u0014X-Y7Fq\u0016\u001cW\u000f^5p]\u0016sg/\u001b:p]6,g\u000e\u001e\u0005\u0006}R\u0004\ra`\u0001\u000bI\u0006$\u0018m\u0015;sK\u0006l\u0007\u0003\u0002\t\u0002\u0002\u001dJ1!a\u0001\u0003\u0005)!\u0015\r^1TiJ,\u0017-\u001c\u0005\u0007\u0003\u000f!\b\u0019A\u0010\u0002\u001d\u0015D\b/Z2uK\u0012\u0014Vm];mi\"1\u00111\u0002.\u0005\u0002-\f1\u0005^3tiJK7\r[!ts:\u001cg)\u001e8di&|gNU;oi&lWmQ8oi\u0016DH\u000fK\u0002\u0002\n5Da!!\u0005[\t\u0003Y\u0017a\t;fgR\f5/\u001f8d/\u0006LG/V:j]\u001e\fen\u001c8z[>,8OR;oGRLwN\u001c\u0015\u0004\u0003\u001fi\u0007f\u0002.\u0002\u0018\u0005\r\u0012Q\u0005\t\u0005\u00033\ty\"\u0004\u0002\u0002\u001c)\u0019\u0011Q\u0004)\u0002\rI,hN\\3s\u0013\u0011\t\t#a\u0007\u0003\u000fI+hnV5uQ\u0006)a/\u00197vK\u000e\u0012\u0011q\u0005\t\u0005\u0003S\tY#D\u0001T\u0013\r\tic\u0015\u0002\u000e!\u0006\u0014\u0018-\\3uKJL'0\u001a3")
public class AsyncDataStreamITCase
extends AbstractTestBase {
    private final boolean ordered;

    @Parameterized.Parameters(name="ordered = {0}")
    public static Collection<Object> parameters() {
        return AsyncDataStreamITCase$.MODULE$.parameters();
    }

    @Test
    public void testAsyncWithTimeout() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment$.MODULE$.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream source = env.fromElements((Seq)Predef$.MODULE$.wrapIntArray(new int[]{1}), (TypeInformation)BasicTypeInfo.getInfoFor(Integer.TYPE));
        long timeout = 1L;
        DataStream asyncMapped = this.ordered ? AsyncDataStream$.MODULE$.orderedWait(source, (AsyncFunction)new AsyncFunctionWithTimeoutExpired(), timeout, TimeUnit.MILLISECONDS, (TypeInformation)BasicTypeInfo.getInfoFor(Integer.TYPE)) : AsyncDataStream$.MODULE$.unorderedWait(source, (AsyncFunction)new AsyncFunctionWithTimeoutExpired(), timeout, TimeUnit.MILLISECONDS, (TypeInformation)BasicTypeInfo.getInfoFor(Integer.TYPE));
        this.executeAndValidate(this.ordered, env, (DataStream<Object>)asyncMapped, (ArrayBuffer<Object>)((ArrayBuffer)ArrayBuffer$.MODULE$.apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{3}))));
    }

    @Test
    public void testAsyncWithoutTimeout() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment$.MODULE$.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream source = env.fromElements((Seq)Predef$.MODULE$.wrapIntArray(new int[]{1}), (TypeInformation)BasicTypeInfo.getInfoFor(Integer.TYPE));
        long timeout = 1L;
        DataStream asyncMapped = this.ordered ? AsyncDataStream$.MODULE$.orderedWait(source, (AsyncFunction)new AsyncFunctionWithoutTimeoutExpired(), timeout, TimeUnit.MILLISECONDS, (TypeInformation)BasicTypeInfo.getInfoFor(Integer.TYPE)) : AsyncDataStream$.MODULE$.unorderedWait(source, (AsyncFunction)new AsyncFunctionWithoutTimeoutExpired(), timeout, TimeUnit.MILLISECONDS, (TypeInformation)BasicTypeInfo.getInfoFor(Integer.TYPE));
        this.executeAndValidate(this.ordered, env, (DataStream<Object>)asyncMapped, (ArrayBuffer<Object>)((ArrayBuffer)ArrayBuffer$.MODULE$.apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{2}))));
    }

    private void executeAndValidate(boolean ordered, StreamExecutionEnvironment env, DataStream<Object> dataStream, ArrayBuffer<Object> expectedResult) {
        AsyncDataStreamITCase$.MODULE$.org$apache$flink$streaming$api$scala$AsyncDataStreamITCase$$testResult_$eq((ArrayBuffer<Object>)((ArrayBuffer)ArrayBuffer$.MODULE$.apply((Seq)Nil$.MODULE$)));
        dataStream.addSink((SinkFunction)new SinkFunction<Object>(null){

            public void invoke(Object x$1, SinkFunction.Context x$2) throws Exception {
                super.invoke(x$1, x$2);
            }

            public void writeWatermark(Watermark x$1) throws Exception {
                super.writeWatermark(x$1);
            }

            public void finish() throws Exception {
                super.finish();
            }

            public void invoke(int value) {
                AsyncDataStreamITCase$.MODULE$.org$apache$flink$streaming$api$scala$AsyncDataStreamITCase$$testResult().$plus$eq((Object)BoxesRunTime.boxToInteger((int)value));
            }
        });
        env.execute("testAsyncDataStream");
        if (ordered) {
            Assert.assertEquals(expectedResult, AsyncDataStreamITCase$.MODULE$.org$apache$flink$streaming$api$scala$AsyncDataStreamITCase$$testResult());
        } else {
            Assert.assertEquals(expectedResult, (Object)AsyncDataStreamITCase$.MODULE$.org$apache$flink$streaming$api$scala$AsyncDataStreamITCase$$testResult().sorted((Ordering)Ordering.Int$.MODULE$));
        }
    }

    @Test
    public void testRichAsyncFunctionRuntimeContext() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment$.MODULE$.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream source = env.fromElements((Seq)Predef$.MODULE$.wrapIntArray(new int[]{1}), (TypeInformation)BasicTypeInfo.getInfoFor(Integer.TYPE));
        long timeout = 10000L;
        MyRichAsyncFunction richAsyncFunction = new MyRichAsyncFunction();
        DataStream asyncMapped = this.ordered ? AsyncDataStream$.MODULE$.orderedWait(source, (AsyncFunction)richAsyncFunction, timeout, TimeUnit.MILLISECONDS, (TypeInformation)BasicTypeInfo.getInfoFor(Integer.TYPE)) : AsyncDataStream$.MODULE$.unorderedWait(source, (AsyncFunction)richAsyncFunction, timeout, TimeUnit.MILLISECONDS, (TypeInformation)BasicTypeInfo.getInfoFor(Integer.TYPE));
        this.executeAndValidate(false, env, (DataStream<Object>)asyncMapped, (ArrayBuffer<Object>)((ArrayBuffer)ArrayBuffer$.MODULE$.apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{2}))));
    }

    @Test
    public void testAsyncWaitUsingAnonymousFunction() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment$.MODULE$.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream source = env.fromElements((Seq)Predef$.MODULE$.wrapIntArray(new int[]{1, 2}), (TypeInformation)BasicTypeInfo.getInfoFor(Integer.TYPE));
        Function2 & Serializable & scala.Serializable asyncFunction = (Function2 & Serializable & scala.Serializable)(input, collector) -> {
            Future$.MODULE$.apply((Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> collector.complete((Iterable)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{BoxesRunTime.unboxToInt((Object)input) * 2}))), (ExecutionContext)ExecutionContext$.MODULE$.global());
            return BoxedUnit.UNIT;
        };
        long timeout = 10000L;
        DataStream asyncMapped = this.ordered ? AsyncDataStream$.MODULE$.orderedWait(source, timeout, TimeUnit.MILLISECONDS, (Function2)asyncFunction, (TypeInformation)BasicTypeInfo.getInfoFor(Integer.TYPE)) : AsyncDataStream$.MODULE$.unorderedWait(source, timeout, TimeUnit.MILLISECONDS, (Function2)asyncFunction, (TypeInformation)BasicTypeInfo.getInfoFor(Integer.TYPE));
        this.executeAndValidate(this.ordered, env, (DataStream<Object>)asyncMapped, (ArrayBuffer<Object>)((ArrayBuffer)ArrayBuffer$.MODULE$.apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{2, 4}))));
    }

    public AsyncDataStreamITCase(boolean ordered) {
        this.ordered = ordered;
    }
}

