/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.graph;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.FunctionAnnotation;
import org.apache.flink.api.java.operators.CoGroupOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.DistinctOperator;
import org.apache.flink.api.java.operators.FilterOperator;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.GroupReduceOperator;
import org.apache.flink.api.java.operators.JoinOperator;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.operators.Operator;
import org.apache.flink.api.java.operators.UnionOperator;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.EdgeDirection;
import org.apache.flink.graph.EdgeJoinFunction;
import org.apache.flink.graph.EdgesFunction;
import org.apache.flink.graph.EdgesFunctionWithVertexValue;
import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.GraphAnalytic;
import org.apache.flink.graph.GraphCsvReader;
import org.apache.flink.graph.NeighborsFunction;
import org.apache.flink.graph.NeighborsFunctionWithVertexValue;
import org.apache.flink.graph.ReduceEdgesFunction;
import org.apache.flink.graph.ReduceNeighborsFunction;
import org.apache.flink.graph.Triplet;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.VertexJoinFunction;
import org.apache.flink.graph.asm.translate.TranslateEdgeValues;
import org.apache.flink.graph.asm.translate.TranslateFunction;
import org.apache.flink.graph.asm.translate.TranslateGraphIds;
import org.apache.flink.graph.asm.translate.TranslateVertexValues;
import org.apache.flink.graph.gsa.ApplyFunction;
import org.apache.flink.graph.gsa.GSAConfiguration;
import org.apache.flink.graph.gsa.GatherFunction;
import org.apache.flink.graph.gsa.GatherSumApplyIteration;
import org.apache.flink.graph.gsa.SumFunction;
import org.apache.flink.graph.pregel.ComputeFunction;
import org.apache.flink.graph.pregel.MessageCombiner;
import org.apache.flink.graph.pregel.VertexCentricConfiguration;
import org.apache.flink.graph.pregel.VertexCentricIteration;
import org.apache.flink.graph.spargel.ScatterFunction;
import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
import org.apache.flink.graph.spargel.ScatterGatherIteration;
import org.apache.flink.graph.utils.EdgeToTuple3Map;
import org.apache.flink.graph.utils.Tuple2ToEdgeMap;
import org.apache.flink.graph.utils.Tuple2ToVertexMap;
import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
import org.apache.flink.graph.utils.VertexToTuple2Map;
import org.apache.flink.graph.validation.GraphValidator;
import org.apache.flink.types.LongValue;
import org.apache.flink.types.NullValue;
import org.apache.flink.util.Collector;

public class Graph<K, VV, EV> {
    private final ExecutionEnvironment context;
    private final DataSet<Vertex<K, VV>> vertices;
    private final DataSet<Edge<K, EV>> edges;

    private Graph(DataSet<Vertex<K, VV>> vertices, DataSet<Edge<K, EV>> edges, ExecutionEnvironment context) {
        this.vertices = vertices;
        this.edges = edges;
        this.context = context;
    }

    public static <K, VV, EV> Graph<K, VV, EV> fromCollection(Collection<Vertex<K, VV>> vertices, Collection<Edge<K, EV>> edges, ExecutionEnvironment context) {
        return Graph.fromDataSet(context.fromCollection(vertices), context.fromCollection(edges), context);
    }

    public static <K, EV> Graph<K, NullValue, EV> fromCollection(Collection<Edge<K, EV>> edges, ExecutionEnvironment context) {
        return Graph.fromDataSet(context.fromCollection(edges), context);
    }

    public static <K, VV, EV> Graph<K, VV, EV> fromCollection(Collection<Edge<K, EV>> edges, MapFunction<K, VV> vertexValueInitializer, ExecutionEnvironment context) {
        return Graph.fromDataSet(context.fromCollection(edges), vertexValueInitializer, context);
    }

    public static <K, VV, EV> Graph<K, VV, EV> fromDataSet(DataSet<Vertex<K, VV>> vertices, DataSet<Edge<K, EV>> edges, ExecutionEnvironment context) {
        return new Graph<K, VV, EV>(vertices, edges, context);
    }

    public static <K, EV> Graph<K, NullValue, EV> fromDataSet(DataSet<Edge<K, EV>> edges, ExecutionEnvironment context) {
        Operator vertices = ((FlatMapOperator)edges.flatMap(new EmitSrcAndTarget()).name("Source and target IDs")).distinct().name("IDs");
        return new Graph(vertices, edges, context);
    }

    public static <K, VV, EV> Graph<K, VV, EV> fromDataSet(DataSet<Edge<K, EV>> edges, final MapFunction<K, VV> vertexValueInitializer, ExecutionEnvironment context) {
        TypeInformation keyType = ((TupleTypeInfo)edges.getType()).getTypeAt(0);
        TypeInformation valueType = TypeExtractor.createTypeInfo(MapFunction.class, vertexValueInitializer.getClass(), (int)1, (TypeInformation)keyType, null);
        TupleTypeInfo returnType = new TupleTypeInfo(Vertex.class, new TypeInformation[]{keyType, valueType});
        Operator vertices = ((MapOperator)((MapOperator)((DistinctOperator)((FlatMapOperator)edges.flatMap(new EmitSrcAndTargetAsTuple1()).name("Source and target IDs")).distinct().name("IDs")).map(new MapFunction<Tuple1<K>, Vertex<K, VV>>(){
            private Vertex<K, VV> output = new Vertex();

            public Vertex<K, VV> map(Tuple1<K> value) throws Exception {
                this.output.f0 = value.f0;
                this.output.f1 = vertexValueInitializer.map(value.f0);
                return this.output;
            }
        }).returns((TypeInformation)returnType)).withForwardedFields(new String[]{"f0"})).name("Initialize vertex values");
        return new Graph<K, VV, EV>(vertices, edges, context);
    }

    public static <K, VV, EV> Graph<K, VV, EV> fromTupleDataSet(DataSet<Tuple2<K, VV>> vertices, DataSet<Tuple3<K, K, EV>> edges, ExecutionEnvironment context) {
        Operator vertexDataSet = vertices.map(new Tuple2ToVertexMap()).name("Type conversion");
        Operator edgeDataSet = edges.map(new Tuple3ToEdgeMap()).name("Type conversion");
        return Graph.fromDataSet(vertexDataSet, edgeDataSet, context);
    }

    public static <K, EV> Graph<K, NullValue, EV> fromTupleDataSet(DataSet<Tuple3<K, K, EV>> edges, ExecutionEnvironment context) {
        Operator edgeDataSet = edges.map(new Tuple3ToEdgeMap()).name("Type conversion");
        return Graph.fromDataSet(edgeDataSet, context);
    }

    public static <K, VV, EV> Graph<K, VV, EV> fromTupleDataSet(DataSet<Tuple3<K, K, EV>> edges, MapFunction<K, VV> vertexValueInitializer, ExecutionEnvironment context) {
        Operator edgeDataSet = edges.map(new Tuple3ToEdgeMap()).name("Type conversion");
        return Graph.fromDataSet(edgeDataSet, vertexValueInitializer, context);
    }

    public static <K> Graph<K, NullValue, NullValue> fromTuple2DataSet(DataSet<Tuple2<K, K>> edges, ExecutionEnvironment context) {
        Operator edgeDataSet = edges.map(new Tuple2ToEdgeMap()).name("To Edge");
        return Graph.fromDataSet(edgeDataSet, context);
    }

    public static <K, VV> Graph<K, VV, NullValue> fromTuple2DataSet(DataSet<Tuple2<K, K>> edges, MapFunction<K, VV> vertexValueInitializer, ExecutionEnvironment context) {
        Operator edgeDataSet = edges.map(new Tuple2ToEdgeMap()).name("To Edge");
        return Graph.fromDataSet(edgeDataSet, vertexValueInitializer, context);
    }

    public static GraphCsvReader fromCsvReader(String verticesPath, String edgesPath, ExecutionEnvironment context) {
        return new GraphCsvReader(verticesPath, edgesPath, context);
    }

    public static GraphCsvReader fromCsvReader(String edgesPath, ExecutionEnvironment context) {
        return new GraphCsvReader(edgesPath, context);
    }

    public static <K, VV> GraphCsvReader fromCsvReader(String edgesPath, MapFunction<K, VV> vertexValueInitializer, ExecutionEnvironment context) {
        return new GraphCsvReader(edgesPath, vertexValueInitializer, context);
    }

    public ExecutionEnvironment getContext() {
        return this.context;
    }

    public Boolean validate(GraphValidator<K, VV, EV> validator) throws Exception {
        return validator.validate(this);
    }

    public DataSet<Vertex<K, VV>> getVertices() {
        return this.vertices;
    }

    public DataSet<Edge<K, EV>> getEdges() {
        return this.edges;
    }

    public DataSet<Tuple2<K, VV>> getVerticesAsTuple2() {
        return this.vertices.map(new VertexToTuple2Map());
    }

