/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.streaming.runtime;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;

public class CoGroupJoinITCase
extends AbstractTestBase {
    private static List<String> testResults;

    @Test
    public void testCoGroup() throws Exception {
        testResults = new ArrayList<String>();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator source1 = env.addSource((SourceFunction)new SourceFunction<Tuple2<String, Integer>>(){
            private static final long serialVersionUID = 1L;

            public void run(SourceFunction.SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
                ctx.collect((Object)Tuple2.of((Object)"a", (Object)0));
                ctx.collect((Object)Tuple2.of((Object)"a", (Object)1));
                ctx.collect((Object)Tuple2.of((Object)"a", (Object)2));
                ctx.collect((Object)Tuple2.of((Object)"b", (Object)3));
                ctx.collect((Object)Tuple2.of((Object)"b", (Object)4));
                ctx.collect((Object)Tuple2.of((Object)"b", (Object)5));
                ctx.collect((Object)Tuple2.of((Object)"a", (Object)6));
                ctx.collect((Object)Tuple2.of((Object)"a", (Object)7));
                ctx.collect((Object)Tuple2.of((Object)"a", (Object)8));
            }

            public void cancel() {
            }
        }).assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks)new Tuple2TimestampExtractor());
        SingleOutputStreamOperator source2 = env.addSource((SourceFunction)new SourceFunction<Tuple2<String, Integer>>(){

            public void run(SourceFunction.SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
                ctx.collect((Object)Tuple2.of((Object)"a", (Object)0));
                ctx.collect((Object)Tuple2.of((Object)"a", (Object)1));
                ctx.collect((Object)Tuple2.of((Object)"b", (Object)3));
                ctx.collect((Object)Tuple2.of((Object)"c", (Object)6));
                ctx.collect((Object)Tuple2.of((Object)"c", (Object)7));
                ctx.collect((Object)Tuple2.of((Object)"c", (Object)8));
            }

            public void cancel() {
            }
        }).assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks)new Tuple2TimestampExtractor());
        source1.coGroup((DataStream)source2).where((KeySelector)new Tuple2KeyExtractor()).equalTo((KeySelector)new Tuple2KeyExtractor()).window((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.of((long)3L, (TimeUnit)TimeUnit.MILLISECONDS))).apply((CoGroupFunction)new CoGroupFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>(){

            public void coGroup(Iterable<Tuple2<String, Integer>> first, Iterable<Tuple2<String, Integer>> second, Collector<String> out) throws Exception {
                StringBuilder result = new StringBuilder();
                result.append("F:");
                for (Tuple2<String, Integer> t : first) {
                    result.append(t.toString());
                }
                result.append(" S:");
                for (Tuple2<String, Integer> t : second) {
                    result.append(t.toString());
                }
                out.collect((Object)result.toString());
            }
        }).addSink((SinkFunction)new SinkFunction<String>(){

            public void invoke(String value) throws Exception {
                testResults.add(value);
            }
        });
        env.execute("CoGroup Test");
        List<String> expectedResult = Arrays.asList("F:(a,0)(a,1)(a,2) S:(a,0)(a,1)", "F:(b,3)(b,4)(b,5) S:(b,3)", "F:(a,6)(a,7)(a,8) S:", "F: S:(c,6)(c,7)(c,8)");
        Collections.sort(expectedResult);
        Collections.sort(testResults);
        Assert.assertEquals(expectedResult, testResults);
    }

    @Test
    public void testJoin() throws Exception {
        testResults = new ArrayList<String>();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator source1 = env.addSource((SourceFunction)new SourceFunction<Tuple3<String, String, Integer>>(){

            public void run(SourceFunction.SourceContext<Tuple3<String, String, Integer>> ctx) throws Exception {
                ctx.collect((Object)Tuple3.of((Object)"a", (Object)"x", (Object)0));
                ctx.collect((Object)Tuple3.of((Object)"a", (Object)"y", (Object)1));
                ctx.collect((Object)Tuple3.of((Object)"a", (Object)"z", (Object)2));
                ctx.collect((Object)Tuple3.of((Object)"b", (Object)"u", (Object)3));
                ctx.collect((Object)Tuple3.of((Object)"b", (Object)"w", (Object)5));
                ctx.collect((Object)Tuple3.of((Object)"a", (Object)"i", (Object)6));
                ctx.collect((Object)Tuple3.of((Object)"a", (Object)"j", (Object)7));
                ctx.collect((Object)Tuple3.of((Object)"a", (Object)"k", (Object)8));
            }

            public void cancel() {
            }
        }).assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks)new Tuple3TimestampExtractor());
        SingleOutputStreamOperator source2 = env.addSource((SourceFunction)new SourceFunction<Tuple3<String, String, Integer>>(){

            public void run(SourceFunction.SourceContext<Tuple3<String, String, Integer>> ctx) throws Exception {
                ctx.collect((Object)Tuple3.of((Object)"a", (Object)"u", (Object)0));
                ctx.collect((Object)Tuple3.of((Object)"a", (Object)"w", (Object)1));
                ctx.collect((Object)Tuple3.of((Object)"b", (Object)"i", (Object)3));
                ctx.collect((Object)Tuple3.of((Object)"b", (Object)"k", (Object)5));
                ctx.collect((Object)Tuple3.of((Object)"a", (Object)"x", (Object)6));
                ctx.collect((Object)Tuple3.of((Object)"a", (Object)"z", (Object)8));
            }

            public void cancel() {
            }
        }).assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks)new Tuple3TimestampExtractor());
        source1.join((DataStream)source2).where((KeySelector)new Tuple3KeyExtractor()).equalTo((KeySelector)new Tuple3KeyExtractor()).window((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.of((long)3L, (TimeUnit)TimeUnit.MILLISECONDS))).apply((JoinFunction)new JoinFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>, String>(){

            public String join(Tuple3<String, String, Integer> first, Tuple3<String, String, Integer> second) throws Exception {
                return first + ":" + second;
            }
        }).addSink((SinkFunction)new SinkFunction<String>(){

            public void invoke(String value) throws Exception {
                testResults.add(value);
            }
        });
        env.execute("Join Test");
        List<String> expectedResult = Arrays.asList("(a,x,0):(a,u,0)", "(a,x,0):(a,w,1)", "(a,y,1):(a,u,0)", "(a,y,1):(a,w,1)", "(a,z,2):(a,u,0)", "(a,z,2):(a,w,1)", "(b,u,3):(b,i,3)", "(b,u,3):(b,k,5)", "(b,w,5):(b,i,3)", "(b,w,5):(b,k,5)", "(a,i,6):(a,x,6)", "(a,i,6):(a,z,8)", "(a,j,7):(a,x,6)", "(a,j,7):(a,z,8)", "(a,k,8):(a,x,6)", "(a,k,8):(a,z,8)");
        Collections.sort(expectedResult);
        Collections.sort(testResults);
        Assert.assertEquals(expectedResult, testResults);
    }

    @Test
    public void testSelfJoin() throws Exception {
        testResults = new ArrayList<String>();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator source1 = env.addSource((SourceFunction)new SourceFunction<Tuple3<String, String, Integer>>(){
            private static final long serialVersionUID = 1L;

            public void run(SourceFunction.SourceContext<Tuple3<String, String, Integer>> ctx) throws Exception {
                ctx.collect((Object)Tuple3.of((Object)"a", (Object)"x", (Object)0));
                ctx.collect((Object)Tuple3.of((Object)"a", (Object)"y", (Object)1));
                ctx.collect((Object)Tuple3.of((Object)"a", (Object)"z", (Object)2));
                ctx.collect((Object)Tuple3.of((Object)"b", (Object)"u", (Object)3));
                ctx.collect((Object)Tuple3.of((Object)"b", (Object)"w", (Object)5));
                ctx.collect((Object)Tuple3.of((Object)"a", (Object)"i", (Object)6));
                ctx.collect((Object)Tuple3.of((Object)"a", (Object)"j", (Object)7));
                ctx.collect((Object)Tuple3.of((Object)"a", (Object)"k", (Object)8));
            }

            public void cancel() {
            }
        }).assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks)new Tuple3TimestampExtractor());
        source1.join((DataStream)source1).where((KeySelector)new Tuple3KeyExtractor()).equalTo((KeySelector)new Tuple3KeyExtractor()).window((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.of((long)3L, (TimeUnit)TimeUnit.MILLISECONDS))).apply((JoinFunction)new JoinFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>, String>(){

            public String join(Tuple3<String, String, Integer> first, Tuple3<String, String, Integer> second) throws Exception {
                return first + ":" + second;
            }
        }).addSink((SinkFunction)new SinkFunction<String>(){

            public void invoke(String value) throws Exception {
                testResults.add(value);
            }
        });
        env.execute("Self-Join Test");
        List<String> expectedResult = Arrays.asList("(a,x,0):(a,x,0)", "(a,x,0):(a,y,1)", "(a,x,0):(a,z,2)", "(a,y,1):(a,x,0)", "(a,y,1):(a,y,1)", "(a,y,1):(a,z,2)", "(a,z,2):(a,x,0)", "(a,z,2):(a,y,1)", "(a,z,2):(a,z,2)", "(b,u,3):(b,u,3)", "(b,u,3):(b,w,5)", "(b,w,5):(b,u,3)", "(b,w,5):(b,w,5)", "(a,i,6):(a,i,6)", "(a,i,6):(a,j,7)", "(a,i,6):(a,k,8)", "(a,j,7):(a,i,6)", "(a,j,7):(a,j,7)", "(a,j,7):(a,k,8)", "(a,k,8):(a,i,6)", "(a,k,8):(a,j,7)", "(a,k,8):(a,k,8)");
        Collections.sort(expectedResult);
        Collections.sort(testResults);
        Assert.assertEquals(expectedResult, testResults);
    }

    @Test
    public void testCoGroupOperatorWithCheckpoint() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource source1 = env.fromData((Object[])new Tuple2[]{Tuple2.of((Object)"a", (Object)0), Tuple2.of((Object)"b", (Object)3)});
        DataStreamSource source2 = env.fromData((Object[])new Tuple2[]{Tuple2.of((Object)"a", (Object)1), Tuple2.of((Object)"b", (Object)6)});
        DataStream coGroupWindow = source1.coGroup((DataStream)source2).where((KeySelector)new Tuple2KeyExtractor()).equalTo((KeySelector)new Tuple2KeyExtractor()).window((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.of((long)3L, (TimeUnit)TimeUnit.MILLISECONDS))).apply((CoGroupFunction)new CoGroupFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>(){

            public void coGroup(Iterable<Tuple2<String, Integer>> first, Iterable<Tuple2<String, Integer>> second, Collector<String> out) throws Exception {
                out.collect((Object)(first + ":" + second));
            }
        });
        OneInputTransformation transform = (OneInputTransformation)coGroupWindow.getTransformation();
        OneInputStreamOperator operator = transform.getOperator();
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness(operator, (KeySelector)new Tuple2KeyExtractor(), (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.open();
        testHarness.snapshot(0L, 0L);
    }

    private static class Tuple3KeyExtractor
    implements KeySelector<Tuple3<String, String, Integer>, String> {
        private Tuple3KeyExtractor() {
        }

        public String getKey(Tuple3<String, String, Integer> value) throws Exception {
            return (String)value.f0;
        }
    }

    private static class Tuple2KeyExtractor
    implements KeySelector<Tuple2<String, Integer>, String> {
        private Tuple2KeyExtractor() {
        }

        public String getKey(Tuple2<String, Integer> value) throws Exception {
            return (String)value.f0;
        }
    }

    private static class Tuple3TimestampExtractor
    implements AssignerWithPunctuatedWatermarks<Tuple3<String, String, Integer>> {
        private Tuple3TimestampExtractor() {
        }

        public long extractTimestamp(Tuple3<String, String, Integer> element, long previousTimestamp) {
            return ((Integer)element.f2).intValue();
        }

        public Watermark checkAndGetNextWatermark(Tuple3<String, String, Integer> lastElement, long extractedTimestamp) {
            return new Watermark((long)((Integer)lastElement.f2 - 1));
        }
    }

    private static class Tuple2TimestampExtractor
    implements AssignerWithPunctuatedWatermarks<Tuple2<String, Integer>> {
        private Tuple2TimestampExtractor() {
        }

        public long extractTimestamp(Tuple2<String, Integer> element, long previousTimestamp) {
            return ((Integer)element.f1).intValue();
        }

        public Watermark checkAndGetNextWatermark(Tuple2<String, Integer> element, long extractedTimestamp) {
            return new Watermark(extractedTimestamp - 1L);
        }
    }
}

