package org.apache.hama.examples.graph;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.BSPJob;
import org.apache.hama.bsp.BSPJobClient;
import org.apache.hama.bsp.BSPPeerProtocol;
import org.apache.hama.bsp.BooleanMessage;
import org.apache.hama.bsp.ClusterStatus;
import org.apache.hama.bsp.IntegerMessage;
import org.apache.hama.examples.RandBench;
import org.apache.zookeeper.KeeperException;

/* loaded from: input_file:org/apache/hama/examples/graph/ShortestPaths.class */
public class ShortestPaths extends ShortestPathsBase {
    public static final Log LOG = LogFactory.getLog(ShortestPaths.class);
    private Configuration conf;
    private Map<ShortestPathVertex, List<ShortestPathVertex>> adjacencyList = new HashMap();
    private Map<String, ShortestPathVertex> vertexLookupMap = new HashMap();
    private String[] peerNames;

    @Override // org.apache.hama.bsp.BSPInterface
    public void bsp(BSPPeerProtocol bSPPeerProtocol) throws IOException, KeeperException, InterruptedException {
        LOG.info("Mapping graph into ram...");
        mapAdjacencyList(this.conf, bSPPeerProtocol, this.adjacencyList, this.vertexLookupMap);
        LOG.info("Finished! Starting graph initialization...");
        parsePeerNames(this.conf);
        String str = this.conf.get(ShortestPathsBase.MASTER_TASK);
        ShortestPathVertex shortestPathVertex = this.vertexLookupMap.get(this.conf.get(ShortestPathsBase.SHORTEST_PATHS_START_VERTEX_ID));
        if (shortestPathVertex != null) {
            shortestPathVertex.setCost(0);
            sendMessageToNeighbors(bSPPeerProtocol, shortestPathVertex);
        }
        LOG.info("Finished! Starting main loop...");
        boolean z = true;
        while (z) {
            int i = 0;
            bSPPeerProtocol.sync();
            LinkedList linkedList = new LinkedList();
            while (true) {
                IntegerMessage integerMessage = (IntegerMessage) bSPPeerProtocol.getCurrentMessage();
                if (integerMessage == null) {
                    break;
                }
                ShortestPathVertex shortestPathVertex2 = this.vertexLookupMap.get(integerMessage.getTag());
                if (shortestPathVertex2.getCost().intValue() > integerMessage.getData().intValue()) {
                    i++;
                    linkedList.add(shortestPathVertex2);
                    shortestPathVertex2.setCost(integerMessage.getData());
                }
            }
            z = broadcastUpdatesMade(bSPPeerProtocol, str, i);
            Iterator it = linkedList.iterator();
            while (it.hasNext()) {
                sendMessageToNeighbors(bSPPeerProtocol, (ShortestPathVertex) it.next());
            }
        }
        LOG.info("Finished!");
        saveVertexMap(this.conf, bSPPeerProtocol, this.adjacencyList);
    }

    private void parsePeerNames(Configuration configuration) {
        this.peerNames = configuration.get(ShortestPathsBase.BSP_PEERS).split(";");
    }

    private boolean broadcastUpdatesMade(BSPPeerProtocol bSPPeerProtocol, String str, int i) throws IOException, KeeperException, InterruptedException {
        bSPPeerProtocol.send(str, new IntegerMessage(bSPPeerProtocol.getPeerName(), i));
        bSPPeerProtocol.sync();
        if (bSPPeerProtocol.getPeerName().equals(str)) {
            int i2 = 0;
            while (true) {
                IntegerMessage integerMessage = (IntegerMessage) bSPPeerProtocol.getCurrentMessage();
                if (integerMessage == null) {
                    break;
                }
                i2 += integerMessage.getData().intValue();
                LOG.info("Received " + integerMessage.getData() + " updates from " + integerMessage.getTag() + " in SuperStep " + bSPPeerProtocol.getSuperstepCount());
            }
            for (String str2 : bSPPeerProtocol.getAllPeerNames()) {
                bSPPeerProtocol.send(str2, new BooleanMessage("", i2 > 0));
            }
        }
        bSPPeerProtocol.sync();
        return ((BooleanMessage) bSPPeerProtocol.getCurrentMessage()).getData().booleanValue();
    }