    public DataSet<Tuple3<K, K, EV>> getEdgesAsTuple3() {
        return this.edges.map(new EdgeToTuple3Map());
    }

    public DataSet<Triplet<K, VV, EV>> getTriplets() {
        return ((JoinOperator)this.getVertices().join(this.getEdges()).where(new int[]{0}).equalTo(new int[]{0}).with(new ProjectEdgeWithSrcValue()).name("Project edge with source value")).join(this.getVertices()).where(new int[]{1}).equalTo(new int[]{0}).with(new ProjectEdgeWithVertexValues()).name("Project edge with vertex values");
    }

    public <NV> Graph<K, NV, EV> mapVertices(MapFunction<Vertex<K, VV>, NV> mapper) {
        TypeInformation keyType = ((TupleTypeInfo)this.vertices.getType()).getTypeAt(0);
        TypeInformation valueType = TypeExtractor.createTypeInfo(MapFunction.class, mapper.getClass(), (int)1, (TypeInformation)this.vertices.getType(), null);
        TupleTypeInfo returnType = new TupleTypeInfo(Vertex.class, new TypeInformation[]{keyType, valueType});
        return this.mapVertices(mapper, (TypeInformation<Vertex<K, NV>>)returnType);
    }

    public <NV> Graph<K, NV, EV> mapVertices(final MapFunction<Vertex<K, VV>, NV> mapper, TypeInformation<Vertex<K, NV>> returnType) {
        Operator mappedVertices = ((MapOperator)((MapOperator)this.vertices.map(new MapFunction<Vertex<K, VV>, Vertex<K, NV>>(){
            private Vertex<K, NV> output = new Vertex();

            public Vertex<K, NV> map(Vertex<K, VV> value) throws Exception {
                this.output.f0 = value.f0;
                this.output.f1 = mapper.map(value);
                return this.output;
            }
        }).returns(returnType)).withForwardedFields(new String[]{"f0"})).name("Map vertices");
        return new Graph<K, VV, EV>(mappedVertices, this.edges, this.context);
    }

    public <NV> Graph<K, VV, NV> mapEdges(MapFunction<Edge<K, EV>, NV> mapper) {
        TypeInformation keyType = ((TupleTypeInfo)this.edges.getType()).getTypeAt(0);
        TypeInformation valueType = TypeExtractor.createTypeInfo(MapFunction.class, mapper.getClass(), (int)1, (TypeInformation)this.edges.getType(), null);
        TupleTypeInfo returnType = new TupleTypeInfo(Edge.class, new TypeInformation[]{keyType, keyType, valueType});
        return this.mapEdges(mapper, (TypeInformation<Edge<K, NV>>)returnType);
    }

    public <NV> Graph<K, VV, NV> mapEdges(final MapFunction<Edge<K, EV>, NV> mapper, TypeInformation<Edge<K, NV>> returnType) {
        Operator mappedEdges = ((MapOperator)((MapOperator)this.edges.map(new MapFunction<Edge<K, EV>, Edge<K, NV>>(){
            private Edge<K, NV> output = new Edge();

            public Edge<K, NV> map(Edge<K, EV> value) throws Exception {
                this.output.f0 = value.f0;
                this.output.f1 = value.f1;
                this.output.f2 = mapper.map(value);
                return this.output;
            }
        }).returns(returnType)).withForwardedFields(new String[]{"f0; f1"})).name("Map edges");
        return new Graph<K, VV, EV>(this.vertices, mappedEdges, this.context);
    }

    public <NEW> Graph<NEW, VV, EV> translateGraphIds(TranslateFunction<K, NEW> translator) throws Exception {
        return (Graph)this.run(new TranslateGraphIds(translator));
    }

    public <NEW> Graph<K, NEW, EV> translateVertexValues(TranslateFunction<VV, NEW> translator) throws Exception {
        return (Graph)this.run(new TranslateVertexValues(translator));
    }

    public <NEW> Graph<K, VV, NEW> translateEdgeValues(TranslateFunction<EV, NEW> translator) throws Exception {
        return (Graph)this.run(new TranslateEdgeValues(translator));
    }

    public <T> Graph<K, VV, EV> joinWithVertices(DataSet<Tuple2<K, T>> inputDataSet, VertexJoinFunction<VV, T> vertexJoinFunction) {
        Operator resultedVertices = this.getVertices().coGroup(inputDataSet).where(new int[]{0}).equalTo(new int[]{0}).with(new ApplyCoGroupToVertexValues(vertexJoinFunction)).name("Join with vertices");
        return new Graph<K, VV, EV>(resultedVertices, this.edges, this.context);
    }

    public <T> Graph<K, VV, EV> joinWithEdges(DataSet<Tuple3<K, K, T>> inputDataSet, EdgeJoinFunction<EV, T> edgeJoinFunction) {
        Operator resultedEdges = this.getEdges().coGroup(inputDataSet).where(new int[]{0, 1}).equalTo(new int[]{0, 1}).with(new ApplyCoGroupToEdgeValues(edgeJoinFunction)).name("Join with edges");
        return new Graph<K, VV, EV>(this.vertices, resultedEdges, this.context);
    }

    public <T> Graph<K, VV, EV> joinWithEdgesOnSource(DataSet<Tuple2<K, T>> inputDataSet, EdgeJoinFunction<EV, T> edgeJoinFunction) {
        Operator resultedEdges = this.getEdges().coGroup(inputDataSet).where(new int[]{0}).equalTo(new int[]{0}).with(new ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget(edgeJoinFunction)).name("Join with edges on source");
        return new Graph<K, VV, EV>(this.vertices, resultedEdges, this.context);
    }

    public <T> Graph<K, VV, EV> joinWithEdgesOnTarget(DataSet<Tuple2<K, T>> inputDataSet, EdgeJoinFunction<EV, T> edgeJoinFunction) {
        Operator resultedEdges = this.getEdges().coGroup(inputDataSet).where(new int[]{1}).equalTo(new int[]{0}).with(new ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget(edgeJoinFunction)).name("Join with edges on target");
        return new Graph<K, VV, EV>(this.vertices, resultedEdges, this.context);
    }

    public Graph<K, VV, EV> subgraph(FilterFunction<Vertex<K, VV>> vertexFilter, FilterFunction<Edge<K, EV>> edgeFilter) {
        FilterOperator filteredVertices = this.vertices.filter(vertexFilter);
        Operator remainingEdges = this.edges.join((DataSet)filteredVertices).where(new int[]{0}).equalTo(new int[]{0}).with(new ProjectEdge()).join((DataSet)filteredVertices).where(new int[]{1}).equalTo(new int[]{0}).with(new ProjectEdge()).name("Subgraph");
        FilterOperator filteredEdges = remainingEdges.filter(edgeFilter);
        return new Graph<K, VV, EV>(filteredVertices, filteredEdges, this.context);
    }

    public Graph<K, VV, EV> filterOnVertices(FilterFunction<Vertex<K, VV>> vertexFilter) {
        FilterOperator filteredVertices = this.vertices.filter(vertexFilter);
        Operator remainingEdges = this.edges.join((DataSet)filteredVertices).where(new int[]{0}).equalTo(new int[]{0}).with(new ProjectEdge()).join((DataSet)filteredVertices).where(new int[]{1}).equalTo(new int[]{0}).with(new ProjectEdge()).name("Filter on vertices");
        return new Graph<K, VV, EV>(filteredVertices, remainingEdges, this.context);
    }

    public Graph<K, VV, EV> filterOnEdges(FilterFunction<Edge<K, EV>> edgeFilter) {
        Operator filteredEdges = this.edges.filter(edgeFilter).name("Filter on edges");
        return new Graph<K, VV, EV>(this.vertices, filteredEdges, this.context);
    }

    public DataSet<Tuple2<K, LongValue>> outDegrees() {
        return this.vertices.coGroup(this.edges).where(new int[]{0}).equalTo(new int[]{0}).with(new CountNeighborsCoGroup()).name("Out-degree");
    }

    public DataSet<Tuple2<K, LongValue>> inDegrees() {
        return this.vertices.coGroup(this.edges).where(new int[]{0}).equalTo(new int[]{1}).with(new CountNeighborsCoGroup()).name("In-degree");
    }

    public DataSet<Tuple2<K, LongValue>> getDegrees() {
        return ((UnionOperator)this.outDegrees().union(this.inDegrees()).name("In- and out-degree")).groupBy(new int[]{0}).sum(1).name("Sum");
    }

    public Graph<K, VV, EV> getUndirected() {
        Operator undirectedEdges = this.edges.flatMap(new RegularAndReversedEdgesMap()).name("To undirected graph");
        return new Graph<K, VV, EV>(this.vertices, undirectedEdges, this.context);
    }

