/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.examples.ml;

import java.util.concurrent.TimeUnit;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

public class IncrementalLearningSkeleton {
    public static void main(String[] args) throws Exception {
        ParameterTool params = ParameterTool.fromArgs((String[])args);
        System.out.println("Usage: IncrementalLearningSkeleton --output <path>");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        DataStreamSource trainingData = env.addSource((SourceFunction)new FiniteTrainingDataSource());
        DataStreamSource newData = env.addSource((SourceFunction)new FiniteNewDataSource());
        SingleOutputStreamOperator model = trainingData.assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks)new LinearTimestamp()).timeWindowAll(Time.of((long)5000L, (TimeUnit)TimeUnit.MILLISECONDS)).apply((AllWindowFunction)new PartialModelBuilder());
        SingleOutputStreamOperator prediction = newData.connect((DataStream)model).map((CoMapFunction)new Predictor());
        if (params.has("output")) {
            prediction.writeAsText(params.get("output"));
        } else {
            System.out.println("Printing result to stdout. Use --output to specify output path.");
            prediction.print();
        }
        env.execute("Streaming Incremental Learning");
    }

    public static class Predictor
    implements CoMapFunction<Integer, Double[], Integer> {
        private static final long serialVersionUID = 1L;
        Double[] batchModel = null;
        Double[] partialModel = null;

        public Integer map1(Integer value) {
            return this.predict(value);
        }

        public Integer map2(Double[] value) {
            this.partialModel = value;
            this.batchModel = this.getBatchModel();
            return 1;
        }

        protected Double[] getBatchModel() {
            return new Double[]{0.0};
        }

        protected Integer predict(Integer inTuple) {
            return 0;
        }
    }

    public static class PartialModelBuilder
    implements AllWindowFunction<Integer, Double[], TimeWindow> {
        private static final long serialVersionUID = 1L;

        protected Double[] buildPartialModel(Iterable<Integer> values) {
            return new Double[]{1.0};
        }

        public void apply(TimeWindow window, Iterable<Integer> values, Collector<Double[]> out) throws Exception {
            out.collect((Object)this.buildPartialModel(values));
        }
    }

    public static class LinearTimestamp
    implements AssignerWithPunctuatedWatermarks<Integer> {
        private static final long serialVersionUID = 1L;
        private long counter = 0L;

        public long extractTimestamp(Integer element, long previousElementTimestamp) {
            return this.counter += 10L;
        }

        public Watermark checkAndGetNextWatermark(Integer lastElement, long extractedTimestamp) {
            return new Watermark(this.counter - 1L);
        }
    }

    public static class FiniteTrainingDataSource
    implements SourceFunction<Integer> {
        private static final long serialVersionUID = 1L;
        private int counter = 0;

        public void run(SourceFunction.SourceContext<Integer> collector) throws Exception {
            while (this.counter < 8200) {
                collector.collect((Object)this.getTrainingData());
            }
        }

        public void cancel() {
        }

        private Integer getTrainingData() throws InterruptedException {
            ++this.counter;
            return 1;
        }
    }

    public static class FiniteNewDataSource
    implements SourceFunction<Integer> {
        private static final long serialVersionUID = 1L;
        private int counter;

        public void run(SourceFunction.SourceContext<Integer> ctx) throws Exception {
            Thread.sleep(15L);
            while (this.counter < 50) {
                ctx.collect((Object)this.getNewData());
            }
        }

        public void cancel() {
        }

        private Integer getNewData() throws InterruptedException {
            Thread.sleep(5L);
            ++this.counter;
            return 1;
        }
    }
}

