package org.apache.hama.graph;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hama.bsp.BSP;
import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.Combiner;
import org.apache.hama.bsp.HashPartitioner;
import org.apache.hama.bsp.Partitioner;
import org.apache.hama.bsp.sync.SyncException;
import org.apache.hama.util.KeyValuePair;

/* loaded from: input_file:org/apache/hama/graph/GraphJobRunner.class */
public final class GraphJobRunner<V extends Writable, E extends Writable, M extends Writable> extends BSP<Writable, Writable, Writable, Writable, GraphJobMessage> {
    public static final String S_FLAG_AGGREGATOR_VALUE = "hama.1";
    public static final String S_FLAG_AGGREGATOR_INCREMENT = "hama.2";
    public static final String MESSAGE_COMBINER_CLASS = "hama.vertex.message.combiner.class";
    public static final String GRAPH_REPAIR = "hama.graph.repair";
    private Configuration conf;
    private Combiner<M> combiner;
    private Partitioner<V, M> partitioner;
    private List<Vertex<V, E, M>> vertices = new ArrayList();
    private boolean updated = true;
    private int globalUpdateCounts = 0;
    private long numberVertices = 0;
    private int maxIteration = -1;
    private long iteration;
    private Class<V> vertexIdClass;
    private Class<M> vertexValueClass;
    private Class<E> edgeValueClass;
    private Class<Vertex<V, E, M>> vertexClass;
    private AggregationRunner<V, E, M> aggregationRunner;
    private BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer;
    private static final Log LOG = LogFactory.getLog(GraphJobRunner.class);
    public static final String S_FLAG_MESSAGE_COUNTS = "hama.0";
    public static final Text FLAG_MESSAGE_COUNTS = new Text(S_FLAG_MESSAGE_COUNTS);

    /* loaded from: input_file:org/apache/hama/graph/GraphJobRunner$GraphJobCounter.class */
    public enum GraphJobCounter {
        MULTISTEP_PARTITIONING,
        ITERATIONS,
        INPUT_VERTICES,
        AGGREGATE_VERTICES
    }

    public final void setup(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> bSPPeer) throws IOException, SyncException, InterruptedException {
        setupFields(bSPPeer);
        loadVertices(bSPPeer);
        countGlobalVertexCount(bSPPeer);
        doInitialSuperstep(bSPPeer);
    }

    public final void bsp(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> bSPPeer) throws IOException, SyncException, InterruptedException {
        while (this.updated) {
            if (this.maxIteration > 0 && this.iteration > this.maxIteration) {
                return;
            }
            this.globalUpdateCounts = 0;
            bSPPeer.sync();
            Map<V, List<M>> parseMessages = parseMessages(bSPPeer);
            doMasterUpdates(bSPPeer);
            if (!this.aggregationRunner.receiveAggregatedValues(bSPPeer, this.iteration)) {
                return;
            }
            doSuperstep(parseMessages, bSPPeer);
            if (isMasterTask(bSPPeer)) {
                bSPPeer.getCounter(GraphJobCounter.ITERATIONS).increment(1L);
            }
        }
    }

    public final void cleanup(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> bSPPeer) throws IOException {
        for (Vertex<V, E, M> vertex : this.vertices) {
            bSPPeer.write(vertex.getVertexID(), vertex.getValue());
        }
    }

    private void doMasterUpdates(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> bSPPeer) throws IOException {
        if (!isMasterTask(bSPPeer) || this.iteration <= 1) {
            return;
        }
        MapWritable mapWritable = new MapWritable();
        if (this.globalUpdateCounts == 0) {
            mapWritable.put(FLAG_MESSAGE_COUNTS, new IntWritable(Integer.MIN_VALUE));
        } else {
            this.aggregationRunner.doMasterAggregation(mapWritable);
        }
        for (String str : bSPPeer.getAllPeerNames()) {
            bSPPeer.send(str, new GraphJobMessage(mapWritable));
        }
    }

