/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.examples.join;

import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.time.Time;

public class WindowJoin {
    private static final String[] names = new String[]{"tom", "jerry", "alice", "bob", "john", "grace"};
    private static final int GRADE_COUNT = 5;
    private static final int SALARY_MAX = 10000;
    private static final int SLEEP_TIME = 10;

    public static void main(String[] args) throws Exception {
        ParameterTool params = ParameterTool.fromArgs((String[])args);
        System.out.println("Usage: WindowJoin --grades <path> --salaries <path> --output <path>");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setGlobalJobParameters((ExecutionConfig.GlobalJobParameters)params);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        SingleOutputStreamOperator grades = WindowJoin.getGradesPath(env, params);
        SingleOutputStreamOperator salaries = WindowJoin.getSalariesPath(env, params);
        grades = grades.assignTimestampsAndWatermarks((AssignerWithPeriodicWatermarks)new MyTimestampExtractor());
        salaries = salaries.assignTimestampsAndWatermarks((AssignerWithPeriodicWatermarks)new MyTimestampExtractor());
        DataStream joinedStream = grades.join((DataStream)salaries).where((KeySelector)new NameKeySelector()).equalTo((KeySelector)new NameKeySelector()).window((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.of((long)5L, (TimeUnit)TimeUnit.MILLISECONDS))).apply((JoinFunction)new MyJoinFunction());
        if (params.has("output")) {
            joinedStream.writeAsText(params.get("output"));
        } else {
            System.out.println("Printing result to stdout. Use --output to specify output path.");
            joinedStream.print();
        }
        env.execute("Windowed Join Example");
    }

    private static DataStream<Tuple3<Long, String, Integer>> getGradesPath(StreamExecutionEnvironment env, ParameterTool params) {
        if (params.has("grades")) {
            return env.readTextFile(params.get("grades")).map((MapFunction)new MySourceMap());
        }
        System.out.println("Executing WindowJoin example with default grades data set.");
        System.out.println("Use --grades to specify file input.");
        return env.addSource((SourceFunction)new GradeSource());
    }

    private static DataStream<Tuple3<Long, String, Integer>> getSalariesPath(StreamExecutionEnvironment env, ParameterTool params) {
        if (params.has("salaries")) {
            return env.readTextFile(params.get("salaries")).map((MapFunction)new MySourceMap());
        }
        System.out.println("Executing WindowJoin example with default salaries data set.");
        System.out.println("Use --salaries to specify file input.");
        return env.addSource((SourceFunction)new SalarySource());
    }

    private static class NameKeySelector
    implements KeySelector<Tuple3<Long, String, Integer>, String> {
        private static final long serialVersionUID = 1L;

        private NameKeySelector() {
        }

        public String getKey(Tuple3<Long, String, Integer> value) throws Exception {
            return (String)value.f1;
        }
    }

    private static class MyTimestampExtractor
    extends AscendingTimestampExtractor<Tuple3<Long, String, Integer>> {
        private MyTimestampExtractor() {
        }

        public long extractAscendingTimestamp(Tuple3<Long, String, Integer> element) {
            return (Long)element.f0;
        }
    }

    public static class MyJoinFunction
    implements JoinFunction<Tuple3<Long, String, Integer>, Tuple3<Long, String, Integer>, Tuple3<String, Integer, Integer>> {
        private static final long serialVersionUID = 1L;
        private Tuple3<String, Integer, Integer> joined = new Tuple3();

        public Tuple3<String, Integer, Integer> join(Tuple3<Long, String, Integer> first, Tuple3<Long, String, Integer> second) throws Exception {
            this.joined.f0 = first.f1;
            this.joined.f1 = first.f2;
            this.joined.f2 = second.f2;
            return this.joined;
        }
    }

    public static class MySourceMap
    extends RichMapFunction<String, Tuple3<Long, String, Integer>> {
        private static final long serialVersionUID = 1L;
        private String[] record = new String[2];

        public Tuple3<Long, String, Integer> map(String line) throws Exception {
            this.record = line.substring(1, line.length() - 1).split(",");
            return new Tuple3((Object)Long.parseLong(this.record[0]), (Object)this.record[1], (Object)Integer.parseInt(this.record[2]));
        }
    }

    public static class SalarySource
    extends RichSourceFunction<Tuple3<Long, String, Integer>> {
        private static final long serialVersionUID = 1L;
        private transient Random rand;
        private transient Tuple3<Long, String, Integer> outTuple;
        private volatile boolean isRunning;
        private int counter;

        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            this.rand = new Random();
            this.outTuple = new Tuple3();
            this.isRunning = true;
        }

        public void run(SourceFunction.SourceContext<Tuple3<Long, String, Integer>> ctx) throws Exception {
            while (this.isRunning && this.counter < 100) {
                this.outTuple.f0 = System.currentTimeMillis();
                this.outTuple.f1 = names[this.rand.nextInt(names.length)];
                this.outTuple.f2 = this.rand.nextInt(10000) + 1;
                Thread.sleep(this.rand.nextInt(10) + 1);
                ++this.counter;
                ctx.collect(this.outTuple);
            }
        }

        public void cancel() {
            this.isRunning = false;
        }
    }

    public static class GradeSource
    implements SourceFunction<Tuple3<Long, String, Integer>> {
        private static final long serialVersionUID = 1L;
        private Random rand = new Random();
        private Tuple3<Long, String, Integer> outTuple = new Tuple3();
        private volatile boolean isRunning = true;
        private int counter;

        public void run(SourceFunction.SourceContext<Tuple3<Long, String, Integer>> ctx) throws Exception {
            while (this.isRunning && this.counter < 100) {
                this.outTuple.f0 = System.currentTimeMillis();
                this.outTuple.f1 = names[this.rand.nextInt(names.length)];
                this.outTuple.f2 = this.rand.nextInt(5) + 1;
                Thread.sleep(this.rand.nextInt(10) + 1);
                ++this.counter;
                ctx.collect(this.outTuple);
            }
        }

        public void cancel() {
            this.isRunning = false;
        }
    }
}

