package org.apache.flink.test.iterative.aggregators;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Random;
import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
import org.apache.flink.api.common.aggregators.LongSumAggregator;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
import org.apache.flink.test.util.JavaProgramTestBase;
import org.apache.flink.types.LongValue;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/test/iterative/aggregators/AggregatorsITCase.class */
public class AggregatorsITCase extends JavaProgramTestBase {
    private static final int NUM_PROGRAMS = 5;
    private static final int MAX_ITERATIONS = 20;
    private static final int DOP = 2;
    private int curProgId;
    private String resultPath;
    private String expectedResult;

    /* loaded from: input_file:org/apache/flink/test/iterative/aggregators/AggregatorsITCase$AggregateMapDelta.class */
    public static final class AggregateMapDelta extends RichMapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
        private LongSumAggregator aggr;
        private LongValue previousAggr;
        private int superstep;

        public void open(Configuration configuration) {
            this.aggr = getIterationRuntimeContext().getIterationAggregator("count.negative.elements");
            this.superstep = getIterationRuntimeContext().getSuperstepNumber();
            if (this.superstep > 1) {
                this.previousAggr = getIterationRuntimeContext().getPreviousIterationAggregate("count.negative.elements");
                Assert.assertEquals(this.superstep - 1, this.previousAggr.getValue());
            }
        }

