/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.scala;

import java.util.Arrays;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo;
import org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.scala.DataStream;
import org.apache.flink.streaming.api.scala.OutputTag;
import org.apache.flink.streaming.api.scala.OutputTag$;
import org.apache.flink.streaming.api.scala.SideOutputITCase$;
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment$;
import org.apache.flink.streaming.api.scala.TestAssigner;
import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.test.streaming.runtime.util.TestListResultSink;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;
import scala.Function1;
import scala.Predef$;
import scala.Serializable;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.Seq;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.ScalaRunTime$;

@ScalaSignature(bytes="\u0006\u0001\u00053A!\u0001\u0002\u0001\u001f\t\u00012+\u001b3f\u001fV$\b/\u001e;J)\u000e\u000b7/\u001a\u0006\u0003\u0007\u0011\tQa]2bY\u0006T!!\u0002\u0004\u0002\u0007\u0005\u0004\u0018N\u0003\u0002\b\u0011\u0005I1\u000f\u001e:fC6Lgn\u001a\u0006\u0003\u0013)\tQA\u001a7j].T!a\u0003\u0007\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005i\u0011aA8sO\u000e\u00011C\u0001\u0001\u0011!\t\tb#D\u0001\u0013\u0015\t\u0019B#\u0001\u0003vi&d'BA\u000b\t\u0003\u0011!Xm\u001d;\n\u0005]\u0011\"\u0001E!cgR\u0014\u0018m\u0019;UKN$()Y:f\u0011\u0015I\u0002\u0001\"\u0001\u001b\u0003\u0019a\u0014N\\5u}Q\t1\u0004\u0005\u0002\u001d\u00015\t!\u0001C\u0003\u001f\u0001\u0011\u0005q$A\u000fuKN$\bK]8dKN\u001ch)\u001e8di&|gnU5eK>+H\u000f];u)\u0005\u0001\u0003CA\u0011$\u001b\u0005\u0011#\"A\u0002\n\u0005\u0011\u0012#\u0001B+oSRD#!\b\u0014\u0011\u0005\u001dRS\"\u0001\u0015\u000b\u0005%b\u0011!\u00026v]&$\u0018BA\u0016)\u0005\u0011!Vm\u001d;\t\u000b5\u0002A\u0011A\u0010\u0002EQ,7\u000f^&fs\u0016$\u0007K]8dKN\u001ch)\u001e8di&|gnU5eK>+H\u000f];uQ\tac\u0005C\u00031\u0001\u0011\u0005q$A\u0015uKN$\bK]8dKN\u001ch)\u001e8di&|gnU5eK>+H\u000f];u/&$\bn\u0016:p]\u001e$\u0016m\u001a\u0015\u0003_\u0019BQa\r\u0001\u0005\u0002}\tq\u0004^3ti\u0006cGnV5oI><H*\u0019;f\u0003J\u0014\u0018N^5oO\u00163XM\u001c;tQ\t\u0011d\u0005C\u00037\u0001\u0011\u0005q$A\u0011uKN$8*Z=fI^Kg\u000eZ8x\u0019\u0006$X-\u0011:sSZLgnZ#wK:$8\u000f\u000b\u00026M!)\u0011\b\u0001C\u0001?\u0005\u0019C/Z:u!J|7-Z:t/&tGm\\<Gk:\u001cG/[8o'&$WmT;uaV$\bF\u0001\u001d'\u0011\u0015a\u0004\u0001\"\u0001 \u0003\u0019\"Xm\u001d;Qe>\u001cWm]:BY2<\u0016N\u001c3po\u001a+hn\u0019;j_:\u001c\u0016\u000eZ3PkR\u0004X\u000f\u001e\u0015\u0003w\u0019BQa\u0010\u0001\u0005\u0002}\t\u0011\u0004^3tiVs\u0017n\u001c8PMR;xnU5eK>+H\u000f];ug\"\u0012aH\n")
public class SideOutputITCase
extends AbstractTestBase {
    @Test
    public void testProcessFunctionSideOutput() {
        TestListResultSink sideOutputResultSink = new TestListResultSink();
        TestListResultSink resultSink = new TestListResultSink();
        StreamExecutionEnvironment env = StreamExecutionEnvironment$.MODULE$.getExecutionEnvironment();
        env.setParallelism(3);
        DataStream dataStream = env.fromElements((Seq)Predef$.MODULE$.wrapIntArray(new int[]{1, 2, 5, 3, 4}), (TypeInformation)BasicTypeInfo.getInfoFor(Integer.TYPE));
        OutputTag outputTag = OutputTag$.MODULE$.apply("side", (TypeInformation)BasicTypeInfo.getInfoFor(String.class));
        DataStream passThroughtStream = dataStream.process((ProcessFunction)new ProcessFunction<Object, Object>(this, outputTag){
            private final OutputTag outputTag$1;

            public void processElement(int value, ProcessFunction.Context ctx, Collector<Object> out) {
                out.collect((Object)BoxesRunTime.boxToInteger((int)value));
                ctx.output((org.apache.flink.util.OutputTag)this.outputTag$1, (Object)new StringBuilder().append((Object)"sideout-").append((Object)String.valueOf(value)).toString());
            }
            {
                this.outputTag$1 = outputTag$1;
            }
        }, (TypeInformation)BasicTypeInfo.getInfoFor(Integer.TYPE));
        passThroughtStream.getSideOutput(outputTag, (TypeInformation)BasicTypeInfo.getInfoFor(String.class)).addSink((SinkFunction)sideOutputResultSink);
        passThroughtStream.addSink((SinkFunction)resultSink);
        env.execute();
        Assert.assertEquals(Arrays.asList((Object[])new String[]{"sideout-1", "sideout-2", "sideout-3", "sideout-4", "sideout-5"}), (Object)sideOutputResultSink.getSortedResult());
        Assert.assertEquals(Arrays.asList(ScalaRunTime$.MODULE$.toObjectArray((Object)new int[]{1, 2, 3, 4, 5})), (Object)resultSink.getSortedResult());
    }

    @Test
    public void testKeyedProcessFunctionSideOutput() {
        TestListResultSink sideOutputResultSink = new TestListResultSink();
        TestListResultSink resultSink = new TestListResultSink();
        StreamExecutionEnvironment env = StreamExecutionEnvironment$.MODULE$.getExecutionEnvironment();
        env.setParallelism(3);
        DataStream dataStream = env.fromElements((Seq)Predef$.MODULE$.wrapIntArray(new int[]{1, 2, 5, 3, 4}), (TypeInformation)BasicTypeInfo.getInfoFor(Integer.TYPE));
        OutputTag outputTag = OutputTag$.MODULE$.apply("side", (TypeInformation)BasicTypeInfo.getInfoFor(String.class));
        DataStream passThroughtStream = dataStream.keyBy((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final int apply(int x) {
                return this.apply$mcII$sp(x);
            }

            public int apply$mcII$sp(int x) {
                return x;
            }
        }, (TypeInformation)BasicTypeInfo.getInfoFor(Integer.TYPE)).process((ProcessFunction)new ProcessFunction<Object, Object>(this, outputTag){
            private final OutputTag outputTag$2;

            public void processElement(int value, ProcessFunction.Context ctx, Collector<Object> out) {
                out.collect((Object)BoxesRunTime.boxToInteger((int)value));
                ctx.output((org.apache.flink.util.OutputTag)this.outputTag$2, (Object)new StringBuilder().append((Object)"sideout-").append((Object)String.valueOf(value)).toString());
            }
            {
                this.outputTag$2 = outputTag$2;
            }
        }, (TypeInformation)BasicTypeInfo.getInfoFor(Integer.TYPE));
        passThroughtStream.getSideOutput(outputTag, (TypeInformation)BasicTypeInfo.getInfoFor(String.class)).addSink((SinkFunction)sideOutputResultSink);
        passThroughtStream.addSink((SinkFunction)resultSink);
        env.execute();
        Assert.assertEquals(Arrays.asList((Object[])new String[]{"sideout-1", "sideout-2", "sideout-3", "sideout-4", "sideout-5"}), (Object)sideOutputResultSink.getSortedResult());
        Assert.assertEquals(Arrays.asList(ScalaRunTime$.MODULE$.toObjectArray((Object)new int[]{1, 2, 3, 4, 5})), (Object)resultSink.getSortedResult());
    }

    @Test
    public void testProcessFunctionSideOutputWithWrongTag() {
        TestListResultSink sideOutputResultSink = new TestListResultSink();
        TestListResultSink resultSink = new TestListResultSink();
        StreamExecutionEnvironment env = StreamExecutionEnvironment$.MODULE$.getExecutionEnvironment();
        env.setParallelism(3);
        DataStream dataStream = env.fromElements((Seq)Predef$.MODULE$.wrapIntArray(new int[]{1, 2, 5, 3, 4}), (TypeInformation)BasicTypeInfo.getInfoFor(Integer.TYPE));
        OutputTag outputTag = OutputTag$.MODULE$.apply("side", (TypeInformation)BasicTypeInfo.getInfoFor(String.class));
        OutputTag otherOutputTag = OutputTag$.MODULE$.apply("other-side", (TypeInformation)BasicTypeInfo.getInfoFor(String.class));
        DataStream passThroughtStream = dataStream.process((ProcessFunction)new ProcessFunction<Object, Object>(this, otherOutputTag){
            private final OutputTag otherOutputTag$1;

            public void processElement(int value, ProcessFunction.Context ctx, Collector<Object> out) {
                ctx.output((org.apache.flink.util.OutputTag)this.otherOutputTag$1, (Object)new StringBuilder().append((Object)"sideout-").append((Object)String.valueOf(value)).toString());
            }
            {
                this.otherOutputTag$1 = otherOutputTag$1;
            }
        }, (TypeInformation)BasicTypeInfo.getInfoFor(Integer.TYPE));
        passThroughtStream.getSideOutput(outputTag, (TypeInformation)BasicTypeInfo.getInfoFor(String.class)).addSink((SinkFunction)sideOutputResultSink);
        env.execute();
        Assert.assertTrue((boolean)sideOutputResultSink.getSortedResult().isEmpty());
    }

    @Test
    public void testAllWindowLateArrivingEvents() {
        TestListResultSink resultSink = new TestListResultSink();
        TestListResultSink lateResultSink = new TestListResultSink();
        StreamExecutionEnvironment env = StreamExecutionEnvironment$.MODULE$.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream dataStream = env.fromElements((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{new Tuple2((Object)"1", (Object)BoxesRunTime.boxToInteger((int)1)), new Tuple2((Object)"2", (Object)BoxesRunTime.boxToInteger((int)2)), new Tuple2((Object)"5", (Object)BoxesRunTime.boxToInteger((int)5)), new Tuple2((Object)"3", (Object)BoxesRunTime.boxToInteger((int)3)), new Tuple2((Object)"4", (Object)BoxesRunTime.boxToInteger((int)4))}), (TypeInformation)new CaseClassTypeInfo<Tuple2<String, Object>>(this){

            public /* synthetic */ TypeInformation[] protected$types($anon$12 x$1) {
                return x$1.types;
            }

            public TypeSerializer<Tuple2<String, Object>> createSerializer(ExecutionConfig executionConfig) {
                TypeSerializer[] fieldSerializers = new TypeSerializer[this.getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.getArity()).foreach$mVc$sp((Function1)new Serializable(this, executionConfig, fieldSerializers){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ $anon$12 $outer;
                    private final ExecutionConfig executionConfig$1;
                    private final TypeSerializer[] fieldSerializers$1;

                    public final void apply(int i) {
                        this.apply$mcVI$sp(i);
                    }

                    public void apply$mcVI$sp(int i) {
                        this.fieldSerializers$1[i] = this.$outer.protected$types(this.$outer)[i].createSerializer(this.executionConfig$1);
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                        this.executionConfig$1 = executionConfig$1;
                        this.fieldSerializers$1 = fieldSerializers$1;
                    }
                });
                ScalaCaseClassSerializer<Tuple2<String, Object>> unused = new ScalaCaseClassSerializer<Tuple2<String, Object>>(this, fieldSerializers){

                    public Tuple2<String, Object> createInstance(Object[] fields) {
                        return new Tuple2((Object)((String)fields[0]), (Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)fields[1])));
                    }
                };
                return new ScalaCaseClassSerializer(this.getTypeClass(), fieldSerializers);
            }
        });
        OutputTag lateDataTag = OutputTag$.MODULE$.apply("late", (TypeInformation)new CaseClassTypeInfo<Tuple2<String, Object>>(this){

            public /* synthetic */ TypeInformation[] protected$types($anon$13 x$1) {
                return x$1.types;
            }

            public TypeSerializer<Tuple2<String, Object>> createSerializer(ExecutionConfig executionConfig) {
                TypeSerializer[] fieldSerializers = new TypeSerializer[this.getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.getArity()).foreach$mVc$sp((Function1)new Serializable(this, executionConfig, fieldSerializers){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ $anon$13 $outer;
                    private final ExecutionConfig executionConfig$2;
                    private final TypeSerializer[] fieldSerializers$2;

                    public final void apply(int i) {
                        this.apply$mcVI$sp(i);
                    }

                    public void apply$mcVI$sp(int i) {
                        this.fieldSerializers$2[i] = this.$outer.protected$types(this.$outer)[i].createSerializer(this.executionConfig$2);
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                        this.executionConfig$2 = executionConfig$2;
                        this.fieldSerializers$2 = fieldSerializers$2;
                    }
                });
                ScalaCaseClassSerializer<Tuple2<String, Object>> unused = new ScalaCaseClassSerializer<Tuple2<String, Object>>(this, fieldSerializers){

                    public Tuple2<String, Object> createInstance(Object[] fields) {
                        return new Tuple2((Object)((String)fields[0]), (Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)fields[1])));
                    }
                };
                return new ScalaCaseClassSerializer(this.getTypeClass(), fieldSerializers);
            }
        });
        DataStream windowOperator = dataStream.assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks)new TestAssigner()).windowAll((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.milliseconds((long)1L))).sideOutputLateData(lateDataTag).process((ProcessAllWindowFunction)new ProcessAllWindowFunction<Tuple2<String, Object>, String, TimeWindow>(this){

            public void process(ProcessAllWindowFunction.Context context, Iterable<Tuple2<String, Object>> elements, Collector<String> out) {
                elements.foreach((Function1)new Serializable(this, out){
                    public static final long serialVersionUID = 0L;
                    private final Collector out$1;

                    public final void apply(Tuple2<String, Object> in) {
                        this.out$1.collect(in._1());
                    }
                    {
                        this.out$1 = out$1;
                    }
                });
            }
        }, (TypeInformation)BasicTypeInfo.getInfoFor(String.class));
        windowOperator.getSideOutput(lateDataTag, (TypeInformation)new CaseClassTypeInfo<Tuple2<String, Object>>(this){

            public /* synthetic */ TypeInformation[] protected$types($anon$15 x$1) {
                return x$1.types;
            }

            public TypeSerializer<Tuple2<String, Object>> createSerializer(ExecutionConfig executionConfig) {
                TypeSerializer[] fieldSerializers = new TypeSerializer[this.getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.getArity()).foreach$mVc$sp((Function1)new Serializable(this, executionConfig, fieldSerializers){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ $anon$15 $outer;
                    private final ExecutionConfig executionConfig$3;
                    private final TypeSerializer[] fieldSerializers$3;

                    public final void apply(int i) {
                        this.apply$mcVI$sp(i);
                    }

                    public void apply$mcVI$sp(int i) {
                        this.fieldSerializers$3[i] = this.$outer.protected$types(this.$outer)[i].createSerializer(this.executionConfig$3);
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                        this.executionConfig$3 = executionConfig$3;
                        this.fieldSerializers$3 = fieldSerializers$3;
                    }
                });
                ScalaCaseClassSerializer<Tuple2<String, Object>> unused = new ScalaCaseClassSerializer<Tuple2<String, Object>>(this, fieldSerializers){

                    public Tuple2<String, Object> createInstance(Object[] fields) {
                        return new Tuple2((Object)((String)fields[0]), (Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)fields[1])));
                    }
                };
                return new ScalaCaseClassSerializer(this.getTypeClass(), fieldSerializers);
            }
        }).addSink((SinkFunction)lateResultSink);
        windowOperator.addSink((SinkFunction)resultSink);
        env.execute();
        Assert.assertEquals(Arrays.asList((Object[])new String[]{"1", "2", "5"}), (Object)resultSink.getResult());
        Assert.assertEquals(Arrays.asList((Object[])new Tuple2[]{new Tuple2((Object)"3", (Object)BoxesRunTime.boxToInteger((int)3)), new Tuple2((Object)"4", (Object)BoxesRunTime.boxToInteger((int)4))}), (Object)lateResultSink.getResult());
    }

    @Test
    public void testKeyedWindowLateArrivingEvents() {
        TestListResultSink resultSink = new TestListResultSink();
        TestListResultSink lateResultSink = new TestListResultSink();
        StreamExecutionEnvironment env = StreamExecutionEnvironment$.MODULE$.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream dataStream = env.fromElements((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{new Tuple2((Object)"1", (Object)BoxesRunTime.boxToInteger((int)1)), new Tuple2((Object)"2", (Object)BoxesRunTime.boxToInteger((int)2)), new Tuple2((Object)"5", (Object)BoxesRunTime.boxToInteger((int)5)), new Tuple2((Object)"3", (Object)BoxesRunTime.boxToInteger((int)3)), new Tuple2((Object)"4", (Object)BoxesRunTime.boxToInteger((int)4))}), (TypeInformation)new CaseClassTypeInfo<Tuple2<String, Object>>(this){

            public /* synthetic */ TypeInformation[] protected$types($anon$16 x$1) {
                return x$1.types;
            }

            public TypeSerializer<Tuple2<String, Object>> createSerializer(ExecutionConfig executionConfig) {
                TypeSerializer[] fieldSerializers = new TypeSerializer[this.getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.getArity()).foreach$mVc$sp((Function1)new Serializable(this, executionConfig, fieldSerializers){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ $anon$16 $outer;
                    private final ExecutionConfig executionConfig$4;
                    private final TypeSerializer[] fieldSerializers$4;

                    public final void apply(int i) {
                        this.apply$mcVI$sp(i);
                    }

                    public void apply$mcVI$sp(int i) {
                        this.fieldSerializers$4[i] = this.$outer.protected$types(this.$outer)[i].createSerializer(this.executionConfig$4);
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                        this.executionConfig$4 = executionConfig$4;
                        this.fieldSerializers$4 = fieldSerializers$4;
                    }
                });
                ScalaCaseClassSerializer<Tuple2<String, Object>> unused = new ScalaCaseClassSerializer<Tuple2<String, Object>>(this, fieldSerializers){

                    public Tuple2<String, Object> createInstance(Object[] fields) {
                        return new Tuple2((Object)((String)fields[0]), (Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)fields[1])));
                    }
                };
                return new ScalaCaseClassSerializer(this.getTypeClass(), fieldSerializers);
            }
        });
        OutputTag lateDataTag = OutputTag$.MODULE$.apply("late", (TypeInformation)new CaseClassTypeInfo<Tuple2<String, Object>>(this){

            public /* synthetic */ TypeInformation[] protected$types($anon$17 x$1) {
                return x$1.types;
            }

            public TypeSerializer<Tuple2<String, Object>> createSerializer(ExecutionConfig executionConfig) {
                TypeSerializer[] fieldSerializers = new TypeSerializer[this.getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.getArity()).foreach$mVc$sp((Function1)new Serializable(this, executionConfig, fieldSerializers){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ $anon$17 $outer;
                    private final ExecutionConfig executionConfig$5;
                    private final TypeSerializer[] fieldSerializers$5;

                    public final void apply(int i) {
                        this.apply$mcVI$sp(i);
                    }

                    public void apply$mcVI$sp(int i) {
                        this.fieldSerializers$5[i] = this.$outer.protected$types(this.$outer)[i].createSerializer(this.executionConfig$5);
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                        this.executionConfig$5 = executionConfig$5;
                        this.fieldSerializers$5 = fieldSerializers$5;
                    }
                });
                ScalaCaseClassSerializer<Tuple2<String, Object>> unused = new ScalaCaseClassSerializer<Tuple2<String, Object>>(this, fieldSerializers){

                    public Tuple2<String, Object> createInstance(Object[] fields) {
                        return new Tuple2((Object)((String)fields[0]), (Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)fields[1])));
                    }
                };
                return new ScalaCaseClassSerializer(this.getTypeClass(), fieldSerializers);
            }
        });
        DataStream windowOperator = dataStream.assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks)new TestAssigner()).keyBy((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final String apply(Tuple2<String, Object> i) {
                return (String)i._1();
            }
        }, (TypeInformation)BasicTypeInfo.getInfoFor(String.class)).window((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.milliseconds((long)1L))).sideOutputLateData(lateDataTag).process((ProcessWindowFunction)new ProcessWindowFunction<Tuple2<String, Object>, String, String, TimeWindow>(this){

            public void process(String key, ProcessWindowFunction.Context context, Iterable<Tuple2<String, Object>> elements, Collector<String> out) {
                elements.foreach((Function1)new Serializable(this, out){
                    public static final long serialVersionUID = 0L;
                    private final Collector out$2;

                    public final void apply(Tuple2<String, Object> in) {
                        this.out$2.collect(in._1());
                    }
                    {
                        this.out$2 = out$2;
                    }
                });
            }
        }, (TypeInformation)BasicTypeInfo.getInfoFor(String.class));
        windowOperator.getSideOutput(lateDataTag, (TypeInformation)new CaseClassTypeInfo<Tuple2<String, Object>>(this){

            public /* synthetic */ TypeInformation[] protected$types($anon$19 x$1) {
                return x$1.types;
            }

            public TypeSerializer<Tuple2<String, Object>> createSerializer(ExecutionConfig executionConfig) {
                TypeSerializer[] fieldSerializers = new TypeSerializer[this.getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.getArity()).foreach$mVc$sp((Function1)new Serializable(this, executionConfig, fieldSerializers){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ $anon$19 $outer;
                    private final ExecutionConfig executionConfig$6;
                    private final TypeSerializer[] fieldSerializers$6;

                    public final void apply(int i) {
                        this.apply$mcVI$sp(i);
                    }

                    public void apply$mcVI$sp(int i) {
                        this.fieldSerializers$6[i] = this.$outer.protected$types(this.$outer)[i].createSerializer(this.executionConfig$6);
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                        this.executionConfig$6 = executionConfig$6;
                        this.fieldSerializers$6 = fieldSerializers$6;
                    }
                });
                ScalaCaseClassSerializer<Tuple2<String, Object>> unused = new ScalaCaseClassSerializer<Tuple2<String, Object>>(this, fieldSerializers){

                    public Tuple2<String, Object> createInstance(Object[] fields) {
                        return new Tuple2((Object)((String)fields[0]), (Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)fields[1])));
                    }
                };
                return new ScalaCaseClassSerializer(this.getTypeClass(), fieldSerializers);
            }
        }).addSink((SinkFunction)lateResultSink);
        windowOperator.addSink((SinkFunction)resultSink);
        env.execute();
        Assert.assertEquals(Arrays.asList((Object[])new String[]{"1", "2", "5"}), (Object)resultSink.getResult());
        Assert.assertEquals(Arrays.asList((Object[])new Tuple2[]{new Tuple2((Object)"3", (Object)BoxesRunTime.boxToInteger((int)3)), new Tuple2((Object)"4", (Object)BoxesRunTime.boxToInteger((int)4))}), (Object)lateResultSink.getResult());
    }

    @Test
    public void testProcessWindowFunctionSideOutput() {
        TestListResultSink resultSink = new TestListResultSink();
        TestListResultSink sideOutputResultSink = new TestListResultSink();
        StreamExecutionEnvironment env = StreamExecutionEnvironment$.MODULE$.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream dataStream = env.fromElements((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{new Tuple2((Object)"1", (Object)BoxesRunTime.boxToInteger((int)1)), new Tuple2((Object)"2", (Object)BoxesRunTime.boxToInteger((int)2)), new Tuple2((Object)"5", (Object)BoxesRunTime.boxToInteger((int)5)), new Tuple2((Object)"3", (Object)BoxesRunTime.boxToInteger((int)3)), new Tuple2((Object)"4", (Object)BoxesRunTime.boxToInteger((int)4))}), (TypeInformation)new CaseClassTypeInfo<Tuple2<String, Object>>(this){

            public /* synthetic */ TypeInformation[] protected$types($anon$20 x$1) {
                return x$1.types;
            }

            public TypeSerializer<Tuple2<String, Object>> createSerializer(ExecutionConfig executionConfig) {
                TypeSerializer[] fieldSerializers = new TypeSerializer[this.getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.getArity()).foreach$mVc$sp((Function1)new Serializable(this, executionConfig, fieldSerializers){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ $anon$20 $outer;
                    private final ExecutionConfig executionConfig$7;
                    private final TypeSerializer[] fieldSerializers$7;

                    public final void apply(int i) {
                        this.apply$mcVI$sp(i);
                    }

                    public void apply$mcVI$sp(int i) {
                        this.fieldSerializers$7[i] = this.$outer.protected$types(this.$outer)[i].createSerializer(this.executionConfig$7);
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                        this.executionConfig$7 = executionConfig$7;
                        this.fieldSerializers$7 = fieldSerializers$7;
                    }
                });
                ScalaCaseClassSerializer<Tuple2<String, Object>> unused = new ScalaCaseClassSerializer<Tuple2<String, Object>>(this, fieldSerializers){

                    public Tuple2<String, Object> createInstance(Object[] fields) {
                        return new Tuple2((Object)((String)fields[0]), (Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)fields[1])));
                    }
                };
                return new ScalaCaseClassSerializer(this.getTypeClass(), fieldSerializers);
            }
        });
        OutputTag sideOutputTag = OutputTag$.MODULE$.apply("side", (TypeInformation)BasicTypeInfo.getInfoFor(String.class));
        DataStream windowOperator = dataStream.assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks)new TestAssigner()).keyBy((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final String apply(Tuple2<String, Object> i) {
                return (String)i._1();
            }
        }, (TypeInformation)BasicTypeInfo.getInfoFor(String.class)).window((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.milliseconds((long)1L))).process((ProcessWindowFunction)new ProcessWindowFunction<Tuple2<String, Object>, String, String, TimeWindow>(this, sideOutputTag){
            public final OutputTag sideOutputTag$1;

            public void process(String key, ProcessWindowFunction.Context context, Iterable<Tuple2<String, Object>> elements, Collector<String> out) {
                elements.foreach((Function1)new Serializable(this, context, out){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ $anon$21 $outer;
                    private final ProcessWindowFunction.Context context$1;
                    private final Collector out$3;

                    public final void apply(Tuple2<String, Object> in) {
                        this.out$3.collect(in._1());
                        this.context$1.output(this.$outer.sideOutputTag$1, (Object)new StringBuilder().append((Object)"sideout-").append(in._1()).toString());
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                        this.context$1 = context$1;
                        this.out$3 = out$3;
                    }
                });
            }
            {
                this.sideOutputTag$1 = sideOutputTag$1;
            }
        }, (TypeInformation)BasicTypeInfo.getInfoFor(String.class));
        windowOperator.getSideOutput(sideOutputTag, (TypeInformation)BasicTypeInfo.getInfoFor(String.class)).addSink((SinkFunction)sideOutputResultSink);
        windowOperator.addSink((SinkFunction)resultSink);
        env.execute();
        Assert.assertEquals(Arrays.asList((Object[])new String[]{"1", "2", "5"}), (Object)resultSink.getResult());
        Assert.assertEquals(Arrays.asList((Object[])new String[]{"sideout-1", "sideout-2", "sideout-5"}), (Object)sideOutputResultSink.getResult());
    }

    @Test
    public void testProcessAllWindowFunctionSideOutput() {
        TestListResultSink resultSink = new TestListResultSink();
        TestListResultSink sideOutputResultSink = new TestListResultSink();
        StreamExecutionEnvironment env = StreamExecutionEnvironment$.MODULE$.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream dataStream = env.fromElements((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{new Tuple2((Object)"1", (Object)BoxesRunTime.boxToInteger((int)1)), new Tuple2((Object)"2", (Object)BoxesRunTime.boxToInteger((int)2)), new Tuple2((Object)"5", (Object)BoxesRunTime.boxToInteger((int)5)), new Tuple2((Object)"3", (Object)BoxesRunTime.boxToInteger((int)3)), new Tuple2((Object)"4", (Object)BoxesRunTime.boxToInteger((int)4))}), (TypeInformation)new CaseClassTypeInfo<Tuple2<String, Object>>(this){

            public /* synthetic */ TypeInformation[] protected$types($anon$22 x$1) {
                return x$1.types;
            }

            public TypeSerializer<Tuple2<String, Object>> createSerializer(ExecutionConfig executionConfig) {
                TypeSerializer[] fieldSerializers = new TypeSerializer[this.getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.getArity()).foreach$mVc$sp((Function1)new Serializable(this, executionConfig, fieldSerializers){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ $anon$22 $outer;
                    private final ExecutionConfig executionConfig$8;
                    private final TypeSerializer[] fieldSerializers$8;

                    public final void apply(int i) {
                        this.apply$mcVI$sp(i);
                    }

                    public void apply$mcVI$sp(int i) {
                        this.fieldSerializers$8[i] = this.$outer.protected$types(this.$outer)[i].createSerializer(this.executionConfig$8);
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                        this.executionConfig$8 = executionConfig$8;
                        this.fieldSerializers$8 = fieldSerializers$8;
                    }
                });
                ScalaCaseClassSerializer<Tuple2<String, Object>> unused = new ScalaCaseClassSerializer<Tuple2<String, Object>>(this, fieldSerializers){

                    public Tuple2<String, Object> createInstance(Object[] fields) {
                        return new Tuple2((Object)((String)fields[0]), (Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)fields[1])));
                    }
                };
                return new ScalaCaseClassSerializer(this.getTypeClass(), fieldSerializers);
            }
        });
        OutputTag sideOutputTag = OutputTag$.MODULE$.apply("side", (TypeInformation)BasicTypeInfo.getInfoFor(String.class));
        DataStream windowOperator = dataStream.assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks)new TestAssigner()).windowAll((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.milliseconds((long)1L))).process((ProcessAllWindowFunction)new ProcessAllWindowFunction<Tuple2<String, Object>, String, TimeWindow>(this, sideOutputTag){
            public final OutputTag sideOutputTag$2;

            public void process(ProcessAllWindowFunction.Context context, Iterable<Tuple2<String, Object>> elements, Collector<String> out) {
                elements.foreach((Function1)new Serializable(this, context, out){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ $anon$23 $outer;
                    private final ProcessAllWindowFunction.Context context$2;
                    private final Collector out$4;

                    public final void apply(Tuple2<String, Object> in) {
                        this.out$4.collect(in._1());
                        this.context$2.output(this.$outer.sideOutputTag$2, (Object)new StringBuilder().append((Object)"sideout-").append(in._1()).toString());
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                        this.context$2 = context$2;
                        this.out$4 = out$4;
                    }
                });
            }
            {
                this.sideOutputTag$2 = sideOutputTag$2;
            }
        }, (TypeInformation)BasicTypeInfo.getInfoFor(String.class));
        windowOperator.getSideOutput(sideOutputTag, (TypeInformation)BasicTypeInfo.getInfoFor(String.class)).addSink((SinkFunction)sideOutputResultSink);
        windowOperator.addSink((SinkFunction)resultSink);
        env.execute();
        Assert.assertEquals(Arrays.asList((Object[])new String[]{"1", "2", "5"}), (Object)resultSink.getResult());
        Assert.assertEquals(Arrays.asList((Object[])new String[]{"sideout-1", "sideout-2", "sideout-5"}), (Object)sideOutputResultSink.getResult());
    }

    @Test
    public void testUnionOfTwoSideOutputs() {
        TestListResultSink evensResultSink = new TestListResultSink();
        TestListResultSink oddsResultSink = new TestListResultSink();
        TestListResultSink oddsUEvensResultSink = new TestListResultSink();
        TestListResultSink evensUOddsResultSink = new TestListResultSink();
        TestListResultSink oddsUOddsResultSink = new TestListResultSink();
        TestListResultSink evensUEvensResultSink = new TestListResultSink();
        TestListResultSink oddsUEvensExternalResultSink = new TestListResultSink();
        TestListResultSink resultSink = new TestListResultSink();
        StreamExecutionEnvironment env = StreamExecutionEnvironment$.MODULE$.getExecutionEnvironment();
        env.setParallelism(3);
        DataStream input = env.fromElements((Seq)Predef$.MODULE$.wrapIntArray(new int[]{1, 2, 3, 4}), (TypeInformation)BasicTypeInfo.getInfoFor(Integer.TYPE));
        OutputTag oddTag = OutputTag$.MODULE$.apply("odds", (TypeInformation)BasicTypeInfo.getInfoFor(Integer.TYPE));
        OutputTag evenTag = OutputTag$.MODULE$.apply("even", (TypeInformation)BasicTypeInfo.getInfoFor(Integer.TYPE));
        DataStream passThroughStream = input.process((ProcessFunction)new ProcessFunction<Object, Object>(this, oddTag, evenTag){
            private final OutputTag oddTag$1;
            private final OutputTag evenTag$1;

            public void processElement(int value, ProcessFunction.Context ctx, Collector<Object> out) {
                if (value % 2 != 0) {
                    ctx.output((org.apache.flink.util.OutputTag)this.oddTag$1, (Object)BoxesRunTime.boxToInteger((int)value));
                } else {
                    ctx.output((org.apache.flink.util.OutputTag)this.evenTag$1, (Object)BoxesRunTime.boxToInteger((int)value));
                }
                out.collect((Object)BoxesRunTime.boxToInteger((int)value));
            }
            {
                this.oddTag$1 = oddTag$1;
                this.evenTag$1 = evenTag$1;
            }
        }, (TypeInformation)BasicTypeInfo.getInfoFor(Integer.TYPE));
        DataStream evens = passThroughStream.getSideOutput(evenTag, (TypeInformation)BasicTypeInfo.getInfoFor(Integer.TYPE));
        DataStream odds = passThroughStream.getSideOutput(oddTag, (TypeInformation)BasicTypeInfo.getInfoFor(Integer.TYPE));
        evens.addSink((SinkFunction)evensResultSink);
        odds.addSink((SinkFunction)oddsResultSink);
        passThroughStream.addSink((SinkFunction)resultSink);
        odds.union((Seq)Predef$.MODULE$.wrapRefArray((Object[])new DataStream[]{evens})).addSink((SinkFunction)oddsUEvensResultSink);
        evens.union((Seq)Predef$.MODULE$.wrapRefArray((Object[])new DataStream[]{odds})).addSink((SinkFunction)evensUOddsResultSink);
        odds.union((Seq)Predef$.MODULE$.wrapRefArray((Object[])new DataStream[]{odds})).addSink((SinkFunction)oddsUOddsResultSink);
        evens.union((Seq)Predef$.MODULE$.wrapRefArray((Object[])new DataStream[]{evens})).addSink((SinkFunction)evensUEvensResultSink);
        odds.union((Seq)Predef$.MODULE$.wrapRefArray((Object[])new DataStream[]{env.fromElements((Seq)Predef$.MODULE$.wrapIntArray(new int[]{2, 4}), (TypeInformation)BasicTypeInfo.getInfoFor(Integer.TYPE))})).addSink((SinkFunction)oddsUEvensExternalResultSink);
        env.execute();
        Assert.assertEquals(Arrays.asList(ScalaRunTime$.MODULE$.toObjectArray((Object)new int[]{1, 3})), (Object)oddsResultSink.getSortedResult());
        Assert.assertEquals(Arrays.asList(ScalaRunTime$.MODULE$.toObjectArray((Object)new int[]{2, 4})), (Object)evensResultSink.getSortedResult());
        Assert.assertEquals(Arrays.asList(ScalaRunTime$.MODULE$.toObjectArray((Object)new int[]{1, 2, 3, 4})), (Object)resultSink.getSortedResult());
        Assert.assertEquals(Arrays.asList(ScalaRunTime$.MODULE$.toObjectArray((Object)new int[]{1, 2, 3, 4})), (Object)oddsUEvensResultSink.getSortedResult());
        Assert.assertEquals(Arrays.asList(ScalaRunTime$.MODULE$.toObjectArray((Object)new int[]{1, 2, 3, 4})), (Object)evensUOddsResultSink.getSortedResult());
        Assert.assertEquals(Arrays.asList(ScalaRunTime$.MODULE$.toObjectArray((Object)new int[]{1, 1, 3, 3})), (Object)oddsUOddsResultSink.getSortedResult());
        Assert.assertEquals(Arrays.asList(ScalaRunTime$.MODULE$.toObjectArray((Object)new int[]{2, 2, 4, 4})), (Object)evensUEvensResultSink.getSortedResult());
        Assert.assertEquals(Arrays.asList(ScalaRunTime$.MODULE$.toObjectArray((Object)new int[]{1, 2, 3, 4})), (Object)oddsUEvensExternalResultSink.getSortedResult());
    }
}