    public <T> DataSet<T> groupReduceOnEdges(EdgesFunctionWithVertexValue<K, VV, EV, T> edgesFunction, EdgeDirection direction) throws IllegalArgumentException {
        switch (direction) {
            case IN: {
                return this.vertices.coGroup(this.edges).where(new int[]{0}).equalTo(new int[]{1}).with(new ApplyCoGroupFunction<K, VV, EV, T>(edgesFunction)).name("GroupReduce on in-edges");
            }
            case OUT: {
                return this.vertices.coGroup(this.edges).where(new int[]{0}).equalTo(new int[]{0}).with(new ApplyCoGroupFunction<K, VV, EV, T>(edgesFunction)).name("GroupReduce on out-edges");
            }
            case ALL: {
                return this.vertices.coGroup((DataSet)this.edges.flatMap(new EmitOneEdgePerNode()).name("Emit edge")).where(new int[]{0}).equalTo(new int[]{0}).with(new ApplyCoGroupFunctionOnAllEdges<K, VV, EV, T>(edgesFunction)).name("GroupReduce on in- and out-edges");
            }
        }
        throw new IllegalArgumentException("Illegal edge direction");
    }

    public <T> DataSet<T> groupReduceOnEdges(EdgesFunctionWithVertexValue<K, VV, EV, T> edgesFunction, EdgeDirection direction, TypeInformation<T> typeInfo) throws IllegalArgumentException {
        switch (direction) {
            case IN: {
                return ((CoGroupOperator)this.vertices.coGroup(this.edges).where(new int[]{0}).equalTo(new int[]{1}).with(new ApplyCoGroupFunction<K, VV, EV, T>(edgesFunction)).name("GroupReduce on in-edges")).returns(typeInfo);
            }
            case OUT: {
                return ((CoGroupOperator)this.vertices.coGroup(this.edges).where(new int[]{0}).equalTo(new int[]{0}).with(new ApplyCoGroupFunction<K, VV, EV, T>(edgesFunction)).name("GroupReduce on out-edges")).returns(typeInfo);
            }
            case ALL: {
                return ((CoGroupOperator)this.vertices.coGroup((DataSet)this.edges.flatMap(new EmitOneEdgePerNode()).name("Emit edge")).where(new int[]{0}).equalTo(new int[]{0}).with(new ApplyCoGroupFunctionOnAllEdges<K, VV, EV, T>(edgesFunction)).name("GroupReduce on in- and out-edges")).returns(typeInfo);
            }
        }
        throw new IllegalArgumentException("Illegal edge direction");
    }

    public <T> DataSet<T> groupReduceOnEdges(EdgesFunction<K, EV, T> edgesFunction, EdgeDirection direction) throws IllegalArgumentException {
        TypeInformation keyType = ((TupleTypeInfo)this.vertices.getType()).getTypeAt(0);
        TypeInformation edgeValueType = ((TupleTypeInfo)this.edges.getType()).getTypeAt(2);
        TypeInformation returnType = TypeExtractor.createTypeInfo(EdgesFunction.class, edgesFunction.getClass(), (int)2, (TypeInformation)keyType, (TypeInformation)edgeValueType);
        return this.groupReduceOnEdges(edgesFunction, direction, returnType);
    }

    public <T> DataSet<T> groupReduceOnEdges(EdgesFunction<K, EV, T> edgesFunction, EdgeDirection direction, TypeInformation<T> typeInfo) throws IllegalArgumentException {
        switch (direction) {
            case IN: {
                return ((GroupReduceOperator)((MapOperator)((MapOperator)this.edges.map(new ProjectVertexIdMap(1)).name("Vertex ID")).withForwardedFields(new String[]{"f1->f0"})).groupBy(new int[]{0}).reduceGroup(new ApplyGroupReduceFunction<K, EV, T>(edgesFunction)).name("GroupReduce on in-edges")).returns(typeInfo);
            }
            case OUT: {
                return ((GroupReduceOperator)((MapOperator)((MapOperator)this.edges.map(new ProjectVertexIdMap(0)).name("Vertex ID")).withForwardedFields(new String[]{"f0"})).groupBy(new int[]{0}).reduceGroup(new ApplyGroupReduceFunction<K, EV, T>(edgesFunction)).name("GroupReduce on out-edges")).returns(typeInfo);
            }
            case ALL: {
                return ((GroupReduceOperator)((FlatMapOperator)this.edges.flatMap(new EmitOneEdgePerNode()).name("Emit edge")).groupBy(new int[]{0}).reduceGroup(new ApplyGroupReduceFunction<K, EV, T>(edgesFunction)).name("GroupReduce on in- and out-edges")).returns(typeInfo);
            }
        }
        throw new IllegalArgumentException("Illegal edge direction");
    }

    public Graph<K, VV, EV> reverse() throws UnsupportedOperationException {
        Operator reversedEdges = this.edges.map(new ReverseEdgesMap()).name("Reverse edges");
        return new Graph<K, VV, EV>(this.vertices, reversedEdges, this.context);
    }

    public long numberOfVertices() throws Exception {
        return this.vertices.count();
    }

    public long numberOfEdges() throws Exception {
        return this.edges.count();
    }

    public DataSet<K> getVertexIds() {
        return this.vertices.map(new ExtractVertexIDMapper()).name("Vertex IDs");
    }

    public DataSet<Tuple2<K, K>> getEdgeIds() {
        return this.edges.map(new ExtractEdgeIDsMapper()).name("Edge IDs");
    }

    public Graph<K, VV, EV> addVertex(Vertex<K, VV> vertex) {
        ArrayList<Vertex<K, VV>> newVertex = new ArrayList<Vertex<K, VV>>();
        newVertex.add(vertex);
        return this.addVertices(newVertex);
    }

    public Graph<K, VV, EV> addVertices(List<Vertex<K, VV>> verticesToAdd) {
        Operator newVertices = this.vertices.coGroup((DataSet)this.context.fromCollection(verticesToAdd)).where(new int[]{0}).equalTo(new int[]{0}).with(new VerticesUnionCoGroup()).name("Add vertices");
        return new Graph<K, VV, EV>(newVertices, this.edges, this.context);
    }

    public Graph<K, VV, EV> addEdge(Vertex<K, VV> source, Vertex<K, VV> target, EV edgeValue) {
        Graph<K, VV, EV> partialGraph = Graph.fromCollection(Arrays.asList(source, target), Collections.singletonList(new Edge<Object, EV>(source.f0, target.f0, edgeValue)), this.context);
        return this.union(partialGraph);
    }

    public Graph<K, VV, EV> addEdges(List<Edge<K, EV>> newEdges) {
        DataSource newEdgesDataSet = this.context.fromCollection(newEdges);
        Operator validNewEdges = ((JoinOperator)this.getVertices().join((DataSet)newEdgesDataSet).where(new int[]{0}).equalTo(new int[]{0}).with(new JoinVerticesWithEdgesOnSrc()).name("Join with source")).join(this.getVertices()).where(new int[]{1}).equalTo(new int[]{0}).with(new JoinWithVerticesOnTrg()).name("Join with target");
        return Graph.fromDataSet(this.vertices, this.edges.union((DataSet)validNewEdges), this.context);
    }

    public Graph<K, VV, EV> removeVertex(Vertex<K, VV> vertex) {
        ArrayList<Vertex<K, VV>> vertexToBeRemoved = new ArrayList<Vertex<K, VV>>();
        vertexToBeRemoved.add(vertex);
        return this.removeVertices(vertexToBeRemoved);
    }

    public Graph<K, VV, EV> removeVertices(List<Vertex<K, VV>> verticesToBeRemoved) {
        return this.removeVertices((DataSet<Vertex<K, VV>>)this.context.fromCollection(verticesToBeRemoved));
    }

    private Graph<K, VV, EV> removeVertices(DataSet<Vertex<K, VV>> verticesToBeRemoved) {
        Operator newVertices = this.getVertices().coGroup(verticesToBeRemoved).where(new int[]{0}).equalTo(new int[]{0}).with(new VerticesRemovalCoGroup()).name("Remove vertices");
        Operator newEdges = ((JoinOperator)newVertices.join(this.getEdges()).where(new int[]{0}).equalTo(new int[]{0}).with(new ProjectEdgeToBeRemoved()).name("Edges to be removed")).join((DataSet)newVertices).where(new int[]{1}).equalTo(new int[]{0}).with(new ProjectEdge()).name("Remove edges");
        return new Graph<K, VV, EV>(newVertices, newEdges, this.context);
    }

    public Graph<K, VV, EV> removeEdge(Edge<K, EV> edge) {
        Operator newEdges = this.getEdges().filter(new EdgeRemovalEdgeFilter<K, EV>(edge)).name("Remove edge");
        return new Graph<K, VV, EV>(this.vertices, newEdges, this.context);
    }

    public Graph<K, VV, EV> removeEdges(List<Edge<K, EV>> edgesToBeRemoved) {
        Operator newEdges = this.getEdges().coGroup((DataSet)this.context.fromCollection(edgesToBeRemoved)).where(new int[]{0, 1}).equalTo(new int[]{0, 1}).with(new EdgeRemovalCoGroup()).name("Remove edges");
        return new Graph<K, VV, EV>(this.vertices, newEdges, this.context);
    }

