/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.graph.gsa;

import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.aggregators.Aggregator;
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.functions.RichFlatJoinFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.functions.FunctionAnnotation;
import org.apache.flink.api.java.operators.CustomUnaryOperation;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.operators.JoinOperator;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.operators.ReduceOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.EdgeDirection;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.gsa.ApplyFunction;
import org.apache.flink.graph.gsa.GSAConfiguration;
import org.apache.flink.graph.gsa.GatherFunction;
import org.apache.flink.graph.gsa.Neighbor;
import org.apache.flink.graph.gsa.SumFunction;
import org.apache.flink.graph.utils.GraphUtils;
import org.apache.flink.types.LongValue;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;

public class GatherSumApplyIteration<K, VV, EV, M>
implements CustomUnaryOperation<Vertex<K, VV>, Vertex<K, VV>> {
    private DataSet<Vertex<K, VV>> vertexDataSet;
    private DataSet<Edge<K, EV>> edgeDataSet;
    private final GatherFunction<VV, EV, M> gather;
    private final SumFunction<VV, EV, M> sum;
    private final ApplyFunction<K, VV, M> apply;
    private final int maximumNumberOfIterations;
    private EdgeDirection direction = EdgeDirection.OUT;
    private GSAConfiguration configuration;

    private GatherSumApplyIteration(GatherFunction<VV, EV, M> gather, SumFunction<VV, EV, M> sum, ApplyFunction<K, VV, M> apply, DataSet<Edge<K, EV>> edges, int maximumNumberOfIterations) {
        Preconditions.checkNotNull(gather);
        Preconditions.checkNotNull(sum);
        Preconditions.checkNotNull(apply);
        Preconditions.checkNotNull(edges);
        Preconditions.checkArgument((maximumNumberOfIterations > 0 ? 1 : 0) != 0, (Object)"The maximum number of iterations must be at least one.");
        this.gather = gather;
        this.sum = sum;
        this.apply = apply;
        this.edgeDataSet = edges;
        this.maximumNumberOfIterations = maximumNumberOfIterations;
    }

    public void setInput(DataSet<Vertex<K, VV>> dataSet) {
        this.vertexDataSet = dataSet;
    }

    public DataSet<Vertex<K, VV>> createResult() {
        JoinOperator.EquiJoin neighbors;
        if (this.vertexDataSet == null) {
            throw new IllegalStateException("The input data set has not been set.");
        }
        TypeInformation keyType = ((TupleTypeInfo)this.vertexDataSet.getType()).getTypeAt(0);
        TypeInformation messageType = TypeExtractor.createTypeInfo(this.gather, GatherFunction.class, this.gather.getClass(), (int)2);
        TupleTypeInfo innerType = new TupleTypeInfo(new TypeInformation[]{keyType, messageType});
        TypeInformation outputType = this.vertexDataSet.getType();
        DataSet<LongValue> numberOfVertices = null;
        if (this.configuration != null && this.configuration.isOptNumVertices()) {
            try {
                numberOfVertices = GraphUtils.count(this.vertexDataSet);
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
        GatherUdf gatherUdf = new GatherUdf(this.gather, (TypeInformation)innerType);
        SumUdf sumUdf = new SumUdf(this.sum, (TypeInformation)innerType);
        ApplyUdf applyUdf = new ApplyUdf(this.apply, outputType);
        int[] zeroKeyPos = new int[]{0};
        DeltaIteration iteration = this.vertexDataSet.iterateDelta(this.vertexDataSet, this.maximumNumberOfIterations, zeroKeyPos);
        if (this.configuration != null) {
            iteration.name(this.configuration.getName("Gather-sum-apply iteration (" + this.gather + " | " + this.sum + " | " + this.apply + ")"));
            iteration.parallelism(this.configuration.getParallelism());
            iteration.setSolutionSetUnManaged(this.configuration.isSolutionSetUnmanagedMemory());
            for (Map.Entry<String, Aggregator<?>> entry : this.configuration.getAggregators().entrySet()) {
                iteration.registerAggregator(entry.getKey(), entry.getValue());
            }
        } else {
            iteration.name("Gather-sum-apply iteration (" + this.gather + " | " + this.sum + " | " + this.apply + ")");
        }
        if (this.configuration != null) {
            this.direction = this.configuration.getDirection();
        }
        switch (this.direction) {
            case OUT: {
                neighbors = iteration.getWorkset().join(this.edgeDataSet).where(new int[]{0}).equalTo(new int[]{0}).with(new ProjectKeyWithNeighborOUT());
                break;
            }
            case IN: {
                neighbors = iteration.getWorkset().join(this.edgeDataSet).where(new int[]{0}).equalTo(new int[]{1}).with(new ProjectKeyWithNeighborIN());
                break;
            }
            case ALL: {
                neighbors = iteration.getWorkset().join(this.edgeDataSet).where(new int[]{0}).equalTo(new int[]{0}).with(new ProjectKeyWithNeighborOUT()).union((DataSet)iteration.getWorkset().join(this.edgeDataSet).where(new int[]{0}).equalTo(new int[]{1}).with(new ProjectKeyWithNeighborIN()));
                break;
            }
            default: {
                neighbors = iteration.getWorkset().join(this.edgeDataSet).where(new int[]{0}).equalTo(new int[]{0}).with(new ProjectKeyWithNeighborOUT());
            }
        }
        MapOperator gatherMapOperator = neighbors.map(gatherUdf);
        gatherMapOperator = (MapOperator)gatherMapOperator.name("Gather");
        if (this.configuration != null) {
            for (Tuple2<String, DataSet<?>> e : this.configuration.getGatherBcastVars()) {
                gatherMapOperator = (MapOperator)gatherMapOperator.withBroadcastSet((DataSet)e.f1, (String)e.f0);
            }
            if (this.configuration.isOptNumVertices()) {
                gatherMapOperator = (MapOperator)gatherMapOperator.withBroadcastSet(numberOfVertices, "number of vertices");
            }
        }
        MapOperator gatheredSet = gatherMapOperator;
        ReduceOperator sumReduceOperator = gatheredSet.groupBy(new int[]{0}).reduce(sumUdf);
        sumReduceOperator = (ReduceOperator)sumReduceOperator.name("Sum");
        if (this.configuration != null) {
            for (Tuple2<String, DataSet<?>> e : this.configuration.getSumBcastVars()) {
                sumReduceOperator = (ReduceOperator)sumReduceOperator.withBroadcastSet((DataSet)e.f1, (String)e.f0);
            }
            if (this.configuration.isOptNumVertices()) {
                sumReduceOperator = (ReduceOperator)sumReduceOperator.withBroadcastSet(numberOfVertices, "number of vertices");
            }
        }
        ReduceOperator summedSet = sumReduceOperator;
        JoinOperator.EquiJoin appliedSet = summedSet.join((DataSet)iteration.getSolutionSet()).where(new int[]{0}).equalTo(new int[]{0}).with(applyUdf);
        appliedSet = (JoinOperator)appliedSet.name("Apply");
        if (this.configuration != null) {
            for (Tuple2<String, DataSet<?>> e : this.configuration.getApplyBcastVars()) {
                appliedSet = (JoinOperator)appliedSet.withBroadcastSet((DataSet)e.f1, (String)e.f0);
            }
            if (this.configuration.isOptNumVertices()) {
                appliedSet = (JoinOperator)appliedSet.withBroadcastSet(numberOfVertices, "number of vertices");
            }
        }
        ((JoinOperator)appliedSet.withForwardedFieldsFirst(new String[]{"0"})).withForwardedFieldsSecond(new String[]{"0"});
        return iteration.closeWith((DataSet)appliedSet, (DataSet)appliedSet);
    }

    public static <K, VV, EV, M> GatherSumApplyIteration<K, VV, EV, M> withEdges(DataSet<Edge<K, EV>> edges, GatherFunction<VV, EV, M> gather, SumFunction<VV, EV, M> sum, ApplyFunction<K, VV, M> apply, int maximumNumberOfIterations) {
        return new GatherSumApplyIteration<K, VV, EV, M>(gather, sum, apply, edges, maximumNumberOfIterations);
    }

    public void configure(GSAConfiguration parameters) {
        this.configuration = parameters;
    }

    public GSAConfiguration getIterationConfiguration() {
        return this.configuration;
    }

    @FunctionAnnotation.ForwardedFieldsSecond(value={"f0"})
    private static final class ProjectKeyWithNeighborIN<K, VV, EV>
    implements FlatJoinFunction<Vertex<K, VV>, Edge<K, EV>, Tuple2<K, Neighbor<VV, EV>>> {
        private ProjectKeyWithNeighborIN() {
        }

        public void join(Vertex<K, VV> vertex, Edge<K, EV> edge, Collector<Tuple2<K, Neighbor<VV, EV>>> out) {
            out.collect((Object)new Tuple2(edge.getSource(), new Neighbor<VV, EV>(vertex.getValue(), edge.getValue())));
        }
    }

    @FunctionAnnotation.ForwardedFieldsSecond(value={"f1->f0"})
    private static final class ProjectKeyWithNeighborOUT<K, VV, EV>
    implements FlatJoinFunction<Vertex<K, VV>, Edge<K, EV>, Tuple2<K, Neighbor<VV, EV>>> {
        private ProjectKeyWithNeighborOUT() {
        }

        public void join(Vertex<K, VV> vertex, Edge<K, EV> edge, Collector<Tuple2<K, Neighbor<VV, EV>>> out) {
            out.collect((Object)new Tuple2(edge.getTarget(), new Neighbor<VV, EV>(vertex.getValue(), edge.getValue())));
        }
    }

    private static final class ApplyUdf<K, VV, EV, M>
    extends RichFlatJoinFunction<Tuple2<K, M>, Vertex<K, VV>, Vertex<K, VV>>
    implements ResultTypeQueryable<Vertex<K, VV>> {
        private final ApplyFunction<K, VV, M> applyFunction;
        private transient TypeInformation<Vertex<K, VV>> resultType;

        private ApplyUdf(ApplyFunction<K, VV, M> applyFunction, TypeInformation<Vertex<K, VV>> resultType) {
            this.applyFunction = applyFunction;
            this.resultType = resultType;
        }

        public void join(Tuple2<K, M> newValue, Vertex<K, VV> currentValue, Collector<Vertex<K, VV>> out) throws Exception {
            this.applyFunction.setOutput(currentValue, out);
            this.applyFunction.apply(newValue.f1, currentValue.getValue());
        }

        public void open(Configuration parameters) throws Exception {
            if (this.getRuntimeContext().hasBroadcastVariable("number of vertices")) {
                List numberOfVertices = this.getRuntimeContext().getBroadcastVariable("number of vertices");
                this.applyFunction.setNumberOfVertices(((LongValue)numberOfVertices.iterator().next()).getValue());
            }
            if (this.getIterationRuntimeContext().getSuperstepNumber() == 1) {
                this.applyFunction.init(this.getIterationRuntimeContext());
            }
            this.applyFunction.preSuperstep();
        }

        public void close() throws Exception {
            this.applyFunction.postSuperstep();
        }

        public TypeInformation<Vertex<K, VV>> getProducedType() {
            return this.resultType;
        }
    }

    private static final class SumUdf<K, VV, EV, M>
    extends RichReduceFunction<Tuple2<K, M>>
    implements ResultTypeQueryable<Tuple2<K, M>> {
        private final SumFunction<VV, EV, M> sumFunction;
        private transient TypeInformation<Tuple2<K, M>> resultType;

        private SumUdf(SumFunction<VV, EV, M> sumFunction, TypeInformation<Tuple2<K, M>> resultType) {
            this.sumFunction = sumFunction;
            this.resultType = resultType;
        }

        public Tuple2<K, M> reduce(Tuple2<K, M> arg0, Tuple2<K, M> arg1) throws Exception {
            Object result = this.sumFunction.sum(arg0.f1, arg1.f1);
            if (result == arg1.f1) {
                Object tmp = arg1.f1;
                arg1.f1 = arg0.f1;
                arg0.f1 = tmp;
            } else {
                arg0.f1 = result;
            }
            return arg0;
        }

        public void open(Configuration parameters) throws Exception {
            if (this.getRuntimeContext().hasBroadcastVariable("number of vertices")) {
                List numberOfVertices = this.getRuntimeContext().getBroadcastVariable("number of vertices");
                this.sumFunction.setNumberOfVertices(((LongValue)numberOfVertices.iterator().next()).getValue());
            }
            if (this.getIterationRuntimeContext().getSuperstepNumber() == 1) {
                this.sumFunction.init(this.getIterationRuntimeContext());
            }
            this.sumFunction.preSuperstep();
        }

        public void close() throws Exception {
            this.sumFunction.postSuperstep();
        }

        public TypeInformation<Tuple2<K, M>> getProducedType() {
            return this.resultType;
        }
    }

    @FunctionAnnotation.ForwardedFields(value={"f0"})
    private static final class GatherUdf<K, VV, EV, M>
    extends RichMapFunction<Tuple2<K, Neighbor<VV, EV>>, Tuple2<K, M>>
    implements ResultTypeQueryable<Tuple2<K, M>> {
        private final GatherFunction<VV, EV, M> gatherFunction;
        private transient TypeInformation<Tuple2<K, M>> resultType;

        private GatherUdf(GatherFunction<VV, EV, M> gatherFunction, TypeInformation<Tuple2<K, M>> resultType) {
            this.gatherFunction = gatherFunction;
            this.resultType = resultType;
        }

        public Tuple2<K, M> map(Tuple2<K, Neighbor<VV, EV>> neighborTuple) {
            M result = this.gatherFunction.gather((Neighbor)((Object)neighborTuple.f1));
            return new Tuple2(neighborTuple.f0, result);
        }

        public void open(Configuration parameters) throws Exception {
            if (this.getRuntimeContext().hasBroadcastVariable("number of vertices")) {
                List numberOfVertices = this.getRuntimeContext().getBroadcastVariable("number of vertices");
                this.gatherFunction.setNumberOfVertices(((LongValue)numberOfVertices.iterator().next()).getValue());
            }
            if (this.getIterationRuntimeContext().getSuperstepNumber() == 1) {
                this.gatherFunction.init(this.getIterationRuntimeContext());
            }
            this.gatherFunction.preSuperstep();
        }

        public void close() throws Exception {
            this.gatherFunction.postSuperstep();
        }

        public TypeInformation<Tuple2<K, M>> getProducedType() {
            return this.resultType;
        }
    }
}