    private void doSuperstep(Map<V, List<M>> map, BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> bSPPeer) throws IOException {
        int i = 0;
        for (Vertex<V, E, M> vertex : this.vertices) {
            List list = map.get(vertex.getVertexID());
            if (vertex.isHalted() && list != null) {
                vertex.setActive();
            }
            if (list == null) {
                list = Collections.emptyList();
            }
            if (!vertex.isHalted()) {
                if (this.combiner != null) {
                    Writable combine = this.combiner.combine(list);
                    list = new ArrayList();
                    list.add(combine);
                }
                M value = vertex.getValue();
                vertex.compute(list.iterator());
                this.aggregationRunner.aggregateVertex(value, vertex);
                if (!vertex.isHalted()) {
                    i++;
                }
            }
        }
        this.aggregationRunner.sendAggregatorValues(bSPPeer, i);
        this.iteration++;
    }

    private void doInitialSuperstep(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> bSPPeer) throws IOException {
        for (Vertex<V, E, M> vertex : this.vertices) {
            List singletonList = Collections.singletonList(vertex.getValue());
            M value = vertex.getValue();
            vertex.compute(singletonList.iterator());
            this.aggregationRunner.aggregateVertex(value, vertex);
        }
        this.aggregationRunner.sendAggregatorValues(bSPPeer, 1);
        this.iteration++;
    }

