/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.recordJobs.graph;

import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.Program;
import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.api.common.aggregators.Aggregator;
import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.io.FileOutputFormat;
import org.apache.flink.api.common.operators.GenericDataSinkBase;
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.java.record.functions.CoGroupFunction;
import org.apache.flink.api.java.record.functions.JoinFunction;
import org.apache.flink.api.java.record.operators.BulkIteration;
import org.apache.flink.api.java.record.operators.CoGroupOperator;
import org.apache.flink.api.java.record.operators.FileDataSink;
import org.apache.flink.api.java.record.operators.FileDataSource;
import org.apache.flink.api.java.record.operators.JoinOperator;
import org.apache.flink.test.recordJobs.graph.pageRankUtil.DanglingPageRankInputFormat;
import org.apache.flink.test.recordJobs.graph.pageRankUtil.DiffL1NormConvergenceCriterion;
import org.apache.flink.test.recordJobs.graph.pageRankUtil.DotProductCoGroup;
import org.apache.flink.test.recordJobs.graph.pageRankUtil.DotProductMatch;
import org.apache.flink.test.recordJobs.graph.pageRankUtil.ImprovedAdjacencyListInputFormat;
import org.apache.flink.test.recordJobs.graph.pageRankUtil.PageRankStatsAggregator;
import org.apache.flink.test.recordJobs.graph.pageRankUtil.PageWithRankOutFormat;
import org.apache.flink.types.LongValue;

public class DanglingPageRank
implements Program,
ProgramDescription {
    private static final long serialVersionUID = 1L;
    public static final String NUM_VERTICES_CONFIG_PARAM = "pageRank.numVertices";

    public Plan getPlan(String ... args) {
        int parallelism = 1;
        String pageWithRankInputPath = "";
        String adjacencyListInputPath = "";
        String outputPath = "";
        int numIterations = 25;
        long numVertices = 5L;
        long numDanglingVertices = 1L;
        if (args.length >= 7) {
            parallelism = Integer.parseInt(args[0]);
            pageWithRankInputPath = args[1];
            adjacencyListInputPath = args[2];
            outputPath = args[3];
            numIterations = Integer.parseInt(args[4]);
            numVertices = Long.parseLong(args[5]);
            numDanglingVertices = Long.parseLong(args[6]);
        }
        FileDataSource pageWithRankInput = new FileDataSource((FileInputFormat)new DanglingPageRankInputFormat(), pageWithRankInputPath, "DanglingPageWithRankInput");
        pageWithRankInput.getParameters().setLong(NUM_VERTICES_CONFIG_PARAM, numVertices);
        BulkIteration iteration = new BulkIteration("Page Rank Loop");
        iteration.setInput((Operator)pageWithRankInput);
        FileDataSource adjacencyListInput = new FileDataSource((FileInputFormat)new ImprovedAdjacencyListInputFormat(), adjacencyListInputPath, "AdjancencyListInput");
        JoinOperator join = JoinOperator.builder((JoinFunction)new DotProductMatch(), LongValue.class, (int)0, (int)0).input1(iteration.getPartialSolution()).input2((Operator)adjacencyListInput).name("Join with Edges").build();
        CoGroupOperator rankAggregation = CoGroupOperator.builder((CoGroupFunction)new DotProductCoGroup(), LongValue.class, (int)0, (int)0).input1(iteration.getPartialSolution()).input2((Operator)join).name("Rank Aggregation").build();
        rankAggregation.getParameters().setLong(NUM_VERTICES_CONFIG_PARAM, numVertices);
        rankAggregation.getParameters().setLong("pageRank.numDanglingVertices", numDanglingVertices);
        iteration.setNextPartialSolution((Operator)rankAggregation);
        iteration.setMaximumNumberOfIterations(numIterations);
        iteration.getAggregators().registerAggregationConvergenceCriterion("pagerank.aggregator", (Aggregator)new PageRankStatsAggregator(), (ConvergenceCriterion)new DiffL1NormConvergenceCriterion());
        FileDataSink out = new FileDataSink((FileOutputFormat)new PageWithRankOutFormat(), outputPath, (Operator)iteration, "Final Ranks");
        Plan p = new Plan((GenericDataSinkBase)out, "Dangling PageRank");
        p.setDefaultParallelism(parallelism);
        return p;
    }

    public String getDescription() {
        return "Parameters: <parallelism> <pages-input-path> <edges-input-path> <output-path> <max-iterations> <num-vertices> <num-dangling-vertices>";
    }
}

