/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.iterative.nephele.customdanglingpagerank;

import java.util.Iterator;
import java.util.Set;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.iterative.nephele.ConfigUtils;
import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRank;
import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRankAndDangling;
import org.apache.flink.test.iterative.nephele.danglingpagerank.PageRankStats;
import org.apache.flink.test.iterative.nephele.danglingpagerank.PageRankStatsAggregator;
import org.apache.flink.util.Collector;

public class CustomCompensatableDotProductCoGroup
extends AbstractRichFunction
implements CoGroupFunction<VertexWithRankAndDangling, VertexWithRank, VertexWithRankAndDangling> {
    private static final long serialVersionUID = 1L;
    public static final String AGGREGATOR_NAME = "pagerank.aggregator";
    private VertexWithRankAndDangling accumulator = new VertexWithRankAndDangling();
    private PageRankStatsAggregator aggregator;
    private long numVertices;
    private long numDanglingVertices;
    private double dampingFactor;
    private double danglingRankFactor;
    private static final double BETA = 0.85;
    private int workerIndex;
    private int currentIteration;
    private int failingIteration;
    private Set<Integer> failingWorkers;

    public void open(Configuration parameters) throws Exception {
        this.workerIndex = this.getRuntimeContext().getIndexOfThisSubtask();
        this.currentIteration = this.getIterationRuntimeContext().getSuperstepNumber();
        this.failingIteration = ConfigUtils.asInteger("compensation.failingIteration", parameters);
        this.failingWorkers = ConfigUtils.asIntSet("compensation.failingWorker", parameters);
        this.numVertices = ConfigUtils.asLong("pageRank.numVertices", parameters);
        this.numDanglingVertices = ConfigUtils.asLong("pageRank.numDanglingVertices", parameters);
        this.dampingFactor = 0.15000000000000002 / (double)this.numVertices;
        this.aggregator = (PageRankStatsAggregator)this.getIterationRuntimeContext().getIterationAggregator(AGGREGATOR_NAME);
        if (this.currentIteration == 1) {
            this.danglingRankFactor = 0.85 * (double)this.numDanglingVertices / ((double)this.numVertices * (double)this.numVertices);
        } else {
            PageRankStats previousAggregate = (PageRankStats)this.getIterationRuntimeContext().getPreviousIterationAggregate(AGGREGATOR_NAME);
            this.danglingRankFactor = 0.85 * previousAggregate.danglingRank() / (double)this.numVertices;
        }
    }

    public void coGroup(Iterable<VertexWithRankAndDangling> currentPageRankIterable, Iterable<VertexWithRank> partialRanks, Collector<VertexWithRankAndDangling> collector) {
        Iterator<VertexWithRankAndDangling> currentPageRankIterator = currentPageRankIterable.iterator();
        if (!currentPageRankIterator.hasNext()) {
            long missingVertex = partialRanks.iterator().next().getVertexID();
            throw new IllegalStateException("No current page rank for vertex [" + missingVertex + "]!");
        }
        VertexWithRankAndDangling currentPageRank = currentPageRankIterator.next();
        long edges = 0L;
        double summedRank = 0.0;
        for (VertexWithRank pr : partialRanks) {
            summedRank += pr.getRank();
            ++edges;
        }
        double rank = 0.85 * summedRank + this.dampingFactor + this.danglingRankFactor;
        double currentRank = currentPageRank.getRank();
        boolean isDangling = currentPageRank.isDangling();
        double danglingRankToAggregate = isDangling ? rank : 0.0;
        long danglingVerticesToAggregate = isDangling ? 1L : 0L;
        double diff = Math.abs(currentRank - rank);
        this.aggregator.aggregate(diff, rank, danglingRankToAggregate, danglingVerticesToAggregate, 1L, edges, summedRank, 0.0);
        this.accumulator.setVertexID(currentPageRank.getVertexID());
        this.accumulator.setRank(rank);
        this.accumulator.setDangling(isDangling);
        collector.collect((Object)this.accumulator);
    }

    public void close() throws Exception {
        if (this.currentIteration == this.failingIteration && this.failingWorkers.contains(this.workerIndex)) {
            this.aggregator.reset();
        }
    }
}

