/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.graph.library;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.EdgeJoinFunction;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.spargel.GatherFunction;
import org.apache.flink.graph.spargel.MessageIterator;
import org.apache.flink.graph.spargel.ScatterFunction;
import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
import org.apache.flink.types.LongValue;

public class PageRank<K>
implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
    private double beta;
    private int maxIterations;

    public PageRank(double beta, int maxIterations) {
        this.beta = beta;
        this.maxIterations = maxIterations;
    }

    @Override
    public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> network) throws Exception {
        DataSet<Tuple2<K, LongValue>> vertexOutDegrees = network.outDegrees();
        Graph<K, Double, Double> networkWithWeights = network.joinWithEdgesOnSource(vertexOutDegrees, new InitWeights());
        ScatterGatherConfiguration parameters = new ScatterGatherConfiguration();
        parameters.setOptNumVertices(true);
        return networkWithWeights.runScatterGatherIteration(new RankMessenger(), new VertexRankUpdater(this.beta), this.maxIterations, parameters).getVertices();
    }

    private static final class InitWeights
    implements EdgeJoinFunction<Double, LongValue> {
        private InitWeights() {
        }

        @Override
        public Double edgeJoin(Double edgeValue, LongValue inputValue) {
            return edgeValue / (double)inputValue.getValue();
        }
    }

    public static final class VertexRankUpdater<K>
    extends GatherFunction<K, Double, Double> {
        private final double beta;

        public VertexRankUpdater(double beta) {
            this.beta = beta;
        }

        @Override
        public void updateVertex(Vertex<K, Double> vertex, MessageIterator<Double> inMessages) {
            double rankSum = 0.0;
            for (double msg : inMessages) {
                rankSum += msg;
            }
            double newRank = this.beta * rankSum + (1.0 - this.beta) / (double)this.getNumberOfVertices();
            this.setNewVertexValue(newRank);
        }
    }

    public static final class RankMessenger<K>
    extends ScatterFunction<K, Double, Double, Double> {
        @Override
        public void sendMessages(Vertex<K, Double> vertex) {
            if (this.getSuperstepNumber() == 1) {
                vertex.setValue(1.0 / (double)this.getNumberOfVertices());
            }
            for (Edge edge : this.getEdges()) {
                this.sendMessageTo(edge.getTarget(), vertex.getValue() * (Double)edge.getValue());
            }
        }
    }
}