    public Graph<K, VV, EV> union(Graph<K, VV, EV> graph) {
        Operator unionedVertices = ((UnionOperator)graph.getVertices().union(this.getVertices()).name("Vertices")).distinct().name("Vertices");
        Operator unionedEdges = graph.getEdges().union(this.getEdges()).name("Edges");
        return new Graph<K, VV, EV>(unionedVertices, unionedEdges, this.context);
    }

    public Graph<K, VV, EV> difference(Graph<K, VV, EV> graph) {
        DataSet<Vertex<K, VV>> removeVerticesData = graph.getVertices();
        return this.removeVertices(removeVerticesData);
    }

    public Graph<K, NullValue, EV> intersect(Graph<K, VV, EV> graph, boolean distinctEdges) {
        DataSet<Edge<K, EV>> intersectEdges = distinctEdges ? this.getDistinctEdgeIntersection(graph.getEdges()) : this.getPairwiseEdgeIntersection(graph.getEdges());
        return Graph.fromDataSet(intersectEdges, this.getContext());
    }

    private DataSet<Edge<K, EV>> getDistinctEdgeIntersection(DataSet<Edge<K, EV>> edges) {
        return ((JoinOperator)((JoinOperator)this.getEdges().join(edges).where(new int[]{0, 1, 2}).equalTo(new int[]{0, 1, 2}).with(new JoinFunction<Edge<K, EV>, Edge<K, EV>, Edge<K, EV>>(){

            public Edge<K, EV> join(Edge<K, EV> first, Edge<K, EV> second) throws Exception {
                return first;
            }
        }).withForwardedFieldsFirst(new String[]{"*"})).name("Intersect edges")).distinct().name("Edges");
    }

    private DataSet<Edge<K, EV>> getPairwiseEdgeIntersection(DataSet<Edge<K, EV>> edges) {
        return this.getEdges().coGroup(edges).where(new int[]{0, 1, 2}).equalTo(new int[]{0, 1, 2}).with(new MatchingEdgeReducer()).name("Intersect edges");
    }

    public <M> Graph<K, VV, EV> runScatterGatherIteration(ScatterFunction<K, VV, M, EV> scatterFunction, org.apache.flink.graph.spargel.GatherFunction<K, VV, M> gatherFunction, int maximumNumberOfIterations) {
        return this.runScatterGatherIteration(scatterFunction, gatherFunction, maximumNumberOfIterations, null);
    }

    public <M> Graph<K, VV, EV> runScatterGatherIteration(ScatterFunction<K, VV, M, EV> scatterFunction, org.apache.flink.graph.spargel.GatherFunction<K, VV, M> gatherFunction, int maximumNumberOfIterations, ScatterGatherConfiguration parameters) {
        ScatterGatherIteration<K, VV, M, EV> iteration = ScatterGatherIteration.withEdges(this.edges, scatterFunction, gatherFunction, maximumNumberOfIterations);
        iteration.configure(parameters);
        DataSet newVertices = this.getVertices().runOperation(iteration);
        return new Graph<K, VV, EV>(newVertices, this.edges, this.context);
    }

    public <M> Graph<K, VV, EV> runGatherSumApplyIteration(GatherFunction<VV, EV, M> gatherFunction, SumFunction<VV, EV, M> sumFunction, ApplyFunction<K, VV, M> applyFunction, int maximumNumberOfIterations) {
        return this.runGatherSumApplyIteration(gatherFunction, sumFunction, applyFunction, maximumNumberOfIterations, null);
    }

    public <M> Graph<K, VV, EV> runGatherSumApplyIteration(GatherFunction<VV, EV, M> gatherFunction, SumFunction<VV, EV, M> sumFunction, ApplyFunction<K, VV, M> applyFunction, int maximumNumberOfIterations, GSAConfiguration parameters) {
        GatherSumApplyIteration<K, VV, EV, M> iteration = GatherSumApplyIteration.withEdges(this.edges, gatherFunction, sumFunction, applyFunction, maximumNumberOfIterations);
        iteration.configure(parameters);
        DataSet newVertices = this.vertices.runOperation(iteration);
        return new Graph<K, VV, EV>(newVertices, this.edges, this.context);
    }

    public <M> Graph<K, VV, EV> runVertexCentricIteration(ComputeFunction<K, VV, EV, M> computeFunction, MessageCombiner<K, M> combiner, int maximumNumberOfIterations) {
        return this.runVertexCentricIteration(computeFunction, combiner, maximumNumberOfIterations, null);
    }

    public <M> Graph<K, VV, EV> runVertexCentricIteration(ComputeFunction<K, VV, EV, M> computeFunction, MessageCombiner<K, M> combiner, int maximumNumberOfIterations, VertexCentricConfiguration parameters) {
        VertexCentricIteration<K, VV, EV, M> iteration = VertexCentricIteration.withEdges(this.edges, computeFunction, combiner, maximumNumberOfIterations);
        iteration.configure(parameters);
        DataSet newVertices = this.getVertices().runOperation(iteration);
        return new Graph<K, VV, EV>(newVertices, this.edges, this.context);
    }

    public <T> T run(GraphAlgorithm<K, VV, EV, T> algorithm) throws Exception {
        return algorithm.run(this);
    }

    public <T> GraphAnalytic<K, VV, EV, T> run(GraphAnalytic<K, VV, EV, T> analytic) throws Exception {
        analytic.run(this);
        return analytic;
    }

    public <T> DataSet<T> groupReduceOnNeighbors(NeighborsFunctionWithVertexValue<K, VV, EV, T> neighborsFunction, EdgeDirection direction) throws IllegalArgumentException {
        switch (direction) {
            case IN: {
                Operator edgesWithSources = this.edges.join(this.vertices).where(new int[]{0}).equalTo(new int[]{0}).name("Edge with source vertex");
                return this.vertices.coGroup((DataSet)edgesWithSources).where(new int[]{0}).equalTo(new String[]{"f0.f1"}).with(new ApplyNeighborCoGroupFunction<K, VV, EV, T>(neighborsFunction)).name("Neighbors function");
            }
            case OUT: {
                Operator edgesWithTargets = this.edges.join(this.vertices).where(new int[]{1}).equalTo(new int[]{0}).name("Edge with target vertex");
                return this.vertices.coGroup((DataSet)edgesWithTargets).where(new int[]{0}).equalTo(new String[]{"f0.f0"}).with(new ApplyNeighborCoGroupFunction<K, VV, EV, T>(neighborsFunction)).name("Neighbors function");
            }
            case ALL: {
                Operator edgesWithNeighbors = ((FlatMapOperator)this.edges.flatMap(new EmitOneEdgeWithNeighborPerNode()).name("Forward and reverse edges")).join(this.vertices).where(new int[]{1}).equalTo(new int[]{0}).with(new ProjectEdgeWithNeighbor()).name("Edge with vertex");
                return this.vertices.coGroup((DataSet)edgesWithNeighbors).where(new int[]{0}).equalTo(new int[]{0}).with(new ApplyCoGroupFunctionOnAllNeighbors<K, VV, EV, T>(neighborsFunction)).name("Neighbors function");
            }
        }
        throw new IllegalArgumentException("Illegal edge direction");
    }

    public <T> DataSet<T> groupReduceOnNeighbors(NeighborsFunctionWithVertexValue<K, VV, EV, T> neighborsFunction, EdgeDirection direction, TypeInformation<T> typeInfo) throws IllegalArgumentException {
        switch (direction) {
            case IN: {
                Operator edgesWithSources = this.edges.join(this.vertices).where(new int[]{0}).equalTo(new int[]{0}).name("Edge with source vertex");
                return ((CoGroupOperator)this.vertices.coGroup((DataSet)edgesWithSources).where(new int[]{0}).equalTo(new String[]{"f0.f1"}).with(new ApplyNeighborCoGroupFunction<K, VV, EV, T>(neighborsFunction)).name("Neighbors function")).returns(typeInfo);
            }
            case OUT: {
                Operator edgesWithTargets = this.edges.join(this.vertices).where(new int[]{1}).equalTo(new int[]{0}).name("Edge with target vertex");
                return ((CoGroupOperator)this.vertices.coGroup((DataSet)edgesWithTargets).where(new int[]{0}).equalTo(new String[]{"f0.f0"}).with(new ApplyNeighborCoGroupFunction<K, VV, EV, T>(neighborsFunction)).name("Neighbors function")).returns(typeInfo);
            }
            case ALL: {
                Operator edgesWithNeighbors = ((FlatMapOperator)this.edges.flatMap(new EmitOneEdgeWithNeighborPerNode()).name("Forward and reverse edges")).join(this.vertices).where(new int[]{1}).equalTo(new int[]{0}).with(new ProjectEdgeWithNeighbor()).name("Edge with vertex");
                return ((CoGroupOperator)this.vertices.coGroup((DataSet)edgesWithNeighbors).where(new int[]{0}).equalTo(new int[]{0}).with(new ApplyCoGroupFunctionOnAllNeighbors<K, VV, EV, T>(neighborsFunction)).name("Neighbors function")).returns(typeInfo);
            }
        }
        throw new IllegalArgumentException("Illegal edge direction");
    }

