/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.examples.windowing;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedDataStream;
import org.apache.flink.streaming.api.datastream.temporaloperator.StreamJoinOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.WindowMapFunction;
import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction;
import org.apache.flink.streaming.api.windowing.helper.Delta;
import org.apache.flink.streaming.api.windowing.helper.Time;
import org.apache.flink.streaming.api.windowing.helper.WindowingHelper;
import org.apache.flink.util.Collector;

public class StockPrices {
    private static final ArrayList<String> SYMBOLS = new ArrayList<String>(Arrays.asList("SPX", "FTSE", "DJI", "DJT", "BUX", "DAX", "GOOG"));
    private static final Double DEFAULT_PRICE = 1000.0;
    private static final StockPrice DEFAULT_STOCK_PRICE = new StockPrice("", DEFAULT_PRICE);
    private static boolean fileOutput = false;
    private static String hostName;
    private static int port;
    private static String outputPath;

    public static void main(String[] args) throws Exception {
        if (!StockPrices.parseParameters(args)) {
            return;
        }
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator socketStockStream = env.socketTextStream(hostName, port).map((MapFunction)new MapFunction<String, StockPrice>(){
            private String[] tokens;

            public StockPrice map(String value) throws Exception {
                this.tokens = value.split(",");
                return new StockPrice(this.tokens[0], Double.parseDouble(this.tokens[1]));
            }
        });
        DataStreamSource SPX_stream = env.addSource((SourceFunction)new StockSource("SPX", 10));
        DataStreamSource FTSE_stream = env.addSource((SourceFunction)new StockSource("FTSE", 20));
        DataStreamSource DJI_stream = env.addSource((SourceFunction)new StockSource("DJI", 30));
        DataStreamSource BUX_stream = env.addSource((SourceFunction)new StockSource("BUX", 40));
        DataStream stockStream = socketStockStream.merge(new DataStream[]{SPX_stream, FTSE_stream, DJI_stream, BUX_stream});
        WindowedDataStream windowedStream = stockStream.window((WindowingHelper)Time.of((long)10L, (TimeUnit)TimeUnit.SECONDS)).every((WindowingHelper)Time.of((long)5L, (TimeUnit)TimeUnit.SECONDS));
        DataStream lowest = windowedStream.minBy("price").flatten();
        DataStream maxByStock = windowedStream.groupBy(new String[]{"symbol"}).maxBy("price").flatten();
        DataStream rollingMean = windowedStream.groupBy(new String[]{"symbol"}).mapWindow((WindowMapFunction)new WindowMean()).flatten();
        DataStream priceWarnings = stockStream.groupBy(new String[]{"symbol"}).window((WindowingHelper)Delta.of((double)0.05, (DeltaFunction)new DeltaFunction<StockPrice>(){

            public double getDelta(StockPrice oldDataPoint, StockPrice newDataPoint) {
                return Math.abs(oldDataPoint.price - newDataPoint.price);
            }
        }, (Object)DEFAULT_STOCK_PRICE)).mapWindow((WindowMapFunction)new SendWarning()).flatten();
        DataStream warningsPerStock = priceWarnings.map((MapFunction)new MapFunction<String, Count>(){

            public Count map(String value) throws Exception {
                return new Count(value, 1);
            }
        }).groupBy(new String[]{"symbol"}).window((WindowingHelper)Time.of((long)30L, (TimeUnit)TimeUnit.SECONDS)).sum("count").flatten();
        DataStreamSource tweetStream = env.addSource((SourceFunction)new TweetSource());
        SingleOutputStreamOperator mentionedSymbols = tweetStream.flatMap((FlatMapFunction)new FlatMapFunction<String, String>(){

            public void flatMap(String value, Collector<String> out) throws Exception {
                String[] words;
                for (String word : words = value.split(" ")) {
                    out.collect((Object)word.toUpperCase());
                }
            }
        }).filter((FilterFunction)new FilterFunction<String>(){

            public boolean filter(String value) throws Exception {
                return SYMBOLS.contains(value);
            }
        });
        DataStream tweetsPerStock = mentionedSymbols.map((MapFunction)new MapFunction<String, Count>(){

            public Count map(String value) throws Exception {
                return new Count(value, 1);
            }
        }).groupBy(new String[]{"symbol"}).window((WindowingHelper)Time.of((long)30L, (TimeUnit)TimeUnit.SECONDS)).sum("count").flatten();
        SingleOutputStreamOperator tweetsAndWarning = ((StreamJoinOperator.JoinWindow)warningsPerStock.join(tweetsPerStock).onWindow(30L, TimeUnit.SECONDS)).where(new String[]{"symbol"}).equalTo(new String[]{"symbol"}).with((JoinFunction)new JoinFunction<Count, Count, Tuple2<Integer, Integer>>(){

            public Tuple2<Integer, Integer> join(Count first, Count second) throws Exception {
                return new Tuple2((Object)first.count, (Object)second.count);
            }
        });
        DataStream rollingCorrelation = tweetsAndWarning.window((WindowingHelper)Time.of((long)30L, (TimeUnit)TimeUnit.SECONDS)).mapWindow((WindowMapFunction)new WindowCorrelation()).flatten();
        if (fileOutput) {
            rollingCorrelation.writeAsText(outputPath, 1L);
        } else {
            rollingCorrelation.print();
        }
        env.execute("Stock stream");
    }

