/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.graph.library;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.graph.EdgeJoinFunction;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.gsa.ApplyFunction;
import org.apache.flink.graph.gsa.GSAConfiguration;
import org.apache.flink.graph.gsa.GatherFunction;
import org.apache.flink.graph.gsa.Neighbor;
import org.apache.flink.graph.gsa.SumFunction;
import org.apache.flink.types.LongValue;

public class GSAPageRank<K>
implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
    private double beta;
    private int maxIterations;

    public GSAPageRank(double beta, int maxIterations) {
        this.beta = beta;
        this.maxIterations = maxIterations;
    }

    @Override
    public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> network) throws Exception {
        DataSet<Tuple2<K, LongValue>> vertexOutDegrees = network.outDegrees();
        Graph<K, Double, Double> networkWithWeights = network.joinWithEdgesOnSource(vertexOutDegrees, new InitWeights());
        GSAConfiguration parameters = new GSAConfiguration();
        parameters.setOptNumVertices(true);
        return networkWithWeights.runGatherSumApplyIteration(new GatherRanks(), new SumRanks(), new UpdateRanks(this.beta), this.maxIterations, parameters).getVertices();
    }

    private static final class InitWeights
    implements EdgeJoinFunction<Double, LongValue> {
        private InitWeights() {
        }

        @Override
        public Double edgeJoin(Double edgeValue, LongValue inputValue) {
            return edgeValue / (double)inputValue.getValue();
        }
    }

    private static final class UpdateRanks<K>
    extends ApplyFunction<K, Double, Double> {
        private final double beta;

        public UpdateRanks(double beta) {
            this.beta = beta;
        }

        @Override
        public void apply(Double rankSum, Double currentValue) {
            this.setResult((1.0 - this.beta) / (double)this.getNumberOfVertices() + this.beta * rankSum);
        }
    }

    private static final class SumRanks
    extends SumFunction<Double, Double, Double> {
        private SumRanks() {
        }

        @Override
        public Double sum(Double newValue, Double currentValue) {
            return newValue + currentValue;
        }
    }

    private static final class GatherRanks
    extends GatherFunction<Double, Double, Double> {
        private GatherRanks() {
        }

        @Override
        public Double gather(Neighbor<Double, Double> neighbor) {
            double neighborRank = neighbor.getNeighborValue();
            if (this.getSuperstepNumber() == 1) {
                neighborRank = 1.0 / (double)this.getNumberOfVertices();
            }
            return neighborRank * neighbor.getEdgeValue();
        }
    }
}

