/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.graph.spargel;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.aggregators.Aggregator;
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichCoGroupFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.CoGroupOperator;
import org.apache.flink.api.java.operators.CustomUnaryOperation;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.operators.JoinOperator;
import org.apache.flink.api.java.operators.TwoInputUdfOperator;
import org.apache.flink.api.java.operators.UnionOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.EdgeDirection;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.spargel.GatherFunction;
import org.apache.flink.graph.spargel.MessageIterator;
import org.apache.flink.graph.spargel.ScatterFunction;
import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
import org.apache.flink.graph.utils.GraphUtils;
import org.apache.flink.types.LongValue;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;

public class ScatterGatherIteration<K, VV, Message, EV>
implements CustomUnaryOperation<Vertex<K, VV>, Vertex<K, VV>> {
    private final ScatterFunction<K, VV, Message, EV> scatterFunction;
    private final GatherFunction<K, VV, Message> gatherFunction;
    private final DataSet<Edge<K, EV>> edgesWithValue;
    private final int maximumNumberOfIterations;
    private final TypeInformation<Message> messageType;
    private DataSet<Vertex<K, VV>> initialVertices;
    private ScatterGatherConfiguration configuration;

    private ScatterGatherIteration(ScatterFunction<K, VV, Message, EV> sf, GatherFunction<K, VV, Message> gf, DataSet<Edge<K, EV>> edgesWithValue, int maximumNumberOfIterations) {
        Preconditions.checkNotNull(sf);
        Preconditions.checkNotNull(gf);
        Preconditions.checkNotNull(edgesWithValue);
        Preconditions.checkArgument((maximumNumberOfIterations > 0 ? 1 : 0) != 0, (Object)"The maximum number of iterations must be at least one.");
        this.scatterFunction = sf;
        this.gatherFunction = gf;
        this.edgesWithValue = edgesWithValue;
        this.maximumNumberOfIterations = maximumNumberOfIterations;
        this.messageType = this.getMessageType(sf);
    }

    private TypeInformation<Message> getMessageType(ScatterFunction<K, VV, Message, EV> mf) {
        return TypeExtractor.createTypeInfo(mf, ScatterFunction.class, mf.getClass(), (int)2);
    }

    public void setInput(DataSet<Vertex<K, VV>> inputData) {
        this.initialVertices = inputData;
    }

    public DataSet<Vertex<K, VV>> createResult() {
        if (this.initialVertices == null) {
            throw new IllegalStateException("The input data set has not been set.");
        }
        TypeInformation keyType = ((TupleTypeInfo)this.initialVertices.getType()).getTypeAt(0);
        TupleTypeInfo messageTypeInfo = new TupleTypeInfo(new TypeInformation[]{keyType, this.messageType});
        Graph<K, VV, EV> graph = Graph.fromDataSet(this.initialVertices, this.edgesWithValue, this.initialVertices.getExecutionEnvironment());
        DataSet<LongValue> numberOfVertices = null;
        if (this.configuration != null && this.configuration.isOptNumVertices()) {
            try {
                numberOfVertices = GraphUtils.count(this.initialVertices);
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
        if (this.configuration != null) {
            this.scatterFunction.setDirection(this.configuration.getDirection());
        } else {
            this.scatterFunction.setDirection(EdgeDirection.OUT);
        }
        EdgeDirection messagingDirection = this.scatterFunction.getDirection();
        if (this.configuration != null && this.configuration.isOptDegrees()) {
            return this.createResultVerticesWithDegrees(graph, messagingDirection, (TypeInformation<Tuple2<K, Message>>)messageTypeInfo, numberOfVertices);
        }
        return this.createResultSimpleVertex(messagingDirection, (TypeInformation<Tuple2<K, Message>>)messageTypeInfo, numberOfVertices);
    }

    public static <K, VV, Message, EV> ScatterGatherIteration<K, VV, Message, EV> withEdges(DataSet<Edge<K, EV>> edgesWithValue, ScatterFunction<K, VV, Message, EV> sf, GatherFunction<K, VV, Message> gf, int maximumNumberOfIterations) {
        return new ScatterGatherIteration<K, VV, Message, EV>(sf, gf, edgesWithValue, maximumNumberOfIterations);
    }

    public void configure(ScatterGatherConfiguration parameters) {
        this.configuration = parameters;
    }

    public ScatterGatherConfiguration getIterationConfiguration() {
        return this.configuration;
    }

    private CoGroupOperator<?, ?, Tuple2<K, Message>> buildScatterFunction(DeltaIteration<Vertex<K, VV>, Vertex<K, VV>> iteration, TypeInformation<Tuple2<K, Message>> messageTypeInfo, int whereArg, int equalToArg, DataSet<LongValue> numberOfVertices) {
        ScatterUdfWithEVsSimpleVV messenger = new ScatterUdfWithEVsSimpleVV(this.scatterFunction, messageTypeInfo);
        CoGroupOperator messages = this.edgesWithValue.coGroup((DataSet)iteration.getWorkset()).where(new int[]{whereArg}).equalTo(new int[]{equalToArg}).with(messenger);
        messages = (CoGroupOperator)messages.name("Messaging");
        if (this.configuration != null) {
            for (Tuple2<String, DataSet<?>> e : this.configuration.getScatterBcastVars()) {
                messages = (CoGroupOperator)messages.withBroadcastSet((DataSet)e.f1, (String)e.f0);
            }
            if (this.configuration.isOptNumVertices()) {
                messages = (CoGroupOperator)messages.withBroadcastSet(numberOfVertices, "number of vertices");
            }
        }
        return messages;
    }

    private CoGroupOperator<?, ?, Tuple2<K, Message>> buildScatterFunctionVerticesWithDegrees(DeltaIteration<Vertex<K, Tuple3<VV, LongValue, LongValue>>, Vertex<K, Tuple3<VV, LongValue, LongValue>>> iteration, TypeInformation<Tuple2<K, Message>> messageTypeInfo, int whereArg, int equalToArg, DataSet<LongValue> numberOfVertices) {
        ScatterUdfWithEVsVVWithDegrees messenger = new ScatterUdfWithEVsVVWithDegrees(this.scatterFunction, messageTypeInfo);
        CoGroupOperator messages = this.edgesWithValue.coGroup((DataSet)iteration.getWorkset()).where(new int[]{whereArg}).equalTo(new int[]{equalToArg}).with(messenger);
        messages = (CoGroupOperator)messages.name("Messaging");
        if (this.configuration != null) {
            for (Tuple2<String, DataSet<?>> e : this.configuration.getScatterBcastVars()) {
                messages = (CoGroupOperator)messages.withBroadcastSet((DataSet)e.f1, (String)e.f0);
            }
            if (this.configuration.isOptNumVertices()) {
                messages = (CoGroupOperator)messages.withBroadcastSet(numberOfVertices, "number of vertices");
            }
        }
        return messages;
    }

    private void setUpIteration(DeltaIteration<?, ?> iteration) {
        if (this.configuration != null) {
            iteration.name(this.configuration.getName("Scatter-gather iteration (" + this.gatherFunction + " | " + this.scatterFunction + ")"));
            iteration.parallelism(this.configuration.getParallelism());
            iteration.setSolutionSetUnManaged(this.configuration.isSolutionSetUnmanagedMemory());
            for (Map.Entry<String, Aggregator<?>> entry : this.configuration.getAggregators().entrySet()) {
                iteration.registerAggregator(entry.getKey(), entry.getValue());
            }
        } else {
            iteration.name("Scatter-gather iteration (" + this.gatherFunction + " | " + this.scatterFunction + ")");
        }
    }

    private DataSet<Vertex<K, VV>> createResultSimpleVertex(EdgeDirection messagingDirection, TypeInformation<Tuple2<K, Message>> messageTypeInfo, DataSet<LongValue> numberOfVertices) {
        UnionOperator messages;
        TypeInformation vertexTypes = this.initialVertices.getType();
        DeltaIteration iteration = this.initialVertices.iterateDelta(this.initialVertices, this.maximumNumberOfIterations, new int[]{0});
        this.setUpIteration(iteration);
        switch (messagingDirection) {
            case IN: {
                messages = this.buildScatterFunction(iteration, messageTypeInfo, 1, 0, numberOfVertices);
                break;
            }
            case OUT: {
                messages = this.buildScatterFunction(iteration, messageTypeInfo, 0, 0, numberOfVertices);
                break;
            }
            case ALL: {
                messages = this.buildScatterFunction(iteration, messageTypeInfo, 1, 0, numberOfVertices).union(this.buildScatterFunction(iteration, messageTypeInfo, 0, 0, numberOfVertices));
                break;
            }
            default: {
                throw new IllegalArgumentException("Illegal edge direction");
            }
        }
        GatherUdfSimpleVV updateUdf = new GatherUdfSimpleVV(this.gatherFunction, vertexTypes);
        CoGroupOperator updates = messages.coGroup((DataSet)iteration.getSolutionSet()).where(new int[]{0}).equalTo(new int[]{0}).with(updateUdf);
        if (this.configuration != null && this.configuration.isOptNumVertices()) {
            updates = (CoGroupOperator)updates.withBroadcastSet(numberOfVertices, "number of vertices");
        }
        this.configureUpdateFunction(updates);
        return iteration.closeWith((DataSet)updates, (DataSet)updates);
    }

    private DataSet<Vertex<K, VV>> createResultVerticesWithDegrees(Graph<K, VV, EV> graph, EdgeDirection messagingDirection, TypeInformation<Tuple2<K, Message>> messageTypeInfo, DataSet<LongValue> numberOfVertices) {
        UnionOperator messages;
        this.gatherFunction.setOptDegrees(this.configuration.isOptDegrees());
        DataSet<Tuple2<K, LongValue>> inDegrees2 = graph.inDegrees();
        DataSet<Tuple2<K, LongValue>> outDegrees2 = graph.outDegrees();
        TwoInputUdfOperator degrees = ((JoinOperator)inDegrees2.join(outDegrees2).where(new int[]{0}).equalTo(new int[]{0}).with(new FlatJoinFunction<Tuple2<K, LongValue>, Tuple2<K, LongValue>, Tuple3<K, LongValue, LongValue>>(){

            public void join(Tuple2<K, LongValue> first, Tuple2<K, LongValue> second, Collector<Tuple3<K, LongValue, LongValue>> out) {
                out.collect((Object)new Tuple3(first.f0, first.f1, second.f1));
            }
        }).withForwardedFieldsFirst(new String[]{"f0;f1"})).withForwardedFieldsSecond(new String[]{"f1"});
        TwoInputUdfOperator verticesWithDegrees = this.initialVertices.join((DataSet)degrees).where(new int[]{0}).equalTo(new int[]{0}).with(new FlatJoinFunction<Vertex<K, VV>, Tuple3<K, LongValue, LongValue>, Vertex<K, Tuple3<VV, LongValue, LongValue>>>(){

            public void join(Vertex<K, VV> vertex, Tuple3<K, LongValue, LongValue> degrees, Collector<Vertex<K, Tuple3<VV, LongValue, LongValue>>> out) throws Exception {
                out.collect(new Vertex(vertex.getId(), new Tuple3(vertex.getValue(), degrees.f1, degrees.f2)));
            }
        }).withForwardedFieldsFirst(new String[]{"f0"});
        TypeInformation vertexTypes = verticesWithDegrees.getType();
        DeltaIteration iteration = verticesWithDegrees.iterateDelta((DataSet)verticesWithDegrees, this.maximumNumberOfIterations, new int[]{0});
        this.setUpIteration(iteration);
        switch (messagingDirection) {
            case IN: {
                messages = this.buildScatterFunctionVerticesWithDegrees(iteration, messageTypeInfo, 1, 0, numberOfVertices);
                break;
            }
            case OUT: {
                messages = this.buildScatterFunctionVerticesWithDegrees(iteration, messageTypeInfo, 0, 0, numberOfVertices);
                break;
            }
            case ALL: {
                messages = this.buildScatterFunctionVerticesWithDegrees(iteration, messageTypeInfo, 1, 0, numberOfVertices).union(this.buildScatterFunctionVerticesWithDegrees(iteration, messageTypeInfo, 0, 0, numberOfVertices));
                break;
            }
            default: {
                throw new IllegalArgumentException("Illegal edge direction");
            }
        }
        GatherUdfVVWithDegrees updateUdf = new GatherUdfVVWithDegrees(this.gatherFunction, vertexTypes);
        CoGroupOperator updates = messages.coGroup((DataSet)iteration.getSolutionSet()).where(new int[]{0}).equalTo(new int[]{0}).with(updateUdf);
        if (this.configuration != null && this.configuration.isOptNumVertices()) {
            updates = (CoGroupOperator)updates.withBroadcastSet(numberOfVertices, "number of vertices");
        }
        this.configureUpdateFunction(updates);
        return iteration.closeWith((DataSet)updates, (DataSet)updates).map(new MapFunction<Vertex<K, Tuple3<VV, LongValue, LongValue>>, Vertex<K, VV>>(){

            public Vertex<K, VV> map(Vertex<K, Tuple3<VV, LongValue, LongValue>> vertex) {
                return new Vertex(vertex.getId(), vertex.getValue().f0);
            }
        });
    }

    private <VVWithDegree> void configureUpdateFunction(CoGroupOperator<?, ?, Vertex<K, VVWithDegree>> updates) {
        updates = (CoGroupOperator)updates.name("Vertex State Updates");
        if (this.configuration != null) {
            for (Tuple2<String, DataSet<?>> e : this.configuration.getGatherBcastVars()) {
                updates = (CoGroupOperator)updates.withBroadcastSet((DataSet)e.f1, (String)e.f0);
            }
        }
        ((CoGroupOperator)updates.withForwardedFieldsFirst(new String[]{"0"})).withForwardedFieldsSecond(new String[]{"0"});
    }

    private static final class GatherUdfVVWithDegrees<K, VV, Message>
    extends GatherUdf<K, Tuple3<VV, LongValue, LongValue>, Message> {
        private GatherUdfVVWithDegrees(GatherFunction<K, Tuple3<VV, LongValue, LongValue>, Message> gatherFunction, TypeInformation<Vertex<K, Tuple3<VV, LongValue, LongValue>>> resultType) {
            super(gatherFunction, resultType);
        }

        public void coGroup(Iterable<Tuple2<K, Message>> messages, Iterable<Vertex<K, Tuple3<VV, LongValue, LongValue>>> vertex, Collector<Vertex<K, Tuple3<VV, LongValue, LongValue>>> out) throws Exception {
            Iterator<Vertex<K, Tuple3<VV, LongValue, LongValue>>> vertexIter = vertex.iterator();
            if (!vertexIter.hasNext()) {
                Iterator<Tuple2<K, Message>> messageIter = messages.iterator();
                if (messageIter.hasNext()) {
                    String message = "Target vertex does not exist!.";
                    try {
                        Tuple2<K, Message> next = messageIter.next();
                        message = "Target vertex '" + next.f0 + "' does not exist!.";
                    }
                    catch (Throwable throwable) {
                        // empty catch block
                    }
                    throw new Exception(message);
                }
                throw new Exception();
            }
            Vertex<K, Tuple3<VV, LongValue, LongValue>> vertexWithDegrees = vertexIter.next();
            Iterator downcastIter = messages.iterator();
            this.messageIter.setSource(downcastIter);
            this.gatherFunction.setInDegree(((LongValue)((Tuple3)vertexWithDegrees.f1).f1).getValue());
            this.gatherFunction.setOutDegree(((LongValue)((Tuple3)vertexWithDegrees.f1).f2).getValue());
            this.gatherFunction.setOutputWithDegrees(vertexWithDegrees, out);
            this.gatherFunction.updateVertexFromScatterGatherIteration(vertexWithDegrees, this.messageIter);
        }
    }

    private static final class GatherUdfSimpleVV<K, VV, Message>
    extends GatherUdf<K, VV, Message> {
        private GatherUdfSimpleVV(GatherFunction<K, VV, Message> gatherFunction, TypeInformation<Vertex<K, VV>> resultType) {
            super(gatherFunction, resultType);
        }

        public void coGroup(Iterable<Tuple2<K, Message>> messages, Iterable<Vertex<K, VV>> vertex, Collector<Vertex<K, VV>> out) throws Exception {
            Iterator<Vertex<K, VV>> vertexIter = vertex.iterator();
            if (!vertexIter.hasNext()) {
                Iterator<Tuple2<K, Message>> messageIter = messages.iterator();
                if (messageIter.hasNext()) {
                    String message = "Target vertex does not exist!.";
                    try {
                        Tuple2<K, Message> next = messageIter.next();
                        message = "Target vertex '" + next.f0 + "' does not exist!.";
                    }
                    catch (Throwable throwable) {
                        // empty catch block
                    }
                    throw new Exception(message);
                }
                throw new Exception();
            }
            Vertex<K, VV> vertexState = vertexIter.next();
            Iterator downcastIter = messages.iterator();
            this.messageIter.setSource(downcastIter);
            this.gatherFunction.setOutput(vertexState, out);
            this.gatherFunction.updateVertex(vertexState, this.messageIter);
        }
    }

    private static abstract class GatherUdf<K, VVWithDegrees, Message>
    extends RichCoGroupFunction<Tuple2<K, Message>, Vertex<K, VVWithDegrees>, Vertex<K, VVWithDegrees>>
    implements ResultTypeQueryable<Vertex<K, VVWithDegrees>> {
        private static final long serialVersionUID = 1L;
        final GatherFunction<K, VVWithDegrees, Message> gatherFunction;
        final MessageIterator<Message> messageIter = new MessageIterator();
        private transient TypeInformation<Vertex<K, VVWithDegrees>> resultType;

        private GatherUdf(GatherFunction<K, VVWithDegrees, Message> gatherFunction, TypeInformation<Vertex<K, VVWithDegrees>> resultType) {
            this.gatherFunction = gatherFunction;
            this.resultType = resultType;
        }

        public void open(Configuration parameters) throws Exception {
            if (this.getRuntimeContext().hasBroadcastVariable("number of vertices")) {
                List numberOfVertices = this.getRuntimeContext().getBroadcastVariable("number of vertices");
                this.gatherFunction.setNumberOfVertices(((LongValue)numberOfVertices.iterator().next()).getValue());
            }
            if (this.getIterationRuntimeContext().getSuperstepNumber() == 1) {
                this.gatherFunction.init(this.getIterationRuntimeContext());
            }
            this.gatherFunction.preSuperstep();
        }

        public void close() throws Exception {
            this.gatherFunction.postSuperstep();
        }

        public TypeInformation<Vertex<K, VVWithDegrees>> getProducedType() {
            return this.resultType;
        }
    }

    private static final class ScatterUdfWithEVsVVWithDegrees<K, VV, Message, EV>
    extends ScatterUdfWithEdgeValues<K, Tuple3<VV, LongValue, LongValue>, VV, Message, EV> {
        private Vertex<K, VV> nextVertex = new Vertex();

        private ScatterUdfWithEVsVVWithDegrees(ScatterFunction<K, VV, Message, EV> scatterFunction, TypeInformation<Tuple2<K, Message>> resultType) {
            super(scatterFunction, resultType);
        }

        public void coGroup(Iterable<Edge<K, EV>> edges, Iterable<Vertex<K, Tuple3<VV, LongValue, LongValue>>> state, Collector<Tuple2<K, Message>> out) throws Exception {
            Iterator<Vertex<K, Tuple3<VV, LongValue, LongValue>>> stateIter = state.iterator();
            if (stateIter.hasNext()) {
                Vertex<K, Tuple3<VV, LongValue, LongValue>> vertexWithDegrees = stateIter.next();
                this.nextVertex.f0 = vertexWithDegrees.f0;
                this.nextVertex.f1 = ((Tuple3)vertexWithDegrees.f1).f0;
                this.scatterFunction.setInDegree(((LongValue)((Tuple3)vertexWithDegrees.f1).f1).getValue());
                this.scatterFunction.setOutDegree(((LongValue)((Tuple3)vertexWithDegrees.f1).f2).getValue());
                this.scatterFunction.set(edges.iterator(), out, vertexWithDegrees.getId());
                this.scatterFunction.sendMessages(this.nextVertex);
            }
        }
    }

    private static final class ScatterUdfWithEVsSimpleVV<K, VV, Message, EV>
    extends ScatterUdfWithEdgeValues<K, VV, VV, Message, EV> {
        private ScatterUdfWithEVsSimpleVV(ScatterFunction<K, VV, Message, EV> scatterFunction, TypeInformation<Tuple2<K, Message>> resultType) {
            super(scatterFunction, resultType);
        }

        public void coGroup(Iterable<Edge<K, EV>> edges, Iterable<Vertex<K, VV>> state, Collector<Tuple2<K, Message>> out) throws Exception {
            Iterator<Vertex<K, VV>> stateIter = state.iterator();
            if (stateIter.hasNext()) {
                Vertex<K, VV> newVertexState = stateIter.next();
                this.scatterFunction.set(edges.iterator(), out, newVertexState.getId());
                this.scatterFunction.sendMessages(newVertexState);
            }
        }
    }

    private static abstract class ScatterUdfWithEdgeValues<K, VVWithDegrees, VV, Message, EV>
    extends RichCoGroupFunction<Edge<K, EV>, Vertex<K, VVWithDegrees>, Tuple2<K, Message>>
    implements ResultTypeQueryable<Tuple2<K, Message>> {
        private static final long serialVersionUID = 1L;
        final ScatterFunction<K, VV, Message, EV> scatterFunction;
        private transient TypeInformation<Tuple2<K, Message>> resultType;

        private ScatterUdfWithEdgeValues(ScatterFunction<K, VV, Message, EV> scatterFunction, TypeInformation<Tuple2<K, Message>> resultType) {
            this.scatterFunction = scatterFunction;
            this.resultType = resultType;
        }

        public void open(Configuration parameters) throws Exception {
            if (this.getRuntimeContext().hasBroadcastVariable("number of vertices")) {
                List numberOfVertices = this.getRuntimeContext().getBroadcastVariable("number of vertices");
                this.scatterFunction.setNumberOfVertices(((LongValue)numberOfVertices.iterator().next()).getValue());
            }
            if (this.getIterationRuntimeContext().getSuperstepNumber() == 1) {
                this.scatterFunction.init(this.getIterationRuntimeContext());
            }
            this.scatterFunction.preSuperstep();
        }

        public void close() throws Exception {
            this.scatterFunction.postSuperstep();
        }

        public TypeInformation<Tuple2<K, Message>> getProducedType() {
            return this.resultType;
        }
    }
}

