/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.examples.join;

import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.temporaloperator.StreamJoinOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.source.RichSourceFunction;
import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.util.Collector;

public class WindowJoin {
    private static final String[] names = new String[]{"tom", "jerry", "alice", "bob", "john", "grace"};
    private static final int GRADE_COUNT = 5;
    private static final int SALARY_MAX = 10000;
    private static final int SLEEP_TIME = 10;
    private static boolean fileOutput = false;
    private static String outputPath;

    public static void main(String[] args) throws Exception {
        if (!WindowJoin.parseParameters(args)) {
            return;
        }
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource grades = env.addSource((SourceFunction)new GradeSource());
        DataStreamSource salaries = env.addSource((SourceFunction)new SalarySource());
        SingleOutputStreamOperator joinedStream = ((StreamJoinOperator.JoinWindow)grades.join((DataStream)salaries).onWindow(1L, TimeUnit.SECONDS)).where(new int[]{0}).equalTo(new int[]{0}).with((JoinFunction)new MyJoinFunction());
        if (fileOutput) {
            joinedStream.writeAsText(outputPath, 1L);
        } else {
            joinedStream.print();
        }
        env.execute("Windowed Join Example");
    }

    /*
     * Enabled aggressive block sorting
     */
    private static boolean parseParameters(String[] args) {
        if (args.length <= 0) {
            System.out.println("Executing WindowJoin with generated data.");
            System.out.println("  Provide parameter to write to file.");
            System.out.println("  Usage: WindowJoin <result path>");
            return true;
        }
        fileOutput = true;
        if (args.length == 1) {
            outputPath = args[0];
            return true;
        }
        System.err.println("Usage: WindowJoin <result path>");
        return false;
    }

    public static class MyJoinFunction
    implements JoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Integer, Integer>> {
        private static final long serialVersionUID = 1L;
        private Tuple3<String, Integer, Integer> joined = new Tuple3();

        public Tuple3<String, Integer, Integer> join(Tuple2<String, Integer> first, Tuple2<String, Integer> second) throws Exception {
            this.joined.f0 = first.f0;
            this.joined.f1 = first.f1;
            this.joined.f2 = second.f1;
            return this.joined;
        }
    }

    public static class SalarySource
    extends RichSourceFunction<Tuple2<String, Integer>> {
        private static final long serialVersionUID = 1L;
        private transient Random rand;
        private transient Tuple2<String, Integer> outTuple;

        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            this.rand = new Random();
            this.outTuple = new Tuple2();
        }

        public void invoke(Collector<Tuple2<String, Integer>> out) throws Exception {
            while (true) {
                this.outTuple.f0 = names[this.rand.nextInt(names.length)];
                this.outTuple.f1 = this.rand.nextInt(10000) + 1;
                out.collect(this.outTuple);
                Thread.sleep(this.rand.nextInt(10) + 1);
            }
        }
    }

    public static class GradeSource
    implements SourceFunction<Tuple2<String, Integer>> {
        private static final long serialVersionUID = 1L;
        private Random rand = new Random();
        private Tuple2<String, Integer> outTuple = new Tuple2();

        public void invoke(Collector<Tuple2<String, Integer>> out) throws Exception {
            while (true) {
                this.outTuple.f0 = names[this.rand.nextInt(names.length)];
                this.outTuple.f1 = this.rand.nextInt(5) + 1;
                out.collect(this.outTuple);
                Thread.sleep(this.rand.nextInt(10) + 1);
            }
        }
    }
}