    public <T> DataSet<T> groupReduceOnNeighbors(NeighborsFunction<K, VV, EV, T> neighborsFunction, EdgeDirection direction) throws IllegalArgumentException {
        switch (direction) {
            case IN: {
                Operator edgesWithSources = ((JoinOperator)this.edges.join(this.vertices).where(new int[]{0}).equalTo(new int[]{0}).with(new ProjectVertexIdJoin(1)).withForwardedFieldsFirst(new String[]{"f1->f0"})).name("Edge with source vertex ID");
                return edgesWithSources.groupBy(new int[]{0}).reduceGroup(new ApplyNeighborGroupReduceFunction<K, VV, EV, T>(neighborsFunction)).name("Neighbors function");
            }
            case OUT: {
                Operator edgesWithTargets = ((JoinOperator)this.edges.join(this.vertices).where(new int[]{1}).equalTo(new int[]{0}).with(new ProjectVertexIdJoin(0)).withForwardedFieldsFirst(new String[]{"f0"})).name("Edge with target vertex ID");
                return edgesWithTargets.groupBy(new int[]{0}).reduceGroup(new ApplyNeighborGroupReduceFunction<K, VV, EV, T>(neighborsFunction)).name("Neighbors function");
            }
            case ALL: {
                Operator edgesWithNeighbors = ((FlatMapOperator)this.edges.flatMap(new EmitOneEdgeWithNeighborPerNode()).name("Forward and reverse edges")).join(this.vertices).where(new int[]{1}).equalTo(new int[]{0}).with(new ProjectEdgeWithNeighbor()).name("Edge with vertex ID");
                return edgesWithNeighbors.groupBy(new int[]{0}).reduceGroup(new ApplyNeighborGroupReduceFunction<K, VV, EV, T>(neighborsFunction)).name("Neighbors function");
            }
        }
        throw new IllegalArgumentException("Illegal edge direction");
    }

    public <T> DataSet<T> groupReduceOnNeighbors(NeighborsFunction<K, VV, EV, T> neighborsFunction, EdgeDirection direction, TypeInformation<T> typeInfo) throws IllegalArgumentException {
        switch (direction) {
            case IN: {
                Operator edgesWithSources = ((JoinOperator)this.edges.join(this.vertices).where(new int[]{0}).equalTo(new int[]{0}).with(new ProjectVertexIdJoin(1)).withForwardedFieldsFirst(new String[]{"f1->f0"})).name("Edge with source vertex ID");
                return ((GroupReduceOperator)edgesWithSources.groupBy(new int[]{0}).reduceGroup(new ApplyNeighborGroupReduceFunction<K, VV, EV, T>(neighborsFunction)).name("Neighbors function")).returns(typeInfo);
            }
            case OUT: {
                Operator edgesWithTargets = ((JoinOperator)this.edges.join(this.vertices).where(new int[]{1}).equalTo(new int[]{0}).with(new ProjectVertexIdJoin(0)).withForwardedFieldsFirst(new String[]{"f0"})).name("Edge with target vertex ID");
                return ((GroupReduceOperator)edgesWithTargets.groupBy(new int[]{0}).reduceGroup(new ApplyNeighborGroupReduceFunction<K, VV, EV, T>(neighborsFunction)).name("Neighbors function")).returns(typeInfo);
            }
            case ALL: {
                Operator edgesWithNeighbors = this.edges.flatMap(new EmitOneEdgeWithNeighborPerNode()).join(this.vertices).where(new int[]{1}).equalTo(new int[]{0}).with(new ProjectEdgeWithNeighbor()).name("Edge with vertex ID");
                return ((GroupReduceOperator)edgesWithNeighbors.groupBy(new int[]{0}).reduceGroup(new ApplyNeighborGroupReduceFunction<K, VV, EV, T>(neighborsFunction)).name("Neighbors function")).returns(typeInfo);
            }
        }
        throw new IllegalArgumentException("Illegal edge direction");
    }

    public DataSet<Tuple2<K, VV>> reduceOnNeighbors(ReduceNeighborsFunction<VV> reduceNeighborsFunction, EdgeDirection direction) throws IllegalArgumentException {
        switch (direction) {
            case IN: {
                Operator verticesWithSourceNeighborValues = ((JoinOperator)this.edges.join(this.vertices).where(new int[]{0}).equalTo(new int[]{0}).with(new ProjectVertexWithNeighborValueJoin(1)).withForwardedFieldsFirst(new String[]{"f1->f0"})).name("Vertex with in-neighbor value");
                return verticesWithSourceNeighborValues.groupBy(new int[]{0}).reduce(new ApplyNeighborReduceFunction(reduceNeighborsFunction)).name("Neighbors function");
            }
            case OUT: {
                Operator verticesWithTargetNeighborValues = ((JoinOperator)this.edges.join(this.vertices).where(new int[]{1}).equalTo(new int[]{0}).with(new ProjectVertexWithNeighborValueJoin(0)).withForwardedFieldsFirst(new String[]{"f0"})).name("Vertex with out-neighbor value");
                return verticesWithTargetNeighborValues.groupBy(new int[]{0}).reduce(new ApplyNeighborReduceFunction(reduceNeighborsFunction)).name("Neighbors function");
            }
            case ALL: {
                Operator verticesWithNeighborValues = this.edges.flatMap(new EmitOneEdgeWithNeighborPerNode()).join(this.vertices).where(new int[]{1}).equalTo(new int[]{0}).with(new ProjectNeighborValue()).name("Vertex with neighbor value");
                return verticesWithNeighborValues.groupBy(new int[]{0}).reduce(new ApplyNeighborReduceFunction(reduceNeighborsFunction)).name("Neighbors function");
            }
        }
        throw new IllegalArgumentException("Illegal edge direction");
    }

    public DataSet<Tuple2<K, EV>> reduceOnEdges(ReduceEdgesFunction<EV> reduceEdgesFunction, EdgeDirection direction) throws IllegalArgumentException {
        switch (direction) {
            case IN: {
                return ((MapOperator)((MapOperator)this.edges.map(new ProjectVertexWithEdgeValueMap(1)).withForwardedFields(new String[]{"f1->f0"})).name("Vertex with in-edges")).groupBy(new int[]{0}).reduce(new ApplyReduceFunction(reduceEdgesFunction)).name("Reduce on edges");
            }
            case OUT: {
                return ((MapOperator)((MapOperator)this.edges.map(new ProjectVertexWithEdgeValueMap(0)).withForwardedFields(new String[]{"f0->f0"})).name("Vertex with out-edges")).groupBy(new int[]{0}).reduce(new ApplyReduceFunction(reduceEdgesFunction)).name("Reduce on edges");
            }
            case ALL: {
                return ((FlatMapOperator)((FlatMapOperator)this.edges.flatMap(new EmitOneVertexWithEdgeValuePerNode()).withForwardedFields(new String[]{"f2->f1"})).name("Vertex with all edges")).groupBy(new int[]{0}).reduce(new ApplyReduceFunction(reduceEdgesFunction)).name("Reduce on edges");
            }
        }
        throw new IllegalArgumentException("Illegal edge direction");
    }

    @FunctionAnnotation.ForwardedFields(value={"f0"})
    private static final class ApplyReduceFunction<K, EV>
    implements ReduceFunction<Tuple2<K, EV>> {
        private ReduceEdgesFunction<EV> function;

        public ApplyReduceFunction(ReduceEdgesFunction<EV> fun) {
            this.function = fun;
        }

        public Tuple2<K, EV> reduce(Tuple2<K, EV> first, Tuple2<K, EV> second) throws Exception {
            first.f1 = this.function.reduceEdges(first.f1, second.f1);
            return first;
        }
    }

    @FunctionAnnotation.ForwardedFields(value={"f0"})
    private static final class ApplyNeighborReduceFunction<K, VV>
    implements ReduceFunction<Tuple2<K, VV>> {
        private ReduceNeighborsFunction<VV> function;

        public ApplyNeighborReduceFunction(ReduceNeighborsFunction<VV> fun) {
            this.function = fun;
        }

        public Tuple2<K, VV> reduce(Tuple2<K, VV> first, Tuple2<K, VV> second) throws Exception {
            first.f1 = this.function.reduceNeighbors(first.f1, second.f1);
            return first;
        }
    }

