package org.apache.flink.test.iterative.aggregators;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.util.Random;
import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
import org.apache.flink.api.common.aggregators.LongSumAggregator;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.apache.flink.types.LongValue;
import org.apache.flink.util.Collector;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/test/iterative/aggregators/AggregatorsITCase.class */
public class AggregatorsITCase extends MultipleProgramsTestBase {
    private static final int MAX_ITERATIONS = 20;
    private static final int parallelism = 2;
    private static final String NEGATIVE_ELEMENTS_AGGR = "count.negative.elements";
    private static String testString = "Et tu, Brute?";
    private static String testName = "testing_caesar";
    private static String testPath;
    private String resultPath;
    private String expected;

    @Rule
    public TemporaryFolder tempFolder;

    /* loaded from: input_file:org/apache/flink/test/iterative/aggregators/AggregatorsITCase$AggregateMapDelta.class */
    public static final class AggregateMapDelta extends RichMapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
        private LongSumAggregator aggr;
        private LongValue previousAggr;
        private int superstep;

        public void open(Configuration configuration) {
            this.aggr = getIterationRuntimeContext().getIterationAggregator(AggregatorsITCase.NEGATIVE_ELEMENTS_AGGR);
            this.superstep = getIterationRuntimeContext().getSuperstepNumber();
            if (this.superstep > 1) {
                this.previousAggr = getIterationRuntimeContext().getPreviousIterationAggregate(AggregatorsITCase.NEGATIVE_ELEMENTS_AGGR);
                Assert.assertEquals(this.superstep - 1, this.previousAggr.getValue());
            }
        }

