package org.apache.flink.spargel.java.record;

import java.io.IOException;
import java.util.Iterator;
import org.apache.flink.api.common.aggregators.AggregatorRegistry;
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.java.record.functions.CoGroupFunction;
import org.apache.flink.api.java.record.functions.FunctionAnnotation;
import org.apache.flink.api.java.record.operators.CoGroupOperator;
import org.apache.flink.api.java.record.operators.DeltaIteration;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.types.Key;
import org.apache.flink.types.Record;
import org.apache.flink.types.Value;
import org.apache.flink.util.Collector;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.ReflectionUtil;

/* loaded from: input_file:org/apache/flink/spargel/java/record/SpargelIteration.class */
public class SpargelIteration {
    private static final String DEFAULT_NAME = "<unnamed vertex-centric iteration>";
    private final DeltaIteration iteration;
    private final Class<? extends Key<?>> vertexKey;
    private final Class<? extends Value> vertexValue;
    private final Class<? extends Value> messageType;
    private final Class<? extends Value> edgeValue;
    private final CoGroupOperator vertexUpdater;
    private final CoGroupOperator messager;

    /* loaded from: input_file:org/apache/flink/spargel/java/record/SpargelIteration$MessagingDriver.class */
    public static final class MessagingDriver<K extends Key<K>, V extends Value, M extends Value, E extends Value> extends CoGroupFunction {
        private static final long serialVersionUID = 1;
        private static final String UDF_PARAM = "spargel.udf";
        private static final String KEY_PARAM = "spargel.key-type";
        private static final String VALUE_PARAM = "spargel.value-type";
        private static final String MESSAGE_PARAM = "spargel.message-type";
        private static final String EDGE_PARAM = "spargel.edge-value";
        private MessagingFunction<K, V, M, E> messagingFunction;
        private K vertexKey;
        private V vertexValue;

        public void coGroup(Iterator<Record> it, Iterator<Record> it2, Collector<Record> collector) throws Exception {
            if (it2.hasNext()) {
                Record next = it2.next();
                next.getFieldInto(0, this.vertexKey);
                next.getFieldInto(1, this.vertexValue);
                this.messagingFunction.set(it, collector);
                this.messagingFunction.sendMessages(this.vertexKey, this.vertexValue);
            }
        }

        public void open(Configuration configuration) throws Exception {
            if (this.messagingFunction == null) {
                ClassLoader userCodeClassLoader = getRuntimeContext().getUserCodeClassLoader();
                Class cls = configuration.getClass(KEY_PARAM, (Class) null, userCodeClassLoader);
                Class cls2 = configuration.getClass(VALUE_PARAM, (Class) null, userCodeClassLoader);
                Class cls3 = configuration.getClass(EDGE_PARAM, (Class) null, userCodeClassLoader);
                this.vertexKey = (K) InstantiationUtil.instantiate(cls, Key.class);
                this.vertexValue = (V) InstantiationUtil.instantiate(cls2, Value.class);
                Key key = (Key) InstantiationUtil.instantiate(cls, Key.class);
                Value value = (Value) InstantiationUtil.instantiate(cls3, Value.class);
                try {
                    this.messagingFunction = (MessagingFunction) InstantiationUtil.readObjectFromConfig(configuration, UDF_PARAM, getRuntimeContext().getUserCodeClassLoader());
                    this.messagingFunction.init(getIterationRuntimeContext(), key, value);
                    this.messagingFunction.setup(configuration);
                } catch (Exception e) {
                    throw new Exception("Could not instantiate MessagingFunction" + (e.getMessage() == null ? "." : ": " + e.getMessage()), e);
                }
            }
            this.messagingFunction.preSuperstep();
        }

        public void close() throws Exception {
            this.messagingFunction.postSuperstep();
        }
    }

    @FunctionAnnotation.ConstantFieldsFirst({0})
    /* loaded from: input_file:org/apache/flink/spargel/java/record/SpargelIteration$VertexUpdateDriver.class */
    public static final class VertexUpdateDriver<K extends Key<K>, V extends Value, M extends Value> extends CoGroupFunction {
        private static final long serialVersionUID = 1;
        private static final String UDF_PARAM = "spargel.udf";
        private static final String KEY_PARAM = "spargel.key-type";
        private static final String VALUE_PARAM = "spargel.value-type";
        private static final String MESSAGE_PARAM = "spargel.message-type";
        private VertexUpdateFunction<K, V, M> vertexUpdateFunction;
        private K vertexKey;
        private V vertexValue;
        private MessageIterator<M> messageIter;

        public void coGroup(Iterator<Record> it, Iterator<Record> it2, Collector<Record> collector) throws Exception {
            if (!it2.hasNext()) {
                if (!it.hasNext()) {
                    throw new Exception();
                }
                String str = "Target vertex does not exist!.";
                try {
                    it.next().getFieldInto(0, this.vertexKey);
                    str = "Target vertex '" + this.vertexKey + "' does not exist!.";
                } catch (Throwable th) {
                }
                throw new Exception(str);
            }
            Record next = it2.next();
            next.getFieldInto(0, this.vertexKey);
            next.getFieldInto(1, this.vertexValue);
            this.messageIter.setSource(it);
            this.vertexUpdateFunction.setOutput(next, collector);
            this.vertexUpdateFunction.updateVertex(this.vertexKey, this.vertexValue, this.messageIter);
        }

