/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.iterative;

import java.io.BufferedReader;
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.JoinOperator;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.examples.java.graph.ConnectedComponents;
import org.apache.flink.test.testdata.ConnectedComponentsData;
import org.apache.flink.test.util.JavaProgramTestBaseJUnit4;
import org.apache.flink.test.util.TestBaseUtils;

public class ConnectedComponentsITCase
extends JavaProgramTestBaseJUnit4 {
    private static final long SEED = 3287269182979823L;
    private static final int NUM_VERTICES = 1000;
    private static final int NUM_EDGES = 10000;
    protected String verticesPath;
    protected String edgesPath;
    protected String resultPath;

    protected void preSubmit() throws Exception {
        this.verticesPath = this.createTempFile("vertices.txt", ConnectedComponentsData.getEnumeratingVertices((int)1000));
        this.edgesPath = this.createTempFile("edges.txt", ConnectedComponentsData.getRandomOddEvenEdges((int)10000, (int)1000, (long)3287269182979823L));
        this.resultPath = this.getTempFilePath("results");
    }

    protected void testProgram() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource vertices = env.readCsvFile(this.verticesPath).types(Long.class);
        FlatMapOperator edges = env.readCsvFile(this.edgesPath).fieldDelimiter(" ").types(Long.class, Long.class).flatMap((FlatMapFunction)new ConnectedComponents.UndirectEdge());
        MapOperator verticesWithInitialId = vertices.map(new DuplicateValue());
        DeltaIteration iteration = verticesWithInitialId.iterateDelta((DataSet)verticesWithInitialId, 100, new int[]{0});
        JoinOperator.EquiJoin changes = iteration.getWorkset().join((DataSet)edges).where(new int[]{0}).equalTo(new int[]{0}).with((JoinFunction)new ConnectedComponents.NeighborWithComponentIDJoin()).groupBy(new int[]{0}).aggregate(Aggregations.MIN, 1).join((DataSet)iteration.getSolutionSet()).where(new int[]{0}).equalTo(new int[]{0}).with((FlatJoinFunction)new ConnectedComponents.ComponentIdFilter());
        DataSet result = iteration.closeWith((DataSet)changes, (DataSet)changes);
        result.writeAsCsv(this.resultPath, "\n", " ");
        env.execute("Connected Components Example");
    }

    protected void postSubmit() throws Exception {
        for (BufferedReader reader : TestBaseUtils.getResultReader((String)this.resultPath)) {
            ConnectedComponentsData.checkOddEvenResult((BufferedReader)reader);
        }
    }

    public static final class DuplicateValue<T>
    implements MapFunction<Tuple1<T>, Tuple2<T, T>> {
        public Tuple2<T, T> map(Tuple1<T> vertex) {
            return new Tuple2(vertex.f0, vertex.f0);
        }
    }
}

