/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.recordJobs.wordcount;

import java.io.Serializable;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.Program;
import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.Histogram;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.io.FileOutputFormat;
import org.apache.flink.api.common.operators.GenericDataSinkBase;
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.java.record.functions.FunctionAnnotation;
import org.apache.flink.api.java.record.functions.MapFunction;
import org.apache.flink.api.java.record.functions.ReduceFunction;
import org.apache.flink.api.java.record.io.CsvOutputFormat;
import org.apache.flink.api.java.record.io.TextInputFormat;
import org.apache.flink.api.java.record.operators.FileDataSink;
import org.apache.flink.api.java.record.operators.FileDataSource;
import org.apache.flink.api.java.record.operators.MapOperator;
import org.apache.flink.api.java.record.operators.ReduceOperator;
import org.apache.flink.client.LocalExecutor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.Record;
import org.apache.flink.types.StringValue;
import org.apache.flink.types.Value;
import org.apache.flink.util.Collector;

public class WordCountAccumulators
implements Program,
ProgramDescription {
    private static final long serialVersionUID = 1L;

    public Plan getPlan(String ... args) {
        int numSubTasks = args.length > 0 ? Integer.parseInt(args[0]) : 1;
        String dataInput = args.length > 1 ? args[1] : "";
        String output = args.length > 2 ? args[2] : "";
        FileDataSource source = new FileDataSource((FileInputFormat)new TextInputFormat(), dataInput, "Input Lines");
        MapOperator mapper = MapOperator.builder((MapFunction)new TokenizeLine()).input((Operator)source).name("Tokenize Lines").build();
        ReduceOperator reducer = ReduceOperator.builder(CountWords.class, StringValue.class, (int)0).input((Operator)mapper).name("Count Words").build();
        FileDataSink out = new FileDataSink((FileOutputFormat)new CsvOutputFormat(), output, (Operator)reducer, "Word Counts");
        ((CsvOutputFormat.ConfigBuilder)((CsvOutputFormat.ConfigBuilder)((CsvOutputFormat.ConfigBuilder)CsvOutputFormat.configureRecordFormat((FileDataSink)out).recordDelimiter('\n')).fieldDelimiter(' ')).field(StringValue.class, 0)).field(IntValue.class, 1);
        Plan plan = new Plan((GenericDataSinkBase)out, "WordCount Example");
        plan.setDefaultParallelism(numSubTasks);
        return plan;
    }

    public String getDescription() {
        return "Parameters: [numSubStasks] [input] [output]";
    }

    public static void main(String[] args) throws Exception {
        WordCountAccumulators wc = new WordCountAccumulators();
        if (args.length < 3) {
            System.err.println(wc.getDescription());
            System.exit(1);
        }
        Plan plan = wc.getPlan(args);
        JobExecutionResult result = LocalExecutor.execute((Plan)plan);
        System.out.println("Number of lines counter: " + result.getAccumulatorResult("accumulator.num-lines"));
        System.out.println("Words per line histogram: " + result.getAccumulatorResult("accumulator.words-per-line"));
        System.out.println("Distinct words: " + result.getAccumulatorResult("accumulator.distinct-words"));
    }

    public static class SetAccumulator<T extends Value>
    implements Accumulator<T, HashSet<T>> {
        private static final long serialVersionUID = 1L;
        private HashSet<T> set = new HashSet();

        public void add(T value) {
            this.set.add(value);
        }

        public HashSet<T> getLocalValue() {
            return this.set;
        }

        public void resetLocal() {
            this.set.clear();
        }

        public void merge(Accumulator<T, HashSet<T>> other) {
            this.set.addAll((Collection)((Object)other.getLocalValue()));
        }

        public Accumulator<T, HashSet<T>> clone() {
            SetAccumulator<T> result = new SetAccumulator<T>();
            result.set.addAll(this.set);
            return result;
        }
    }

    @ReduceOperator.Combinable
    @FunctionAnnotation.ConstantFields(value={0})
    public static class CountWords
    extends ReduceFunction
    implements Serializable {
        private static final long serialVersionUID = 1L;
        private final IntValue cnt = new IntValue();

        public void reduce(Iterator<Record> records, Collector<Record> out) {
            Record element = null;
            int sum = 0;
            while (records.hasNext()) {
                element = records.next();
                IntValue i = (IntValue)element.getField(1, IntValue.class);
                sum += i.getValue();
            }
            this.cnt.setValue(sum);
            element.setField(1, (Value)this.cnt);
            out.collect((Object)element);
        }
    }

    public static class TokenizeLine
    extends MapFunction
    implements Serializable {
        private static final long serialVersionUID = 1L;
        public static final String ACCUM_NUM_LINES = "accumulator.num-lines";
        private LongCounter numLines = new LongCounter();
        public static final String ACCUM_WORDS_PER_LINE = "accumulator.words-per-line";
        private Histogram wordsPerLine = new Histogram();
        public static final String ACCUM_DISTINCT_WORDS = "accumulator.distinct-words";
        private SetAccumulator<StringValue> distinctWords = new SetAccumulator();

        public void open(Configuration parameters) throws Exception {
            this.getRuntimeContext().addAccumulator(ACCUM_NUM_LINES, (Accumulator)this.numLines);
            this.getRuntimeContext().addAccumulator(ACCUM_WORDS_PER_LINE, (Accumulator)this.wordsPerLine);
            this.getRuntimeContext().addAccumulator(ACCUM_DISTINCT_WORDS, this.distinctWords);
        }

        public void map(Record record, Collector<Record> collector) {
            this.numLines.add(1L);
            String line = ((StringValue)record.getField(0, StringValue.class)).getValue();
            line = line.replaceAll("\\W+", " ").toLowerCase();
            StringTokenizer tokenizer = new StringTokenizer(line);
            int numWords = 0;
            while (tokenizer.hasMoreTokens()) {
                String word = tokenizer.nextToken();
                this.distinctWords.add(new StringValue((CharSequence)word));
                ++numWords;
                collector.collect((Object)new Record((Value)new StringValue((CharSequence)word), (Value)new IntValue(1)));
            }
            this.wordsPerLine.add(Integer.valueOf(numWords));
        }
    }
}

