/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.iterative.nephele.customdanglingpagerank;

import java.util.Set;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.functions.GenericCollectorMap;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.iterative.nephele.ConfigUtils;
import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRankAndDangling;
import org.apache.flink.test.iterative.nephele.danglingpagerank.PageRankStats;
import org.apache.flink.util.Collector;

public class CustomCompensatingMap
extends AbstractRichFunction
implements GenericCollectorMap<VertexWithRankAndDangling, VertexWithRankAndDangling> {
    private static final long serialVersionUID = 1L;
    private boolean isFailureIteration;
    private boolean isFailingWorker;
    private double uniformRank;
    private double rescaleFactor;

    public void open(Configuration parameters) throws Exception {
        int failingIteration;
        int currentIteration = this.getIterationRuntimeContext().getSuperstepNumber();
        this.isFailureIteration = currentIteration == (failingIteration = ConfigUtils.asInteger("compensation.failingIteration", parameters)) + 1;
        int workerIndex = this.getRuntimeContext().getIndexOfThisSubtask();
        Set<Integer> failingWorkers = ConfigUtils.asIntSet("compensation.failingWorker", parameters);
        this.isFailingWorker = failingWorkers.contains(workerIndex);
        long numVertices = ConfigUtils.asLong("pageRank.numVertices", parameters);
        if (currentIteration > 1) {
            PageRankStats stats = (PageRankStats)this.getIterationRuntimeContext().getPreviousIterationAggregate("pagerank.aggregator");
            this.uniformRank = 1.0 / (double)numVertices;
            double lostMassFactor = (double)(numVertices - stats.numVertices()) / (double)numVertices;
            this.rescaleFactor = (1.0 - lostMassFactor) / stats.rank();
        }
    }

    public void map(VertexWithRankAndDangling pageWithRank, Collector<VertexWithRankAndDangling> out) throws Exception {
        if (this.isFailureIteration) {
            double rank = pageWithRank.getRank();
            if (this.isFailingWorker) {
                pageWithRank.setRank(this.uniformRank);
            } else {
                pageWithRank.setRank(rank * this.rescaleFactor);
            }
        }
        out.collect((Object)pageWithRank);
    }
}

