/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.examples.join;

import java.util.Random;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.temporal.StreamJoinOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.helper.Timestamp;

public class WindowJoin {
    private static DataStream<Tuple2<String, Integer>> grades;
    private static DataStream<Tuple2<String, Integer>> salaries;
    private static final String[] names;
    private static final int GRADE_COUNT = 5;
    private static final int SALARY_MAX = 10000;
    private static final int SLEEP_TIME = 10;
    private static boolean fileInput;
    private static boolean fileOutput;
    private static String gradesPath;
    private static String salariesPath;
    private static String outputPath;

    public static void main(String[] args) throws Exception {
        if (!WindowJoin.parseParameters(args)) {
            return;
        }
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        WindowJoin.setInputStreams(env);
        SingleOutputStreamOperator joinedStream = ((StreamJoinOperator.JoinWindow)grades.join(salaries).onWindow(1L, (Timestamp)new MyTimestamp(0), (Timestamp)new MyTimestamp(0))).where(new int[]{0}).equalTo(new int[]{0}).with((JoinFunction)new MyJoinFunction());
        if (fileOutput) {
            joinedStream.writeAsText(outputPath, 1L);
        } else {
            joinedStream.print();
        }
        env.execute("Windowed Join Example");
    }

    /*
     * Enabled aggressive block sorting
     */
    private static boolean parseParameters(String[] args) {
        if (args.length <= 0) {
            System.out.println("Executing WindowJoin with generated data.");
            System.out.println("  Provide parameter to write to file.");
            System.out.println("  Usage: WindowJoin <result path>");
            return true;
        }
        if (args.length == 1) {
            fileOutput = true;
            outputPath = args[0];
            return true;
        }
        if (args.length == 3) {
            fileInput = true;
            fileOutput = true;
            gradesPath = args[0];
            salariesPath = args[1];
            outputPath = args[2];
            return true;
        }
        System.err.println("Usage: WindowJoin <result path> or WindowJoin <input path 1> <input path 2> <result path>");
        return false;
    }

    private static void setInputStreams(StreamExecutionEnvironment env) {
        if (fileInput) {
            grades = env.readTextFile(gradesPath).map((MapFunction)new MySourceMap());
            salaries = env.readTextFile(salariesPath).map((MapFunction)new MySourceMap());
        } else {
            grades = env.addSource((SourceFunction)new GradeSource());
            salaries = env.addSource((SourceFunction)new SalarySource());
        }
    }

    static {
        names = new String[]{"tom", "jerry", "alice", "bob", "john", "grace"};
        fileInput = false;
        fileOutput = false;
    }

    public static class MyTimestamp
    implements Timestamp<Tuple2<String, Integer>> {
        private static final long serialVersionUID = 1L;
        private int counter;

        public MyTimestamp(int starttime) {
            this.counter = starttime;
        }

        public long getTimestamp(Tuple2<String, Integer> value) {
            this.counter += 10;
            return this.counter;
        }
    }

    public static class MyJoinFunction
    implements JoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Integer, Integer>> {
        private static final long serialVersionUID = 1L;
        private Tuple3<String, Integer, Integer> joined = new Tuple3();

        public Tuple3<String, Integer, Integer> join(Tuple2<String, Integer> first, Tuple2<String, Integer> second) throws Exception {
            this.joined.f0 = first.f0;
            this.joined.f1 = first.f1;
            this.joined.f2 = second.f1;
            return this.joined;
        }
    }

    public static class MySourceMap
    extends RichMapFunction<String, Tuple2<String, Integer>> {
        private static final long serialVersionUID = 1L;
        private String[] record = new String[2];

        public Tuple2<String, Integer> map(String line) throws Exception {
            this.record = line.substring(1, line.length() - 1).split(",");
            return new Tuple2((Object)this.record[0], (Object)Integer.parseInt(this.record[1]));
        }
    }

    public static class SalarySource
    extends RichSourceFunction<Tuple2<String, Integer>> {
        private static final long serialVersionUID = 1L;
        private transient Random rand;
        private transient Tuple2<String, Integer> outTuple;
        private volatile boolean isRunning;
        private int counter;

        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            this.rand = new Random();
            this.outTuple = new Tuple2();
            this.isRunning = true;
        }

        public void run(SourceFunction.SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
            while (this.isRunning && this.counter < 100) {
                this.outTuple.f0 = names[this.rand.nextInt(names.length)];
                this.outTuple.f1 = this.rand.nextInt(10000) + 1;
                Thread.sleep(this.rand.nextInt(10) + 1);
                ++this.counter;
                ctx.collect(this.outTuple);
            }
        }

        public void cancel() {
            this.isRunning = false;
        }
    }

    public static class GradeSource
    implements SourceFunction<Tuple2<String, Integer>> {
        private static final long serialVersionUID = 1L;
        private Random rand = new Random();
        private Tuple2<String, Integer> outTuple = new Tuple2();
        private volatile boolean isRunning = true;
        private int counter;

        public void run(SourceFunction.SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
            while (this.isRunning && this.counter < 100) {
                this.outTuple.f0 = names[this.rand.nextInt(names.length)];
                this.outTuple.f1 = this.rand.nextInt(5) + 1;
                Thread.sleep(this.rand.nextInt(10) + 1);
                ++this.counter;
                ctx.collect(this.outTuple);
            }
        }

        public void cancel() {
            this.isRunning = false;
        }
    }
}

