/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.examples.windowing;

import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.TimestampExtractor;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.evictors.Evictor;
import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
import org.apache.flink.streaming.api.windowing.time.AbstractTime;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.DeltaTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;

public class TopSpeedWindowing {
    private static final int NUM_CAR_EVENTS = 100;
    private static boolean fileInput = false;
    private static boolean fileOutput = false;
    private static int numOfCars = 2;
    private static int evictionSec = 10;
    private static double triggerMeters = 50.0;
    private static String inputPath;
    private static String outputPath;

    public static void main(String[] args) throws Exception {
        if (!TopSpeedWindowing.parseParameters(args)) {
            return;
        }
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        Object carData = fileInput ? env.readTextFile(inputPath).map((MapFunction)new ParseCarData()) : env.addSource((SourceFunction)CarSource.create(numOfCars));
        SingleOutputStreamOperator topSpeeds = carData.assignTimestamps((TimestampExtractor)new CarTimestamp()).keyBy(new int[]{0}).window((WindowAssigner)GlobalWindows.create()).evictor((Evictor)TimeEvictor.of((AbstractTime)Time.of((long)evictionSec, (TimeUnit)TimeUnit.SECONDS))).trigger((Trigger)DeltaTrigger.of((double)triggerMeters, (DeltaFunction)new DeltaFunction<Tuple4<Integer, Integer, Double, Long>>(){
            private static final long serialVersionUID = 1L;

            public double getDelta(Tuple4<Integer, Integer, Double, Long> oldDataPoint, Tuple4<Integer, Integer, Double, Long> newDataPoint) {
                return (Double)newDataPoint.f2 - (Double)oldDataPoint.f2;
            }
        })).maxBy(1);
        if (fileOutput) {
            topSpeeds.print();
            topSpeeds.writeAsText(outputPath);
        } else {
            topSpeeds.print();
        }
        env.execute("CarTopSpeedWindowingExample");
    }

    private static boolean parseParameters(String[] args) {
        if (args.length > 0) {
            if (args.length == 2) {
                fileInput = true;
                fileOutput = true;
                inputPath = args[0];
                outputPath = args[1];
            } else {
                System.err.println("Usage: TopSpeedWindowingExample <input path> <output path>");
                return false;
            }
        }
        return true;
    }

    private static class CarTimestamp
    implements TimestampExtractor<Tuple4<Integer, Integer, Double, Long>> {
        private static final long serialVersionUID = 1L;

        private CarTimestamp() {
        }

        public long extractTimestamp(Tuple4<Integer, Integer, Double, Long> element, long currentTimestamp) {
            return (Long)element.f3;
        }

        public long extractWatermark(Tuple4<Integer, Integer, Double, Long> element, long currentTimestamp) {
            return (Long)element.f3 - 1L;
        }

        public long getCurrentWatermark() {
            return Long.MIN_VALUE;
        }
    }

    private static class ParseCarData
    extends RichMapFunction<String, Tuple4<Integer, Integer, Double, Long>> {
        private static final long serialVersionUID = 1L;

        private ParseCarData() {
        }

        public Tuple4<Integer, Integer, Double, Long> map(String record) {
            String rawData = record.substring(1, record.length() - 1);
            String[] data = rawData.split(",");
            return new Tuple4((Object)Integer.valueOf(data[0]), (Object)Integer.valueOf(data[1]), (Object)Double.valueOf(data[2]), (Object)Long.valueOf(data[3]));
        }
    }

    private static class CarSource
    implements SourceFunction<Tuple4<Integer, Integer, Double, Long>> {
        private static final long serialVersionUID = 1L;
        private Integer[] speeds;
        private Double[] distances;
        private Random rand = new Random();
        private volatile boolean isRunning = true;
        private int counter;

        private CarSource(int numOfCars) {
            this.speeds = new Integer[numOfCars];
            this.distances = new Double[numOfCars];
            Arrays.fill((Object[])this.speeds, (Object)50);
            Arrays.fill((Object[])this.distances, (Object)0.0);
        }

        public static CarSource create(int cars) {
            return new CarSource(cars);
        }

        public void run(SourceFunction.SourceContext<Tuple4<Integer, Integer, Double, Long>> ctx) throws Exception {
            while (this.isRunning && this.counter < 100) {
                Thread.sleep(100L);
                for (int carId = 0; carId < this.speeds.length; ++carId) {
                    this.speeds[carId] = this.rand.nextBoolean() ? Integer.valueOf(Math.min(100, this.speeds[carId] + 5)) : Integer.valueOf(Math.max(0, this.speeds[carId] - 5));
                    Double[] doubleArray = this.distances;
                    int n = carId;
                    Double.valueOf(doubleArray[n] + (double)this.speeds[carId].intValue() / 3.6);
                    Tuple4 record = new Tuple4((Object)carId, (Object)this.speeds[carId], (Object)this.distances[carId], (Object)System.currentTimeMillis());
                    ctx.collect((Object)record);
                    ++this.counter;
                }
            }
        }

        public void cancel() {
            this.isRunning = false;
        }
    }
}

