/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.graph.library.similarity;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.common.operators.base.JoinOperatorBase;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.functions.FunctionAnnotation;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.GroupReduceOperator;
import org.apache.flink.api.java.operators.JoinOperator;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.operators.Operator;
import org.apache.flink.api.java.operators.PartitionOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree;
import org.apache.flink.graph.utils.Murmur3_32;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
import org.apache.flink.types.CopyableValue;
import org.apache.flink.types.FloatValue;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.LongValue;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;

public class AdamicAdar<K extends CopyableValue<K>, VV, EV>
extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
    private static final int GROUP_SIZE = 64;
    private static final String SUM_OF_SCORES_AND_NUMBER_OF_NEIGHBOR_PAIRS = "sum of scores and number of vertices";
    private float minimumScore = 0.0f;
    private float minimumRatio = 0.0f;
    private int littleParallelism = -1;

    public AdamicAdar<K, VV, EV> setMinimumScore(float score) {
        Preconditions.checkArgument((score >= 0.0f ? 1 : 0) != 0, (Object)"Minimum score must be non-negative");
        this.minimumScore = score;
        return this;
    }

    public AdamicAdar<K, VV, EV> setMinimumRatio(float ratio) {
        Preconditions.checkArgument((ratio >= 0.0f ? 1 : 0) != 0, (Object)"Minimum ratio must be non-negative");
        this.minimumRatio = ratio;
        return this;
    }

    public AdamicAdar<K, VV, EV> setLittleParallelism(int littleParallelism) {
        Preconditions.checkArgument((littleParallelism > 0 || littleParallelism == -1 ? 1 : 0) != 0, (Object)"The parallelism must be greater than zero.");
        this.littleParallelism = littleParallelism;
        return this;
    }

    @Override
    protected String getAlgorithmName() {
        return AdamicAdar.class.getName();
    }

    @Override
    protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
        Preconditions.checkNotNull((Object)other);
        if (!AdamicAdar.class.isAssignableFrom(other.getClass())) {
            return false;
        }
        AdamicAdar rhs = (AdamicAdar)other;
        if (this.minimumRatio != rhs.minimumRatio || this.minimumScore != rhs.minimumScore) {
            return false;
        }
        this.littleParallelism = this.littleParallelism == -1 ? rhs.littleParallelism : (rhs.littleParallelism == -1 ? this.littleParallelism : Math.min(this.littleParallelism, rhs.littleParallelism));
        return true;
    }

    @Override
    public DataSet<Result<K>> runInternal(Graph<K, VV, EV> input) throws Exception {
        Operator inverseLogDegree = ((MapOperator)((DataSet)input.run(new VertexDegree().setParallelism(this.littleParallelism))).map(new VertexInverseLogDegree()).setParallelism(this.littleParallelism)).name("Vertex score");
        Operator sourceInverseLogDegree = ((JoinOperator)input.getEdges().join((DataSet)inverseLogDegree, JoinOperatorBase.JoinHint.REPARTITION_HASH_SECOND).where(new int[]{0}).equalTo(new int[]{0}).projectFirst(new int[]{0, 1}).projectSecond(new int[]{2}).setParallelism(this.littleParallelism)).name("Edge score");
        Operator groupSpans = ((GroupReduceOperator)sourceInverseLogDegree.groupBy(new int[]{0}).sortGroup(1, Order.ASCENDING).reduceGroup(new GenerateGroupSpans()).setParallelism(this.littleParallelism)).name("Generate group spans");
        Operator groups = ((FlatMapOperator)((PartitionOperator)((PartitionOperator)groupSpans.rebalance().setParallelism(this.littleParallelism)).name("Rebalance")).flatMap(new GenerateGroups()).setParallelism(this.littleParallelism)).name("Generate groups");
        Operator twoPaths = groups.groupBy(new int[]{0, 1}).sortGroup(2, Order.ASCENDING).reduceGroup(new GenerateGroupPairs()).name("Generate group pairs");
        GroupReduceOperator scores = (GroupReduceOperator)twoPaths.groupBy(new int[]{0, 1}).reduceGroup(new ComputeScores(this.minimumScore, this.minimumRatio)).name("Compute scores");
        if (this.minimumRatio > 0.0f) {
            AggregateOperator sumOfScoresAndNumberOfNeighborPairs = ((MapOperator)((MapOperator)inverseLogDegree.map(new ComputeScoreFromVertex()).setParallelism(this.littleParallelism)).name("Average score")).sum(0).andSum(1);
            scores.withBroadcastSet((DataSet)sumOfScoresAndNumberOfNeighborPairs, SUM_OF_SCORES_AND_NUMBER_OF_NEIGHBOR_PAIRS);
        }
        return scores;
    }

    public static class Result<T>
    extends Edge<T, FloatValue> {
        public static final int HASH_SEED = -469371183;
        private Murmur3_32 hasher = new Murmur3_32(-469371183);

        public Result() {
            this.f2 = new FloatValue();
        }

        public FloatValue getAdamicAdarScore() {
            return (FloatValue)this.f2;
        }

        public String toVerboseString() {
            return "Vertex IDs: (" + this.f0 + ", " + this.f1 + "), adamic-adar score: " + this.getAdamicAdarScore();
        }

        public int hashCode() {
            return this.hasher.reset().hash(this.f0.hashCode()).hash(this.f1.hashCode()).hash(((FloatValue)this.f2).getValue()).hash();
        }
    }

    @FunctionAnnotation.ForwardedFields(value={"0; 1"})
    private static class ComputeScores<T>
    extends RichGroupReduceFunction<Tuple3<T, T, FloatValue>, Result<T>> {
        private float minimumScore;
        private float minimumRatio;
        private Result<T> output = new Result();

        public ComputeScores(float minimumScore, float minimumRatio) {
            this.minimumScore = minimumScore;
            this.minimumRatio = minimumRatio;
        }

        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            if (this.minimumRatio > 0.0f) {
                List var = this.getRuntimeContext().getBroadcastVariable(AdamicAdar.SUM_OF_SCORES_AND_NUMBER_OF_NEIGHBOR_PAIRS);
                Tuple2 sumAndCount = (Tuple2)var.iterator().next();
                float averageScore = ((FloatValue)sumAndCount.f0).getValue() / (float)((LongValue)sumAndCount.f1).getValue();
                this.minimumScore = Math.max(this.minimumScore, averageScore * this.minimumRatio);
            }
        }

        public void reduce(Iterable<Tuple3<T, T, FloatValue>> values, Collector<Result<T>> out) throws Exception {
            float sum = 0.0f;
            Tuple3<T, T, FloatValue> edge = null;
            Iterator<Tuple3<T, T, FloatValue>> i$ = values.iterator();
            while (i$.hasNext()) {
                Tuple3<T, T, FloatValue> next;
                edge = next = i$.next();
                sum += ((FloatValue)next.f2).getValue();
            }
            if (sum >= this.minimumScore) {
                this.output.f0 = edge.f0;
                this.output.f1 = edge.f1;
                ((FloatValue)this.output.f2).setValue(sum);
                out.collect(this.output);
            }
        }
    }

    private static class ComputeScoreFromVertex<T>
    implements MapFunction<Tuple3<T, LongValue, FloatValue>, Tuple2<FloatValue, LongValue>> {
        private FloatValue sumOfScores = new FloatValue();
        private LongValue numberOfNeighborPairs = new LongValue();
        private Tuple2<FloatValue, LongValue> output = new Tuple2((Object)this.sumOfScores, (Object)this.numberOfNeighborPairs);

        private ComputeScoreFromVertex() {
        }

        public Tuple2<FloatValue, LongValue> map(Tuple3<T, LongValue, FloatValue> value) throws Exception {
            long degree = ((LongValue)value.f1).getValue();
            long neighborPairs = degree * (degree - 1L) / 2L;
            this.sumOfScores.setValue(((FloatValue)value.f2).getValue() * (float)neighborPairs);
            this.numberOfNeighborPairs.setValue(neighborPairs);
            return this.output;
        }
    }

    @FunctionAnnotation.ForwardedFields(value={"3->2"})
    private static class GenerateGroupPairs<T extends CopyableValue<T>>
    implements GroupReduceFunction<Tuple4<IntValue, T, T, FloatValue>, Tuple3<T, T, FloatValue>> {
        private Tuple3<T, T, FloatValue> output = new Tuple3();
        private boolean initialized = false;
        private List<T> visited = new ArrayList<T>(64);

        private GenerateGroupPairs() {
        }

        public void reduce(Iterable<Tuple4<IntValue, T, T, FloatValue>> values, Collector<Tuple3<T, T, FloatValue>> out) throws Exception {
            int visitedCount = 0;
            for (Tuple4<IntValue, T, T, FloatValue> edge : values) {
                int i;
                this.output.f1 = edge.f2;
                this.output.f2 = edge.f3;
                for (i = 0; i < visitedCount; ++i) {
                    this.output.f0 = this.visited.get(i);
                    out.collect(this.output);
                }
                if (visitedCount >= 64) continue;
                if (!this.initialized) {
                    this.initialized = true;
                    for (i = 0; i < 64; ++i) {
                        this.visited.add(((CopyableValue)edge.f2).copy());
                    }
                } else {
                    ((CopyableValue)edge.f2).copyTo(this.visited.get(visitedCount));
                }
                ++visitedCount;
            }
        }
    }

    @FunctionAnnotation.ForwardedFields(value={"1; 2; 3"})
    private static class GenerateGroups<T>
    implements FlatMapFunction<Tuple4<IntValue, T, T, FloatValue>, Tuple4<IntValue, T, T, FloatValue>> {
        private GenerateGroups() {
        }

        public void flatMap(Tuple4<IntValue, T, T, FloatValue> value, Collector<Tuple4<IntValue, T, T, FloatValue>> out) throws Exception {
            int spans = ((IntValue)value.f0).getValue();
            for (int idx = 0; idx < spans; ++idx) {
                ((IntValue)value.f0).setValue(idx);
                out.collect(value);
            }
        }
    }

    @FunctionAnnotation.ForwardedFields(value={"0->1; 1->2 ; 2->3"})
    private static class GenerateGroupSpans<T>
    implements GroupReduceFunction<Tuple3<T, T, FloatValue>, Tuple4<IntValue, T, T, FloatValue>> {
        private IntValue groupSpansValue = new IntValue();
        private Tuple4<IntValue, T, T, FloatValue> output = new Tuple4((Object)this.groupSpansValue, null, null, null);

        private GenerateGroupSpans() {
        }

        public void reduce(Iterable<Tuple3<T, T, FloatValue>> values, Collector<Tuple4<IntValue, T, T, FloatValue>> out) throws Exception {
            int groupCount = 0;
            int groupSpans = 1;
            this.groupSpansValue.setValue(groupSpans);
            for (Tuple3<T, T, FloatValue> edge : values) {
                this.output.f1 = edge.f0;
                this.output.f2 = edge.f1;
                this.output.f3 = edge.f2;
                out.collect(this.output);
                if (++groupCount != 64) continue;
                groupCount = 0;
                this.groupSpansValue.setValue(++groupSpans);
            }
        }
    }

    @FunctionAnnotation.ForwardedFields(value={"0; 1"})
    private static class VertexInverseLogDegree<T>
    implements MapFunction<Vertex<T, LongValue>, Tuple3<T, LongValue, FloatValue>> {
        private Tuple3<T, LongValue, FloatValue> output = new Tuple3(null, null, (Object)new FloatValue());

        private VertexInverseLogDegree() {
        }

        public Tuple3<T, LongValue, FloatValue> map(Vertex<T, LongValue> value) throws Exception {
            this.output.f0 = value.f0;
            this.output.f1 = value.f1;
            long degree = ((LongValue)value.f1).getValue();
            float inverseLogDegree = degree == 1L ? 0.0f : 1.0f / (float)Math.log(((LongValue)value.f1).getValue());
            ((FloatValue)this.output.f2).setValue(inverseLogDegree);
            return this.output;
        }
    }
}