    private void setupFields(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> bSPPeer) {
        this.peer = bSPPeer;
        this.conf = bSPPeer.getConfiguration();
        this.maxIteration = bSPPeer.getConfiguration().getInt("hama.graph.max.iteration", -1);
        this.vertexIdClass = this.conf.getClass(GraphJob.VERTEX_ID_CLASS_ATTR, Text.class, Writable.class);
        this.vertexValueClass = this.conf.getClass(GraphJob.VERTEX_VALUE_CLASS_ATTR, IntWritable.class, Writable.class);
        this.edgeValueClass = this.conf.getClass(GraphJob.VERTEX_EDGE_VALUE_CLASS_ATTR, IntWritable.class, Writable.class);
        this.vertexClass = this.conf.getClass(GraphJob.VERTEX_CLASS_ATTR, Vertex.class);
        GraphJobMessage.VERTEX_ID_CLASS = this.vertexIdClass;
        GraphJobMessage.VERTEX_VALUE_CLASS = this.vertexValueClass;
        GraphJobMessage.VERTEX_CLASS = this.vertexClass;
        GraphJobMessage.EDGE_VALUE_CLASS = this.edgeValueClass;
        this.partitioner = (Partitioner) ReflectionUtils.newInstance(this.conf.getClass("bsp.input.partitioner.class", HashPartitioner.class), this.conf);
        if (!this.conf.getClass("hama.vertex.message.combiner.class", Combiner.class).equals(Combiner.class)) {
            LOG.debug("Combiner class: " + this.conf.get("hama.vertex.message.combiner.class"));
            this.combiner = (Combiner) ReflectionUtils.newInstance(this.conf.getClass("hama.vertex.message.combiner.class", Combiner.class), this.conf);
        }
        this.aggregationRunner = new AggregationRunner<>();
        this.aggregationRunner.setupAggregators(bSPPeer);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void loadVertices(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> bSPPeer) throws IOException, SyncException, InterruptedException {
        VertexInputReader vertexInputReader = (VertexInputReader) ReflectionUtils.newInstance(this.conf.getClass(GraphJob.VERTEX_GRAPH_INPUT_READER, VertexInputReader.class), this.conf);
        boolean z = this.conf.getBoolean(GRAPH_REPAIR, false);
        boolean z2 = this.conf.getBoolean(GraphJob.VERTEX_GRAPH_RUNTIME_PARTIONING, true);
        long splitSize = bSPPeer.getSplitSize();
        int partitionMultiSteps = partitionMultiSteps(bSPPeer, splitSize);
        long j = splitSize / partitionMultiSteps;
        boolean z3 = this.conf.getBoolean("hama.graph.self.ref", false);
        LOG.debug("vertex class: " + this.vertexClass);
        Vertex<V, E, M> newVertexInstance = newVertexInstance(this.vertexClass, this.conf);
        newVertexInstance.runner = this;
        long pos = bSPPeer.getPos();
        if (pos == 0) {
            pos = 1;
        }
        int i = 1;
        while (true) {
            KeyValuePair readNext = bSPPeer.readNext();
            if (readNext == null) {
                break;
            }
            try {
                if (vertexInputReader.parseVertex((Writable) readNext.getKey(), (Writable) readNext.getValue(), newVertexInstance)) {
                    if (newVertexInstance.getEdges() == null) {
                        if (z3) {
                            newVertexInstance.setEdges(Collections.singletonList(new Edge(newVertexInstance.getVertexID(), null)));
                        } else {
                            newVertexInstance.setEdges(Collections.EMPTY_LIST);
                        }
                    }
                    if (z3) {
                        newVertexInstance.addEdge(new Edge<>(newVertexInstance.getVertexID(), null));
                    }
                    if (z2) {
                        bSPPeer.send(bSPPeer.getPeerName(this.partitioner.getPartition(newVertexInstance.getVertexID(), newVertexInstance.getValue(), bSPPeer.getNumPeers())), new GraphJobMessage((Vertex<?, ?, ?>) newVertexInstance));
                    } else {
                        newVertexInstance.setup(this.conf);
                        this.vertices.add(newVertexInstance);
                    }
                    newVertexInstance = newVertexInstance(this.vertexClass, this.conf);
                    newVertexInstance.runner = this;
                    if (z2 && i < partitionMultiSteps && bSPPeer.getPos() - pos >= j) {
                        bSPPeer.sync();
                        i++;
                        while (true) {
                            GraphJobMessage graphJobMessage = (GraphJobMessage) bSPPeer.getCurrentMessage();
                            if (graphJobMessage == null) {
                                break;
                            }
                            Vertex<?, ?, ?> vertex = graphJobMessage.getVertex();
                            vertex.runner = this;
                            vertex.setup(this.conf);
                            this.vertices.add(vertex);
                        }
                        pos = bSPPeer.getPos();
                    }
                }
            } catch (Exception e) {
                throw new IOException("exception occured during parsing vertex!" + e.toString());
            }
        }
        if (z2) {
            bSPPeer.sync();
            while (true) {
                GraphJobMessage graphJobMessage2 = (GraphJobMessage) bSPPeer.getCurrentMessage();
                if (graphJobMessage2 == null) {
                    break;
                }
                Vertex<?, ?, ?> vertex2 = graphJobMessage2.getVertex();
                vertex2.runner = this;
                vertex2.setup(this.conf);
                this.vertices.add(vertex2);
            }
        }
        LOG.debug("Loading finished at " + bSPPeer.getSuperstepCount() + " steps.");
        if (z) {
            LOG.debug("Starting repair of this graph!");
            repair(bSPPeer, partitionMultiSteps, z3);
        }
        LOG.debug("Starting Vertex processing!");
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void repair(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> bSPPeer, int i, boolean z) throws IOException, SyncException, InterruptedException {
        int i2 = 0;
        MapWritable mapWritable = new MapWritable();
        mapWritable.put(new IntWritable(bSPPeer.getPeerIndex()), new IntWritable(this.vertices.size()));
        bSPPeer.send(getMasterTask(bSPPeer), new GraphJobMessage(mapWritable));
        bSPPeer.sync();
        if (isMasterTask(bSPPeer)) {
            int i3 = Integer.MAX_VALUE;
            while (true) {
                GraphJobMessage graphJobMessage = (GraphJobMessage) bSPPeer.getCurrentMessage();
                if (graphJobMessage == null) {
                    break;
                }
                Iterator it = graphJobMessage.getMap().entrySet().iterator();
                while (it.hasNext()) {
                    int i4 = ((IntWritable) ((Map.Entry) it.next()).getValue()).get();
                    if (i3 > i4) {
                        i3 = i4;
                    }
                }
            }
            i2 = i3 < i * 2 ? i3 : i * 2;
            for (String str : bSPPeer.getAllPeerNames()) {
                MapWritable mapWritable2 = new MapWritable();
                mapWritable2.put(new Text("steps"), new IntWritable(i2));
                bSPPeer.send(str, new GraphJobMessage(mapWritable2));
            }
        }
        bSPPeer.sync();
        Iterator it2 = ((GraphJobMessage) bSPPeer.getCurrentMessage()).getMap().entrySet().iterator();
        while (it2.hasNext()) {
            i2 = ((IntWritable) ((Map.Entry) it2.next()).getValue()).get();
        }
        HashMap hashMap = new HashMap();
        int i5 = 0;
        int i6 = 0;
        for (Vertex<V, E, M> vertex : this.vertices) {
            for (Edge<V, E> edge : vertex.getEdges()) {
                bSPPeer.send(vertex.getDestinationPeerName(edge), new GraphJobMessage(edge.getDestinationVertexID()));
            }
            if (i6 < i2 && i5 % (this.vertices.size() / i2) == 0) {
                bSPPeer.sync();
                i6++;
                while (true) {
                    GraphJobMessage graphJobMessage2 = (GraphJobMessage) bSPPeer.getCurrentMessage();
                    if (graphJobMessage2 != null) {
                        Writable vertexId = graphJobMessage2.getVertexId();
                        Vertex newVertexInstance = newVertexInstance(this.vertexClass, this.conf);
                        newVertexInstance.setVertexID(vertexId);
                        newVertexInstance.runner = this;
                        if (z) {
                            newVertexInstance.setEdges(Collections.singletonList(new Edge(newVertexInstance.getVertexID(), null)));
                        } else {
                            newVertexInstance.setEdges(new ArrayList(0));
                        }
                        newVertexInstance.setup(this.conf);
                        hashMap.put(vertexId, newVertexInstance);
                    }
                }
            }
            i5++;
        }
        bSPPeer.sync();
        while (true) {
            GraphJobMessage graphJobMessage3 = (GraphJobMessage) bSPPeer.getCurrentMessage();
            if (graphJobMessage3 == null) {
                break;
            }
            Writable vertexId2 = graphJobMessage3.getVertexId();
            Vertex newVertexInstance2 = newVertexInstance(this.vertexClass, this.conf);
            newVertexInstance2.setVertexID(vertexId2);
            newVertexInstance2.runner = this;
            if (z) {
                newVertexInstance2.setEdges(Collections.singletonList(new Edge(newVertexInstance2.getVertexID(), null)));
            } else {
                newVertexInstance2.setEdges(new ArrayList(0));
            }
            newVertexInstance2.setup(this.conf);
            hashMap.put(vertexId2, newVertexInstance2);
        }
        for (Vertex<V, E, M> vertex2 : this.vertices) {
            if (hashMap.containsKey(vertex2.getVertexID())) {
                hashMap.remove(vertex2.getVertexID());
            }
        }
        this.vertices.addAll(hashMap.values());
        hashMap.clear();
    }

    private int partitionMultiSteps(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> bSPPeer, long j) throws IOException, SyncException, InterruptedException {
        int i = 1;
        MapWritable mapWritable = new MapWritable();
        mapWritable.put(new IntWritable(bSPPeer.getPeerIndex()), new LongWritable(j));
        bSPPeer.send(getMasterTask(bSPPeer), new GraphJobMessage(mapWritable));
        bSPPeer.sync();
        if (isMasterTask(bSPPeer)) {
            long j2 = 0;
            while (true) {
                GraphJobMessage graphJobMessage = (GraphJobMessage) bSPPeer.getCurrentMessage();
                if (graphJobMessage == null) {
                    break;
                }
                Iterator it = graphJobMessage.getMap().entrySet().iterator();
                while (it.hasNext()) {
                    long j3 = ((LongWritable) ((Map.Entry) it.next()).getValue()).get();
                    if (j2 < j3) {
                        j2 = j3;
                    }
                }
            }
            int i2 = ((int) (j2 / this.conf.getLong("hama.graph.multi.step.partitioning.interval", 20000000L))) + 1;
            for (String str : bSPPeer.getAllPeerNames()) {
                MapWritable mapWritable2 = new MapWritable();
                mapWritable2.put(new Text("max"), new IntWritable(i2));
                bSPPeer.send(str, new GraphJobMessage(mapWritable2));
            }
        }
        bSPPeer.sync();
        Iterator it2 = ((GraphJobMessage) bSPPeer.getCurrentMessage()).getMap().entrySet().iterator();
        while (it2.hasNext()) {
            i = ((IntWritable) ((Map.Entry) it2.next()).getValue()).get();
        }
        if (isMasterTask(bSPPeer)) {
            bSPPeer.getCounter(GraphJobCounter.MULTISTEP_PARTITIONING).increment(i);
        }
        return i;
    }

    private void countGlobalVertexCount(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> bSPPeer) throws IOException, SyncException, InterruptedException {
        for (String str : bSPPeer.getAllPeerNames()) {
            bSPPeer.send(str, new GraphJobMessage(new IntWritable(this.vertices.size())));
        }
        bSPPeer.sync();
        while (true) {
            GraphJobMessage graphJobMessage = (GraphJobMessage) bSPPeer.getCurrentMessage();
            if (graphJobMessage == null) {
                break;
            } else if (graphJobMessage.isVerticesSizeMessage()) {
                this.numberVertices += graphJobMessage.getVerticesSize().get();
            }
        }
        if (isMasterTask(bSPPeer)) {
            bSPPeer.getCounter(GraphJobCounter.INPUT_VERTICES).increment(this.numberVertices);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private Map<V, List<M>> parseMessages(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> bSPPeer) throws IOException {
        HashMap hashMap = new HashMap();
        while (true) {
            GraphJobMessage graphJobMessage = (GraphJobMessage) bSPPeer.getCurrentMessage();
            if (graphJobMessage == null) {
                return hashMap;
            }
            if (graphJobMessage.isVertexMessage()) {
                Writable vertexId = graphJobMessage.getVertexId();
                Writable vertexValue = graphJobMessage.getVertexValue();
                List list = (List) hashMap.get(vertexId);
                if (list == null) {
                    list = new ArrayList();
                    hashMap.put(vertexId, list);
                }
                list.add(vertexValue);
            } else {
                if (!graphJobMessage.isMapMessage()) {
                    throw new UnsupportedOperationException("Unknown message type: " + graphJobMessage);
                }
                for (Map.Entry entry : graphJobMessage.getMap().entrySet()) {
                    Text text = (Text) entry.getKey();
                    if (FLAG_MESSAGE_COUNTS.equals(text)) {
                        if (((IntWritable) entry.getValue()).get() == Integer.MIN_VALUE) {
                            this.updated = false;
                        } else {
                            this.globalUpdateCounts += ((IntWritable) entry.getValue()).get();
                        }
                    } else if (this.aggregationRunner.isEnabled() && text.toString().startsWith(S_FLAG_AGGREGATOR_VALUE)) {
                        this.aggregationRunner.masterReadAggregatedValue(text, (Writable) entry.getValue());
                    } else if (this.aggregationRunner.isEnabled() && text.toString().startsWith(S_FLAG_AGGREGATOR_INCREMENT)) {
                        this.aggregationRunner.masterReadAggregatedIncrementalValue(text, (Writable) entry.getValue());
                    }
                }
            }
        }
    }

    public final long getNumberVertices() {
        return this.numberVertices;
    }

    public final long getNumberIterations() {
        return this.iteration;
    }

    public final int getMaxIteration() {
        return this.maxIteration;
    }

    public final Partitioner<V, M> getPartitioner() {
        return this.partitioner;
    }

    public final Writable getLastAggregatedValue(int i) {
        return this.aggregationRunner.getLastAggregatedValue(i);
    }

    public final IntWritable getNumLastAggregatedVertices(int i) {
        return this.aggregationRunner.getNumLastAggregatedVertices(i);
    }

    public final BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> getPeer() {
        return this.peer;
    }

    public static boolean isMasterTask(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> bSPPeer) {
        return bSPPeer.getPeerName().equals(getMasterTask(bSPPeer));
    }

    public static String getMasterTask(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> bSPPeer) {
        return bSPPeer.getPeerName(0);
    }

    public static <V extends Writable, E extends Writable, M extends Writable> Vertex<V, E, M> newVertexInstance(Class<?> cls, Configuration configuration) {
        return (Vertex) ReflectionUtils.newInstance(cls, configuration);
    }
}
