package org.apache.flink.graph.spargel;

import java.util.Iterator;
import java.util.Map;
import org.apache.flink.api.common.aggregators.Aggregator;
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichCoGroupFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.CoGroupOperator;
import org.apache.flink.api.java.operators.CustomUnaryOperation;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.operators.TwoInputUdfOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.EdgeDirection;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.apache.flink.util.Collector;

/* loaded from: input_file:org/apache/flink/graph/spargel/ScatterGatherIteration.class */
public class ScatterGatherIteration<K, VV, Message, EV> implements CustomUnaryOperation<Vertex<K, VV>, Vertex<K, VV>> {
    private final VertexUpdateFunction<K, VV, Message> updateFunction;
    private final MessagingFunction<K, VV, Message, EV> messagingFunction;
    private final DataSet<Edge<K, EV>> edgesWithValue;
    private final int maximumNumberOfIterations;
    private final TypeInformation<Message> messageType;
    private DataSet<Vertex<K, VV>> initialVertices;
    private ScatterGatherConfiguration configuration;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/graph/spargel/ScatterGatherIteration$MessagingUdfWithEVsSimpleVV.class */
    public static final class MessagingUdfWithEVsSimpleVV<K, VV, Message, EV> extends MessagingUdfWithEdgeValues<K, VV, VV, Message, EV> {
        private MessagingUdfWithEVsSimpleVV(MessagingFunction<K, VV, Message, EV> messagingFunction, TypeInformation<Tuple2<K, Message>> typeInformation) {
            super(messagingFunction, typeInformation);
        }

