package org.apache.hama.graph;

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.BSP;
import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.Combiner;
import org.apache.hama.bsp.HashPartitioner;
import org.apache.hama.bsp.Partitioner;
import org.apache.hama.bsp.sync.SyncException;
import org.apache.hama.commons.util.KeyValuePair;
import org.apache.hama.util.UnsafeByteArrayInputStream;
import org.apache.hama.util.WritableUtils;

/* loaded from: input_file:org/apache/hama/graph/GraphJobRunner.class */
public final class GraphJobRunner<V extends WritableComparable, E extends Writable, M extends Writable> extends BSP<Writable, Writable, Writable, Writable, GraphJobMessage> {
    public static final String S_FLAG_AGGREGATOR_VALUE = "hama.1";
    public static final String S_FLAG_AGGREGATOR_INCREMENT = "hama.2";
    public static final String VERTEX_CLASS_KEY = "hama.graph.vertex.class";
    public static final String DEFAULT_THREAD_POOL_SIZE = "hama.graph.thread.pool.size";
    private HamaConfiguration conf;
    private Partitioner<V, M> partitioner;
    public static Class<?> VERTEX_CLASS;
    public static Class<? extends WritableComparable> VERTEX_ID_CLASS;
    public static Class<? extends Writable> VERTEX_VALUE_CLASS;
    public static Class<? extends Writable> EDGE_VALUE_CLASS;
    public static Class<Vertex<?, ?, ?>> vertexClass;
    private VerticesInfo<V, E, M> vertices;
    private AggregationRunner<V, E, M> aggregationRunner;
    private VertexOutputWriter<Writable, Writable, V, E, M> vertexOutputWriter;
    private Combiner<Writable> combiner;
    private BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer;
    private static final Log LOG = LogFactory.getLog(GraphJobRunner.class);
    public static final String S_FLAG_MESSAGE_COUNTS = "hama.0";
    public static final Text FLAG_MESSAGE_COUNTS = new Text(S_FLAG_MESSAGE_COUNTS);
    public static final String S_FLAG_VERTEX_INCREASE = "hama.3";
    public static final Text FLAG_VERTEX_INCREASE = new Text(S_FLAG_VERTEX_INCREASE);
    public static final String S_FLAG_VERTEX_DECREASE = "hama.4";
    public static final Text FLAG_VERTEX_DECREASE = new Text(S_FLAG_VERTEX_DECREASE);
    public static final String S_FLAG_VERTEX_ALTER_COUNTER = "hama.5";
    public static final Text FLAG_VERTEX_ALTER_COUNTER = new Text(S_FLAG_VERTEX_ALTER_COUNTER);
    public static final String S_FLAG_VERTEX_TOTAL_VERTICES = "hama.6";
    public static final Text FLAG_VERTEX_TOTAL_VERTICES = new Text(S_FLAG_VERTEX_TOTAL_VERTICES);
    private boolean updated = true;
    private int globalUpdateCounts = 0;
    private int changedVertexCnt = 0;
    private long numberVertices = 0;
    private int maxIteration = -1;
    private long iteration = 0;
    private AtomicInteger errorCount = new AtomicInteger(0);
    private RejectedExecutionHandler retryHandler = new RetryRejectedExecutionHandler();
    private final ConcurrentHashMap<Integer, GraphJobMessage> partitionMessages = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<V, GraphJobMessage> vertexMessages = new ConcurrentHashMap<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.hama.graph.GraphJobRunner$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/hama/graph/GraphJobRunner$1.class */
    public class AnonymousClass1 implements Iterable<Writable> {
        DataInputStream dis;
        final /* synthetic */ byte[] val$valuesBytes;
        final /* synthetic */ int val$numOfValues;

        AnonymousClass1(byte[] bArr, int i) {
            this.val$valuesBytes = bArr;
            this.val$numOfValues = i;
        }

