/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.graph.library.linkanalysis;

import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.aggregators.Aggregator;
import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
import org.apache.flink.api.common.aggregators.DoubleSumAggregator;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichJoinFunction;
import org.apache.flink.api.common.operators.base.JoinOperatorBase;
import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.functions.FunctionAnnotation;
import org.apache.flink.api.java.operators.CoGroupOperator;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.operators.JoinOperator;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.operators.Operator;
import org.apache.flink.api.java.operators.ReduceOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.asm.result.PrintableResult;
import org.apache.flink.graph.asm.result.UnaryResultBase;
import org.apache.flink.graph.library.linkanalysis.Functions;
import org.apache.flink.graph.utils.MurmurHash;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
import org.apache.flink.types.DoubleValue;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;

public class HITS<K, VV, EV>
extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
    private static final String CHANGE_IN_SCORES = "change in scores";
    private static final String HUBBINESS_SUM_SQUARED = "hubbiness sum squared";
    private static final String AUTHORITY_SUM_SQUARED = "authority sum squared";
    private int maxIterations;
    private double convergenceThreshold;

    public HITS(int iterations) {
        this(iterations, Double.MAX_VALUE);
    }

    public HITS(double convergenceThreshold) {
        this(Integer.MAX_VALUE, convergenceThreshold);
    }

    public HITS(int maxIterations, double convergenceThreshold) {
        Preconditions.checkArgument((maxIterations > 0 ? 1 : 0) != 0, (Object)"Number of iterations must be greater than zero");
        Preconditions.checkArgument((convergenceThreshold > 0.0 ? 1 : 0) != 0, (Object)"Convergence threshold must be greater than zero");
        this.maxIterations = maxIterations;
        this.convergenceThreshold = convergenceThreshold;
    }

    @Override
    protected void mergeConfiguration(GraphAlgorithmWrappingBase other) {
        super.mergeConfiguration(other);
        HITS rhs = (HITS)other;
        this.maxIterations = Math.max(this.maxIterations, rhs.maxIterations);
        this.convergenceThreshold = Math.min(this.convergenceThreshold, rhs.convergenceThreshold);
    }

    @Override
    public DataSet<Result<K>> runInternal(Graph<K, VV, EV> input) throws Exception {
        Operator passThrough;
        Operator edges = ((MapOperator)input.getEdges().map(new ExtractEdgeIDs()).setParallelism(this.parallelism)).name("Extract edge IDs");
        Operator initialScores = ((ReduceOperator)((MapOperator)((MapOperator)edges.map(new InitializeScores()).setParallelism(this.parallelism)).name("Initial scores")).groupBy(new int[]{0}).reduce(new SumScores()).setCombineHint(ReduceOperatorBase.CombineHint.HASH).setParallelism(this.parallelism)).name("Sum");
        IterativeDataSet iterative = (IterativeDataSet)initialScores.iterate(this.maxIterations).setParallelism(this.parallelism);
        Operator hubbiness = ((ReduceOperator)((CoGroupOperator)((CoGroupOperator)iterative.coGroup((DataSet)edges).where(new int[]{0}).equalTo(new int[]{1}).with(new Hubbiness()).setParallelism(this.parallelism)).name("Hub")).groupBy(new int[]{0}).reduce(new Functions.SumScore()).setCombineHint(ReduceOperatorBase.CombineHint.HASH).setParallelism(this.parallelism)).name("Sum");
        Operator hubbinessSumSquared = ((ReduceOperator)((MapOperator)((MapOperator)hubbiness.map(new Square()).setParallelism(this.parallelism)).name("Square")).reduce((ReduceFunction)new Sum()).setParallelism(this.parallelism)).name("Sum");
        Operator authority = ((ReduceOperator)((CoGroupOperator)((CoGroupOperator)hubbiness.coGroup((DataSet)edges).where(new int[]{0}).equalTo(new int[]{0}).with(new Authority()).setParallelism(this.parallelism)).name("Authority")).groupBy(new int[]{0}).reduce(new Functions.SumScore()).setCombineHint(ReduceOperatorBase.CombineHint.HASH).setParallelism(this.parallelism)).name("Sum");
        Operator authoritySumSquared = ((ReduceOperator)((MapOperator)((MapOperator)authority.map(new Square()).setParallelism(this.parallelism)).name("Square")).reduce((ReduceFunction)new Sum()).setParallelism(this.parallelism)).name("Sum");
        Operator scores = ((JoinOperator)((JoinOperator)((JoinOperator)hubbiness.fullOuterJoin((DataSet)authority, JoinOperatorBase.JoinHint.REPARTITION_SORT_MERGE).where(new int[]{0}).equalTo(new int[]{0}).with(new JoinAndNormalizeHubAndAuthority()).withBroadcastSet((DataSet)hubbinessSumSquared, HUBBINESS_SUM_SQUARED)).withBroadcastSet((DataSet)authoritySumSquared, AUTHORITY_SUM_SQUARED)).setParallelism(this.parallelism)).name("Join scores");
        if (this.convergenceThreshold < Double.MAX_VALUE) {
            passThrough = ((JoinOperator)iterative.fullOuterJoin((DataSet)scores, JoinOperatorBase.JoinHint.REPARTITION_SORT_MERGE).where(new int[]{0}).equalTo(new int[]{0}).with(new ChangeInScores()).setParallelism(this.parallelism)).name("Change in scores");
            iterative.registerAggregationConvergenceCriterion(CHANGE_IN_SCORES, (Aggregator)new DoubleSumAggregator(), (ConvergenceCriterion)new ScoreConvergence(this.convergenceThreshold));
        } else {
            passThrough = scores;
        }
        return ((MapOperator)iterative.closeWith((DataSet)passThrough).map(new TranslateResult()).setParallelism(this.parallelism)).name("Map result");
    }

    public static class Result<T>
    extends UnaryResultBase<T>
    implements PrintableResult {
        private DoubleValue hubScore;
        private DoubleValue authorityScore;
        public static final int HASH_SEED = 1074835241;
        private transient MurmurHash hasher;

        public DoubleValue getHubScore() {
            return this.hubScore;
        }

        public void setHubScore(DoubleValue hubScore) {
            this.hubScore = hubScore;
        }

        public DoubleValue getAuthorityScore() {
            return this.authorityScore;
        }

        public void setAuthorityScore(DoubleValue authorityScore) {
            this.authorityScore = authorityScore;
        }

        @Override
        public String toString() {
            return "(" + this.getVertexId0() + "," + this.hubScore + "," + this.authorityScore + ")";
        }

        @Override
        public String toPrintableString() {
            return "Vertex ID: " + this.getVertexId0() + ", hub score: " + this.hubScore + ", authority score: " + this.authorityScore;
        }

        public int hashCode() {
            if (this.hasher == null) {
                this.hasher = new MurmurHash(1074835241);
            }
            return this.hasher.reset().hash(this.getVertexId0().hashCode()).hash(this.hubScore.getValue()).hash(this.authorityScore.getValue()).hash();
        }
    }

    @FunctionAnnotation.ForwardedFields(value={"0->vertexId0; 1->hubScore; 2->authorityScore"})
    private static class TranslateResult<T>
    implements MapFunction<Tuple3<T, DoubleValue, DoubleValue>, Result<T>> {
        private Result<T> output = new Result();

        private TranslateResult() {
        }

        public Result<T> map(Tuple3<T, DoubleValue, DoubleValue> value) throws Exception {
            this.output.setVertexId0(value.f0);
            this.output.setHubScore((DoubleValue)value.f1);
            this.output.setAuthorityScore((DoubleValue)value.f2);
            return this.output;
        }
    }

    private static class ScoreConvergence
    implements ConvergenceCriterion<DoubleValue> {
        private double convergenceThreshold;

        public ScoreConvergence(double convergenceThreshold) {
            this.convergenceThreshold = convergenceThreshold;
        }

        public boolean isConverged(int iteration, DoubleValue value) {
            double val = value.getValue();
            return 0.0 <= val && val <= this.convergenceThreshold;
        }
    }

    @FunctionAnnotation.ForwardedFieldsFirst(value={"0"})
    @FunctionAnnotation.ForwardedFieldsSecond(value={"*"})
    private static class ChangeInScores<T>
    extends RichJoinFunction<Tuple3<T, DoubleValue, DoubleValue>, Tuple3<T, DoubleValue, DoubleValue>, Tuple3<T, DoubleValue, DoubleValue>> {
        private boolean isInitialSuperstep;
        private double changeInScores;

        private ChangeInScores() {
        }

        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            this.isInitialSuperstep = this.getIterationRuntimeContext().getSuperstepNumber() == 1;
            this.changeInScores = this.isInitialSuperstep ? -1.0 : 0.0;
        }

        public void close() throws Exception {
            super.close();
            DoubleSumAggregator agg = (DoubleSumAggregator)this.getIterationRuntimeContext().getIterationAggregator(HITS.CHANGE_IN_SCORES);
            agg.aggregate(this.changeInScores);
        }

        public Tuple3<T, DoubleValue, DoubleValue> join(Tuple3<T, DoubleValue, DoubleValue> first, Tuple3<T, DoubleValue, DoubleValue> second) throws Exception {
            if (!this.isInitialSuperstep) {
                this.changeInScores += Math.abs(((DoubleValue)second.f1).getValue() - ((DoubleValue)first.f1).getValue());
                this.changeInScores += Math.abs(((DoubleValue)second.f2).getValue() - ((DoubleValue)first.f2).getValue());
            }
            return second;
        }
    }

    @FunctionAnnotation.ForwardedFieldsFirst(value={"0"})
    @FunctionAnnotation.ForwardedFieldsSecond(value={"0"})
    private static class JoinAndNormalizeHubAndAuthority<T>
    extends RichJoinFunction<Tuple2<T, DoubleValue>, Tuple2<T, DoubleValue>, Tuple3<T, DoubleValue, DoubleValue>> {
        private Tuple3<T, DoubleValue, DoubleValue> output = new Tuple3(null, (Object)new DoubleValue(), (Object)new DoubleValue());
        private double hubbinessRootSumSquared;
        private double authorityRootSumSquared;

        private JoinAndNormalizeHubAndAuthority() {
        }

        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            List hubbinessSumSquared = this.getRuntimeContext().getBroadcastVariable(HITS.HUBBINESS_SUM_SQUARED);
            Iterator hubbinessSumSquaredIterator = hubbinessSumSquared.iterator();
            this.hubbinessRootSumSquared = hubbinessSumSquaredIterator.hasNext() ? Math.sqrt(((DoubleValue)hubbinessSumSquaredIterator.next()).getValue()) : Double.NaN;
            List authoritySumSquared = this.getRuntimeContext().getBroadcastVariable(HITS.AUTHORITY_SUM_SQUARED);
            Iterator authoritySumSquaredIterator = authoritySumSquared.iterator();
            this.authorityRootSumSquared = authoritySumSquaredIterator.hasNext() ? Math.sqrt(((DoubleValue)authoritySumSquaredIterator.next()).getValue()) : Double.NaN;
        }

        public Tuple3<T, DoubleValue, DoubleValue> join(Tuple2<T, DoubleValue> hubbiness, Tuple2<T, DoubleValue> authority) throws Exception {
            this.output.f0 = authority == null ? hubbiness.f0 : authority.f0;
            ((DoubleValue)this.output.f1).setValue(hubbiness == null ? 0.0 : ((DoubleValue)hubbiness.f1).getValue() / this.hubbinessRootSumSquared);
            ((DoubleValue)this.output.f2).setValue(authority == null ? 0.0 : ((DoubleValue)authority.f1).getValue() / this.authorityRootSumSquared);
            return this.output;
        }
    }

    private static class Sum
    implements ReduceFunction<DoubleValue> {
        private Sum() {
        }

        public DoubleValue reduce(DoubleValue first, DoubleValue second) throws Exception {
            first.setValue(first.getValue() + second.getValue());
            return first;
        }
    }

    private static class Square<T>
    implements MapFunction<Tuple2<T, DoubleValue>, DoubleValue> {
        private DoubleValue output = new DoubleValue();

        private Square() {
        }

        public DoubleValue map(Tuple2<T, DoubleValue> value) throws Exception {
            double val = ((DoubleValue)value.f1).getValue();
            this.output.setValue(val * val);
            return this.output;
        }
    }

    @FunctionAnnotation.ForwardedFieldsFirst(value={"1"})
    @FunctionAnnotation.ForwardedFieldsSecond(value={"1->0"})
    private static class Authority<T>
    implements CoGroupFunction<Tuple2<T, DoubleValue>, Tuple2<T, T>, Tuple2<T, DoubleValue>> {
        private Tuple2<T, DoubleValue> output = new Tuple2();

        private Authority() {
        }

        public void coGroup(Iterable<Tuple2<T, DoubleValue>> vertex, Iterable<Tuple2<T, T>> edges, Collector<Tuple2<T, DoubleValue>> out) throws Exception {
            this.output.f1 = vertex.iterator().next().f1;
            for (Tuple2<T, T> edge : edges) {
                this.output.f0 = edge.f1;
                out.collect(this.output);
            }
        }
    }

    @FunctionAnnotation.ForwardedFieldsFirst(value={"2->1"})
    @FunctionAnnotation.ForwardedFieldsSecond(value={"0"})
    private static class Hubbiness<T>
    implements CoGroupFunction<Tuple3<T, DoubleValue, DoubleValue>, Tuple2<T, T>, Tuple2<T, DoubleValue>> {
        private Tuple2<T, DoubleValue> output = new Tuple2();

        private Hubbiness() {
        }

        public void coGroup(Iterable<Tuple3<T, DoubleValue, DoubleValue>> vertex, Iterable<Tuple2<T, T>> edges, Collector<Tuple2<T, DoubleValue>> out) throws Exception {
            this.output.f1 = vertex.iterator().next().f2;
            for (Tuple2<T, T> edge : edges) {
                this.output.f0 = edge.f0;
                out.collect(this.output);
            }
        }
    }

    @FunctionAnnotation.ForwardedFields(value={"0"})
    private static class SumScores<T>
    implements ReduceFunction<Tuple3<T, DoubleValue, DoubleValue>> {
        private SumScores() {
        }

        public Tuple3<T, DoubleValue, DoubleValue> reduce(Tuple3<T, DoubleValue, DoubleValue> left, Tuple3<T, DoubleValue, DoubleValue> right) throws Exception {
            ((DoubleValue)left.f1).setValue(((DoubleValue)left.f1).getValue() + ((DoubleValue)right.f1).getValue());
            ((DoubleValue)left.f2).setValue(((DoubleValue)left.f2).getValue() + ((DoubleValue)right.f2).getValue());
            return left;
        }
    }

    @FunctionAnnotation.ForwardedFields(value={"1->0"})
    private static class InitializeScores<T>
    implements MapFunction<Tuple2<T, T>, Tuple3<T, DoubleValue, DoubleValue>> {
        private Tuple3<T, DoubleValue, DoubleValue> output = new Tuple3(null, (Object)new DoubleValue(0.0), (Object)new DoubleValue(1.0));

        private InitializeScores() {
        }

        public Tuple3<T, DoubleValue, DoubleValue> map(Tuple2<T, T> value) throws Exception {
            this.output.f0 = value.f1;
            return this.output;
        }
    }

    @FunctionAnnotation.ForwardedFields(value={"0; 1"})
    private static class ExtractEdgeIDs<T, ET>
    implements MapFunction<Edge<T, ET>, Tuple2<T, T>> {
        private Tuple2<T, T> output = new Tuple2();

        private ExtractEdgeIDs() {
        }

        public Tuple2<T, T> map(Edge<T, ET> value) throws Exception {
            this.output.f0 = value.f0;
            this.output.f1 = value.f1;
            return this.output;
        }
    }
}