    private static final class ApplyCoGroupFunctionOnAllNeighbors<K, VV, EV, T>
    implements CoGroupFunction<Vertex<K, VV>, Tuple3<K, Edge<K, EV>, Vertex<K, VV>>, T>,
    ResultTypeQueryable<T> {
        private NeighborsFunctionWithVertexValue<K, VV, EV, T> function;

        public ApplyCoGroupFunctionOnAllNeighbors(NeighborsFunctionWithVertexValue<K, VV, EV, T> fun) {
            this.function = fun;
        }

        public void coGroup(Iterable<Vertex<K, VV>> vertex, final Iterable<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> keysWithNeighbors, Collector<T> out) throws Exception {
            final Iterator neighborsIterator = new Iterator<Tuple2<Edge<K, EV>, Vertex<K, VV>>>(){
                final Iterator<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> keysWithEdgesIterator;
                {
                    this.keysWithEdgesIterator = keysWithNeighbors.iterator();
                }

                @Override
                public boolean hasNext() {
                    return this.keysWithEdgesIterator.hasNext();
                }

                @Override
                public Tuple2<Edge<K, EV>, Vertex<K, VV>> next() {
                    Tuple3 next = this.keysWithEdgesIterator.next();
                    return new Tuple2(next.f1, next.f2);
                }

                @Override
                public void remove() {
                    this.keysWithEdgesIterator.remove();
                }
            };
            Iterable neighborsIterable = new Iterable<Tuple2<Edge<K, EV>, Vertex<K, VV>>>(){

                @Override
                public Iterator<Tuple2<Edge<K, EV>, Vertex<K, VV>>> iterator() {
                    return neighborsIterator;
                }
            };
            Iterator<Vertex<K, VV>> vertexIterator = vertex.iterator();
            if (!vertexIterator.hasNext()) {
                throw new NoSuchElementException("The edge src/trg id could not be found within the vertexIds");
            }
            this.function.iterateNeighbors(vertexIterator.next(), neighborsIterable, out);
        }

        public TypeInformation<T> getProducedType() {
            return TypeExtractor.createTypeInfo(NeighborsFunctionWithVertexValue.class, this.function.getClass(), (int)3, null, null);
        }
    }

    private static final class ApplyNeighborCoGroupFunction<K, VV, EV, T>
    implements CoGroupFunction<Vertex<K, VV>, Tuple2<Edge<K, EV>, Vertex<K, VV>>, T>,
    ResultTypeQueryable<T> {
        private NeighborsFunctionWithVertexValue<K, VV, EV, T> function;

        public ApplyNeighborCoGroupFunction(NeighborsFunctionWithVertexValue<K, VV, EV, T> fun) {
            this.function = fun;
        }

        public void coGroup(Iterable<Vertex<K, VV>> vertex, Iterable<Tuple2<Edge<K, EV>, Vertex<K, VV>>> neighbors, Collector<T> out) throws Exception {
            this.function.iterateNeighbors(vertex.iterator().next(), neighbors, out);
        }

        public TypeInformation<T> getProducedType() {
            return TypeExtractor.createTypeInfo(NeighborsFunctionWithVertexValue.class, this.function.getClass(), (int)3, null, null);
        }
    }

    @FunctionAnnotation.ForwardedFieldsFirst(value={"f0; f2->f1"})
    @FunctionAnnotation.ForwardedFieldsSecond(value={"*->f2"})
    private static final class ProjectEdgeWithNeighbor<K, VV, EV>
    implements FlatJoinFunction<Tuple3<K, K, Edge<K, EV>>, Vertex<K, VV>, Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> {
        private ProjectEdgeWithNeighbor() {
        }

        public void join(Tuple3<K, K, Edge<K, EV>> keysWithEdge, Vertex<K, VV> neighbor, Collector<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> out) {
            out.collect((Object)new Tuple3(keysWithEdge.f0, keysWithEdge.f2, neighbor));
        }
    }

    @FunctionAnnotation.ForwardedFieldsFirst(value={"f0"})
    @FunctionAnnotation.ForwardedFieldsSecond(value={"f1"})
    private static final class ProjectNeighborValue<K, VV, EV>
    implements FlatJoinFunction<Tuple3<K, K, Edge<K, EV>>, Vertex<K, VV>, Tuple2<K, VV>> {
        private ProjectNeighborValue() {
        }

        public void join(Tuple3<K, K, Edge<K, EV>> keysWithEdge, Vertex<K, VV> neighbor, Collector<Tuple2<K, VV>> out) {
            out.collect((Object)new Tuple2(keysWithEdge.f0, neighbor.getValue()));
        }
    }

    private static final class ProjectVertexIdJoin<K, VV, EV>
    implements FlatJoinFunction<Edge<K, EV>, Vertex<K, VV>, Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> {
        private int fieldPosition;

        public ProjectVertexIdJoin(int position) {
            this.fieldPosition = position;
        }

        public void join(Edge<K, EV> edge, Vertex<K, VV> otherVertex, Collector<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> out) {
            out.collect((Object)new Tuple3(edge.getField(this.fieldPosition), edge, otherVertex));
        }
    }

    @FunctionAnnotation.ForwardedFieldsSecond(value={"f1"})
    private static final class ProjectVertexWithNeighborValueJoin<K, VV, EV>
    implements FlatJoinFunction<Edge<K, EV>, Vertex<K, VV>, Tuple2<K, VV>> {
        private int fieldPosition;

        public ProjectVertexWithNeighborValueJoin(int position) {
            this.fieldPosition = position;
        }

        public void join(Edge<K, EV> edge, Vertex<K, VV> otherVertex, Collector<Tuple2<K, VV>> out) {
            out.collect((Object)new Tuple2(edge.getField(this.fieldPosition), otherVertex.getValue()));
        }
    }

    private static final class ApplyNeighborGroupReduceFunction<K, VV, EV, T>
    implements GroupReduceFunction<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>, T>,
    ResultTypeQueryable<T> {
        private NeighborsFunction<K, VV, EV, T> function;

        public ApplyNeighborGroupReduceFunction(NeighborsFunction<K, VV, EV, T> fun) {
            this.function = fun;
        }

        public void reduce(Iterable<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edges, Collector<T> out) throws Exception {
            this.function.iterateNeighbors(edges, out);
        }

        public TypeInformation<T> getProducedType() {
            return TypeExtractor.createTypeInfo(NeighborsFunction.class, this.function.getClass(), (int)3, null, null);
        }
    }

    private static final class MatchingEdgeReducer<K, EV>
    implements CoGroupFunction<Edge<K, EV>, Edge<K, EV>, Edge<K, EV>> {
        private MatchingEdgeReducer() {
        }

        public void coGroup(Iterable<Edge<K, EV>> edgesLeft, Iterable<Edge<K, EV>> edgesRight, Collector<Edge<K, EV>> out) throws Exception {
            Iterator<Edge<K, EV>> leftIt = edgesLeft.iterator();
            Iterator<Edge<K, EV>> rightIt = edgesRight.iterator();
            while (leftIt.hasNext() && rightIt.hasNext()) {
                out.collect(leftIt.next());
                out.collect(rightIt.next());
            }
        }
    }

    private static final class EdgeRemovalCoGroup<K, EV>
    implements CoGroupFunction<Edge<K, EV>, Edge<K, EV>, Edge<K, EV>> {
        private EdgeRemovalCoGroup() {
        }

        public void coGroup(Iterable<Edge<K, EV>> edge, Iterable<Edge<K, EV>> edgeToBeRemoved, Collector<Edge<K, EV>> out) throws Exception {
            if (!edgeToBeRemoved.iterator().hasNext()) {
                for (Edge<K, EV> next : edge) {
                    out.collect(next);
                }
            }
        }
    }

    private static final class EdgeRemovalEdgeFilter<K, EV>
    implements FilterFunction<Edge<K, EV>> {
        private Edge<K, EV> edgeToRemove;

        public EdgeRemovalEdgeFilter(Edge<K, EV> edge) {
            this.edgeToRemove = edge;
        }

        public boolean filter(Edge<K, EV> edge) {
            return !edge.f0.equals(this.edgeToRemove.f0) || !edge.f1.equals(this.edgeToRemove.f1);
        }
    }

    @FunctionAnnotation.ForwardedFieldsSecond(value={"f0; f1; f2"})
    private static final class ProjectEdgeToBeRemoved<K, VV, EV>
    implements JoinFunction<Vertex<K, VV>, Edge<K, EV>, Edge<K, EV>> {
        private ProjectEdgeToBeRemoved() {
        }

        public Edge<K, EV> join(Vertex<K, VV> vertex, Edge<K, EV> edge) throws Exception {
            return edge;
        }
    }

    private static final class VerticesRemovalCoGroup<K, VV>
    implements CoGroupFunction<Vertex<K, VV>, Vertex<K, VV>, Vertex<K, VV>> {
        private VerticesRemovalCoGroup() {
        }

        public void coGroup(Iterable<Vertex<K, VV>> vertex, Iterable<Vertex<K, VV>> vertexToBeRemoved, Collector<Vertex<K, VV>> out) throws Exception {
            Iterator<Vertex<K, VV>> vertexIterator = vertex.iterator();
            Iterator<Vertex<K, VV>> vertexToBeRemovedIterator = vertexToBeRemoved.iterator();
            if (vertexIterator.hasNext() && !vertexToBeRemovedIterator.hasNext()) {
                Vertex<K, VV> next = vertexIterator.next();
                out.collect(next);
            }
        }
    }

