/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.examples.join;

import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.TimestampExtractor;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.time.AbstractTime;
import org.apache.flink.streaming.api.windowing.time.Time;

public class WindowJoin {
    private static final String[] names = new String[]{"tom", "jerry", "alice", "bob", "john", "grace"};
    private static final int GRADE_COUNT = 5;
    private static final int SALARY_MAX = 10000;
    private static final int SLEEP_TIME = 10;
    private static boolean fileInput = false;
    private static boolean fileOutput = false;
    private static String gradesPath;
    private static String salariesPath;
    private static String outputPath;

    public static void main(String[] args) throws Exception {
        if (!WindowJoin.parseParameters(args)) {
            return;
        }
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        Tuple2<DataStream<Tuple3<Long, String, Integer>>, DataStream<Tuple3<Long, String, Integer>>> input = WindowJoin.getInputStreams(env);
        DataStream grades = (DataStream)input.f0;
        DataStream salaries = (DataStream)input.f1;
        grades = grades.assignTimestamps((TimestampExtractor)new MyTimestampExtractor());
        salaries = salaries.assignTimestamps((TimestampExtractor)new MyTimestampExtractor());
        DataStream joinedStream = grades.join(salaries).where((KeySelector)new NameKeySelector()).equalTo((KeySelector)new NameKeySelector()).window((WindowAssigner)TumblingTimeWindows.of((AbstractTime)Time.of((long)5L, (TimeUnit)TimeUnit.MILLISECONDS))).apply((JoinFunction)new MyJoinFunction());
        if (fileOutput) {
            joinedStream.writeAsText(outputPath, 1L);
        } else {
            joinedStream.print();
        }
        env.execute("Windowed Join Example");
    }

    /*
     * Enabled aggressive block sorting
     */
    private static boolean parseParameters(String[] args) {
        if (args.length <= 0) {
            System.out.println("Executing WindowJoin with generated data.");
            System.out.println("  Provide parameter to write to file.");
            System.out.println("  Usage: WindowJoin <result path>");
            return true;
        }
        if (args.length == 1) {
            fileOutput = true;
            outputPath = args[0];
            return true;
        }
        if (args.length == 3) {
            fileInput = true;
            fileOutput = true;
            gradesPath = args[0];
            salariesPath = args[1];
            outputPath = args[2];
            return true;
        }
        System.err.println("Usage: WindowJoin <result path> or WindowJoin <input path 1> <input path 2> <result path>");
        return false;
    }

    private static Tuple2<DataStream<Tuple3<Long, String, Integer>>, DataStream<Tuple3<Long, String, Integer>>> getInputStreams(StreamExecutionEnvironment env) {
        DataStreamSource salaries;
        DataStreamSource grades;
        if (fileInput) {
            grades = env.readTextFile(gradesPath).map((MapFunction)new MySourceMap());
            salaries = env.readTextFile(salariesPath).map((MapFunction)new MySourceMap());
        } else {
            grades = env.addSource((SourceFunction)new GradeSource());
            salaries = env.addSource((SourceFunction)new SalarySource());
        }
        return Tuple2.of((Object)grades, (Object)salaries);
    }

    private static class NameKeySelector
    implements KeySelector<Tuple3<Long, String, Integer>, String> {
        private static final long serialVersionUID = 1L;

        private NameKeySelector() {
        }

        public String getKey(Tuple3<Long, String, Integer> value) throws Exception {
            return (String)value.f1;
        }
    }

    private static class MyTimestampExtractor
    implements TimestampExtractor<Tuple3<Long, String, Integer>> {
        private static final long serialVersionUID = 1L;

        private MyTimestampExtractor() {
        }

        public long extractTimestamp(Tuple3<Long, String, Integer> element, long currentTimestamp) {
            return (Long)element.f0;
        }

        public long extractWatermark(Tuple3<Long, String, Integer> element, long currentTimestamp) {
            return (Long)element.f0 - 1L;
        }

        public long getCurrentWatermark() {
            return Long.MIN_VALUE;
        }
    }

    public static class MyJoinFunction
    implements JoinFunction<Tuple3<Long, String, Integer>, Tuple3<Long, String, Integer>, Tuple3<String, Integer, Integer>> {
        private static final long serialVersionUID = 1L;
        private Tuple3<String, Integer, Integer> joined = new Tuple3();

        public Tuple3<String, Integer, Integer> join(Tuple3<Long, String, Integer> first, Tuple3<Long, String, Integer> second) throws Exception {
            this.joined.f0 = first.f1;
            this.joined.f1 = first.f2;
            this.joined.f2 = second.f2;
            return this.joined;
        }
    }

    public static class MySourceMap
    extends RichMapFunction<String, Tuple3<Long, String, Integer>> {
        private static final long serialVersionUID = 1L;
        private String[] record = new String[2];

        public Tuple3<Long, String, Integer> map(String line) throws Exception {
            this.record = line.substring(1, line.length() - 1).split(",");
            return new Tuple3((Object)Long.parseLong(this.record[0]), (Object)this.record[1], (Object)Integer.parseInt(this.record[2]));
        }
    }

    public static class SalarySource
    extends RichSourceFunction<Tuple3<Long, String, Integer>> {
        private static final long serialVersionUID = 1L;
        private transient Random rand;
        private transient Tuple3<Long, String, Integer> outTuple;
        private volatile boolean isRunning;
        private int counter;

        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            this.rand = new Random();
            this.outTuple = new Tuple3();
            this.isRunning = true;
        }

        public void run(SourceFunction.SourceContext<Tuple3<Long, String, Integer>> ctx) throws Exception {
            while (this.isRunning && this.counter < 100) {
                this.outTuple.f0 = System.currentTimeMillis();
                this.outTuple.f1 = names[this.rand.nextInt(names.length)];
                this.outTuple.f2 = this.rand.nextInt(10000) + 1;
                Thread.sleep(this.rand.nextInt(10) + 1);
                ++this.counter;
                ctx.collect(this.outTuple);
            }
        }

        public void cancel() {
            this.isRunning = false;
        }
    }

    public static class GradeSource
    implements SourceFunction<Tuple3<Long, String, Integer>> {
        private static final long serialVersionUID = 1L;
        private Random rand = new Random();
        private Tuple3<Long, String, Integer> outTuple = new Tuple3();
        private volatile boolean isRunning = true;
        private int counter;

        public void run(SourceFunction.SourceContext<Tuple3<Long, String, Integer>> ctx) throws Exception {
            while (this.isRunning && this.counter < 100) {
                this.outTuple.f0 = System.currentTimeMillis();
                this.outTuple.f1 = names[this.rand.nextInt(names.length)];
                this.outTuple.f2 = this.rand.nextInt(5) + 1;
                Thread.sleep(this.rand.nextInt(10) + 1);
                ++this.counter;
                ctx.collect(this.outTuple);
            }
        }

        public void cancel() {
            this.isRunning = false;
        }
    }
}