    private void sendMessageToNeighbors(BSPPeerProtocol bSPPeerProtocol, ShortestPathVertex shortestPathVertex) throws IOException {
        for (ShortestPathVertex shortestPathVertex2 : this.adjacencyList.get(shortestPathVertex)) {
            bSPPeerProtocol.send(this.peerNames[Math.abs(shortestPathVertex2.getId() % bSPPeerProtocol.getAllPeerNames().length)], new IntegerMessage(shortestPathVertex2.getName(), shortestPathVertex.getCost().intValue() == Integer.MAX_VALUE ? shortestPathVertex.getCost().intValue() : shortestPathVertex.getCost().intValue() + shortestPathVertex2.getWeight()));
        }
    }

    @Override // org.apache.hadoop.conf.Configurable
    public void setConf(Configuration configuration) {
        this.conf = configuration;
    }

    @Override // org.apache.hadoop.conf.Configurable
    public Configuration getConf() {
        return this.conf;
    }

    public static void printUsage() {
        System.out.println("Single Source Shortest Path Example:");
        System.out.println("<Startvertex name> <optional: output path> <optional: path to own adjacency list sequencefile>");
    }

    public static void main(String[] strArr) throws IOException, InterruptedException, ClassNotFoundException {
        printUsage();
        HamaConfiguration hamaConfiguration = new HamaConfiguration();
        hamaConfiguration.set(ShortestPathsBase.SHORTEST_PATHS_START_VERTEX_ID, "Frankfurt");
        System.out.println("Setting default start vertex to \"Frankfurt\"!");
        hamaConfiguration.set(ShortestPathsBase.OUT_PATH, "sssp/output");
        boolean z = false;
        if (strArr.length > 0) {
            hamaConfiguration.set(ShortestPathsBase.SHORTEST_PATHS_START_VERTEX_ID, strArr[0]);
            System.out.println("Setting start vertex to " + strArr[0] + "!");
            if (strArr.length > 1) {
                hamaConfiguration.set(ShortestPathsBase.OUT_PATH, strArr[1]);
                System.out.println("Using new output folder: " + strArr[1]);
            }
            r10 = strArr.length > 2 ? new Path(strArr[2]) : null;
            if (strArr.length > 3) {
                z = Boolean.valueOf(strArr[3]).booleanValue();
            }
        }
        Map<ShortestPathVertex, List<ShortestPathVertex>> loadGraph = r10 == null ? ShortestPathsGraphLoader.loadGraph() : null;
        BSPJob bSPJob = new BSPJob(hamaConfiguration, (Class<?>) RandBench.class);
        bSPJob.setJobName("Single Source Shortest Path");
        bSPJob.setBspClass(ShortestPaths.class);
        ClusterStatus clusterStatus = new BSPJobClient(hamaConfiguration).getClusterStatus(true);
        StringBuilder sb = new StringBuilder();
        for (String str : clusterStatus.getActiveGroomNames().values()) {
            hamaConfiguration.set(ShortestPathsBase.MASTER_TASK, str);
            sb.append(str);
            sb.append(";");
        }
        LOG.info("Master is: " + hamaConfiguration.get(ShortestPathsBase.MASTER_TASK));
        hamaConfiguration.set(ShortestPathsBase.BSP_PEERS, sb.toString());
        LOG.info("Starting data partitioning...");
        HamaConfiguration partition = loadGraph == null ? partition(r10, hamaConfiguration, z) : partition(loadGraph, hamaConfiguration);
        LOG.info("Finished!");
        bSPJob.setNumBspTask(clusterStatus.getGroomServers());
        long currentTimeMillis = System.currentTimeMillis();
        if (bSPJob.waitForCompletion(true)) {
            System.out.println("Job Finished in " + ((System.currentTimeMillis() - currentTimeMillis) / 1000.0d) + " seconds");
            printOutput(FileSystem.get(partition), partition);
        }
    }
}