    @FunctionAnnotation.ForwardedFieldsFirst(value={"f0; f1; f2"})
    private static final class JoinWithVerticesOnTrg<K, VV, EV>
    implements JoinFunction<Edge<K, EV>, Vertex<K, VV>, Edge<K, EV>> {
        private JoinWithVerticesOnTrg() {
        }

        public Edge<K, EV> join(Edge<K, EV> edge, Vertex<K, VV> vertex) throws Exception {
            return edge;
        }
    }

    @FunctionAnnotation.ForwardedFieldsSecond(value={"f0; f1; f2"})
    private static final class JoinVerticesWithEdgesOnSrc<K, VV, EV>
    implements JoinFunction<Vertex<K, VV>, Edge<K, EV>, Edge<K, EV>> {
        private JoinVerticesWithEdgesOnSrc() {
        }

        public Edge<K, EV> join(Vertex<K, VV> vertex, Edge<K, EV> edge) throws Exception {
            return edge;
        }
    }

    private static final class VerticesUnionCoGroup<K, VV>
    implements CoGroupFunction<Vertex<K, VV>, Vertex<K, VV>, Vertex<K, VV>> {
        private VerticesUnionCoGroup() {
        }

        public void coGroup(Iterable<Vertex<K, VV>> oldVertices, Iterable<Vertex<K, VV>> newVertices, Collector<Vertex<K, VV>> out) throws Exception {
            Iterator<Vertex<K, VV>> oldVerticesIterator = oldVertices.iterator();
            Iterator<Vertex<K, VV>> newVerticesIterator = newVertices.iterator();
            if (oldVerticesIterator.hasNext()) {
                out.collect(oldVerticesIterator.next());
            } else {
                out.collect(newVerticesIterator.next());
            }
        }
    }

    @FunctionAnnotation.ForwardedFields(value={"f0; f1"})
    private static final class ExtractEdgeIDsMapper<K, EV>
    implements MapFunction<Edge<K, EV>, Tuple2<K, K>> {
        private ExtractEdgeIDsMapper() {
        }

        public Tuple2<K, K> map(Edge<K, EV> edge) throws Exception {
            return new Tuple2(edge.f0, edge.f1);
        }
    }

    private static final class ExtractVertexIDMapper<K, VV>
    implements MapFunction<Vertex<K, VV>, K> {
        private ExtractVertexIDMapper() {
        }

        public K map(Vertex<K, VV> vertex) {
            return (K)vertex.f0;
        }
    }

    private static final class RegularAndReversedEdgesMap<K, EV>
    implements FlatMapFunction<Edge<K, EV>, Edge<K, EV>> {
        public Edge<K, EV> output = new Edge();

        private RegularAndReversedEdgesMap() {
        }

        public void flatMap(Edge<K, EV> edge, Collector<Edge<K, EV>> out) throws Exception {
            out.collect(edge);
            this.output.setFields(edge.f1, edge.f0, edge.f2);
            out.collect(this.output);
        }
    }

    @FunctionAnnotation.ForwardedFields(value={"f0->f1; f1->f0; f2"})
    private static final class ReverseEdgesMap<K, EV>
    implements MapFunction<Edge<K, EV>, Edge<K, EV>> {
        public Edge<K, EV> output = new Edge();

        private ReverseEdgesMap() {
        }

        public Edge<K, EV> map(Edge<K, EV> edge) {
            this.output.setFields(edge.f1, edge.f0, edge.f2);
            return this.output;
        }
    }

    private static final class ApplyCoGroupFunctionOnAllEdges<K, VV, EV, T>
    implements CoGroupFunction<Vertex<K, VV>, Tuple2<K, Edge<K, EV>>, T>,
    ResultTypeQueryable<T> {
        private EdgesFunctionWithVertexValue<K, VV, EV, T> function;

        public ApplyCoGroupFunctionOnAllEdges(EdgesFunctionWithVertexValue<K, VV, EV, T> fun) {
            this.function = fun;
        }

        public void coGroup(Iterable<Vertex<K, VV>> vertex, final Iterable<Tuple2<K, Edge<K, EV>>> keysWithEdges, Collector<T> out) throws Exception {
            final Iterator edgesIterator = new Iterator<Edge<K, EV>>(){
                final Iterator<Tuple2<K, Edge<K, EV>>> keysWithEdgesIterator;
                {
                    this.keysWithEdgesIterator = keysWithEdges.iterator();
                }

                @Override
                public boolean hasNext() {
                    return this.keysWithEdgesIterator.hasNext();
                }

                @Override
                public Edge<K, EV> next() {
                    return (Edge)((Object)this.keysWithEdgesIterator.next().f1);
                }

                @Override
                public void remove() {
                    this.keysWithEdgesIterator.remove();
                }
            };
            Iterable edgesIterable = new Iterable<Edge<K, EV>>(){

                @Override
                public Iterator<Edge<K, EV>> iterator() {
                    return edgesIterator;
                }
            };
            Iterator<Vertex<K, VV>> vertexIterator = vertex.iterator();
            if (!vertexIterator.hasNext()) {
                throw new NoSuchElementException("The edge src/trg id could not be found within the vertexIds");
            }
            this.function.iterateEdges(vertexIterator.next(), edgesIterable, out);
        }

        public TypeInformation<T> getProducedType() {
            return TypeExtractor.createTypeInfo(EdgesFunctionWithVertexValue.class, this.function.getClass(), (int)3, null, null);
        }
    }

    private static final class ApplyCoGroupFunction<K, VV, EV, T>
    implements CoGroupFunction<Vertex<K, VV>, Edge<K, EV>, T>,
    ResultTypeQueryable<T> {
        private EdgesFunctionWithVertexValue<K, VV, EV, T> function;

        public ApplyCoGroupFunction(EdgesFunctionWithVertexValue<K, VV, EV, T> fun) {
            this.function = fun;
        }

        public void coGroup(Iterable<Vertex<K, VV>> vertex, Iterable<Edge<K, EV>> edges, Collector<T> out) throws Exception {
            Iterator<Vertex<K, VV>> vertexIterator = vertex.iterator();
            if (!vertexIterator.hasNext()) {
                throw new NoSuchElementException("The edge src/trg id could not be found within the vertexIds");
            }
            this.function.iterateEdges(vertexIterator.next(), edges, out);
        }

        public TypeInformation<T> getProducedType() {
            return TypeExtractor.createTypeInfo(EdgesFunctionWithVertexValue.class, this.function.getClass(), (int)3, null, null);
        }
    }

    private static final class EmitOneEdgeWithNeighborPerNode<K, EV>
    implements FlatMapFunction<Edge<K, EV>, Tuple3<K, K, Edge<K, EV>>> {
        private EmitOneEdgeWithNeighborPerNode() {
        }

        public void flatMap(Edge<K, EV> edge, Collector<Tuple3<K, K, Edge<K, EV>>> out) {
            out.collect((Object)new Tuple3(edge.getSource(), edge.getTarget(), edge));
            out.collect((Object)new Tuple3(edge.getTarget(), edge.getSource(), edge));
        }
    }

    private static final class EmitOneVertexWithEdgeValuePerNode<K, EV>
    implements FlatMapFunction<Edge<K, EV>, Tuple2<K, EV>> {
        private EmitOneVertexWithEdgeValuePerNode() {
        }

        public void flatMap(Edge<K, EV> edge, Collector<Tuple2<K, EV>> out) {
            out.collect((Object)new Tuple2(edge.getSource(), edge.getValue()));
            out.collect((Object)new Tuple2(edge.getTarget(), edge.getValue()));
        }
    }

    private static final class EmitOneEdgePerNode<K, EV>
    implements FlatMapFunction<Edge<K, EV>, Tuple2<K, Edge<K, EV>>> {
        private EmitOneEdgePerNode() {
        }

        public void flatMap(Edge<K, EV> edge, Collector<Tuple2<K, Edge<K, EV>>> out) {
            out.collect((Object)new Tuple2(edge.getSource(), edge));
            out.collect((Object)new Tuple2(edge.getTarget(), edge));
        }
    }

    private static final class ApplyGroupReduceFunction<K, EV, T>
    implements GroupReduceFunction<Tuple2<K, Edge<K, EV>>, T> {
        private EdgesFunction<K, EV, T> function;

        public ApplyGroupReduceFunction(EdgesFunction<K, EV, T> fun) {
            this.function = fun;
        }

        public void reduce(Iterable<Tuple2<K, Edge<K, EV>>> edges, Collector<T> out) throws Exception {
            this.function.iterateEdges(edges, out);
        }
    }