        public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> tuple2) {
            if (((Integer) tuple2.f1).intValue() == this.superstep) {
                this.aggr.aggregate(1L);
            }
            return tuple2;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/iterative/aggregators/AggregatorsITCase$AggregateMapDeltaWithParam.class */
    public static final class AggregateMapDeltaWithParam extends RichMapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
        private LongSumAggregatorWithParameter aggr;
        private LongValue previousAggr;
        private int superstep;

        public void open(Configuration configuration) {
            this.aggr = getIterationRuntimeContext().getIterationAggregator(AggregatorsITCase.NEGATIVE_ELEMENTS_AGGR);
            this.superstep = getIterationRuntimeContext().getSuperstepNumber();
            if (this.superstep > 1) {
                this.previousAggr = getIterationRuntimeContext().getPreviousIterationAggregate(AggregatorsITCase.NEGATIVE_ELEMENTS_AGGR);
                switch (this.superstep) {
                    case AggregatorsITCase.parallelism /* 2 */:
                        Assert.assertEquals(6L, this.previousAggr.getValue());
                    case 3:
                        Assert.assertEquals(5L, this.previousAggr.getValue());
                    case 4:
                        Assert.assertEquals(3L, this.previousAggr.getValue());
                    case 5:
                        Assert.assertEquals(0L, this.previousAggr.getValue());
                        break;
                }
                Assert.assertEquals(this.superstep - 1, this.previousAggr.getValue());
            }
        }

        public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> tuple2) {
            if (((Integer) tuple2.f1).intValue() < this.aggr.getValue()) {
                this.aggr.aggregate(1L);
            }
            return tuple2;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/iterative/aggregators/AggregatorsITCase$LongSumAggregatorWithParameter.class */
    public static class LongSumAggregatorWithParameter extends LongSumAggregator {
        private int value;

        public LongSumAggregatorWithParameter(int i) {
            this.value = i;
        }

        public int getValue() {
            return this.value;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/iterative/aggregators/AggregatorsITCase$NegativeElementsConvergenceCriterion.class */
    public static final class NegativeElementsConvergenceCriterion implements ConvergenceCriterion<LongValue> {
        public boolean isConverged(int i, LongValue longValue) {
            return longValue.getValue() > 3;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/iterative/aggregators/AggregatorsITCase$NegativeElementsConvergenceCriterionWithParam.class */
    public static final class NegativeElementsConvergenceCriterionWithParam implements ConvergenceCriterion<LongValue> {
        private int value;

        public NegativeElementsConvergenceCriterionWithParam(int i) {
            this.value = i;
        }

        public int getValue() {
            return this.value;
        }

        public boolean isConverged(int i, LongValue longValue) {
            return longValue.getValue() > ((long) this.value);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/iterative/aggregators/AggregatorsITCase$ProjectSecondMapper.class */
    public static final class ProjectSecondMapper extends RichMapFunction<Tuple2<Integer, Integer>, Integer> {
        public Integer map(Tuple2<Integer, Integer> tuple2) {
            return (Integer) tuple2.f1;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/iterative/aggregators/AggregatorsITCase$SubtractOneMap.class */
    public static final class SubtractOneMap extends RichMapFunction<Integer, Integer> {
        private LongSumAggregator aggr;

        public void open(Configuration configuration) {
            this.aggr = getIterationRuntimeContext().getIterationAggregator(AggregatorsITCase.NEGATIVE_ELEMENTS_AGGR);
        }

        public Integer map(Integer num) {
            Integer valueOf = Integer.valueOf(num.intValue() - 1);
            if (valueOf.intValue() < 0) {
                this.aggr.aggregate(1L);
            }
            return valueOf;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/iterative/aggregators/AggregatorsITCase$SubtractOneMapWithParam.class */
    public static final class SubtractOneMapWithParam extends RichMapFunction<Integer, Integer> {
        private LongSumAggregatorWithParameter aggr;

        public void open(Configuration configuration) {
            this.aggr = getIterationRuntimeContext().getIterationAggregator(AggregatorsITCase.NEGATIVE_ELEMENTS_AGGR);
        }

        public Integer map(Integer num) {
            Integer valueOf = Integer.valueOf(num.intValue() - 1);
            if (valueOf.intValue() < this.aggr.getValue()) {
                this.aggr.aggregate(1L);
            }
            return valueOf;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/iterative/aggregators/AggregatorsITCase$TupleMakerMap.class */
    public static final class TupleMakerMap extends RichMapFunction<Integer, Tuple2<Integer, Integer>> {
        private Random rnd;

        public void open(Configuration configuration) {
            this.rnd = new Random((-4539650767900909907L) + getRuntimeContext().getIndexOfThisSubtask());
        }

        public Tuple2<Integer, Integer> map(Integer num) {
            return new Tuple2<>(Integer.valueOf(this.rnd.nextInt(100000)), num);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/iterative/aggregators/AggregatorsITCase$UpdateFilter.class */
    public static final class UpdateFilter extends RichFlatMapFunction<Tuple2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>, Tuple2<Integer, Integer>> {
        private int superstep;

        public void open(Configuration configuration) {
            this.superstep = getIterationRuntimeContext().getSuperstepNumber();
        }

        public void flatMap(Tuple2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> tuple2, Collector<Tuple2<Integer, Integer>> collector) {
            if (((Integer) ((Tuple2) tuple2.f0).f1).intValue() > this.superstep) {
                collector.collect(tuple2.f0);
            }
        }

        public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
            flatMap((Tuple2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>) obj, (Collector<Tuple2<Integer, Integer>>) collector);
        }
    }

    public AggregatorsITCase(MultipleProgramsTestBase.TestExecutionMode testExecutionMode) {
        super(testExecutionMode);
        this.tempFolder = new TemporaryFolder();
    }

    @Before
    public void before() throws Exception {
        File newFile = this.tempFolder.newFile();
        testPath = newFile.toString();
        this.resultPath = newFile.toURI().toString();
    }

    @After
    public void after() throws Exception {
        compareResultsByLinesInMemory(this.expected, this.resultPath);
    }

    @Test
    public void testDistributedCacheWithIterations() throws Exception {
        FileWriter fileWriter = new FileWriter(new File(testPath));
        fileWriter.write(testString);
        fileWriter.close();
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.registerCachedFile(this.resultPath, testName);
        IterativeDataSet iterate = executionEnvironment.fromElements(new Long[]{1L}).iterate(parallelism);
        iterate.closeWith(executionEnvironment.generateSequence(1L, 2L).filter(new RichFilterFunction<Long>() { // from class: org.apache.flink.test.iterative.aggregators.AggregatorsITCase.1
            public void open(Configuration configuration) throws Exception {
                BufferedReader bufferedReader = new BufferedReader(new FileReader(getRuntimeContext().getDistributedCache().getFile(AggregatorsITCase.testName)));
                String readLine = bufferedReader.readLine();
                bufferedReader.close();
                Assert.assertEquals(readLine, AggregatorsITCase.testString);
            }

            public boolean filter(Long l) throws Exception {
                return false;
            }
        }).withBroadcastSet(iterate, "SOLUTION")).output(new DiscardingOutputFormat());
        executionEnvironment.execute();
        this.expected = testString;
    }

    @Test
    public void testAggregatorWithoutParameterForIterate() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(parallelism);
        IterativeDataSet iterate = CollectionDataSets.getIntegerDataSet(executionEnvironment).iterate(MAX_ITERATIONS);
        LongSumAggregator longSumAggregator = new LongSumAggregator();
        iterate.registerAggregator(NEGATIVE_ELEMENTS_AGGR, longSumAggregator);
        iterate.registerAggregationConvergenceCriterion(NEGATIVE_ELEMENTS_AGGR, longSumAggregator, new NegativeElementsConvergenceCriterion());
        iterate.closeWith(iterate.map(new SubtractOneMap())).writeAsText(this.resultPath);
        executionEnvironment.execute();
        this.expected = "-3\n-2\n-2\n-1\n-1\n-1\n0\n0\n0\n0\n1\n1\n1\n1\n1\n";
    }

    @Test
    public void testAggregatorWithParameterForIterate() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(parallelism);
        IterativeDataSet iterate = CollectionDataSets.getIntegerDataSet(executionEnvironment).iterate(MAX_ITERATIONS);
        LongSumAggregatorWithParameter longSumAggregatorWithParameter = new LongSumAggregatorWithParameter(0);
        iterate.registerAggregator(NEGATIVE_ELEMENTS_AGGR, longSumAggregatorWithParameter);
        iterate.registerAggregationConvergenceCriterion(NEGATIVE_ELEMENTS_AGGR, longSumAggregatorWithParameter, new NegativeElementsConvergenceCriterion());
        iterate.closeWith(iterate.map(new SubtractOneMapWithParam())).writeAsText(this.resultPath);
        executionEnvironment.execute();
        this.expected = "-3\n-2\n-2\n-1\n-1\n-1\n0\n0\n0\n0\n1\n1\n1\n1\n1\n";
    }

    @Test
    public void testConvergenceCriterionWithParameterForIterate() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(parallelism);
        IterativeDataSet iterate = CollectionDataSets.getIntegerDataSet(executionEnvironment).iterate(MAX_ITERATIONS);
        LongSumAggregator longSumAggregator = new LongSumAggregator();
        iterate.registerAggregator(NEGATIVE_ELEMENTS_AGGR, longSumAggregator);
        iterate.registerAggregationConvergenceCriterion(NEGATIVE_ELEMENTS_AGGR, longSumAggregator, new NegativeElementsConvergenceCriterionWithParam(3));
        iterate.closeWith(iterate.map(new SubtractOneMap())).writeAsText(this.resultPath);
        executionEnvironment.execute();
        this.expected = "-3\n-2\n-2\n-1\n-1\n-1\n0\n0\n0\n0\n1\n1\n1\n1\n1\n";
    }

    @Test
    public void testAggregatorWithoutParameterForIterateDelta() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(parallelism);
        MapOperator map = CollectionDataSets.getIntegerDataSet(executionEnvironment).map(new TupleMakerMap());
        DeltaIteration iterateDelta = map.iterateDelta(map, MAX_ITERATIONS, new int[]{0});
        iterateDelta.registerAggregator(NEGATIVE_ELEMENTS_AGGR, new LongSumAggregator());
        FlatMapOperator flatMap = iterateDelta.getWorkset().map(new AggregateMapDelta()).join(iterateDelta.getSolutionSet()).where(new int[]{0}).equalTo(new int[]{0}).flatMap(new UpdateFilter());
        iterateDelta.closeWith(flatMap, flatMap).map(new ProjectSecondMapper()).writeAsText(this.resultPath);
        executionEnvironment.execute();
        this.expected = "1\n2\n2\n3\n3\n3\n4\n4\n4\n4\n5\n5\n5\n5\n5\n";
    }

    @Test
    public void testAggregatorWithParameterForIterateDelta() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(parallelism);
        MapOperator map = CollectionDataSets.getIntegerDataSet(executionEnvironment).map(new TupleMakerMap());
        DeltaIteration iterateDelta = map.iterateDelta(map, MAX_ITERATIONS, new int[]{0});
        iterateDelta.registerAggregator(NEGATIVE_ELEMENTS_AGGR, new LongSumAggregatorWithParameter(4));
        FlatMapOperator flatMap = iterateDelta.getWorkset().map(new AggregateMapDelta()).join(iterateDelta.getSolutionSet()).where(new int[]{0}).equalTo(new int[]{0}).flatMap(new UpdateFilter());
        iterateDelta.closeWith(flatMap, flatMap).map(new ProjectSecondMapper()).writeAsText(this.resultPath);
        executionEnvironment.execute();
        this.expected = "1\n2\n2\n3\n3\n3\n4\n4\n4\n4\n5\n5\n5\n5\n5\n";
    }
}
