/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.recordJobs.graph.pageRankUtil;

import java.io.Serializable;
import java.util.Iterator;
import org.apache.flink.api.java.record.functions.CoGroupFunction;
import org.apache.flink.api.java.record.functions.FunctionAnnotation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.recordJobs.graph.pageRankUtil.PageRankStats;
import org.apache.flink.test.recordJobs.graph.pageRankUtil.PageRankStatsAggregator;
import org.apache.flink.test.recordJobs.util.ConfigUtils;
import org.apache.flink.types.BooleanValue;
import org.apache.flink.types.DoubleValue;
import org.apache.flink.types.LongValue;
import org.apache.flink.types.Record;
import org.apache.flink.types.Value;
import org.apache.flink.util.Collector;

@FunctionAnnotation.ConstantFieldsFirst(value={0})
public class DotProductCoGroup
extends CoGroupFunction
implements Serializable {
    private static final long serialVersionUID = 1L;
    public static final String NUM_VERTICES_PARAMETER = "pageRank.numVertices";
    public static final String NUM_DANGLING_VERTICES_PARAMETER = "pageRank.numDanglingVertices";
    public static final String AGGREGATOR_NAME = "pagerank.aggregator";
    private static final double BETA = 0.85;
    private PageRankStatsAggregator aggregator;
    private long numVertices;
    private long numDanglingVertices;
    private double dampingFactor;
    private double danglingRankFactor;
    private Record accumulator = new Record();
    private final DoubleValue newRank = new DoubleValue();
    private BooleanValue isDangling = new BooleanValue();
    private LongValue vertexID = new LongValue();
    private DoubleValue doubleInstance = new DoubleValue();

    public void open(Configuration parameters) throws Exception {
        int currentIteration = this.getIterationRuntimeContext().getSuperstepNumber();
        this.numVertices = ConfigUtils.asLong(NUM_VERTICES_PARAMETER, parameters);
        this.numDanglingVertices = ConfigUtils.asLong(NUM_DANGLING_VERTICES_PARAMETER, parameters);
        this.dampingFactor = 0.15000000000000002 / (double)this.numVertices;
        this.aggregator = (PageRankStatsAggregator)this.getIterationRuntimeContext().getIterationAggregator(AGGREGATOR_NAME);
        if (currentIteration == 1) {
            this.danglingRankFactor = 0.85 * (double)this.numDanglingVertices / ((double)this.numVertices * (double)this.numVertices);
        } else {
            PageRankStats previousAggregate = (PageRankStats)this.getIterationRuntimeContext().getPreviousIterationAggregate(AGGREGATOR_NAME);
            this.danglingRankFactor = 0.85 * previousAggregate.danglingRank() / (double)this.numVertices;
        }
    }

    public void coGroup(Iterator<Record> currentPageRankIterator, Iterator<Record> partialRanks, Collector<Record> collector) {
        if (!currentPageRankIterator.hasNext()) {
            long missingVertex = ((LongValue)partialRanks.next().getField(0, LongValue.class)).getValue();
            throw new IllegalStateException("No current page rank for vertex [" + missingVertex + "]!");
        }
        Record currentPageRank = currentPageRankIterator.next();
        long edges = 0L;
        double summedRank = 0.0;
        while (partialRanks.hasNext()) {
            summedRank += ((DoubleValue)partialRanks.next().getField(1, (Value)this.doubleInstance)).getValue();
            ++edges;
        }
        double rank = 0.85 * summedRank + this.dampingFactor + this.danglingRankFactor;
        double currentRank = ((DoubleValue)currentPageRank.getField(1, (Value)this.doubleInstance)).getValue();
        this.isDangling = (BooleanValue)currentPageRank.getField(2, (Value)this.isDangling);
        double danglingRankToAggregate = this.isDangling.get() ? rank : 0.0;
        long danglingVerticesToAggregate = this.isDangling.get() ? 1L : 0L;
        double diff = Math.abs(currentRank - rank);
        this.aggregator.aggregate(diff, rank, danglingRankToAggregate, danglingVerticesToAggregate, 1L, edges);
        this.newRank.setValue(rank);
        this.accumulator.setField(0, currentPageRank.getField(0, (Value)this.vertexID));
        this.accumulator.setField(1, (Value)this.newRank);
        this.accumulator.setField(2, (Value)this.isDangling);
        collector.collect((Object)this.accumulator);
    }
}

