package org.apache.flink.test.iterative.nephele.customdanglingpagerank;

import java.util.Random;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.iterative.nephele.ConfigUtils;
import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithAdjacencyList;
import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRank;
import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRankAndDangling;
import org.apache.flink.util.Collector;

/* loaded from: input_file:org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductMatch.class */
public class CustomCompensatableDotProductMatch extends AbstractRichFunction implements FlatJoinFunction<VertexWithRankAndDangling, VertexWithAdjacencyList, VertexWithRank> {
    private static final long serialVersionUID = 1;
    private VertexWithRank record = new VertexWithRank();
    private Random random = new Random();
    private double messageLoss;
    private boolean isFailure;

    public void open(Configuration configuration) throws Exception {
        this.isFailure = getIterationRuntimeContext().getSuperstepNumber() == ConfigUtils.asInteger("compensation.failingIteration", configuration) && ConfigUtils.asIntSet("compensation.failingWorker", configuration).contains(Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()));
        this.messageLoss = ConfigUtils.asDouble("compensation.messageLoss", configuration);
    }

    public void join(VertexWithRankAndDangling vertexWithRankAndDangling, VertexWithAdjacencyList vertexWithAdjacencyList, Collector<VertexWithRank> collector) throws Exception {
        double rank = vertexWithRankAndDangling.getRank();
        long[] targets = vertexWithAdjacencyList.getTargets();
        int numTargets = vertexWithAdjacencyList.getNumTargets();
        this.record.setRank(rank / numTargets);
        for (int i = 0; i < numTargets; i++) {
            this.record.setVertexID(targets[i]);
            if (!this.isFailure) {
                collector.collect(this.record);
            } else if (this.random.nextDouble() >= this.messageLoss) {
                collector.collect(this.record);
            }
        }
    }

    public /* bridge */ /* synthetic */ void join(Object obj, Object obj2, Collector collector) throws Exception {
        join((VertexWithRankAndDangling) obj, (VertexWithAdjacencyList) obj2, (Collector<VertexWithRank>) collector);
    }
}