        public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> tuple2) {
            if (((Integer) tuple2.f1).intValue() == this.superstep) {
                this.aggr.aggregate(1L);
            }
            return tuple2;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/iterative/aggregators/AggregatorsITCase$AggregateMapDeltaWithParam.class */
    public static final class AggregateMapDeltaWithParam extends RichMapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
        private LongSumAggregatorWithParameter aggr;
        private LongValue previousAggr;
        private int superstep;

        public void open(Configuration configuration) {
            this.aggr = getIterationRuntimeContext().getIterationAggregator("count.negative.elements");
            this.superstep = getIterationRuntimeContext().getSuperstepNumber();
            if (this.superstep > 1) {
                this.previousAggr = getIterationRuntimeContext().getPreviousIterationAggregate("count.negative.elements");
                switch (this.superstep) {
                    case AggregatorsITCase.DOP /* 2 */:
                        Assert.assertEquals(6L, this.previousAggr.getValue());
                    case 3:
                        Assert.assertEquals(5L, this.previousAggr.getValue());
                    case 4:
                        Assert.assertEquals(3L, this.previousAggr.getValue());
                    case AggregatorsITCase.NUM_PROGRAMS /* 5 */:
                        Assert.assertEquals(0L, this.previousAggr.getValue());
                        break;
                }
                Assert.assertEquals(this.superstep - 1, this.previousAggr.getValue());
            }
        }

        public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> tuple2) {
            if (((Integer) tuple2.f1).intValue() < this.aggr.getValue()) {
                this.aggr.aggregate(1L);
            }
            return tuple2;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/iterative/aggregators/AggregatorsITCase$AggregatorProgs.class */
    private static class AggregatorProgs {
        private static final String NEGATIVE_ELEMENTS_AGGR = "count.negative.elements";

        private AggregatorProgs() {
        }

        public static String runProgram(int i, String str) throws Exception {
            switch (i) {
                case 1:
                    ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
                    executionEnvironment.setDegreeOfParallelism(AggregatorsITCase.DOP);
                    IterativeDataSet iterate = CollectionDataSets.getIntegerDataSet(executionEnvironment).iterate(20);
                    LongSumAggregator longSumAggregator = new LongSumAggregator();
                    iterate.registerAggregator(NEGATIVE_ELEMENTS_AGGR, longSumAggregator);
                    iterate.registerAggregationConvergenceCriterion(NEGATIVE_ELEMENTS_AGGR, longSumAggregator, new NegativeElementsConvergenceCriterion());
                    iterate.closeWith(iterate.map(new SubtractOneMap())).writeAsText(str);
                    executionEnvironment.execute();
                    return "-3\n-2\n-2\n-1\n-1\n-1\n0\n0\n0\n0\n1\n1\n1\n1\n1\n";
                case AggregatorsITCase.DOP /* 2 */:
                    ExecutionEnvironment executionEnvironment2 = ExecutionEnvironment.getExecutionEnvironment();
                    executionEnvironment2.setDegreeOfParallelism(AggregatorsITCase.DOP);
                    IterativeDataSet iterate2 = CollectionDataSets.getIntegerDataSet(executionEnvironment2).iterate(20);
                    LongSumAggregatorWithParameter longSumAggregatorWithParameter = new LongSumAggregatorWithParameter(0);
                    iterate2.registerAggregator(NEGATIVE_ELEMENTS_AGGR, longSumAggregatorWithParameter);
                    iterate2.registerAggregationConvergenceCriterion(NEGATIVE_ELEMENTS_AGGR, longSumAggregatorWithParameter, new NegativeElementsConvergenceCriterion());
                    iterate2.closeWith(iterate2.map(new SubtractOneMapWithParam())).writeAsText(str);
                    executionEnvironment2.execute();
                    return "-3\n-2\n-2\n-1\n-1\n-1\n0\n0\n0\n0\n1\n1\n1\n1\n1\n";
                case 3:
                    ExecutionEnvironment executionEnvironment3 = ExecutionEnvironment.getExecutionEnvironment();
                    executionEnvironment3.setDegreeOfParallelism(AggregatorsITCase.DOP);
                    IterativeDataSet iterate3 = CollectionDataSets.getIntegerDataSet(executionEnvironment3).iterate(20);
                    LongSumAggregator longSumAggregator2 = new LongSumAggregator();
                    iterate3.registerAggregator(NEGATIVE_ELEMENTS_AGGR, longSumAggregator2);
                    iterate3.registerAggregationConvergenceCriterion(NEGATIVE_ELEMENTS_AGGR, longSumAggregator2, new NegativeElementsConvergenceCriterionWithParam(3));
                    iterate3.closeWith(iterate3.map(new SubtractOneMap())).writeAsText(str);
                    executionEnvironment3.execute();
                    return "-3\n-2\n-2\n-1\n-1\n-1\n0\n0\n0\n0\n1\n1\n1\n1\n1\n";
                case 4:
                    ExecutionEnvironment executionEnvironment4 = ExecutionEnvironment.getExecutionEnvironment();
                    executionEnvironment4.setDegreeOfParallelism(AggregatorsITCase.DOP);
                    MapOperator map = CollectionDataSets.getIntegerDataSet(executionEnvironment4).map(new TupleMakerMap());
                    DeltaIteration iterateDelta = map.iterateDelta(map, 20, new int[]{0});
                    iterateDelta.registerAggregator(NEGATIVE_ELEMENTS_AGGR, new LongSumAggregator());
                    FlatMapOperator flatMap = iterateDelta.getWorkset().map(new AggregateMapDelta()).join(iterateDelta.getSolutionSet()).where(new int[]{0}).equalTo(new int[]{0}).flatMap(new UpdateFilter());
                    iterateDelta.closeWith(flatMap, flatMap).map(new ProjectSecondMapper()).writeAsText(str);
                    executionEnvironment4.execute();
                    return "1\n2\n2\n3\n3\n3\n4\n4\n4\n4\n5\n5\n5\n5\n5\n";
                case AggregatorsITCase.NUM_PROGRAMS /* 5 */:
                    ExecutionEnvironment executionEnvironment5 = ExecutionEnvironment.getExecutionEnvironment();
                    executionEnvironment5.setDegreeOfParallelism(AggregatorsITCase.DOP);
                    MapOperator map2 = CollectionDataSets.getIntegerDataSet(executionEnvironment5).map(new TupleMakerMap());
                    DeltaIteration iterateDelta2 = map2.iterateDelta(map2, 20, new int[]{0});
                    iterateDelta2.registerAggregator(NEGATIVE_ELEMENTS_AGGR, new LongSumAggregatorWithParameter(4));
                    FlatMapOperator flatMap2 = iterateDelta2.getWorkset().map(new AggregateMapDelta()).join(iterateDelta2.getSolutionSet()).where(new int[]{0}).equalTo(new int[]{0}).flatMap(new UpdateFilter());
                    iterateDelta2.closeWith(flatMap2, flatMap2).map(new ProjectSecondMapper()).writeAsText(str);
                    executionEnvironment5.execute();
                    return "1\n2\n2\n3\n3\n3\n4\n4\n4\n4\n5\n5\n5\n5\n5\n";
                default:
                    throw new IllegalArgumentException("Invalid program id");
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/test/iterative/aggregators/AggregatorsITCase$LongSumAggregatorWithParameter.class */
    public static class LongSumAggregatorWithParameter extends LongSumAggregator {
        private int value;

        public LongSumAggregatorWithParameter(int i) {
            this.value = i;
        }

        public int getValue() {
            return this.value;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/iterative/aggregators/AggregatorsITCase$NegativeElementsConvergenceCriterion.class */
    public static final class NegativeElementsConvergenceCriterion implements ConvergenceCriterion<LongValue> {
        public boolean isConverged(int i, LongValue longValue) {
            return longValue.getValue() > 3;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/iterative/aggregators/AggregatorsITCase$NegativeElementsConvergenceCriterionWithParam.class */
    public static final class NegativeElementsConvergenceCriterionWithParam implements ConvergenceCriterion<LongValue> {
        private int value;

        public NegativeElementsConvergenceCriterionWithParam(int i) {
            this.value = i;
        }

        public int getValue() {
            return this.value;
        }

        public boolean isConverged(int i, LongValue longValue) {
            return longValue.getValue() > ((long) this.value);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/iterative/aggregators/AggregatorsITCase$ProjectSecondMapper.class */
    public static final class ProjectSecondMapper extends RichMapFunction<Tuple2<Integer, Integer>, Integer> {
        public Integer map(Tuple2<Integer, Integer> tuple2) {
            return (Integer) tuple2.f1;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/iterative/aggregators/AggregatorsITCase$SubtractOneMap.class */
    public static final class SubtractOneMap extends RichMapFunction<Integer, Integer> {
        private LongSumAggregator aggr;

        public void open(Configuration configuration) {
            this.aggr = getIterationRuntimeContext().getIterationAggregator("count.negative.elements");
        }

        public Integer map(Integer num) {
            Integer num2 = new Integer(num.intValue() - 1);
            if (num2.intValue() < 0) {
                this.aggr.aggregate(1L);
            }
            return num2;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/iterative/aggregators/AggregatorsITCase$SubtractOneMapWithParam.class */
    public static final class SubtractOneMapWithParam extends RichMapFunction<Integer, Integer> {
        private LongSumAggregatorWithParameter aggr;

        public void open(Configuration configuration) {
            this.aggr = getIterationRuntimeContext().getIterationAggregator("count.negative.elements");
        }

        public Integer map(Integer num) {
            Integer num2 = new Integer(num.intValue() - 1);
            if (num2.intValue() < this.aggr.getValue()) {
                this.aggr.aggregate(1L);
            }
            return num2;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/iterative/aggregators/AggregatorsITCase$TupleMakerMap.class */
    public static final class TupleMakerMap extends RichMapFunction<Integer, Tuple2<Integer, Integer>> {
        public Tuple2<Integer, Integer> map(Integer num) throws Exception {
            return new Tuple2<>(new Integer(new Random().nextInt(100000)), num);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/iterative/aggregators/AggregatorsITCase$UpdateFilter.class */
    public static final class UpdateFilter extends RichFlatMapFunction<Tuple2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>, Tuple2<Integer, Integer>> {
        private int superstep;

        public void open(Configuration configuration) {
            this.superstep = getIterationRuntimeContext().getSuperstepNumber();
        }

        public void flatMap(Tuple2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> tuple2, Collector<Tuple2<Integer, Integer>> collector) throws Exception {
            if (((Integer) ((Tuple2) tuple2.f0).f1).intValue() > this.superstep) {
                collector.collect(tuple2.f0);
            }
        }

        public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
            flatMap((Tuple2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>) obj, (Collector<Tuple2<Integer, Integer>>) collector);
        }
    }

    public AggregatorsITCase(Configuration configuration) {
        super(configuration);
        this.curProgId = this.config.getInteger("ProgramId", -1);
    }

    protected void preSubmit() throws Exception {
        this.resultPath = getTempDirPath("result");
    }

    protected void testProgram() throws Exception {
        this.expectedResult = AggregatorProgs.runProgram(this.curProgId, this.resultPath);
    }

    protected void postSubmit() throws Exception {
        compareResultsByLinesInMemory(this.expectedResult, this.resultPath);
    }

    @Parameterized.Parameters
    public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
        LinkedList linkedList = new LinkedList();
        for (int i = 1; i <= NUM_PROGRAMS; i++) {
            Configuration configuration = new Configuration();
            configuration.setInteger("ProgramId", i);
            linkedList.add(configuration);
        }
        return toParameterList(linkedList);
    }
}
