/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.iterative.aggregators;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.util.Random;
import org.apache.flink.api.common.aggregators.Aggregator;
import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
import org.apache.flink.api.common.aggregators.LongSumAggregator;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.apache.flink.types.LongValue;
import org.apache.flink.util.Collector;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class AggregatorsITCase
extends MultipleProgramsTestBase {
    private static final int MAX_ITERATIONS = 20;
    private static final int parallelism = 2;
    private static final String NEGATIVE_ELEMENTS_AGGR = "count.negative.elements";
    private static String testString = "Et tu, Brute?";
    private static String testName = "testing_caesar";
    private static String testPath;
    private String resultPath;
    private String expected;
    @Rule
    public TemporaryFolder tempFolder = new TemporaryFolder();

    public AggregatorsITCase(MultipleProgramsTestBase.TestExecutionMode mode) {
        super(mode);
    }

    @Before
    public void before() throws Exception {
        File tempFile = this.tempFolder.newFile();
        testPath = tempFile.toString();
        this.resultPath = tempFile.toURI().toString();
    }

    @After
    public void after() throws Exception {
        AggregatorsITCase.compareResultsByLinesInMemory((String)this.expected, (String)this.resultPath);
    }

    @Test
    public void testDistributedCacheWithIterations() throws Exception {
        File tempFile = new File(testPath);
        FileWriter writer = new FileWriter(tempFile);
        writer.write(testString);
        writer.close();
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.registerCachedFile(this.resultPath, testName);
        IterativeDataSet solution = env.fromElements((Object[])new Long[]{1L}).iterate(2);
        solution.closeWith((DataSet)env.generateSequence(1L, 2L).filter((FilterFunction)new RichFilterFunction<Long>(){

            public void open(Configuration parameters) throws Exception {
                File file = this.getRuntimeContext().getDistributedCache().getFile(testName);
                BufferedReader reader = new BufferedReader(new FileReader(file));
                String output = reader.readLine();
                reader.close();
                Assert.assertEquals((Object)output, (Object)testString);
            }

            public boolean filter(Long value) throws Exception {
                return false;
            }
        }).withBroadcastSet((DataSet)solution, "SOLUTION")).output((OutputFormat)new DiscardingOutputFormat());
        env.execute();
        this.expected = testString;
    }

    @Test
    public void testAggregatorWithoutParameterForIterate() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        DataSet<Integer> initialSolutionSet = CollectionDataSets.getIntegerDataSet(env);
        IterativeDataSet iteration = initialSolutionSet.iterate(20);
        LongSumAggregator aggr = new LongSumAggregator();
        iteration.registerAggregator(NEGATIVE_ELEMENTS_AGGR, (Aggregator)aggr);
        iteration.registerAggregationConvergenceCriterion(NEGATIVE_ELEMENTS_AGGR, (Aggregator)aggr, (ConvergenceCriterion)new NegativeElementsConvergenceCriterion());
        MapOperator updatedDs = iteration.map((MapFunction)new SubtractOneMap());
        iteration.closeWith((DataSet)updatedDs).writeAsText(this.resultPath);
        env.execute();
        this.expected = "-3\n-2\n-2\n-1\n-1\n-1\n0\n0\n0\n0\n1\n1\n1\n1\n1\n";
    }

    @Test
    public void testAggregatorWithParameterForIterate() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        DataSet<Integer> initialSolutionSet = CollectionDataSets.getIntegerDataSet(env);
        IterativeDataSet iteration = initialSolutionSet.iterate(20);
        LongSumAggregatorWithParameter aggr = new LongSumAggregatorWithParameter(0);
        iteration.registerAggregator(NEGATIVE_ELEMENTS_AGGR, (Aggregator)aggr);
        iteration.registerAggregationConvergenceCriterion(NEGATIVE_ELEMENTS_AGGR, (Aggregator)aggr, (ConvergenceCriterion)new NegativeElementsConvergenceCriterion());
        MapOperator updatedDs = iteration.map((MapFunction)new SubtractOneMapWithParam());
        iteration.closeWith((DataSet)updatedDs).writeAsText(this.resultPath);
        env.execute();
        this.expected = "-3\n-2\n-2\n-1\n-1\n-1\n0\n0\n0\n0\n1\n1\n1\n1\n1\n";
    }

    @Test
    public void testConvergenceCriterionWithParameterForIterate() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        DataSet<Integer> initialSolutionSet = CollectionDataSets.getIntegerDataSet(env);
        IterativeDataSet iteration = initialSolutionSet.iterate(20);
        LongSumAggregator aggr = new LongSumAggregator();
        iteration.registerAggregator(NEGATIVE_ELEMENTS_AGGR, (Aggregator)aggr);
        iteration.registerAggregationConvergenceCriterion(NEGATIVE_ELEMENTS_AGGR, (Aggregator)aggr, (ConvergenceCriterion)new NegativeElementsConvergenceCriterionWithParam(3));
        MapOperator updatedDs = iteration.map((MapFunction)new SubtractOneMap());
        iteration.closeWith((DataSet)updatedDs).writeAsText(this.resultPath);
        env.execute();
        this.expected = "-3\n-2\n-2\n-1\n-1\n-1\n0\n0\n0\n0\n1\n1\n1\n1\n1\n";
    }

    @Test
    public void testAggregatorWithoutParameterForIterateDelta() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        MapOperator initialSolutionSet = CollectionDataSets.getIntegerDataSet(env).map((MapFunction)new TupleMakerMap());
        DeltaIteration iteration = initialSolutionSet.iterateDelta((DataSet)initialSolutionSet, 20, new int[]{0});
        LongSumAggregator aggr = new LongSumAggregator();
        iteration.registerAggregator(NEGATIVE_ELEMENTS_AGGR, (Aggregator)aggr);
        MapOperator updatedDs = iteration.getWorkset().map((MapFunction)new AggregateMapDelta());
        FlatMapOperator newElements = updatedDs.join((DataSet)iteration.getSolutionSet()).where(new int[]{0}).equalTo(new int[]{0}).flatMap((FlatMapFunction)new UpdateFilter());
        DataSet iterationRes = iteration.closeWith((DataSet)newElements, (DataSet)newElements);
        MapOperator result = iterationRes.map((MapFunction)new ProjectSecondMapper());
        result.writeAsText(this.resultPath);
        env.execute();
        this.expected = "1\n2\n2\n3\n3\n3\n4\n4\n4\n4\n5\n5\n5\n5\n5\n";
    }

    @Test
    public void testAggregatorWithParameterForIterateDelta() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        MapOperator initialSolutionSet = CollectionDataSets.getIntegerDataSet(env).map((MapFunction)new TupleMakerMap());
        DeltaIteration iteration = initialSolutionSet.iterateDelta((DataSet)initialSolutionSet, 20, new int[]{0});
        LongSumAggregatorWithParameter aggr = new LongSumAggregatorWithParameter(4);
        iteration.registerAggregator(NEGATIVE_ELEMENTS_AGGR, (Aggregator)aggr);
        MapOperator updatedDs = iteration.getWorkset().map((MapFunction)new AggregateMapDelta());
        FlatMapOperator newElements = updatedDs.join((DataSet)iteration.getSolutionSet()).where(new int[]{0}).equalTo(new int[]{0}).flatMap((FlatMapFunction)new UpdateFilter());
        DataSet iterationRes = iteration.closeWith((DataSet)newElements, (DataSet)newElements);
        MapOperator result = iterationRes.map((MapFunction)new ProjectSecondMapper());
        result.writeAsText(this.resultPath);
        env.execute();
        this.expected = "1\n2\n2\n3\n3\n3\n4\n4\n4\n4\n5\n5\n5\n5\n5\n";
    }

    public static final class AggregateMapDeltaWithParam
    extends RichMapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
        private LongSumAggregatorWithParameter aggr;
        private LongValue previousAggr;
        private int superstep;

        public void open(Configuration conf) {
            this.aggr = (LongSumAggregatorWithParameter)this.getIterationRuntimeContext().getIterationAggregator(AggregatorsITCase.NEGATIVE_ELEMENTS_AGGR);
            this.superstep = this.getIterationRuntimeContext().getSuperstepNumber();
            if (this.superstep > 1) {
                this.previousAggr = (LongValue)this.getIterationRuntimeContext().getPreviousIterationAggregate(AggregatorsITCase.NEGATIVE_ELEMENTS_AGGR);
                switch (this.superstep) {
                    case 2: {
                        Assert.assertEquals((long)6L, (long)this.previousAggr.getValue());
                    }
                    case 3: {
                        Assert.assertEquals((long)5L, (long)this.previousAggr.getValue());
                    }
                    case 4: {
                        Assert.assertEquals((long)3L, (long)this.previousAggr.getValue());
                    }
                    case 5: {
                        Assert.assertEquals((long)0L, (long)this.previousAggr.getValue());
                    }
                }
                Assert.assertEquals((long)(this.superstep - 1), (long)this.previousAggr.getValue());
            }
        }

        public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) {
            if ((Integer)value.f1 < this.aggr.getValue()) {
                this.aggr.aggregate(1L);
            }
            return value;
        }
    }

    public static final class ProjectSecondMapper
    extends RichMapFunction<Tuple2<Integer, Integer>, Integer> {
        public Integer map(Tuple2<Integer, Integer> value) {
            return (Integer)value.f1;
        }
    }

    public static final class UpdateFilter
    extends RichFlatMapFunction<Tuple2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>, Tuple2<Integer, Integer>> {
        private int superstep;

        public void open(Configuration conf) {
            this.superstep = this.getIterationRuntimeContext().getSuperstepNumber();
        }

        public void flatMap(Tuple2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> value, Collector<Tuple2<Integer, Integer>> out) {
            if ((Integer)((Tuple2)value.f0).f1 > this.superstep) {
                out.collect(value.f0);
            }
        }
    }

    public static final class AggregateMapDelta
    extends RichMapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
        private LongSumAggregator aggr;
        private LongValue previousAggr;
        private int superstep;

        public void open(Configuration conf) {
            this.aggr = (LongSumAggregator)this.getIterationRuntimeContext().getIterationAggregator(AggregatorsITCase.NEGATIVE_ELEMENTS_AGGR);
            this.superstep = this.getIterationRuntimeContext().getSuperstepNumber();
            if (this.superstep > 1) {
                this.previousAggr = (LongValue)this.getIterationRuntimeContext().getPreviousIterationAggregate(AggregatorsITCase.NEGATIVE_ELEMENTS_AGGR);
                Assert.assertEquals((long)(this.superstep - 1), (long)this.previousAggr.getValue());
            }
        }

        public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) {
            if ((Integer)value.f1 == this.superstep) {
                this.aggr.aggregate(1L);
            }
            return value;
        }
    }

    public static final class TupleMakerMap
    extends RichMapFunction<Integer, Tuple2<Integer, Integer>> {
        private Random rnd;

        public void open(Configuration parameters) {
            this.rnd = new Random(-4539650767900909907L + (long)this.getRuntimeContext().getIndexOfThisSubtask());
        }

        public Tuple2<Integer, Integer> map(Integer value) {
            Integer nodeId = this.rnd.nextInt(100000);
            return new Tuple2((Object)nodeId, (Object)value);
        }
    }

    public static class LongSumAggregatorWithParameter
    extends LongSumAggregator {
        private int value;

        public LongSumAggregatorWithParameter(int val) {
            this.value = val;
        }

        public int getValue() {
            return this.value;
        }
    }

    public static final class SubtractOneMapWithParam
    extends RichMapFunction<Integer, Integer> {
        private LongSumAggregatorWithParameter aggr;

        public void open(Configuration conf) {
            this.aggr = (LongSumAggregatorWithParameter)this.getIterationRuntimeContext().getIterationAggregator(AggregatorsITCase.NEGATIVE_ELEMENTS_AGGR);
        }

        public Integer map(Integer value) {
            Integer newValue = value - 1;
            if (newValue < this.aggr.getValue()) {
                this.aggr.aggregate(1L);
            }
            return newValue;
        }
    }

    public static final class SubtractOneMap
    extends RichMapFunction<Integer, Integer> {
        private LongSumAggregator aggr;

        public void open(Configuration conf) {
            this.aggr = (LongSumAggregator)this.getIterationRuntimeContext().getIterationAggregator(AggregatorsITCase.NEGATIVE_ELEMENTS_AGGR);
        }

        public Integer map(Integer value) {
            Integer newValue = value - 1;
            if (newValue < 0) {
                this.aggr.aggregate(1L);
            }
            return newValue;
        }
    }

    public static final class NegativeElementsConvergenceCriterionWithParam
    implements ConvergenceCriterion<LongValue> {
        private int value;

        public NegativeElementsConvergenceCriterionWithParam(int val) {
            this.value = val;
        }

        public int getValue() {
            return this.value;
        }

        public boolean isConverged(int iteration, LongValue value) {
            return value.getValue() > (long)this.value;
        }
    }

    public static final class NegativeElementsConvergenceCriterion
    implements ConvergenceCriterion<LongValue> {
        public boolean isConverged(int iteration, LongValue value) {
            return value.getValue() > 3L;
        }
    }
}