        public void open(Configuration configuration) throws Exception {
            if (this.vertexUpdateFunction == null) {
                ClassLoader userCodeClassLoader = getRuntimeContext().getUserCodeClassLoader();
                Class cls = configuration.getClass(KEY_PARAM, (Class) null, userCodeClassLoader);
                Class cls2 = configuration.getClass(VALUE_PARAM, (Class) null, userCodeClassLoader);
                Class cls3 = configuration.getClass(MESSAGE_PARAM, (Class) null, userCodeClassLoader);
                this.vertexKey = (K) InstantiationUtil.instantiate(cls, Key.class);
                this.vertexValue = (V) InstantiationUtil.instantiate(cls2, Value.class);
                this.messageIter = new MessageIterator<>((Value) InstantiationUtil.instantiate(cls3, Value.class));
                try {
                    this.vertexUpdateFunction = (VertexUpdateFunction) InstantiationUtil.readObjectFromConfig(configuration, UDF_PARAM, getRuntimeContext().getUserCodeClassLoader());
                    this.vertexUpdateFunction.init(getIterationRuntimeContext());
                    this.vertexUpdateFunction.setup(configuration);
                } catch (Exception e) {
                    throw new Exception("Could not instantiate VertexUpdateFunction" + (e.getMessage() == null ? "." : ": " + e.getMessage()), e);
                }
            }
            this.vertexUpdateFunction.preSuperstep();
        }

        public void close() throws Exception {
            this.vertexUpdateFunction.postSuperstep();
        }
    }

    public <VertexKey extends Key<VertexKey>, VertexValue extends Value, Message extends Value, EdgeValue extends Value> SpargelIteration(MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction, VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction) {
        this(messagingFunction, vertexUpdateFunction, DEFAULT_NAME);
    }

    public <VertexKey extends Key<VertexKey>, VertexValue extends Value, Message extends Value, EdgeValue extends Value> SpargelIteration(MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction, VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction, String str) {
        this.vertexKey = ReflectionUtil.getTemplateType1(messagingFunction.getClass());
        this.vertexValue = ReflectionUtil.getTemplateType2(messagingFunction.getClass());
        this.messageType = ReflectionUtil.getTemplateType3(messagingFunction.getClass());
        this.edgeValue = ReflectionUtil.getTemplateType4(messagingFunction.getClass());
        if (this.vertexKey == null || this.vertexValue == null || this.messageType == null || this.edgeValue == null) {
            throw new RuntimeException();
        }
        this.iteration = new DeltaIteration(0, str);
        this.messager = CoGroupOperator.builder(MessagingDriver.class, this.vertexKey, 0, 0).input2(this.iteration.getWorkset()).name("Message Sender").build();
        this.vertexUpdater = CoGroupOperator.builder(VertexUpdateDriver.class, this.vertexKey, 0, 0).input1(this.messager).input2(this.iteration.getSolutionSet()).name("Vertex Updater").build();
        this.iteration.setNextWorkset(this.vertexUpdater);
        this.iteration.setSolutionSetDelta(this.vertexUpdater);
        try {
            Configuration parameters = this.vertexUpdater.getParameters();
            InstantiationUtil.writeObjectToConfig(vertexUpdateFunction, parameters, "spargel.udf");
            parameters.setClass("spargel.key-type", this.vertexKey);
            parameters.setClass("spargel.value-type", this.vertexValue);
            parameters.setClass("spargel.message-type", this.messageType);
            Configuration parameters2 = this.messager.getParameters();
            InstantiationUtil.writeObjectToConfig(messagingFunction, parameters2, "spargel.udf");
            parameters2.setClass("spargel.key-type", this.vertexKey);
            parameters2.setClass("spargel.value-type", this.vertexValue);
            parameters2.setClass("spargel.message-type", this.messageType);
            parameters2.setClass("spargel.edge-value", this.edgeValue);
        } catch (IOException e) {
            throw new RuntimeException("Could not serialize the UDFs for distribution" + (e.getMessage() == null ? '.' : ": " + e.getMessage()), e);
        }
    }

    public void setVertexInput(Operator<Record> operator) {
        this.iteration.setInitialSolutionSet(operator);
        this.iteration.setInitialWorkset(operator);
    }

    public void setEdgesInput(Operator<Record> operator) {
        this.messager.setFirstInput(operator);
    }

    public Operator<?> getOutput() {
        return this.iteration;
    }

    public void setDegreeOfParallelism(int i) {
        this.iteration.setDegreeOfParallelism(i);
    }

    public void setNumberOfIterations(int i) {
        this.iteration.setMaximumNumberOfIterations(i);
    }

    public AggregatorRegistry getAggregators() {
        return this.iteration.getAggregators();
    }
}
