package org.apache.flink.spargel.java;

import java.lang.Comparable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.Validate;
import org.apache.flink.api.common.aggregators.Aggregator;
import org.apache.flink.api.common.functions.RichCoGroupFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.CoGroupOperator;
import org.apache.flink.api.java.operators.CustomUnaryOperation;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;

/* loaded from: input_file:org/apache/flink/spargel/java/VertexCentricIteration.class */
public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>, VertexValue, Message, EdgeValue> implements CustomUnaryOperation<Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, VertexValue>> {
    private final VertexUpdateFunction<VertexKey, VertexValue, Message> updateFunction;
    private final MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction;
    private final DataSet<Tuple2<VertexKey, VertexKey>> edgesWithoutValue;
    private final DataSet<Tuple3<VertexKey, VertexKey, EdgeValue>> edgesWithValue;
    private final Map<String, Aggregator<?>> aggregators;
    private final int maximumNumberOfIterations;
    private final TypeInformation<Message> messageType;
    private DataSet<Tuple2<VertexKey, VertexValue>> initialVertices;
    private String name;
    private boolean unmanagedSolutionSet;
    private final List<Tuple2<String, DataSet<?>>> bcVarsUpdate = new ArrayList(4);
    private final List<Tuple2<String, DataSet<?>>> bcVarsMessaging = new ArrayList(4);
    private int parallelism = -1;

    /* loaded from: input_file:org/apache/flink/spargel/java/VertexCentricIteration$MessagingUdfNoEdgeValues.class */
    private static final class MessagingUdfNoEdgeValues<VertexKey extends Comparable<VertexKey>, VertexValue, Message> extends RichCoGroupFunction<Tuple2<VertexKey, VertexKey>, Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, Message>> implements ResultTypeQueryable<Tuple2<VertexKey, Message>> {
        private static final long serialVersionUID = 1;
        private final MessagingFunction<VertexKey, VertexValue, Message, ?> messagingFunction;
        private transient TypeInformation<Tuple2<VertexKey, Message>> resultType;

        private MessagingUdfNoEdgeValues(MessagingFunction<VertexKey, VertexValue, Message, ?> messagingFunction, TypeInformation<Tuple2<VertexKey, Message>> typeInformation) {
            this.messagingFunction = messagingFunction;
            this.resultType = typeInformation;
        }

        /* JADX WARN: Multi-variable type inference failed */
        public void coGroup(Iterable<Tuple2<VertexKey, VertexKey>> iterable, Iterable<Tuple2<VertexKey, VertexValue>> iterable2, Collector<Tuple2<VertexKey, Message>> collector) throws Exception {
            Iterator<Tuple2<VertexKey, VertexValue>> it = iterable2.iterator();
            if (it.hasNext()) {
                Tuple2<VertexKey, VertexValue> next = it.next();
                this.messagingFunction.set(iterable.iterator(), collector);
                this.messagingFunction.sendMessages((Comparable) next.f0, next.f1);
            }
        }

        public void open(Configuration configuration) throws Exception {
            if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
                this.messagingFunction.init(getIterationRuntimeContext(), false);
            }
            this.messagingFunction.preSuperstep();
        }

        public void close() throws Exception {
            this.messagingFunction.postSuperstep();
        }

        public TypeInformation<Tuple2<VertexKey, Message>> getProducedType() {
            return this.resultType;
        }
    }

    /* loaded from: input_file:org/apache/flink/spargel/java/VertexCentricIteration$MessagingUdfWithEdgeValues.class */
    private static final class MessagingUdfWithEdgeValues<VertexKey extends Comparable<VertexKey>, VertexValue, Message, EdgeValue> extends RichCoGroupFunction<Tuple3<VertexKey, VertexKey, EdgeValue>, Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, Message>> implements ResultTypeQueryable<Tuple2<VertexKey, Message>> {
        private static final long serialVersionUID = 1;
        private final MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction;
        private transient TypeInformation<Tuple2<VertexKey, Message>> resultType;

        private MessagingUdfWithEdgeValues(MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction, TypeInformation<Tuple2<VertexKey, Message>> typeInformation) {
            this.messagingFunction = messagingFunction;
            this.resultType = typeInformation;
        }

        /* JADX WARN: Multi-variable type inference failed */
        public void coGroup(Iterable<Tuple3<VertexKey, VertexKey, EdgeValue>> iterable, Iterable<Tuple2<VertexKey, VertexValue>> iterable2, Collector<Tuple2<VertexKey, Message>> collector) throws Exception {
            Iterator<Tuple2<VertexKey, VertexValue>> it = iterable2.iterator();
            if (it.hasNext()) {
                Tuple2<VertexKey, VertexValue> next = it.next();
                this.messagingFunction.set(iterable.iterator(), collector);
                this.messagingFunction.sendMessages((Comparable) next.f0, next.f1);
            }
        }

        public void open(Configuration configuration) throws Exception {
            if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
                this.messagingFunction.init(getIterationRuntimeContext(), true);
            }
            this.messagingFunction.preSuperstep();
        }

        public void close() throws Exception {
            this.messagingFunction.postSuperstep();
        }

        public TypeInformation<Tuple2<VertexKey, Message>> getProducedType() {
            return this.resultType;
        }
    }

    /* loaded from: input_file:org/apache/flink/spargel/java/VertexCentricIteration$VertexUpdateUdf.class */
    private static final class VertexUpdateUdf<VertexKey extends Comparable<VertexKey>, VertexValue, Message> extends RichCoGroupFunction<Tuple2<VertexKey, Message>, Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, VertexValue>> implements ResultTypeQueryable<Tuple2<VertexKey, VertexValue>> {
        private static final long serialVersionUID = 1;
        private final VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction;
        private final MessageIterator<Message> messageIter;
        private transient TypeInformation<Tuple2<VertexKey, VertexValue>> resultType;

        private VertexUpdateUdf(VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction, TypeInformation<Tuple2<VertexKey, VertexValue>> typeInformation) {
            this.messageIter = new MessageIterator<>();
            this.vertexUpdateFunction = vertexUpdateFunction;
            this.resultType = typeInformation;
        }

        /* JADX WARN: Multi-variable type inference failed */
        public void coGroup(Iterable<Tuple2<VertexKey, Message>> iterable, Iterable<Tuple2<VertexKey, VertexValue>> iterable2, Collector<Tuple2<VertexKey, VertexValue>> collector) throws Exception {
            Iterator<Tuple2<VertexKey, VertexValue>> it = iterable2.iterator();
            if (it.hasNext()) {
                Tuple2<VertexKey, VertexValue> next = it.next();
                this.messageIter.setSource(iterable.iterator());
                this.vertexUpdateFunction.setOutput(next, collector);
                this.vertexUpdateFunction.updateVertex((Comparable) next.f0, next.f1, this.messageIter);
                return;
            }
            Iterator<Tuple2<VertexKey, Message>> it2 = iterable.iterator();
            if (!it2.hasNext()) {
                throw new Exception();
            }
            String str = "Target vertex does not exist!.";
            try {
                str = "Target vertex '" + it2.next().f0 + "' does not exist!.";
            } catch (Throwable th) {
            }
            throw new Exception(str);
        }

        public void open(Configuration configuration) throws Exception {
            if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
                this.vertexUpdateFunction.init(getIterationRuntimeContext());
            }
            this.vertexUpdateFunction.preSuperstep();
        }

        public void close() throws Exception {
            this.vertexUpdateFunction.postSuperstep();
        }

        public TypeInformation<Tuple2<VertexKey, VertexValue>> getProducedType() {
            return this.resultType;
        }
    }

    private VertexCentricIteration(VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction, MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction, DataSet<Tuple2<VertexKey, VertexKey>> dataSet, int i) {
        Validate.notNull(vertexUpdateFunction);
        Validate.notNull(messagingFunction);
        Validate.notNull(dataSet);
        Validate.isTrue(i > 0, "The maximum number of iterations must be at least one.", new Object[0]);
        TupleTypeInfo type = dataSet.getType();
        Validate.isTrue(type.isTupleType() && type.getArity() == 2, "The edges data set (for edges without edge values) must consist of 2-tuples.", new Object[0]);
        TupleTypeInfo tupleTypeInfo = type;
        Validate.isTrue(tupleTypeInfo.getTypeAt(0).equals(tupleTypeInfo.getTypeAt(1)) && Comparable.class.isAssignableFrom(tupleTypeInfo.getTypeAt(0).getTypeClass()), "Both tuple fields (source and target vertex id) must be of the data type that represents the vertex key and implement the java.lang.Comparable interface.", new Object[0]);
        this.updateFunction = vertexUpdateFunction;
        this.messagingFunction = messagingFunction;
        this.edgesWithoutValue = dataSet;
        this.edgesWithValue = null;
        this.maximumNumberOfIterations = i;
        this.aggregators = new HashMap();
        this.messageType = getMessageType(messagingFunction);
    }

    private VertexCentricIteration(VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction, MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction, DataSet<Tuple3<VertexKey, VertexKey, EdgeValue>> dataSet, int i, boolean z) {
        Validate.notNull(vertexUpdateFunction);
        Validate.notNull(messagingFunction);
        Validate.notNull(dataSet);
        Validate.isTrue(i > 0, "The maximum number of iterations must be at least one.", new Object[0]);
        TupleTypeInfo type = dataSet.getType();
        Validate.isTrue(type.isTupleType() && type.getArity() == 3, "The edges data set (for edges with edge values) must consist of 3-tuples.", new Object[0]);
        TupleTypeInfo tupleTypeInfo = type;
        Validate.isTrue(tupleTypeInfo.getTypeAt(0).equals(tupleTypeInfo.getTypeAt(1)) && Comparable.class.isAssignableFrom(tupleTypeInfo.getTypeAt(0).getTypeClass()), "The first two tuple fields (source and target vertex id) must be of the data type that represents the vertex key and implement the java.lang.Comparable interface.", new Object[0]);
        Validate.isTrue(i > 0, "The maximum number of iterations must be at least one.", new Object[0]);
        this.updateFunction = vertexUpdateFunction;
        this.messagingFunction = messagingFunction;
        this.edgesWithoutValue = null;
        this.edgesWithValue = dataSet;
        this.maximumNumberOfIterations = i;
        this.aggregators = new HashMap();
        this.messageType = getMessageType(messagingFunction);
    }

    private TypeInformation<Message> getMessageType(MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction) {
        return TypeExtractor.createTypeInfo(MessagingFunction.class, messagingFunction.getClass(), 2, (TypeInformation) null, (TypeInformation) null);
    }

    public void registerAggregator(String str, Aggregator<?> aggregator) {
        this.aggregators.put(str, aggregator);
    }

    public void addBroadcastSetForMessagingFunction(String str, DataSet<?> dataSet) {
        this.bcVarsMessaging.add(new Tuple2<>(str, dataSet));
    }

    public void addBroadcastSetForUpdateFunction(String str, DataSet<?> dataSet) {
        this.bcVarsUpdate.add(new Tuple2<>(str, dataSet));
    }

    public void setName(String str) {
        this.name = str;
    }

    public String getName() {
        return this.name;
    }

    public void setParallelism(int i) {
        Validate.isTrue(i > 0 || i == -1, "The degree of parallelism must be positive, or -1 (use default).", new Object[0]);
        this.parallelism = i;
    }

    public int getParallelism() {
        return this.parallelism;
    }

    public void setSolutionSetUnmanagedMemory(boolean z) {
        this.unmanagedSolutionSet = z;
    }

    public boolean isSolutionSetUnmanagedMemory() {
        return this.unmanagedSolutionSet;
    }

    public void setInput(DataSet<Tuple2<VertexKey, VertexValue>> dataSet) {
        TupleTypeInfo type = dataSet.getType();
        Validate.isTrue(type.isTupleType() && type.getArity() == 2, "The input data set (the initial vertices) must consist of 2-tuples.", new Object[0]);
        TypeInformation typeAt = type.getTypeAt(0);
        TypeInformation typeAt2 = ((TupleTypeInfo) (this.edgesWithoutValue != null ? this.edgesWithoutValue.getType() : this.edgesWithValue.getType())).getTypeAt(0);
        Validate.isTrue(typeAt.equals(typeAt2), "The first tuple field (the vertex id) of the input data set (the initial vertices) must be the same data type as the first fields of the edge data set (the source vertex id). Here, the key type for the vertex ids is '%s' and the key type  for the edges is '%s'.", new Object[]{typeAt, typeAt2});
        this.initialVertices = dataSet;
    }

    public DataSet<Tuple2<VertexKey, VertexValue>> createResult() {
        if (this.initialVertices == null) {
            throw new IllegalStateException("The input data set has not been set.");
        }
        TypeInformation type = this.initialVertices.getType();
        TupleTypeInfo tupleTypeInfo = new TupleTypeInfo(new TypeInformation[]{this.initialVertices.getType().getTypeAt(0), this.messageType});
        String str = this.name != null ? this.name : "Vertex-centric iteration (" + this.updateFunction + " | " + this.messagingFunction + ")";
        DeltaIteration iterateDelta = this.initialVertices.iterateDelta(this.initialVertices, this.maximumNumberOfIterations, new int[]{0});
        iterateDelta.name(str);
        iterateDelta.parallelism(this.parallelism);
        iterateDelta.setSolutionSetUnManaged(this.unmanagedSolutionSet);
        for (Map.Entry<String, Aggregator<?>> entry : this.aggregators.entrySet()) {
            iterateDelta.registerAggregator(entry.getKey(), entry.getValue());
        }
        CoGroupOperator name = (this.edgesWithoutValue != null ? this.edgesWithoutValue.coGroup(iterateDelta.getWorkset()).where(new int[]{0}).equalTo(new int[]{0}).with(new MessagingUdfNoEdgeValues(this.messagingFunction, tupleTypeInfo)) : this.edgesWithValue.coGroup(iterateDelta.getWorkset()).where(new int[]{0}).equalTo(new int[]{0}).with(new MessagingUdfWithEdgeValues(this.messagingFunction, tupleTypeInfo))).name("Messaging");
        for (Tuple2<String, DataSet<?>> tuple2 : this.bcVarsMessaging) {
            name = (CoGroupOperator) name.withBroadcastSet((DataSet) tuple2.f1, (String) tuple2.f0);
        }
        CoGroupOperator name2 = name.coGroup(iterateDelta.getSolutionSet()).where(new int[]{0}).equalTo(new int[]{0}).with(new VertexUpdateUdf(this.updateFunction, type)).name("Vertex State Updates");
        for (Tuple2<String, DataSet<?>> tuple22 : this.bcVarsUpdate) {
            name2 = (CoGroupOperator) name2.withBroadcastSet((DataSet) tuple22.f1, (String) tuple22.f0);
        }
        name2.withConstantSetFirst(new String[]{"0"}).withConstantSetSecond(new String[]{"0"});
        return iterateDelta.closeWith(name2, name2);
    }

    public static final <VertexKey extends Comparable<VertexKey>, VertexValue, Message> VertexCentricIteration<VertexKey, VertexValue, Message, ?> withPlainEdges(DataSet<Tuple2<VertexKey, VertexKey>> dataSet, VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction, MessagingFunction<VertexKey, VertexValue, Message, ?> messagingFunction, int i) {
        return new VertexCentricIteration<>(vertexUpdateFunction, messagingFunction, dataSet, i);
    }

    public static final <VertexKey extends Comparable<VertexKey>, VertexValue, Message, EdgeValue> VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue> withValuedEdges(DataSet<Tuple3<VertexKey, VertexKey, EdgeValue>> dataSet, VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction, MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction, int i) {
        return new VertexCentricIteration<>(vertexUpdateFunction, messagingFunction, dataSet, i, true);
    }
}
