/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.examples.java.graph;

import java.util.HashSet;
import java.util.Set;
import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.CoGroupOperator;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.operators.JoinOperator;
import org.apache.flink.api.java.operators.SingleInputUdfOperator;
import org.apache.flink.api.java.operators.TwoInputUdfOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.examples.java.graph.util.ConnectedComponentsData;
import org.apache.flink.util.Collector;

public class TransitiveClosureNaive
implements ProgramDescription {
    private static boolean fileOutput = false;
    private static String edgesPath = null;
    private static String outputPath = null;
    private static int maxIterations = 10;

    public static void main(String ... args) throws Exception {
        if (!TransitiveClosureNaive.parseParameters(args)) {
            return;
        }
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple2<Long, Long>> edges = TransitiveClosureNaive.getEdgeDataSet(env);
        IterativeDataSet paths = edges.iterate(maxIterations);
        SingleInputUdfOperator nextPaths = ((JoinOperator)((JoinOperator)paths.join(edges).where(new int[]{1}).equalTo(new int[]{0}).with((JoinFunction)new JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>(){

            public Tuple2<Long, Long> join(Tuple2<Long, Long> left, Tuple2<Long, Long> right) throws Exception {
                return new Tuple2(left.f0, right.f1);
            }
        }).withForwardedFieldsFirst(new String[]{"0"})).withForwardedFieldsSecond(new String[]{"1"})).union((DataSet)paths).groupBy(new int[]{0, 1}).reduceGroup((GroupReduceFunction)new GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>(){

            public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) throws Exception {
                out.collect(values.iterator().next());
            }
        }).withForwardedFields(new String[]{"0;1"});
        TwoInputUdfOperator newPaths = ((CoGroupOperator)paths.coGroup((DataSet)nextPaths).where(new int[]{0}).equalTo(new int[]{0}).with((CoGroupFunction)new CoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>(){
            Set<Tuple2<Long, Long>> prevSet = new HashSet<Tuple2<Long, Long>>();

            public void coGroup(Iterable<Tuple2<Long, Long>> prevPaths, Iterable<Tuple2<Long, Long>> nextPaths, Collector<Tuple2<Long, Long>> out) throws Exception {
                for (Tuple2<Long, Long> prev : prevPaths) {
                    this.prevSet.add(prev);
                }
                for (Tuple2<Long, Long> next : nextPaths) {
                    if (this.prevSet.contains(next)) continue;
                    out.collect(next);
                }
            }
        }).withForwardedFieldsFirst(new String[]{"0"})).withForwardedFieldsSecond(new String[]{"0"});
        DataSet transitiveClosure = paths.closeWith((DataSet)nextPaths, (DataSet)newPaths);
        if (fileOutput) {
            transitiveClosure.writeAsCsv(outputPath, "\n", " ");
            env.execute("Transitive Closure Example");
        } else {
            transitiveClosure.print();
        }
    }

    public String getDescription() {
        return "Parameters: <edges-path> <result-path> <max-number-of-iterations>";
    }

    /*
     * Enabled aggressive block sorting
     */
    private static boolean parseParameters(String[] programArguments) {
        if (programArguments.length <= 0) {
            System.out.println("Executing TransitiveClosure example with default parameters and built-in default data.");
            System.out.println("  Provide parameters to read input data from files.");
            System.out.println("  See the documentation for the correct format of input files.");
            System.out.println("  Usage: TransitiveClosure <edges path> <result path> <max number of iterations>");
            return true;
        }
        fileOutput = true;
        if (programArguments.length == 3) {
            edgesPath = programArguments[0];
            outputPath = programArguments[1];
            maxIterations = Integer.parseInt(programArguments[2]);
            return true;
        }
        System.err.println("Usage: TransitiveClosure <edges path> <result path> <max number of iterations>");
        return false;
    }

    private static DataSet<Tuple2<Long, Long>> getEdgeDataSet(ExecutionEnvironment env) {
        if (fileOutput) {
            return env.readCsvFile(edgesPath).fieldDelimiter(" ").types(Long.class, Long.class);
        }
        return ConnectedComponentsData.getDefaultEdgeDataSet(env);
    }
}

