package org.apache.flink.graph.library;

import java.util.Iterator;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.spargel.MessageIterator;
import org.apache.flink.graph.spargel.MessagingFunction;
import org.apache.flink.graph.spargel.VertexUpdateFunction;
import org.apache.flink.graph.utils.NullValueEdgeMapper;
import org.apache.flink.types.NullValue;

/* loaded from: input_file:org/apache/flink/graph/library/ConnectedComponents.class */
public class ConnectedComponents<K, EV> implements GraphAlgorithm<K, Long, EV, DataSet<Vertex<K, Long>>> {
    private Integer maxIterations;

    /* loaded from: input_file:org/apache/flink/graph/library/ConnectedComponents$CCMessenger.class */
    public static final class CCMessenger<K> extends MessagingFunction<K, Long, Long, NullValue> {
        @Override // org.apache.flink.graph.spargel.MessagingFunction
        public void sendMessages(Vertex<K, Long> vertex) throws Exception {
            sendMessageToAllNeighbors(vertex.getValue());
        }
    }

    /* loaded from: input_file:org/apache/flink/graph/library/ConnectedComponents$CCUpdater.class */
    public static final class CCUpdater<K> extends VertexUpdateFunction<K, Long, Long> {
        @Override // org.apache.flink.graph.spargel.VertexUpdateFunction
        public void updateVertex(Vertex<K, Long> vertex, MessageIterator<Long> messageIterator) throws Exception {
            long j = Long.MAX_VALUE;
            Iterator<Long> it = messageIterator.iterator();
            while (it.hasNext()) {
                j = Math.min(j, it.next().longValue());
            }
            if (j < vertex.getValue().longValue()) {
                setNewVertexValue(Long.valueOf(j));
            }
        }
    }

    public ConnectedComponents(Integer num) {
        this.maxIterations = num;
    }

    @Override // org.apache.flink.graph.GraphAlgorithm
    public DataSet<Vertex<K, Long>> run(Graph<K, Long, EV> graph) throws Exception {
        return graph.mapEdges(new NullValueEdgeMapper()).getUndirected().runVertexCentricIteration(new CCUpdater(), new CCMessenger(), this.maxIterations.intValue()).getVertices();
    }
}