        public void coGroup(Iterable<Edge<K, EV>> iterable, Iterable<Vertex<K, VV>> iterable2, Collector<Tuple2<K, Message>> collector) throws Exception {
            Iterator<Vertex<K, VV>> it = iterable2.iterator();
            if (it.hasNext()) {
                Vertex<K, VV> next = it.next();
                this.messagingFunction.set(iterable.iterator(), collector, next.getId());
                this.messagingFunction.sendMessages(next);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/graph/spargel/ScatterGatherIteration$MessagingUdfWithEVsVVWithDegrees.class */
    public static final class MessagingUdfWithEVsVVWithDegrees<K, VV, Message, EV> extends MessagingUdfWithEdgeValues<K, Tuple3<VV, Long, Long>, VV, Message, EV> {
        private Vertex<K, VV> nextVertex;

        private MessagingUdfWithEVsVVWithDegrees(MessagingFunction<K, VV, Message, EV> messagingFunction, TypeInformation<Tuple2<K, Message>> typeInformation) {
            super(messagingFunction, typeInformation);
            this.nextVertex = new Vertex<>();
        }

        public void coGroup(Iterable<Edge<K, EV>> iterable, Iterable<Vertex<K, Tuple3<VV, Long, Long>>> iterable2, Collector<Tuple2<K, Message>> collector) throws Exception {
            Iterator<Vertex<K, Tuple3<VV, Long, Long>>> it = iterable2.iterator();
            if (it.hasNext()) {
                Vertex<K, Tuple3<VV, Long, Long>> next = it.next();
                this.nextVertex.setField(next.f0, 0);
                this.nextVertex.setField(((Tuple3) next.f1).f0, 1);
                this.messagingFunction.setInDegree(((Long) ((Tuple3) next.f1).f1).longValue());
                this.messagingFunction.setOutDegree(((Long) ((Tuple3) next.f1).f2).longValue());
                this.messagingFunction.set(iterable.iterator(), collector, next.getId());
                this.messagingFunction.sendMessages(this.nextVertex);
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/graph/spargel/ScatterGatherIteration$MessagingUdfWithEdgeValues.class */
    private static abstract class MessagingUdfWithEdgeValues<K, VVWithDegrees, VV, Message, EV> extends RichCoGroupFunction<Edge<K, EV>, Vertex<K, VVWithDegrees>, Tuple2<K, Message>> implements ResultTypeQueryable<Tuple2<K, Message>> {
        private static final long serialVersionUID = 1;
        final MessagingFunction<K, VV, Message, EV> messagingFunction;
        private transient TypeInformation<Tuple2<K, Message>> resultType;

        private MessagingUdfWithEdgeValues(MessagingFunction<K, VV, Message, EV> messagingFunction, TypeInformation<Tuple2<K, Message>> typeInformation) {
            this.messagingFunction = messagingFunction;
            this.resultType = typeInformation;
        }

        public void open(Configuration configuration) throws Exception {
            if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
                this.messagingFunction.init(getIterationRuntimeContext());
            }
            this.messagingFunction.preSuperstep();
        }

        public void close() throws Exception {
            this.messagingFunction.postSuperstep();
        }

        public TypeInformation<Tuple2<K, Message>> getProducedType() {
            return this.resultType;
        }
    }

    /* loaded from: input_file:org/apache/flink/graph/spargel/ScatterGatherIteration$VertexUpdateUdf.class */
    private static abstract class VertexUpdateUdf<K, VVWithDegrees, Message> extends RichCoGroupFunction<Tuple2<K, Message>, Vertex<K, VVWithDegrees>, Vertex<K, VVWithDegrees>> implements ResultTypeQueryable<Vertex<K, VVWithDegrees>> {
        private static final long serialVersionUID = 1;
        final VertexUpdateFunction<K, VVWithDegrees, Message> vertexUpdateFunction;
        final MessageIterator<Message> messageIter;
        private transient TypeInformation<Vertex<K, VVWithDegrees>> resultType;

        private VertexUpdateUdf(VertexUpdateFunction<K, VVWithDegrees, Message> vertexUpdateFunction, TypeInformation<Vertex<K, VVWithDegrees>> typeInformation) {
            this.messageIter = new MessageIterator<>();
            this.vertexUpdateFunction = vertexUpdateFunction;
            this.resultType = typeInformation;
        }

        public void open(Configuration configuration) throws Exception {
            if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
                this.vertexUpdateFunction.init(getIterationRuntimeContext());
            }
            this.vertexUpdateFunction.preSuperstep();
        }

        public void close() throws Exception {
            this.vertexUpdateFunction.postSuperstep();
        }

        public TypeInformation<Vertex<K, VVWithDegrees>> getProducedType() {
            return this.resultType;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/graph/spargel/ScatterGatherIteration$VertexUpdateUdfSimpleVV.class */
    public static final class VertexUpdateUdfSimpleVV<K, VV, Message> extends VertexUpdateUdf<K, VV, Message> {
        private VertexUpdateUdfSimpleVV(VertexUpdateFunction<K, VV, Message> vertexUpdateFunction, TypeInformation<Vertex<K, VV>> typeInformation) {
            super(vertexUpdateFunction, typeInformation);
        }

        public void coGroup(Iterable<Tuple2<K, Message>> iterable, Iterable<Vertex<K, VV>> iterable2, Collector<Vertex<K, VV>> collector) throws Exception {
            Iterator<Vertex<K, VV>> it = iterable2.iterator();
            if (it.hasNext()) {
                Vertex<K, VV> next = it.next();
                this.messageIter.setSource(iterable.iterator());
                this.vertexUpdateFunction.setOutput(next, collector);
                this.vertexUpdateFunction.updateVertex(next, this.messageIter);
                return;
            }
            Iterator<Tuple2<K, Message>> it2 = iterable.iterator();
            if (!it2.hasNext()) {
                throw new Exception();
            }
            String str = "Target vertex does not exist!.";
            try {
                str = "Target vertex '" + it2.next().f0 + "' does not exist!.";
            } catch (Throwable th) {
            }
            throw new Exception(str);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/graph/spargel/ScatterGatherIteration$VertexUpdateUdfVVWithDegrees.class */
    public static final class VertexUpdateUdfVVWithDegrees<K, VV, Message> extends VertexUpdateUdf<K, Tuple3<VV, Long, Long>, Message> {
        private VertexUpdateUdfVVWithDegrees(VertexUpdateFunction<K, Tuple3<VV, Long, Long>, Message> vertexUpdateFunction, TypeInformation<Vertex<K, Tuple3<VV, Long, Long>>> typeInformation) {
            super(vertexUpdateFunction, typeInformation);
        }

        public void coGroup(Iterable<Tuple2<K, Message>> iterable, Iterable<Vertex<K, Tuple3<VV, Long, Long>>> iterable2, Collector<Vertex<K, Tuple3<VV, Long, Long>>> collector) throws Exception {
            Iterator<Vertex<K, Tuple3<VV, Long, Long>>> it = iterable2.iterator();
            if (!it.hasNext()) {
                Iterator<Tuple2<K, Message>> it2 = iterable.iterator();
                if (!it2.hasNext()) {
                    throw new Exception();
                }
                String str = "Target vertex does not exist!.";
                try {
                    str = "Target vertex '" + it2.next().f0 + "' does not exist!.";
                } catch (Throwable th) {
                }
                throw new Exception(str);
            }
            Vertex<K, Tuple3<VV, Long, Long>> next = it.next();
            this.messageIter.setSource(iterable.iterator());
            this.vertexUpdateFunction.setInDegree(((Long) ((Tuple3) next.f1).f1).longValue());
            this.vertexUpdateFunction.setOutDegree(((Long) ((Tuple3) next.f1).f2).longValue());
            this.vertexUpdateFunction.setOutputWithDegrees(next, collector);
            this.vertexUpdateFunction.updateVertexFromScatterGatherIteration(next, this.messageIter);
        }
    }

    private ScatterGatherIteration(VertexUpdateFunction<K, VV, Message> vertexUpdateFunction, MessagingFunction<K, VV, Message, EV> messagingFunction, DataSet<Edge<K, EV>> dataSet, int i) {
        Preconditions.checkNotNull(vertexUpdateFunction);
        Preconditions.checkNotNull(messagingFunction);
        Preconditions.checkNotNull(dataSet);
        Preconditions.checkArgument(i > 0, "The maximum number of iterations must be at least one.");
        this.updateFunction = vertexUpdateFunction;
        this.messagingFunction = messagingFunction;
        this.edgesWithValue = dataSet;
        this.maximumNumberOfIterations = i;
        this.messageType = getMessageType(messagingFunction);
    }

    private TypeInformation<Message> getMessageType(MessagingFunction<K, VV, Message, EV> messagingFunction) {
        return TypeExtractor.createTypeInfo(messagingFunction, MessagingFunction.class, messagingFunction.getClass(), 2);
    }

    public void setInput(DataSet<Vertex<K, VV>> dataSet) {
        this.initialVertices = dataSet;
    }

    public DataSet<Vertex<K, VV>> createResult() {
        if (this.initialVertices == null) {
            throw new IllegalStateException("The input data set has not been set.");
        }
        TupleTypeInfo tupleTypeInfo = new TupleTypeInfo(new TypeInformation[]{this.initialVertices.getType().getTypeAt(0), this.messageType});
        Graph<K, VV, EV> fromDataSet = Graph.fromDataSet(this.initialVertices, this.edgesWithValue, this.initialVertices.getExecutionEnvironment());
        if (this.configuration != null && this.configuration.isOptNumVertices()) {
            try {
                long numberOfVertices = fromDataSet.numberOfVertices();
                this.messagingFunction.setNumberOfVertices(numberOfVertices);
                this.updateFunction.setNumberOfVertices(numberOfVertices);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        if (this.configuration != null) {
            this.messagingFunction.setDirection(this.configuration.getDirection());
        } else {
            this.messagingFunction.setDirection(EdgeDirection.OUT);
        }
        EdgeDirection direction = this.messagingFunction.getDirection();
        return (this.configuration == null || !this.configuration.isOptDegrees()) ? createResultSimpleVertex(direction, tupleTypeInfo) : createResultVerticesWithDegrees(fromDataSet, direction, tupleTypeInfo);
    }

    public static final <K, VV, Message, EV> ScatterGatherIteration<K, VV, Message, EV> withEdges(DataSet<Edge<K, EV>> dataSet, VertexUpdateFunction<K, VV, Message> vertexUpdateFunction, MessagingFunction<K, VV, Message, EV> messagingFunction, int i) {
        return new ScatterGatherIteration<>(vertexUpdateFunction, messagingFunction, dataSet, i);
    }

    public void configure(ScatterGatherConfiguration scatterGatherConfiguration) {
        this.configuration = scatterGatherConfiguration;
    }

    public ScatterGatherConfiguration getIterationConfiguration() {
        return this.configuration;
    }

    private CoGroupOperator<?, ?, Tuple2<K, Message>> buildMessagingFunction(DeltaIteration<Vertex<K, VV>, Vertex<K, VV>> deltaIteration, TypeInformation<Tuple2<K, Message>> typeInformation, int i, int i2) {
        CoGroupOperator<?, ?, Tuple2<K, Message>> name = this.edgesWithValue.coGroup(deltaIteration.getWorkset()).where(new int[]{i}).equalTo(new int[]{i2}).with(new MessagingUdfWithEVsSimpleVV(this.messagingFunction, typeInformation)).name("Messaging");
        if (this.configuration != null) {
            for (Tuple2<String, DataSet<?>> tuple2 : this.configuration.getMessagingBcastVars()) {
                name = (CoGroupOperator) name.withBroadcastSet((DataSet) tuple2.f1, (String) tuple2.f0);
            }
        }
        return name;
    }

    private CoGroupOperator<?, ?, Tuple2<K, Message>> buildMessagingFunctionVerticesWithDegrees(DeltaIteration<Vertex<K, Tuple3<VV, Long, Long>>, Vertex<K, Tuple3<VV, Long, Long>>> deltaIteration, TypeInformation<Tuple2<K, Message>> typeInformation, int i, int i2) {
        CoGroupOperator<?, ?, Tuple2<K, Message>> name = this.edgesWithValue.coGroup(deltaIteration.getWorkset()).where(new int[]{i}).equalTo(new int[]{i2}).with(new MessagingUdfWithEVsVVWithDegrees(this.messagingFunction, typeInformation)).name("Messaging");
        if (this.configuration != null) {
            for (Tuple2<String, DataSet<?>> tuple2 : this.configuration.getMessagingBcastVars()) {
                name = (CoGroupOperator) name.withBroadcastSet((DataSet) tuple2.f1, (String) tuple2.f0);
            }
        }
        return name;
    }

    private void setUpIteration(DeltaIteration<?, ?> deltaIteration) {
        if (this.configuration == null) {
            deltaIteration.name("Scatter-gather iteration (" + this.updateFunction + " | " + this.messagingFunction + ")");
            return;
        }
        deltaIteration.name(this.configuration.getName("Scatter-gather iteration (" + this.updateFunction + " | " + this.messagingFunction + ")"));
        deltaIteration.parallelism(this.configuration.getParallelism());
        deltaIteration.setSolutionSetUnManaged(this.configuration.isSolutionSetUnmanagedMemory());
        for (Map.Entry<String, Aggregator<?>> entry : this.configuration.getAggregators().entrySet()) {
            deltaIteration.registerAggregator(entry.getKey(), entry.getValue());
        }
    }

    private DataSet<Vertex<K, VV>> createResultSimpleVertex(EdgeDirection edgeDirection, TypeInformation<Tuple2<K, Message>> typeInformation) {
        CoGroupOperator<?, ?, Tuple2<K, Message>> union;
        TypeInformation type = this.initialVertices.getType();
        DeltaIteration<?, ?> iterateDelta = this.initialVertices.iterateDelta(this.initialVertices, this.maximumNumberOfIterations, new int[]{0});
        setUpIteration(iterateDelta);
        switch (edgeDirection) {
            case IN:
                union = buildMessagingFunction(iterateDelta, typeInformation, 1, 0);
                break;
            case OUT:
                union = buildMessagingFunction(iterateDelta, typeInformation, 0, 0);
                break;
            case ALL:
                union = buildMessagingFunction(iterateDelta, typeInformation, 1, 0).union(buildMessagingFunction(iterateDelta, typeInformation, 0, 0));
                break;
            default:
                throw new IllegalArgumentException("Illegal edge direction");
        }
        CoGroupOperator<?, ?, Vertex<K, VVWithDegree>> with = union.coGroup(iterateDelta.getSolutionSet()).where(new int[]{0}).equalTo(new int[]{0}).with(new VertexUpdateUdfSimpleVV(this.updateFunction, type));
        configureUpdateFunction(with);
        return iterateDelta.closeWith(with, with);
    }

    private DataSet<Vertex<K, VV>> createResultVerticesWithDegrees(Graph<K, VV, EV> graph, EdgeDirection edgeDirection, TypeInformation<Tuple2<K, Message>> typeInformation) {
        CoGroupOperator<?, ?, Tuple2<K, Message>> union;
        this.updateFunction.setOptDegrees(this.configuration.isOptDegrees());
        TwoInputUdfOperator withForwardedFieldsFirst = this.initialVertices.join(graph.inDegrees().join(graph.outDegrees()).where(new int[]{0}).equalTo(new int[]{0}).with(new FlatJoinFunction<Tuple2<K, Long>, Tuple2<K, Long>, Tuple3<K, Long, Long>>() { // from class: org.apache.flink.graph.spargel.ScatterGatherIteration.1
            public void join(Tuple2<K, Long> tuple2, Tuple2<K, Long> tuple22, Collector<Tuple3<K, Long, Long>> collector) {
                collector.collect(new Tuple3(tuple2.f0, tuple2.f1, tuple22.f1));
            }
        }).withForwardedFieldsFirst(new String[]{"f0;f1"}).withForwardedFieldsSecond(new String[]{"f1"})).where(new int[]{0}).equalTo(new int[]{0}).with(new FlatJoinFunction<Vertex<K, VV>, Tuple3<K, Long, Long>, Vertex<K, Tuple3<VV, Long, Long>>>() { // from class: org.apache.flink.graph.spargel.ScatterGatherIteration.2
            public void join(Vertex<K, VV> vertex, Tuple3<K, Long, Long> tuple3, Collector<Vertex<K, Tuple3<VV, Long, Long>>> collector) throws Exception {
                collector.collect(new Vertex(vertex.getId(), new Tuple3(vertex.getValue(), tuple3.f1, tuple3.f2)));
            }
        }).withForwardedFieldsFirst(new String[]{"f0"});
        TypeInformation type = withForwardedFieldsFirst.getType();
        DeltaIteration<?, ?> iterateDelta = withForwardedFieldsFirst.iterateDelta(withForwardedFieldsFirst, this.maximumNumberOfIterations, new int[]{0});
        setUpIteration(iterateDelta);
        switch (edgeDirection) {
            case IN:
                union = buildMessagingFunctionVerticesWithDegrees(iterateDelta, typeInformation, 1, 0);
                break;
            case OUT:
                union = buildMessagingFunctionVerticesWithDegrees(iterateDelta, typeInformation, 0, 0);
                break;
            case ALL:
                union = buildMessagingFunctionVerticesWithDegrees(iterateDelta, typeInformation, 1, 0).union(buildMessagingFunctionVerticesWithDegrees(iterateDelta, typeInformation, 0, 0));
                break;
            default:
                throw new IllegalArgumentException("Illegal edge direction");
        }
        CoGroupOperator<?, ?, Vertex<K, VVWithDegree>> with = union.coGroup(iterateDelta.getSolutionSet()).where(new int[]{0}).equalTo(new int[]{0}).with(new VertexUpdateUdfVVWithDegrees(this.updateFunction, type));
        configureUpdateFunction(with);
        return iterateDelta.closeWith(with, with).map(new MapFunction<Vertex<K, Tuple3<VV, Long, Long>>, Vertex<K, VV>>() { // from class: org.apache.flink.graph.spargel.ScatterGatherIteration.3
            public Vertex<K, VV> map(Vertex<K, Tuple3<VV, Long, Long>> vertex) {
                return new Vertex<>(vertex.getId(), vertex.getValue().f0);
            }
        });
    }

    private <VVWithDegree> void configureUpdateFunction(CoGroupOperator<?, ?, Vertex<K, VVWithDegree>> coGroupOperator) {
        CoGroupOperator name = coGroupOperator.name("Vertex State Updates");
        if (this.configuration != null) {
            for (Tuple2<String, DataSet<?>> tuple2 : this.configuration.getUpdateBcastVars()) {
                name = (CoGroupOperator) name.withBroadcastSet((DataSet) tuple2.f1, (String) tuple2.f0);
            }
        }
        name.withForwardedFieldsFirst(new String[]{"0"}).withForwardedFieldsSecond(new String[]{"0"});
    }
}
