/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.accumulators;

import java.io.Serializable;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.common.accumulators.DoubleCounter;
import org.apache.flink.api.common.accumulators.Histogram;
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.legacy.OutputFormatSinkFunction;
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
import org.apache.flink.streaming.api.legacy.io.TextInputFormat;
import org.apache.flink.streaming.api.legacy.io.TextOutputFormat;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.test.util.JavaProgramTestBaseJUnit4;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.types.StringValue;
import org.apache.flink.util.Collector;
import org.junit.Assert;

public class AccumulatorITCase
extends JavaProgramTestBaseJUnit4 {
    private static final String INPUT = "one\ntwo two\nthree three three\n";
    private static final String EXPECTED = "one 1\ntwo 2\nthree 3\n";
    private String dataPath;
    private String resultPath;
    private JobExecutionResult result;

    protected void preSubmit() throws Exception {
        this.dataPath = this.createTempFile("datapoints.txt", INPUT);
        this.resultPath = this.getTempFilePath("result");
    }

    protected void postSubmit() throws Exception {
        TestBaseUtils.compareResultsByLinesInMemory((String)EXPECTED, (String)this.resultPath);
        System.out.println("Accumulator results:");
        JobExecutionResult res = this.result;
        System.out.println(AccumulatorHelper.getResultsFormatted((Map)res.getAllAccumulatorResults()));
        Assert.assertEquals((Object)3, (Object)res.getAccumulatorResult("num-lines"));
        Assert.assertEquals((Object)this.getParallelism(), (Object)res.getAccumulatorResult("open-close-counter"));
        HashMap<Integer, Integer> dist = new HashMap<Integer, Integer>();
        dist.put(1, 1);
        dist.put(2, 1);
        dist.put(3, 1);
        Assert.assertEquals(dist, (Object)res.getAccumulatorResult("words-per-line"));
        HashSet<StringValue> distinctWords = new HashSet<StringValue>();
        distinctWords.add(new StringValue((CharSequence)"one"));
        distinctWords.add(new StringValue((CharSequence)"two"));
        distinctWords.add(new StringValue((CharSequence)"three"));
        Assert.assertEquals(distinctWords, (Object)res.getAccumulatorResult("distinct-words"));
    }

    protected void testProgram() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource input = env.createInput((InputFormat)new TextInputFormat(new Path(this.dataPath))).setParallelism(this.getParallelism());
        input.flatMap((FlatMapFunction)new TokenizeLine()).keyBy((KeySelector & Serializable)x -> (String)x.f0).window((WindowAssigner)GlobalWindows.createWithEndOfStreamTrigger()).reduce((ReduceFunction)new ReduceFunction<Tuple2<String, Integer>>(){

            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
                return Tuple2.of((Object)((String)value2.f0), (Object)((Integer)value1.f1 + (Integer)value2.f1));
            }
        }).map((MapFunction)new MapFunction<Tuple2<String, Integer>, String>(){

            public String map(Tuple2<String, Integer> value) throws Exception {
                return (String)value.f0 + " " + value.f1;
            }
        }).addSink((SinkFunction)new OutputFormatSinkFunction((OutputFormat)new TextOutputFormat(new Path(this.resultPath))));
        this.result = env.execute();
    }

    public static class SetAccumulator<T>
    implements Accumulator<T, HashSet<T>> {
        private static final long serialVersionUID = 1L;
        private HashSet<T> set = new HashSet();

        public void add(T value) {
            this.set.add(value);
        }

        public HashSet<T> getLocalValue() {
            return this.set;
        }

        public void resetLocal() {
            this.set.clear();
        }

        public void merge(Accumulator<T, HashSet<T>> other) {
            this.set.addAll((Collection)((Object)other.getLocalValue()));
        }

        public Accumulator<T, HashSet<T>> clone() {
            SetAccumulator<T> result = new SetAccumulator<T>();
            result.set.addAll(this.set);
            return result;
        }
    }

    private static class CountWords
    extends RichGroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>
    implements GroupCombineFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
        private IntCounter reduceCalls;
        private IntCounter combineCalls;

        private CountWords() {
        }

        public void open(OpenContext openContext) {
            this.reduceCalls = this.getRuntimeContext().getIntCounter("reduce-calls");
            this.combineCalls = this.getRuntimeContext().getIntCounter("combine-calls");
        }

        public void reduce(Iterable<Tuple2<String, Integer>> values, Collector<Tuple2<String, Integer>> out) {
            this.reduceCalls.add(1);
            this.reduceInternal(values, out);
        }

        public void combine(Iterable<Tuple2<String, Integer>> values, Collector<Tuple2<String, Integer>> out) {
            this.combineCalls.add(1);
            this.reduceInternal(values, out);
        }

        private void reduceInternal(Iterable<Tuple2<String, Integer>> values, Collector<Tuple2<String, Integer>> out) {
            int sum = 0;
            String key = null;
            for (Tuple2<String, Integer> e : values) {
                key = (String)e.f0;
                sum += ((Integer)e.f1).intValue();
            }
            out.collect((Object)new Tuple2(key, (Object)sum));
        }
    }

    private static class TokenizeLine
    extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
        private IntCounter cntNumLines;
        private Histogram wordsPerLineDistribution;
        private DoubleCounter openCloseCounter = new DoubleCounter();
        private SetAccumulator<StringValue> distinctWords;

        private TokenizeLine() {
        }

        public void open(OpenContext openContext) {
            this.cntNumLines = this.getRuntimeContext().getIntCounter("num-lines");
            this.wordsPerLineDistribution = this.getRuntimeContext().getHistogram("words-per-line");
            this.getRuntimeContext().addAccumulator("open-close-counter", (Accumulator)this.openCloseCounter);
            this.distinctWords = new SetAccumulator();
            this.getRuntimeContext().addAccumulator("distinct-words", this.distinctWords);
            IntCounter simpleCounter = this.getRuntimeContext().getIntCounter("simple-counter");
            simpleCounter.add(1);
            Assert.assertEquals((long)simpleCounter.getLocalValue().intValue(), (long)1L);
            IntCounter simpleCounter2 = this.getRuntimeContext().getIntCounter("simple-counter");
            Assert.assertEquals((Object)simpleCounter.getLocalValue(), (Object)simpleCounter2.getLocalValue());
            try {
                DoubleCounter simpleCounter3 = this.getRuntimeContext().getDoubleCounter("simple-counter");
                Assert.fail((String)"Should not be able to obtain previously created counter with different type");
            }
            catch (UnsupportedOperationException unsupportedOperationException) {
                // empty catch block
            }
            this.openCloseCounter.add(0.5);
        }

        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            this.cntNumLines.add(1);
            int wordsPerLine = 0;
            for (String token : value.toLowerCase().split("\\W+")) {
                this.distinctWords.add(new StringValue((CharSequence)token));
                out.collect((Object)new Tuple2((Object)token, (Object)1));
                ++wordsPerLine;
            }
            this.wordsPerLineDistribution.add(Integer.valueOf(wordsPerLine));
        }

        public void close() throws Exception {
            this.openCloseCounter.add(0.5);
            Assert.assertEquals((long)1L, (long)this.openCloseCounter.getLocalValue().intValue());
        }
    }
}

