/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.iterative.aggregators;

import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.aggregators.Aggregator;
import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
import org.apache.flink.api.common.aggregators.LongSumAggregator;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.functions.RichJoinFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.GroupReduceOperator;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.util.JavaProgramTestBase;
import org.apache.flink.types.LongValue;
import org.apache.flink.util.Collector;

public class ConnectedComponentsWithParametrizableConvergenceITCase
extends JavaProgramTestBase {
    private static final int MAX_ITERATIONS = 10;
    private static final int parallelism = 1;
    protected static List<Tuple2<Long, Long>> verticesInput = new ArrayList<Tuple2<Long, Long>>();
    protected static List<Tuple2<Long, Long>> edgesInput = new ArrayList<Tuple2<Long, Long>>();
    private String resultPath;
    private String expectedResult;

    protected void preSubmit() throws Exception {
        verticesInput.clear();
        verticesInput.add((Tuple2<Long, Long>)new Tuple2((Object)1L, (Object)1L));
        verticesInput.add((Tuple2<Long, Long>)new Tuple2((Object)2L, (Object)2L));
        verticesInput.add((Tuple2<Long, Long>)new Tuple2((Object)3L, (Object)3L));
        verticesInput.add((Tuple2<Long, Long>)new Tuple2((Object)4L, (Object)4L));
        verticesInput.add((Tuple2<Long, Long>)new Tuple2((Object)5L, (Object)5L));
        verticesInput.add((Tuple2<Long, Long>)new Tuple2((Object)6L, (Object)6L));
        verticesInput.add((Tuple2<Long, Long>)new Tuple2((Object)7L, (Object)7L));
        verticesInput.add((Tuple2<Long, Long>)new Tuple2((Object)8L, (Object)8L));
        verticesInput.add((Tuple2<Long, Long>)new Tuple2((Object)9L, (Object)9L));
        edgesInput.clear();
        edgesInput.add((Tuple2<Long, Long>)new Tuple2((Object)1L, (Object)2L));
        edgesInput.add((Tuple2<Long, Long>)new Tuple2((Object)1L, (Object)3L));
        edgesInput.add((Tuple2<Long, Long>)new Tuple2((Object)2L, (Object)3L));
        edgesInput.add((Tuple2<Long, Long>)new Tuple2((Object)2L, (Object)4L));
        edgesInput.add((Tuple2<Long, Long>)new Tuple2((Object)2L, (Object)1L));
        edgesInput.add((Tuple2<Long, Long>)new Tuple2((Object)3L, (Object)1L));
        edgesInput.add((Tuple2<Long, Long>)new Tuple2((Object)3L, (Object)2L));
        edgesInput.add((Tuple2<Long, Long>)new Tuple2((Object)4L, (Object)2L));
        edgesInput.add((Tuple2<Long, Long>)new Tuple2((Object)4L, (Object)6L));
        edgesInput.add((Tuple2<Long, Long>)new Tuple2((Object)5L, (Object)6L));
        edgesInput.add((Tuple2<Long, Long>)new Tuple2((Object)6L, (Object)4L));
        edgesInput.add((Tuple2<Long, Long>)new Tuple2((Object)6L, (Object)5L));
        edgesInput.add((Tuple2<Long, Long>)new Tuple2((Object)7L, (Object)8L));
        edgesInput.add((Tuple2<Long, Long>)new Tuple2((Object)7L, (Object)9L));
        edgesInput.add((Tuple2<Long, Long>)new Tuple2((Object)8L, (Object)7L));
        edgesInput.add((Tuple2<Long, Long>)new Tuple2((Object)8L, (Object)9L));
        edgesInput.add((Tuple2<Long, Long>)new Tuple2((Object)9L, (Object)7L));
        edgesInput.add((Tuple2<Long, Long>)new Tuple2((Object)9L, (Object)8L));
        this.resultPath = this.getTempDirPath("result");
        this.expectedResult = "(1,1)\n(2,1)\n(3,1)\n(4,1)\n(5,2)\n(6,1)\n(7,7)\n(8,7)\n(9,7)\n";
    }

    protected void testProgram() throws Exception {
        ConnectedComponentsWithConvergenceProgram.runProgram(this.resultPath);
    }

    protected void postSubmit() throws Exception {
        ConnectedComponentsWithParametrizableConvergenceITCase.compareResultsByLinesInMemory((String)this.expectedResult, (String)this.resultPath);
    }

    public static final class UpdatedElementsConvergenceCriterion
    implements ConvergenceCriterion<LongValue> {
        private long threshold;

        public UpdatedElementsConvergenceCriterion(long u_threshold) {
            this.threshold = u_threshold;
        }

        public long getThreshold() {
            return this.threshold;
        }

        public boolean isConverged(int iteration, LongValue value) {
            return value.getValue() < this.threshold;
        }
    }

    public static final class MinimumIdFilter
    extends RichFlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>, Tuple2<Long, Long>> {
        private static LongSumAggregator aggr;

        public void open(Configuration conf) {
            aggr = (LongSumAggregator)this.getIterationRuntimeContext().getIterationAggregator("updated.elements.aggr");
        }

        public void flatMap(Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> vertexWithNewAndOldId, Collector<Tuple2<Long, Long>> out) throws Exception {
            if ((Long)((Tuple2)vertexWithNewAndOldId.f0).f1 < (Long)((Tuple2)vertexWithNewAndOldId.f1).f1) {
                out.collect(vertexWithNewAndOldId.f0);
                aggr.aggregate(1L);
            } else {
                out.collect(vertexWithNewAndOldId.f1);
            }
        }
    }

    public static final class MinimumReduce
    extends RichGroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
        private static final long serialVersionUID = 1L;
        final Tuple2<Long, Long> resultVertex = new Tuple2();

        public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) {
            Long vertexId = 0L;
            Long minimumCompId = Long.MAX_VALUE;
            for (Tuple2<Long, Long> value : values) {
                vertexId = (Long)value.f0;
                Long candidateCompId = (Long)value.f1;
                if (candidateCompId >= minimumCompId) continue;
                minimumCompId = candidateCompId;
            }
            this.resultVertex.f0 = vertexId;
            this.resultVertex.f1 = minimumCompId;
            out.collect(this.resultVertex);
        }
    }

    public static final class NeighborWithComponentIDJoin
    extends RichJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
        private static final long serialVersionUID = 1L;

        public Tuple2<Long, Long> join(Tuple2<Long, Long> vertexWithCompId, Tuple2<Long, Long> edge) throws Exception {
            vertexWithCompId.setField(edge.f1, 0);
            return vertexWithCompId;
        }
    }

    private static class ConnectedComponentsWithConvergenceProgram {
        private static final String UPDATED_ELEMENTS = "updated.elements.aggr";
        private static final long convergence_threshold = 3L;

        private ConnectedComponentsWithConvergenceProgram() {
        }

        public static String runProgram(String resultPath) throws Exception {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            DataSource initialSolutionSet = env.fromCollection(verticesInput);
            DataSource edges = env.fromCollection(edgesInput);
            IterativeDataSet iteration = initialSolutionSet.iterate(10);
            iteration.registerAggregationConvergenceCriterion(UPDATED_ELEMENTS, (Aggregator)new LongSumAggregator(), (ConvergenceCriterion)new UpdatedElementsConvergenceCriterion(3L));
            GroupReduceOperator verticesWithNewComponents = iteration.join((DataSet)edges).where(new int[]{0}).equalTo(new int[]{0}).with((JoinFunction)new NeighborWithComponentIDJoin()).groupBy(new int[]{0}).reduceGroup((GroupReduceFunction)new MinimumReduce());
            FlatMapOperator updatedComponentId = verticesWithNewComponents.join((DataSet)iteration).where(new int[]{0}).equalTo(new int[]{0}).flatMap((FlatMapFunction)new MinimumIdFilter());
            iteration.closeWith((DataSet)updatedComponentId).writeAsText(resultPath);
            env.execute();
            return resultPath;
        }
    }
}

