/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.examples.windowing;

import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction;
import org.apache.flink.streaming.api.windowing.helper.Delta;
import org.apache.flink.streaming.api.windowing.helper.Time;
import org.apache.flink.streaming.api.windowing.helper.WindowingHelper;
import org.apache.flink.util.Collector;

public class TopSpeedWindowingExample {
    private static int numOfCars = 2;
    private static int evictionSec = 10;
    private static double triggerMeters = 50.0;

    public static void main(String[] args) throws Exception {
        if (!TopSpeedWindowingExample.parseParameters(args)) {
            return;
        }
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator topSpeeds = env.addSource((SourceFunction)CarSource.create(numOfCars)).groupBy(new int[]{0}).window(new WindowingHelper[]{Time.of((long)evictionSec, (TimeUnit)TimeUnit.SECONDS)}).every(new WindowingHelper[]{Delta.of((double)triggerMeters, (DeltaFunction)new DeltaFunction<Tuple4<Integer, Integer, Double, Long>>(){

            public double getDelta(Tuple4<Integer, Integer, Double, Long> oldDataPoint, Tuple4<Integer, Integer, Double, Long> newDataPoint) {
                return (Double)newDataPoint.f2 - (Double)oldDataPoint.f2;
            }
        }, (Object)new Tuple4((Object)0, (Object)0, (Object)0.0, (Object)0L))}).maxBy(1);
        topSpeeds.print();
        env.execute("CarTopSpeedWindowingExample");
    }

    private static boolean parseParameters(String[] args) {
        if (args.length > 0) {
            if (args.length == 3) {
                numOfCars = Integer.valueOf(args[0]);
                evictionSec = Integer.valueOf(args[1]);
                triggerMeters = Double.valueOf(args[2]);
            } else {
                System.err.println("Usage: TopSpeedWindowingExample <numCars> <evictSec> <triggerMeters>");
                return false;
            }
        }
        return true;
    }

    private static class CarSource
    implements SourceFunction<Tuple4<Integer, Integer, Double, Long>> {
        private static final long serialVersionUID = 1L;
        private Integer[] speeds;
        private Double[] distances;
        private Random rand = new Random();

        private CarSource(int numOfCars) {
            this.speeds = new Integer[numOfCars];
            this.distances = new Double[numOfCars];
            Arrays.fill((Object[])this.speeds, (Object)50);
            Arrays.fill((Object[])this.distances, (Object)0.0);
        }

        public static CarSource create(int cars) {
            return new CarSource(cars);
        }

        public void invoke(Collector<Tuple4<Integer, Integer, Double, Long>> collector) throws Exception {
            block0: while (true) {
                Thread.sleep(1000L);
                int carId = 0;
                while (true) {
                    if (carId >= this.speeds.length) continue block0;
                    this.speeds[carId] = this.rand.nextBoolean() ? Integer.valueOf(Math.min(100, this.speeds[carId] + 5)) : Integer.valueOf(Math.max(0, this.speeds[carId] - 5));
                    Double[] doubleArray = this.distances;
                    int n = carId;
                    Double.valueOf(doubleArray[n] + (double)this.speeds[carId].intValue() / 3.6);
                    collector.collect((Object)new Tuple4((Object)carId, (Object)this.speeds[carId], (Object)this.distances[carId], (Object)System.currentTimeMillis()));
                    ++carId;
                }
                break;
            }
        }
    }
}