        @Override // java.lang.Iterable
        public Iterator<Writable> iterator() {
            if (GraphJobRunner.this.conf.getBoolean("hama.use.unsafeserialization", false)) {
                this.dis = new DataInputStream(new UnsafeByteArrayInputStream(this.val$valuesBytes));
            } else {
                this.dis = new DataInputStream(new ByteArrayInputStream(this.val$valuesBytes));
            }
            return new Iterator<Writable>() { // from class: org.apache.hama.graph.GraphJobRunner.1.1
                int index = 0;

                @Override // java.util.Iterator
                public boolean hasNext() {
                    return this.index < AnonymousClass1.this.val$numOfValues;
                }

                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.Iterator
                public Writable next() {
                    Writable createVertexValue = GraphJobRunner.createVertexValue();
                    try {
                        createVertexValue.readFields(AnonymousClass1.this.dis);
                        this.index++;
                        return createVertexValue;
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                }

                @Override // java.util.Iterator
                public void remove() {
                }
            };
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hama/graph/GraphJobRunner$AddVertex.class */
    public class AddVertex implements Runnable {
        GraphJobMessage msg;

        public AddVertex(GraphJobMessage graphJobMessage) {
            this.msg = graphJobMessage;
        }

        @Override // java.lang.Runnable
        public void run() {
            DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(this.msg.getValuesBytes()));
            for (int i = 0; i < this.msg.getNumOfValues(); i++) {
                try {
                    Vertex newVertexInstance = GraphJobRunner.newVertexInstance(GraphJobRunner.VERTEX_CLASS);
                    newVertexInstance.readFields(dataInputStream);
                    GraphJobRunner.this.addVertex(newVertexInstance);
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hama/graph/GraphJobRunner$ComputeRunnable.class */
    public class ComputeRunnable implements Runnable {
        Vertex<V, E, M> vertex;
        Iterable<M> msgs;

        public ComputeRunnable(GraphJobMessage graphJobMessage) throws IOException {
            this.vertex = GraphJobRunner.this.vertices.get(graphJobMessage.getVertexId());
            this.msgs = (Iterable<M>) GraphJobRunner.this.getIterableMessages(graphJobMessage.getValuesBytes(), graphJobMessage.getNumOfValues());
        }

        public ComputeRunnable(V v) throws IOException {
            this.vertex = GraphJobRunner.this.vertices.get(v);
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                if (GraphJobRunner.this.iteration == 0) {
                    this.vertex.setup(GraphJobRunner.this.conf);
                    this.msgs = Collections.singleton(this.vertex.getValue());
                }
                this.vertex.compute(this.msgs);
                GraphJobRunner.this.vertices.finishVertexComputation(this.vertex);
            } catch (IOException e) {
                GraphJobRunner.this.incrementErrorCount();
                throw new RuntimeException(e);
            }
        }
    }

    /* loaded from: input_file:org/apache/hama/graph/GraphJobRunner$GraphJobCounter.class */
    public enum GraphJobCounter {
        MULTISTEP_PARTITIONING,
        ITERATIONS,
        INPUT_VERTICES,
        AGGREGATE_VERTICES
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hama/graph/GraphJobRunner$Parser.class */
    public class Parser implements Runnable {
        Vertex<V, E, M> vertex;

        public Parser(Vertex<V, E, M> vertex) {
            this.vertex = vertex;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                int partitionID = GraphJobRunner.this.getPartitionID(this.vertex.getVertexID());
                if (GraphJobRunner.this.peer.getPeerIndex() == partitionID) {
                    GraphJobRunner.this.addVertex(this.vertex);
                } else {
                    ((GraphJobMessage) GraphJobRunner.this.partitionMessages.get(Integer.valueOf(partitionID))).add(WritableUtils.serialize(this.vertex));
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    /* loaded from: input_file:org/apache/hama/graph/GraphJobRunner$RetryRejectedExecutionHandler.class */
    class RetryRejectedExecutionHandler implements RejectedExecutionHandler {
        RetryRejectedExecutionHandler() {
        }

        @Override // java.util.concurrent.RejectedExecutionHandler
        public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
            try {
                Thread.sleep(10L);
            } catch (InterruptedException e) {
                GraphJobRunner.LOG.error(e);
            }
            threadPoolExecutor.execute(runnable);
        }
    }

    public final void setup(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> bSPPeer) throws IOException, SyncException, InterruptedException {
        setupFields(bSPPeer);
        long currentTimeMillis = System.currentTimeMillis();
        loadVertices(bSPPeer);
        LOG.info("Total time spent for loading vertices: " + (System.currentTimeMillis() - currentTimeMillis) + " ms");
        long currentTimeMillis2 = System.currentTimeMillis();
        countGlobalVertexCount(bSPPeer);
        LOG.info("Total time spent for broadcasting global vertex count: " + (System.currentTimeMillis() - currentTimeMillis2) + " ms");
        long currentTimeMillis3 = System.currentTimeMillis();
        doInitialSuperstep(bSPPeer);
        LOG.info("Total time spent for initial superstep: " + (System.currentTimeMillis() - currentTimeMillis3) + " ms");
    }

    public final void bsp(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> bSPPeer) throws IOException, SyncException, InterruptedException {
        while (this.updated) {
            if (this.maxIteration > 0 && this.iteration > this.maxIteration) {
                return;
            }
            this.globalUpdateCounts = 0;
            bSPPeer.sync();
            GraphJobMessage parseMessages = parseMessages(bSPPeer);
            long currentTimeMillis = System.currentTimeMillis();
            doAggregationUpdates(bSPPeer);
            LOG.info("Total time spent for broadcasting aggregation values: " + (System.currentTimeMillis() - currentTimeMillis) + " ms");
            if (!this.updated) {
                return;
            } else {
                doSuperstep(parseMessages, bSPPeer);
            }
        }
    }

    public final void cleanup(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> bSPPeer) throws IOException {
        this.vertexOutputWriter.setup(this.conf);
        Iterator<Vertex<V, E, M>> it = this.vertices.iterator();
        while (it.hasNext()) {
            this.vertexOutputWriter.write(it.next(), bSPPeer);
        }
        this.vertices.clear();
    }

    private void setupFields(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> bSPPeer) throws IOException {
        this.peer = bSPPeer;
        this.conf = bSPPeer.getConfiguration();
        this.maxIteration = bSPPeer.getConfiguration().getInt("hama.graph.max.iteration", -1);
        initClasses(this.conf);
        this.partitioner = (Partitioner) ReflectionUtils.newInstance(this.conf.getClass("bsp.input.partitioner.class", HashPartitioner.class), this.conf);
        this.vertexOutputWriter = (VertexOutputWriter) org.apache.hama.util.ReflectionUtils.newInstance(this.conf.getClass(GraphJob.VERTEX_OUTPUT_WRITER_CLASS_ATTR, VertexOutputWriter.class));
        setAggregationRunner(new AggregationRunner<>());
        getAggregationRunner().setupAggregators(bSPPeer);
        this.vertices = (VerticesInfo) org.apache.hama.util.ReflectionUtils.newInstance(this.conf.getClass("hama.graph.vertices.info", MapVerticesInfo.class, VerticesInfo.class));
        this.vertices.init(this, this.conf, bSPPeer.getTaskId());
        String str = this.conf.get("bsp.combiner.class");
        if (str != null) {
            try {
                this.combiner = (Combiner) org.apache.hama.util.ReflectionUtils.newInstance(str);
            } catch (ClassNotFoundException e) {
                throw new IOException(e);
            }
        }
    }

    private void doSuperstep(GraphJobMessage graphJobMessage, BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> bSPPeer) throws IOException {
        this.errorCount.set(0);
        long currentTimeMillis = System.currentTimeMillis();
        this.changedVertexCnt = 0;
        this.vertices.startSuperstep();
        ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
        threadPoolExecutor.setMaximumPoolSize(this.conf.getInt(DEFAULT_THREAD_POOL_SIZE, 64));
        threadPoolExecutor.setRejectedExecutionHandler(this.retryHandler);
        long currentTimeMillis2 = System.currentTimeMillis();
        while (graphJobMessage != null) {
            threadPoolExecutor.execute(new ComputeRunnable(graphJobMessage));
            graphJobMessage = (GraphJobMessage) bSPPeer.getCurrentMessage();
        }
        LOG.info("Total time spent for superstep-" + bSPPeer.getSuperstepCount() + " looping: " + (System.currentTimeMillis() - currentTimeMillis2) + " ms");
        threadPoolExecutor.shutdown();
        try {
            threadPoolExecutor.awaitTermination(60L, TimeUnit.SECONDS);
            if (this.errorCount.get() > 0) {
                throw new IOException("there were " + this.errorCount + " exceptions during compute vertices.");
            }
            for (Vertex<V, E, M> vertex : this.vertices) {
                if (!vertex.isHalted() && !vertex.isComputed()) {
                    vertex.compute(Collections.emptyList());
                    this.vertices.finishVertexComputation(vertex);
                }
            }
            getAggregationRunner().sendAggregatorValues(bSPPeer, this.vertices.getActiveVerticesNum(), this.changedVertexCnt);
            this.iteration++;
            LOG.info("Total time spent for superstep-" + bSPPeer.getSuperstepCount() + " computing vertices: " + (System.currentTimeMillis() - currentTimeMillis) + " ms");
            long currentTimeMillis3 = System.currentTimeMillis();
            finishSuperstep();
            LOG.info("Total time spent for superstep-" + bSPPeer.getSuperstepCount() + " synchronizing: " + (System.currentTimeMillis() - currentTimeMillis3) + " ms");
        } catch (InterruptedException e) {
            throw new IOException(e);
        }
    }

    private void doInitialSuperstep(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> bSPPeer) throws IOException {
        this.changedVertexCnt = 0;
        this.errorCount.set(0);
        this.vertices.startSuperstep();
        ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
        threadPoolExecutor.setMaximumPoolSize(this.conf.getInt(DEFAULT_THREAD_POOL_SIZE, 64));
        threadPoolExecutor.setRejectedExecutionHandler(this.retryHandler);
        Iterator<V> it = this.vertices.keySet().iterator();
        while (it.hasNext()) {
            threadPoolExecutor.execute(new ComputeRunnable(it.next()));
        }
        threadPoolExecutor.shutdown();
        try {
            threadPoolExecutor.awaitTermination(60L, TimeUnit.SECONDS);
            if (this.errorCount.get() > 0) {
                throw new IOException("there were " + this.errorCount + " exceptions during compute vertices.");
            }
            getAggregationRunner().sendAggregatorValues(bSPPeer, 1, this.changedVertexCnt);
            this.iteration++;
            finishSuperstep();
        } catch (InterruptedException e) {
            throw new IOException(e);
        }
    }

    public void incrementErrorCount() {
        this.errorCount.incrementAndGet();
    }

    private void doAggregationUpdates(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> bSPPeer) throws IOException, SyncException, InterruptedException {
        if (isMasterTask(bSPPeer)) {
            MapWritable mapWritable = new MapWritable();
            mapWritable.put(FLAG_VERTEX_TOTAL_VERTICES, new LongWritable(bSPPeer.getCounter(GraphJobCounter.INPUT_VERTICES).getCounter()));
            if (this.globalUpdateCounts == 0) {
                mapWritable.put(FLAG_MESSAGE_COUNTS, new IntWritable(Integer.MIN_VALUE));
            } else {
                getAggregationRunner().doMasterAggregation(mapWritable);
            }
            for (String str : bSPPeer.getAllPeerNames()) {
                bSPPeer.send(str, new GraphJobMessage(mapWritable));
            }
        }
        if (getAggregationRunner().isEnabled()) {
            bSPPeer.sync();
            this.updated = getAggregationRunner().receiveAggregatedValues(bSPPeer.getCurrentMessage().getMap(), this.iteration);
        }
    }

    public static <V extends WritableComparable<? super V>, E extends Writable, M extends Writable> void initClasses(Configuration configuration) {
        Class<? extends WritableComparable> cls = configuration.getClass(GraphJob.VERTEX_ID_CLASS_ATTR, Text.class, Writable.class);
        Class<? extends Writable> cls2 = configuration.getClass(GraphJob.VERTEX_VALUE_CLASS_ATTR, IntWritable.class, Writable.class);
        Class<? extends Writable> cls3 = configuration.getClass(GraphJob.VERTEX_EDGE_VALUE_CLASS_ATTR, IntWritable.class, Writable.class);
        vertexClass = configuration.getClass("hama.graph.vertex.class", Vertex.class);
        VERTEX_ID_CLASS = cls;
        VERTEX_VALUE_CLASS = cls2;
        VERTEX_CLASS = vertexClass;
        EDGE_VALUE_CLASS = cls3;
    }

    private void loadVertices(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> bSPPeer) throws IOException, SyncException, InterruptedException {
        for (int i = 0; i < bSPPeer.getNumPeers(); i++) {
            this.partitionMessages.put(Integer.valueOf(i), new GraphJobMessage());
        }
        VertexInputReader vertexInputReader = (VertexInputReader) org.apache.hama.util.ReflectionUtils.newInstance(this.conf.getClass("bsp.runtime.partition.recordconverter", VertexInputReader.class));
        ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
        threadPoolExecutor.setMaximumPoolSize(this.conf.getInt(DEFAULT_THREAD_POOL_SIZE, 64));
        threadPoolExecutor.setRejectedExecutionHandler(this.retryHandler);
        while (true) {
            KeyValuePair readNext = bSPPeer.readNext();
            if (readNext == null) {
                break;
            }
            Vertex<V, E, M> newVertexInstance = newVertexInstance(VERTEX_CLASS);
            try {
                if (vertexInputReader.parseVertex((Writable) readNext.getKey(), (Writable) readNext.getValue(), newVertexInstance)) {
                    threadPoolExecutor.execute(new Parser(newVertexInstance));
                }
            } catch (Exception e) {
                throw new IOException("Parse exception occured: " + e);
            }
        }
        threadPoolExecutor.shutdown();
        threadPoolExecutor.awaitTermination(60L, TimeUnit.SECONDS);
        Iterator<Map.Entry<Integer, GraphJobMessage>> it = this.partitionMessages.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<Integer, GraphJobMessage> next = it.next();
            it.remove();
            GraphJobMessage value = next.getValue();
            value.setFlag(8);
            bSPPeer.send(getHostName(next.getKey().intValue()), value);
        }
        bSPPeer.sync();
        ThreadPoolExecutor threadPoolExecutor2 = (ThreadPoolExecutor) Executors.newCachedThreadPool();
        threadPoolExecutor2.setMaximumPoolSize(this.conf.getInt(DEFAULT_THREAD_POOL_SIZE, 64));
        threadPoolExecutor2.setRejectedExecutionHandler(this.retryHandler);
        while (true) {
            GraphJobMessage currentMessage = bSPPeer.getCurrentMessage();
            if (currentMessage == null) {
                threadPoolExecutor2.shutdown();
                threadPoolExecutor2.awaitTermination(60L, TimeUnit.SECONDS);
                LOG.info(this.vertices.size() + " vertices are loaded into " + bSPPeer.getPeerName());
                return;
            }
            threadPoolExecutor2.execute(new AddVertex(currentMessage));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void addVertex(Vertex<V, E, M> vertex) throws IOException {
        if (this.conf.getBoolean("hama.graph.self.ref", false)) {
            vertex.addEdge(new Edge<>(vertex.getVertexID(), null));
        }
        this.vertices.put(vertex);
    }

    private void removeVertex(V v) {
        this.vertices.remove(v);
        LOG.debug("Removed VertexID: " + v + " in peer " + this.peer.getPeerName());
    }

    private void countGlobalVertexCount(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> bSPPeer) throws IOException, SyncException, InterruptedException {
        for (String str : bSPPeer.getAllPeerNames()) {
            bSPPeer.send(str, new GraphJobMessage(new IntWritable(this.vertices.size())));
        }
        bSPPeer.sync();
        while (true) {
            GraphJobMessage currentMessage = bSPPeer.getCurrentMessage();
            if (currentMessage == null) {
                break;
            } else if (currentMessage.isVerticesSizeMessage()) {
                this.numberVertices += currentMessage.getVerticesSize().get();
            }
        }
        if (isMasterTask(bSPPeer)) {
            bSPPeer.getCounter(GraphJobCounter.INPUT_VERTICES).increment(this.numberVertices);
        }
    }

    /* JADX WARN: Code restructure failed: missing block: B:78:0x01b5, code lost:
    
        if (r8 == false) goto L52;
     */
    /* JADX WARN: Code restructure failed: missing block: B:79:0x01b8, code lost:
    
        finishAdditions();
     */
    /* JADX WARN: Code restructure failed: missing block: B:81:0x01be, code lost:
    
        if (r9 == false) goto L55;
     */
    /* JADX WARN: Code restructure failed: missing block: B:82:0x01c1, code lost:
    
        finishRemovals();
     */
    /* JADX WARN: Code restructure failed: missing block: B:84:0x01c6, code lost:
    
        return r0;
     */
    /* JADX WARN: Multi-variable type inference failed */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private org.apache.hama.graph.GraphJobMessage parseMessages(org.apache.hama.bsp.BSPPeer<org.apache.hadoop.io.Writable, org.apache.hadoop.io.Writable, org.apache.hadoop.io.Writable, org.apache.hadoop.io.Writable, org.apache.hama.graph.GraphJobMessage> r6) throws java.io.IOException, org.apache.hama.bsp.sync.SyncException, java.lang.InterruptedException {
        /*
            Method dump skipped, instructions count: 455
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.hama.graph.GraphJobRunner.parseMessages(org.apache.hama.bsp.BSPPeer):org.apache.hama.graph.GraphJobMessage");
    }

    private void finishRemovals() throws IOException {
        this.vertices.finishRemovals();
    }

    private void finishAdditions() throws IOException {
        this.vertices.finishAdditions();
    }

    public void sendMessage(V v, M m) throws IOException {
        if (!this.vertexMessages.containsKey(v)) {
            this.vertexMessages.putIfAbsent(v, new GraphJobMessage());
        }
        if (this.conf.getBoolean("hama.use.unsafeserialization", false)) {
            this.vertexMessages.get(v).add(WritableUtils.unsafeSerialize(m));
        } else {
            this.vertexMessages.get(v).add(WritableUtils.serialize(m));
        }
    }

    public void sendMessage(List<Edge<V, E>> list, M m) throws IOException {
        byte[] serialize = !this.conf.getBoolean("hama.use.unsafeserialization", false) ? WritableUtils.serialize(m) : WritableUtils.unsafeSerialize(m);
        for (Edge<V, E> edge : list) {
            if (!this.vertexMessages.containsKey(edge.getDestinationVertexID())) {
                this.vertexMessages.putIfAbsent(edge.getDestinationVertexID(), new GraphJobMessage());
            }
            this.vertexMessages.get(edge.getDestinationVertexID()).add(serialize);
        }
    }

    public void finishSuperstep() throws IOException {
        this.vertices.finishSuperstep();
        Iterator<Map.Entry<V, GraphJobMessage>> it = this.vertexMessages.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<V, GraphJobMessage> next = it.next();
            it.remove();
            if (this.combiner == null || next.getValue().getNumOfValues() <= 1) {
                next.getValue().setVertexId(next.getKey());
                next.getValue().setFlag(2);
                this.peer.send(getHostName((GraphJobRunner<V, E, M>) next.getKey()), next.getValue());
            } else {
                GraphJobMessage graphJobMessage = new GraphJobMessage(next.getKey(), WritableUtils.serialize(this.combiner.combine(getIterableMessages(next.getValue().getValuesBytes(), next.getValue().getNumOfValues()))));
                graphJobMessage.setFlag(2);
                this.peer.send(getHostName((GraphJobRunner<V, E, M>) next.getKey()), graphJobMessage);
            }
        }
        if (isMasterTask(this.peer)) {
            this.peer.getCounter(GraphJobCounter.ITERATIONS).increment(1L);
        }
    }

    public Iterable<Writable> getIterableMessages(byte[] bArr, int i) {
        return new AnonymousClass1(bArr, i);
    }

    public String getHostName(V v) {
        return this.peer.getPeerName(this.partitioner.getPartition(v, (Object) null, this.peer.getNumPeers()));
    }

    public int getPartitionID(V v) {
        return this.partitioner.getPartition(v, (Object) null, this.peer.getNumPeers());
    }

    public String getHostName(int i) {
        return this.peer.getPeerName(i);
    }

    public final long getNumberVertices() {
        return this.numberVertices;
    }

    public final long getNumberIterations() {
        return this.iteration;
    }

    public final int getMaxIteration() {
        return this.maxIteration;
    }

    public final Writable getLastAggregatedValue(int i) {
        return getAggregationRunner().getLastAggregatedValue(i);
    }

    public final IntWritable getNumLastAggregatedVertices(int i) {
        return getAggregationRunner().getNumLastAggregatedVertices(i);
    }

    public final BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> getPeer() {
        return this.peer;
    }

    public static boolean isMasterTask(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> bSPPeer) {
        return bSPPeer.getPeerName().equals(getMasterTask(bSPPeer));
    }

    public static String getMasterTask(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> bSPPeer) {
        return bSPPeer.getPeerName(0);
    }

    public static <V extends WritableComparable, E extends Writable, M extends Writable> Vertex<V, E, M> newVertexInstance(Class<?> cls) {
        return (Vertex) org.apache.hama.util.ReflectionUtils.newInstance(cls);
    }

    public static <X extends Writable> X createVertexIDObject() {
        return (X) org.apache.hama.util.ReflectionUtils.newInstance(VERTEX_ID_CLASS);
    }

    public static <X extends Writable> X createVertexValue() {
        return (X) org.apache.hama.util.ReflectionUtils.newInstance(VERTEX_VALUE_CLASS);
    }

    public static <X extends Writable> X createEdgeCostObject() {
        return (X) org.apache.hama.util.ReflectionUtils.newInstance(EDGE_VALUE_CLASS);
    }

    public int getChangedVertexCnt() {
        return this.changedVertexCnt;
    }

    public void setChangedVertexCnt(int i) {
        this.changedVertexCnt = i;
    }

    public AggregationRunner<V, E, M> getAggregationRunner() {
        return this.aggregationRunner;
    }

    void setAggregationRunner(AggregationRunner<V, E, M> aggregationRunner) {
        this.aggregationRunner = aggregationRunner;
    }
}
