/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.examples.ml;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.WindowMapFunction;
import org.apache.flink.streaming.api.function.co.CoMapFunction;
import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.helper.Time;
import org.apache.flink.streaming.api.windowing.helper.Timestamp;
import org.apache.flink.streaming.api.windowing.helper.WindowingHelper;
import org.apache.flink.util.Collector;

public class IncrementalLearningSkeleton {
    private static DataStream<Integer> trainingData = null;
    private static DataStream<Integer> newData = null;
    private static boolean fileOutput = false;
    private static String outputPath;

    public static void main(String[] args) throws Exception {
        if (!IncrementalLearningSkeleton.parseParameters(args)) {
            return;
        }
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        IncrementalLearningSkeleton.createSourceStreams(env);
        DataStream model = trainingData.window((WindowingHelper)Time.of((long)5000L, (Timestamp)new LinearTimestamp())).mapWindow((WindowMapFunction)new PartialModelBuilder()).flatten();
        SingleOutputStreamOperator prediction = newData.connect(model).map((CoMapFunction)new Predictor());
        if (fileOutput) {
            prediction.writeAsText(outputPath, 1L);
        } else {
            prediction.print();
        }
        env.execute("Streaming Incremental Learning");
    }

    /*
     * Enabled aggressive block sorting
     */
    private static boolean parseParameters(String[] args) {
        if (args.length <= 0) {
            System.out.println("Executing IncrementalLearningSkeleton with generated data.");
            System.out.println("  Provide parameter to write to file.");
            System.out.println("  Usage: IncrementalLearningSkeleton <result path>");
            return true;
        }
        fileOutput = true;
        if (args.length == 1) {
            outputPath = args[0];
            return true;
        }
        System.err.println("Usage: IncrementalLearningSkeleton <result path>");
        return false;
    }

    public static void createSourceStreams(StreamExecutionEnvironment env) {
        if (fileOutput) {
            trainingData = env.addSource((SourceFunction)new FiniteTrainingDataSource());
            newData = env.addSource((SourceFunction)new FiniteNewDataSource());
        } else {
            trainingData = env.addSource((SourceFunction)new TrainingDataSource());
            newData = env.addSource((SourceFunction)new NewDataSource());
        }
    }

    public static class Predictor
    implements CoMapFunction<Integer, Double[], Integer> {
        private static final long serialVersionUID = 1L;
        Double[] batchModel = null;
        Double[] partialModel = null;

        public Integer map1(Integer value) {
            return this.predict(value);
        }

        public Integer map2(Double[] value) {
            this.partialModel = value;
            this.batchModel = this.getBatchModel();
            return 1;
        }

        protected Double[] getBatchModel() {
            return new Double[]{0.0};
        }

        protected Integer predict(Integer inTuple) {
            return 0;
        }
    }

    public static class PartialModelBuilder
    implements WindowMapFunction<Integer, Double[]> {
        private static final long serialVersionUID = 1L;

        protected Double[] buildPartialModel(Iterable<Integer> values) {
            return new Double[]{1.0};
        }

        public void mapWindow(Iterable<Integer> values, Collector<Double[]> out) throws Exception {
            out.collect((Object)this.buildPartialModel(values));
        }
    }

    public static class LinearTimestamp
    implements Timestamp<Integer> {
        private static final long serialVersionUID = 1L;
        private long counter = 0L;

        public long getTimestamp(Integer value) {
            return this.counter += 10L;
        }
    }

    public static class FiniteTrainingDataSource
    implements SourceFunction<Integer> {
        private static final long serialVersionUID = 1L;
        private int counter = 0;

        public void run(Collector<Integer> collector) throws Exception {
            while (this.counter < 8200) {
                collector.collect((Object)this.getTrainingData());
            }
        }

        public void cancel() {
        }

        private Integer getTrainingData() throws InterruptedException {
            ++this.counter;
            return 1;
        }
    }

    public static class TrainingDataSource
    implements SourceFunction<Integer> {
        private static final long serialVersionUID = 1L;
        private static final int TRAINING_DATA_SLEEP_TIME = 10;

        public void run(Collector<Integer> collector) throws Exception {
            while (true) {
                collector.collect((Object)this.getTrainingData());
            }
        }

        private Integer getTrainingData() throws InterruptedException {
            Thread.sleep(10L);
            return 1;
        }

        public void cancel() {
        }
    }

    public static class FiniteNewDataSource
    implements SourceFunction<Integer> {
        private static final long serialVersionUID = 1L;
        private int counter;

        public void run(Collector<Integer> collector) throws Exception {
            Thread.sleep(15L);
            while (this.counter < 50) {
                collector.collect((Object)this.getNewData());
            }
        }

        public void cancel() {
        }

        private Integer getNewData() throws InterruptedException {
            Thread.sleep(5L);
            ++this.counter;
            return 1;
        }
    }

    public static class NewDataSource
    implements SourceFunction<Integer> {
        private static final long serialVersionUID = 1L;
        private static final int NEW_DATA_SLEEP_TIME = 1000;

        public void run(Collector<Integer> collector) throws Exception {
            while (true) {
                collector.collect((Object)this.getNewData());
            }
        }

        private Integer getNewData() throws InterruptedException {
            Thread.sleep(1000L);
            return 1;
        }

        public void cancel() {
        }
    }
}