    private static boolean parseParameters(String[] args) {
        if (args.length == 3) {
            fileOutput = true;
            hostName = args[0];
            port = Integer.valueOf(args[1]);
            outputPath = args[2];
        } else if (args.length == 2) {
            hostName = args[0];
            port = Integer.valueOf(args[1]);
        } else {
            System.err.println("Usage: StockPrices <hostname> <port> [<output path>]");
            return false;
        }
        return true;
    }

    public static final class WindowCorrelation
    implements WindowMapFunction<Tuple2<Integer, Integer>, Double> {
        private static final long serialVersionUID = 1L;
        private Integer leftSum;
        private Integer rightSum;
        private Integer count;
        private Double leftMean;
        private Double rightMean;
        private Double cov;
        private Double leftSd;
        private Double rightSd;

        public void mapWindow(Iterable<Tuple2<Integer, Integer>> values, Collector<Double> out) throws Exception {
            this.leftSum = 0;
            this.rightSum = 0;
            this.count = 0;
            this.cov = 0.0;
            this.leftSd = 0.0;
            this.rightSd = 0.0;
            for (Tuple2<Integer, Integer> pair : values) {
                this.leftSum = this.leftSum + (Integer)pair.f0;
                this.rightSum = this.rightSum + (Integer)pair.f1;
                Integer n = this.count;
                Integer n2 = this.count = Integer.valueOf(this.count + 1);
            }
            this.leftMean = this.leftSum.doubleValue() / (double)this.count.intValue();
            this.rightMean = this.rightSum.doubleValue() / (double)this.count.intValue();
            for (Tuple2<Integer, Integer> pair : values) {
                this.cov = this.cov + ((double)((Integer)pair.f0).intValue() - this.leftMean) * ((double)((Integer)pair.f1).intValue() - this.rightMean) / (double)this.count.intValue();
            }
            for (Tuple2<Integer, Integer> pair : values) {
                this.leftSd = this.leftSd + Math.pow((double)((Integer)pair.f0).intValue() - this.leftMean, 2.0) / (double)this.count.intValue();
                this.rightSd = this.rightSd + Math.pow((double)((Integer)pair.f1).intValue() - this.rightMean, 2.0) / (double)this.count.intValue();
            }
            this.leftSd = Math.sqrt(this.leftSd);
            this.rightSd = Math.sqrt(this.rightSd);
            out.collect((Object)(this.cov / (this.leftSd * this.rightSd)));
        }
    }

    public static final class SendWarning
    implements WindowMapFunction<StockPrice, String> {
        private static final long serialVersionUID = 1L;

        public void mapWindow(Iterable<StockPrice> values, Collector<String> out) throws Exception {
            if (values.iterator().hasNext()) {
                out.collect((Object)values.iterator().next().symbol);
            }
        }
    }

    public static final class TweetSource
    implements SourceFunction<String> {
        private static final long serialVersionUID = 1L;
        Random random;
        StringBuilder stringBuilder;

        public void run(Collector<String> collector) throws Exception {
            this.random = new Random();
            this.stringBuilder = new StringBuilder();
            while (true) {
                this.stringBuilder.setLength(0);
                for (int i = 0; i < 3; ++i) {
                    this.stringBuilder.append(" ");
                    this.stringBuilder.append((String)SYMBOLS.get(this.random.nextInt(SYMBOLS.size())));
                }
                collector.collect((Object)this.stringBuilder.toString());
                Thread.sleep(500L);
            }
        }

        public void cancel() {
        }
    }

    public static final class WindowMean
    implements WindowMapFunction<StockPrice, StockPrice> {
        private static final long serialVersionUID = 1L;
        private Double sum = 0.0;
        private Integer count = 0;
        private String symbol = "";

        public void mapWindow(Iterable<StockPrice> values, Collector<StockPrice> out) throws Exception {
            if (values.iterator().hasNext()) {
                for (StockPrice sp : values) {
                    this.sum = this.sum + sp.price;
                    this.symbol = sp.symbol;
                    Integer n = this.count;
                    Integer n2 = this.count = Integer.valueOf(this.count + 1);
                }
                out.collect((Object)new StockPrice(this.symbol, this.sum / (double)this.count.intValue()));
            }
        }
    }

    public static final class StockSource
    implements SourceFunction<StockPrice> {
        private static final long serialVersionUID = 1L;
        private Double price;
        private String symbol;
        private Integer sigma;

        public StockSource(String symbol, Integer sigma) {
            this.symbol = symbol;
            this.sigma = sigma;
        }

        public void run(Collector<StockPrice> collector) throws Exception {
            this.price = DEFAULT_PRICE;
            Random random = new Random();
            while (true) {
                this.price = this.price + random.nextGaussian() * (double)this.sigma.intValue();
                collector.collect((Object)new StockPrice(this.symbol, this.price));
                Thread.sleep(random.nextInt(200));
            }
        }

        public void cancel() {
        }
    }

    public static class Count
    implements Serializable {
        private static final long serialVersionUID = 1L;
        public String symbol;
        public Integer count;

        public Count() {
        }

        public Count(String symbol, Integer count) {
            this.symbol = symbol;
            this.count = count;
        }

        public String toString() {
            return "Count{symbol='" + this.symbol + '\'' + ", count=" + this.count + '}';
        }
    }

    public static class StockPrice
    implements Serializable {
        private static final long serialVersionUID = 1L;
        public String symbol;
        public Double price;

        public StockPrice() {
        }

        public StockPrice(String symbol, Double price) {
            this.symbol = symbol;
            this.price = price;
        }

        public String toString() {
            return "StockPrice{symbol='" + this.symbol + '\'' + ", count=" + this.price + '}';
        }
    }
}

