package org.apache.flink.graph.examples;

import java.util.Iterator;
import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.EdgeDirection;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.examples.data.IncrementalSSSPData;
import org.apache.flink.graph.spargel.GatherFunction;
import org.apache.flink.graph.spargel.MessageIterator;
import org.apache.flink.graph.spargel.ScatterFunction;
import org.apache.flink.graph.spargel.ScatterGatherConfiguration;

/* loaded from: input_file:org/apache/flink/graph/examples/IncrementalSSSP.class */
public class IncrementalSSSP implements ProgramDescription {
    private static boolean fileOutput = false;
    private static String verticesInputPath = null;
    private static String edgesInputPath = null;
    private static String edgesInSSSPInputPath = null;
    private static Long srcEdgeToBeRemoved = null;
    private static Long trgEdgeToBeRemoved = null;
    private static Double valEdgeToBeRemoved = null;
    private static String outputPath = null;
    private static int maxIterations = 5;

    /* loaded from: input_file:org/apache/flink/graph/examples/IncrementalSSSP$InvalidateMessenger.class */
    public static final class InvalidateMessenger extends ScatterFunction<Long, Double, Double, Double> {
        private Edge<Long, Double> edgeToBeRemoved;

        public InvalidateMessenger(Edge<Long, Double> edge) {
            this.edgeToBeRemoved = edge;
        }

        public void sendMessages(Vertex<Long, Double> vertex) throws Exception {
            if (getSuperstepNumber() == 1 && ((Long) vertex.getId()).equals(this.edgeToBeRemoved.getSource())) {
                sendMessageTo(this.edgeToBeRemoved.getSource(), Double.valueOf(Double.MAX_VALUE));
            }
            if (getSuperstepNumber() > 1) {
                Iterator it = getEdges().iterator();
                while (it.hasNext()) {
                    sendMessageTo(((Edge) it.next()).getSource(), Double.valueOf(Double.MAX_VALUE));
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/graph/examples/IncrementalSSSP$VertexDistanceUpdater.class */
    public static final class VertexDistanceUpdater extends GatherFunction<Long, Double, Double> {
        public void updateVertex(Vertex<Long, Double> vertex, MessageIterator<Double> messageIterator) throws Exception {
            if (!messageIterator.hasNext() || Long.valueOf(getOutDegree() - 1).longValue() > 0) {
                return;
            }
            setNewVertexValue(Double.valueOf(Double.MAX_VALUE));
        }
    }

    public static void main(String[] strArr) throws Exception {
        if (parseParameters(strArr)) {
            ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
            Edge<Long, Double> edgeToBeRemoved = getEdgeToBeRemoved();
            Graph<Long, Double, Double> graph = getGraph(executionEnvironment);
            Graph<Long, Double, Double> sSSPGraph = getSSSPGraph(executionEnvironment);
            graph.removeEdge(edgeToBeRemoved);
            ScatterGatherConfiguration scatterGatherConfiguration = new ScatterGatherConfiguration();
            if (!isInSSSP(edgeToBeRemoved, sSSPGraph.getEdges())) {
                if (!fileOutput) {
                    graph.getVertices().print();
                    return;
                } else {
                    graph.getVertices().writeAsCsv(outputPath, "\n", ",");
                    executionEnvironment.execute("Incremental SSSP Example");
                    return;
                }
            }
            scatterGatherConfiguration.setDirection(EdgeDirection.IN);
            scatterGatherConfiguration.setOptDegrees(true);
            DataSet vertices = sSSPGraph.runScatterGatherIteration(new InvalidateMessenger(edgeToBeRemoved), new VertexDistanceUpdater(), maxIterations, scatterGatherConfiguration).getVertices();
            if (!fileOutput) {
                vertices.print();
            } else {
                vertices.writeAsCsv(outputPath, "\n", ",");
                executionEnvironment.execute("Incremental SSSP Example");
            }
        }
    }

    public String getDescription() {
        return "Incremental Single Sink Shortest Paths Example";
    }

    public static boolean isInSSSP(final Edge<Long, Double> edge, DataSet<Edge<Long, Double>> dataSet) throws Exception {
        return dataSet.filter(new FilterFunction<Edge<Long, Double>>() { // from class: org.apache.flink.graph.examples.IncrementalSSSP.1
            public boolean filter(Edge<Long, Double> edge2) throws Exception {
                return edge2.equals(edge);
            }
        }).count() > 0;
    }

    private static boolean parseParameters(String[] strArr) {
        if (strArr.length <= 0) {
            return true;
        }
        if (strArr.length != 8) {
            System.out.println("Executing IncrementalSSSP example with default parameters and built-in default data.");
            System.out.println("Provide parameters to read input data from files.");
            System.out.println("See the documentation for the correct format of input files.");
            System.out.println("Usage: IncrementalSSSP <vertex path> <edge path> <edges in SSSP> <src id edge to be removed> <trg id edge to be removed> <val edge to be removed> <output path> <max iterations>");
            return false;
        }
        fileOutput = true;
        verticesInputPath = strArr[0];
        edgesInputPath = strArr[1];
        edgesInSSSPInputPath = strArr[2];
        srcEdgeToBeRemoved = Long.valueOf(Long.parseLong(strArr[3]));
        trgEdgeToBeRemoved = Long.valueOf(Long.parseLong(strArr[4]));
        valEdgeToBeRemoved = Double.valueOf(Double.parseDouble(strArr[5]));
        outputPath = strArr[6];
        maxIterations = Integer.parseInt(strArr[7]);
        return true;
    }

    private static Graph<Long, Double, Double> getGraph(ExecutionEnvironment executionEnvironment) {
        return fileOutput ? Graph.fromCsvReader(verticesInputPath, edgesInputPath, executionEnvironment).lineDelimiterEdges("\n").types(Long.class, Double.class, Double.class) : Graph.fromDataSet(IncrementalSSSPData.getDefaultVertexDataSet(executionEnvironment), IncrementalSSSPData.getDefaultEdgeDataSet(executionEnvironment), executionEnvironment);
    }

    private static Graph<Long, Double, Double> getSSSPGraph(ExecutionEnvironment executionEnvironment) {
        return fileOutput ? Graph.fromCsvReader(verticesInputPath, edgesInSSSPInputPath, executionEnvironment).lineDelimiterEdges("\n").types(Long.class, Double.class, Double.class) : Graph.fromDataSet(IncrementalSSSPData.getDefaultVertexDataSet(executionEnvironment), IncrementalSSSPData.getDefaultEdgesInSSSP(executionEnvironment), executionEnvironment);
    }

    private static Edge<Long, Double> getEdgeToBeRemoved() {
        return fileOutput ? new Edge<>(srcEdgeToBeRemoved, trgEdgeToBeRemoved, valEdgeToBeRemoved) : IncrementalSSSPData.getDefaultEdgeToBeRemoved();
    }
}