    private static final class ProjectVertexWithEdgeValueMap<K, EV>
    implements MapFunction<Edge<K, EV>, Tuple2<K, EV>> {
        private int fieldPosition;

        public ProjectVertexWithEdgeValueMap(int position) {
            this.fieldPosition = position;
        }

        public Tuple2<K, EV> map(Edge<K, EV> edge) {
            return new Tuple2(edge.getField(this.fieldPosition), edge.getValue());
        }
    }

    private static final class ProjectVertexIdMap<K, EV>
    implements MapFunction<Edge<K, EV>, Tuple2<K, Edge<K, EV>>> {
        private int fieldPosition;

        public ProjectVertexIdMap(int position) {
            this.fieldPosition = position;
        }

        public Tuple2<K, Edge<K, EV>> map(Edge<K, EV> edge) {
            return new Tuple2(edge.getField(this.fieldPosition), edge);
        }
    }

    private static final class CountNeighborsCoGroup<K, VV, EV>
    implements CoGroupFunction<Vertex<K, VV>, Edge<K, EV>, Tuple2<K, LongValue>> {
        private LongValue degree = new LongValue();
        private Tuple2<K, LongValue> vertexDegree = new Tuple2(null, (Object)this.degree);

        private CountNeighborsCoGroup() {
        }

        public void coGroup(Iterable<Vertex<K, VV>> vertex, Iterable<Edge<K, EV>> outEdges, Collector<Tuple2<K, LongValue>> out) {
            long count = 0L;
            for (Edge<K, EV> edge : outEdges) {
                ++count;
            }
            this.degree.setValue(count);
            Iterator<Vertex<K, VV>> vertexIterator = vertex.iterator();
            if (!vertexIterator.hasNext()) {
                throw new NoSuchElementException("The edge src/trg id could not be found within the vertexIds");
            }
            this.vertexDegree.f0 = vertexIterator.next().f0;
            out.collect(this.vertexDegree);
        }
    }

    @FunctionAnnotation.ForwardedFieldsFirst(value={"f0; f1; f2"})
    private static final class ProjectEdge<K, VV, EV>
    implements FlatJoinFunction<Edge<K, EV>, Vertex<K, VV>, Edge<K, EV>> {
        private ProjectEdge() {
        }

        public void join(Edge<K, EV> first, Vertex<K, VV> second, Collector<Edge<K, EV>> out) {
            out.collect(first);
        }
    }

    private static final class ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K, EV, T>
    implements CoGroupFunction<Edge<K, EV>, Tuple2<K, T>, Edge<K, EV>> {
        private Edge<K, EV> output = new Edge();
        private EdgeJoinFunction<EV, T> edgeJoinFunction;

        public ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget(EdgeJoinFunction<EV, T> mapper) {
            this.edgeJoinFunction = mapper;
        }

        public void coGroup(Iterable<Edge<K, EV>> edges, Iterable<Tuple2<K, T>> input, Collector<Edge<K, EV>> collector) throws Exception {
            Iterator<Edge<K, EV>> edgesIterator = edges.iterator();
            Iterator<Tuple2<K, T>> inputIterator = input.iterator();
            if (inputIterator.hasNext()) {
                Tuple2<K, T> inputNext = inputIterator.next();
                while (edgesIterator.hasNext()) {
                    Edge<K, EV> edgesNext = edgesIterator.next();
                    this.output.f0 = edgesNext.f0;
                    this.output.f1 = edgesNext.f1;
                    this.output.f2 = this.edgeJoinFunction.edgeJoin(edgesNext.f2, inputNext.f1);
                    collector.collect(this.output);
                }
            } else {
                while (edgesIterator.hasNext()) {
                    collector.collect(edgesIterator.next());
                }
            }
        }
    }

    private static final class ApplyCoGroupToEdgeValues<K, EV, T>
    implements CoGroupFunction<Edge<K, EV>, Tuple3<K, K, T>, Edge<K, EV>> {
        private Edge<K, EV> output = new Edge();
        private EdgeJoinFunction<EV, T> edgeJoinFunction;

        public ApplyCoGroupToEdgeValues(EdgeJoinFunction<EV, T> mapper) {
            this.edgeJoinFunction = mapper;
        }

        public void coGroup(Iterable<Edge<K, EV>> edges, Iterable<Tuple3<K, K, T>> input, Collector<Edge<K, EV>> collector) throws Exception {
            Iterator<Edge<K, EV>> edgesIterator = edges.iterator();
            Iterator<Tuple3<K, K, T>> inputIterator = input.iterator();
            if (edgesIterator.hasNext()) {
                if (inputIterator.hasNext()) {
                    Tuple3<K, K, T> inputNext = inputIterator.next();
                    this.output.f0 = inputNext.f0;
                    this.output.f1 = inputNext.f1;
                    this.output.f2 = this.edgeJoinFunction.edgeJoin(edgesIterator.next().f2, inputNext.f2);
                    collector.collect(this.output);
                } else {
                    collector.collect(edgesIterator.next());
                }
            }
        }
    }

    private static final class ApplyCoGroupToVertexValues<K, VV, T>
    implements CoGroupFunction<Vertex<K, VV>, Tuple2<K, T>, Vertex<K, VV>> {
        private VertexJoinFunction<VV, T> vertexJoinFunction;

        public ApplyCoGroupToVertexValues(VertexJoinFunction<VV, T> mapper) {
            this.vertexJoinFunction = mapper;
        }

        public void coGroup(Iterable<Vertex<K, VV>> vertices, Iterable<Tuple2<K, T>> input, Collector<Vertex<K, VV>> collector) throws Exception {
            Iterator<Vertex<K, VV>> vertexIterator = vertices.iterator();
            Iterator<Tuple2<K, T>> inputIterator = input.iterator();
            if (vertexIterator.hasNext()) {
                if (inputIterator.hasNext()) {
                    Tuple2<K, T> inputNext = inputIterator.next();
                    collector.collect(new Vertex<Object, Object>(inputNext.f0, this.vertexJoinFunction.vertexJoin(vertexIterator.next().f1, inputNext.f1)));
                } else {
                    collector.collect(vertexIterator.next());
                }
            }
        }
    }

    @FunctionAnnotation.ForwardedFieldsFirst(value={"f0; f1; f2; f3->f4"})
    @FunctionAnnotation.ForwardedFieldsSecond(value={"f1->f3"})
    private static final class ProjectEdgeWithVertexValues<K, VV, EV>
    implements FlatJoinFunction<Tuple4<K, K, VV, EV>, Vertex<K, VV>, Triplet<K, VV, EV>> {
        private ProjectEdgeWithVertexValues() {
        }

        public void join(Tuple4<K, K, VV, EV> tripletWithSrcValSet, Vertex<K, VV> vertex, Collector<Triplet<K, VV, EV>> collector) throws Exception {
            collector.collect(new Triplet<Object, Object, Object>(tripletWithSrcValSet.f0, tripletWithSrcValSet.f1, tripletWithSrcValSet.f2, vertex.getValue(), tripletWithSrcValSet.f3));
        }
    }

    @FunctionAnnotation.ForwardedFieldsFirst(value={"f1->f2"})
    @FunctionAnnotation.ForwardedFieldsSecond(value={"f0; f1; f2->f3"})
    private static final class ProjectEdgeWithSrcValue<K, VV, EV>
    implements FlatJoinFunction<Vertex<K, VV>, Edge<K, EV>, Tuple4<K, K, VV, EV>> {
        private ProjectEdgeWithSrcValue() {
        }

        public void join(Vertex<K, VV> vertex, Edge<K, EV> edge, Collector<Tuple4<K, K, VV, EV>> collector) throws Exception {
            collector.collect((Object)new Tuple4(edge.getSource(), edge.getTarget(), vertex.getValue(), edge.getValue()));
        }
    }

    private static final class EmitSrcAndTargetAsTuple1<K, EV>
    implements FlatMapFunction<Edge<K, EV>, Tuple1<K>> {
        private Tuple1<K> output = new Tuple1();

        private EmitSrcAndTargetAsTuple1() {
        }

        public void flatMap(Edge<K, EV> edge, Collector<Tuple1<K>> out) {
            this.output.f0 = edge.f0;
            out.collect(this.output);
            this.output.f0 = edge.f1;
            out.collect(this.output);
        }
    }

    private static final class EmitSrcAndTarget<K, EV>
    implements FlatMapFunction<Edge<K, EV>, Vertex<K, NullValue>> {
        private Vertex<K, NullValue> output = new Vertex<Object, NullValue>(null, NullValue.getInstance());

        private EmitSrcAndTarget() {
        }

        public void flatMap(Edge<K, EV> edge, Collector<Vertex<K, NullValue>> out) {
            this.output.f0 = edge.f0;
            out.collect(this.output);
            this.output.f0 = edge.f1;
            out.collect(this.output);
        }
    }
}

