package org.apache.giraph.graph;

import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.yammer.metrics.core.Counter;
import com.yammer.metrics.core.Histogram;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.Callable;
import org.apache.commons.configuration.tree.DefaultExpressionEngine;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.comm.messages.MessageStore;
import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.function.primitive.PrimitiveRefs;
import org.apache.giraph.io.SimpleVertexWriter;
import org.apache.giraph.metrics.GiraphMetrics;
import org.apache.giraph.metrics.MetricNames;
import org.apache.giraph.metrics.SuperstepMetricsRegistry;
import org.apache.giraph.ooc.OutOfCoreEngine;
import org.apache.giraph.partition.Partition;
import org.apache.giraph.partition.PartitionStats;
import org.apache.giraph.partition.PartitionStore;
import org.apache.giraph.time.SystemTime;
import org.apache.giraph.time.Time;
import org.apache.giraph.time.Times;
import org.apache.giraph.utils.MemoryUtils;
import org.apache.giraph.utils.TimedLogger;
import org.apache.giraph.utils.Trimmable;
import org.apache.giraph.worker.WorkerProgress;
import org.apache.giraph.worker.WorkerThreadGlobalCommUsage;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.Progressable;
import org.apache.log4j.Logger;

/* loaded from: input_file:org/apache/giraph/graph/ComputeCallable.class */
public class ComputeCallable<I extends WritableComparable, V extends Writable, E extends Writable, M1 extends Writable, M2 extends Writable> implements Callable<Collection<PartitionStats>> {
    private static final Logger LOG = Logger.getLogger(ComputeCallable.class);
    private static final Time TIME = SystemTime.get();
    private final long verticesToUpdateProgress;
    private final Mapper<?, ?, ?, ?>.Context context;
    private final GraphState graphState;
    private final MessageStore<I, M1> messageStore;
    private final ImmutableClassesGiraphConfiguration<I, V, E> configuration;
    private final CentralizedServiceWorker<I, V, E> serviceWorker;
    private SimpleVertexWriter<I, V, E> vertexWriter;
    private final Counter messagesSentCounter;
    private final Counter messageBytesSentCounter;
    private final Histogram histogramComputePerPartition;
    private final Histogram histogramGCTimePerThread;
    private final Histogram histogramWaitTimePerThread;
    private final Histogram histogramProcessingTimePerThread;
    private final TimedLogger timedLogger = new TimedLogger(30000, LOG);
    private final long startNanos = TIME.getNanoseconds();

    public ComputeCallable(Mapper<?, ?, ?, ?>.Context context, GraphState graphState, MessageStore<I, M1> messageStore, ImmutableClassesGiraphConfiguration<I, V, E> immutableClassesGiraphConfiguration, CentralizedServiceWorker<I, V, E> centralizedServiceWorker) {
        this.context = context;
        this.configuration = immutableClassesGiraphConfiguration;
        this.messageStore = messageStore;
        this.serviceWorker = centralizedServiceWorker;
        this.graphState = graphState;
        SuperstepMetricsRegistry perSuperstep = GiraphMetrics.get().perSuperstep();
        this.messagesSentCounter = perSuperstep.getCounter(MetricNames.MESSAGES_SENT);
        this.messageBytesSentCounter = perSuperstep.getCounter(MetricNames.MESSAGE_BYTES_SENT);
        this.histogramComputePerPartition = perSuperstep.getUniformHistogram(MetricNames.HISTOGRAM_COMPUTE_PER_PARTITION);
        this.histogramGCTimePerThread = perSuperstep.getUniformHistogram("gc-per-thread-ms");
        this.histogramWaitTimePerThread = perSuperstep.getUniformHistogram("wait-per-thread-ms");
        this.histogramProcessingTimePerThread = perSuperstep.getUniformHistogram("processing-per-thread-ms");
        this.verticesToUpdateProgress = GiraphConstants.VERTICES_TO_UPDATE_PROGRESS.get(immutableClassesGiraphConfiguration);
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.concurrent.Callable
    public Collection<PartitionStats> call() {
        long j;
        NettyWorkerClientRequestProcessor nettyWorkerClientRequestProcessor = new NettyWorkerClientRequestProcessor(this.context, this.configuration, this.serviceWorker, this.configuration.getOutgoingMessageEncodeAndStoreType().useOneMessageToManyIdsEncoding());
        WorkerThreadGlobalCommUsage newThreadAggregatorUsage = this.serviceWorker.getAggregatorHandler().newThreadAggregatorUsage();
        this.vertexWriter = this.serviceWorker.getSuperstepOutput().getVertexWriter();
        Computation<I, V, E, M1, M2> createComputation = this.configuration.createComputation();
        createComputation.initialize(this.graphState, nettyWorkerClientRequestProcessor, this.serviceWorker, newThreadAggregatorUsage);
        createComputation.preSuperstep();
        ArrayList newArrayList = Lists.newArrayList();
        PartitionStore<I, V, E> partitionStore = this.serviceWorker.getPartitionStore();
        OutOfCoreEngine oocEngine = this.serviceWorker.getServerData().getOocEngine();
        GraphTaskManager<I, V, E> graphTaskManager = this.serviceWorker.getGraphTaskManager();
        if (oocEngine != null) {
            oocEngine.processingThreadStart();
        }
        long j2 = 0;
        long j3 = 0;
        long j4 = 0;
        while (true) {
            long currentTimeMillis = System.currentTimeMillis();
            long superstepGCTime = graphTaskManager.getSuperstepGCTime();
            Partition<I, V, E> nextPartition = partitionStore.getNextPartition();
            long superstepGCTime2 = graphTaskManager.getSuperstepGCTime() - superstepGCTime;
            j = j4 + superstepGCTime2;
            j2 += (System.currentTimeMillis() - currentTimeMillis) - superstepGCTime2;
            if (nextPartition == null) {
                break;
            }
            long currentTimeMillis2 = System.currentTimeMillis();
            long superstepGCTime3 = graphTaskManager.getSuperstepGCTime();
            try {
                try {
                    this.serviceWorker.getServerData().resolvePartitionMutation(nextPartition);
                    PartitionStats computePartition = computePartition(createComputation, nextPartition, oocEngine, this.serviceWorker.getConfiguration().getIncomingMessageClasses().ignoreExistingVertices());
                    newArrayList.add(computePartition);
                    long resetMessageCount = nettyWorkerClientRequestProcessor.resetMessageCount();
                    computePartition.addMessagesSentCount(resetMessageCount);
                    this.messagesSentCounter.inc(resetMessageCount);
                    long resetMessageBytesCount = nettyWorkerClientRequestProcessor.resetMessageBytesCount();
                    computePartition.addMessageBytesSentCount(resetMessageBytesCount);
                    this.messageBytesSentCounter.inc(resetMessageBytesCount);
                    this.timedLogger.info("call: Completed " + newArrayList.size() + " partitions, " + partitionStore.getNumPartitions() + " remaining " + MemoryUtils.getRuntimeMemoryStats());
                    long superstepGCTime4 = graphTaskManager.getSuperstepGCTime() - superstepGCTime3;
                    j4 = j + superstepGCTime4;
                    long currentTimeMillis3 = (System.currentTimeMillis() - currentTimeMillis2) - superstepGCTime4;
                    j3 += currentTimeMillis3;
                    computePartition.setComputeMs(currentTimeMillis3);
                    partitionStore.putPartition(nextPartition);
                    this.histogramComputePerPartition.update(System.currentTimeMillis() - currentTimeMillis);
                } catch (IOException e) {
                    throw new IllegalStateException("call: Caught unexpected IOException, failing.", e);
                } catch (InterruptedException e2) {
                    throw new IllegalStateException("call: Caught unexpected InterruptedException, failing.", e2);
                }
            } catch (Throwable th) {
                partitionStore.putPartition(nextPartition);
                throw th;
            }
        }
        this.histogramGCTimePerThread.update(j);
        this.histogramWaitTimePerThread.update(j2);
        this.histogramProcessingTimePerThread.update(j3);
        createComputation.postSuperstep();
        this.serviceWorker.getSuperstepOutput().returnVertexWriter(this.vertexWriter);
        if (LOG.isInfoEnabled()) {
            LOG.info("call: Computation took " + (((float) Times.getNanosSince(TIME, this.startNanos)) / 1.0E9f) + " secs for " + newArrayList.size() + " partitions on superstep " + this.graphState.getSuperstep() + ".  Flushing started (time waiting on partitions was " + String.format("%.2f s", Double.valueOf(j2 / 1000.0d)) + ", time processing partitions was " + String.format("%.2f s", Double.valueOf(j3 / 1000.0d)) + ", time spent on gc was " + String.format("%.2f s", Double.valueOf(j / 1000.0d)) + DefaultExpressionEngine.DEFAULT_INDEX_END);
        }
        try {
            nettyWorkerClientRequestProcessor.flush();
            if (newArrayList.size() > 0) {
                long resetMessageBytesCount2 = nettyWorkerClientRequestProcessor.resetMessageBytesCount();
                ((PartitionStats) newArrayList.get(newArrayList.size() - 1)).addMessageBytesSentCount(resetMessageBytesCount2);
                this.messageBytesSentCounter.inc(resetMessageBytesCount2);
            }
            newThreadAggregatorUsage.finishThreadComputation();
            if (oocEngine != null) {
                oocEngine.processingThreadFinish();
            }
            return newArrayList;
        } catch (IOException e3) {
            throw new IllegalStateException("call: Flushing failed.", e3);
        }
    }

    private PartitionStats computePartition(Computation<I, V, E, M1, M2> computation, Partition<I, V, E> partition, OutOfCoreEngine outOfCoreEngine, boolean z) throws IOException, InterruptedException {
        PartitionStats partitionStats = new PartitionStats(partition.getId(), 0L, 0L, 0L, 0L, 0L, this.serviceWorker.getWorkerInfo().getHostnameId());
        final PrimitiveRefs.LongRef longRef = new PrimitiveRefs.LongRef(0L);
        Progressable progressable = new Progressable() { // from class: org.apache.giraph.graph.ComputeCallable.1
            public void progress() {
                longRef.value++;
                if (longRef.value == ComputeCallable.this.verticesToUpdateProgress) {
                    WorkerProgress.get().addVerticesComputed(longRef.value);
                    longRef.value = 0L;
                }
            }
        };
        synchronized (partition) {
            if (z) {
                Iterable<I> partitionDestinationVertices = this.messageStore.getPartitionDestinationVertices(partition.getId());
                if (!Iterables.isEmpty(partitionDestinationVertices)) {
                    OnlyIdVertex onlyIdVertex = new OnlyIdVertex();
                    for (I i : partitionDestinationVertices) {
                        Iterable<M1> vertexMessages = this.messageStore.getVertexMessages(i);
                        Preconditions.checkState(!Iterables.isEmpty(vertexMessages));
                        onlyIdVertex.setId(i);
                        computation.compute(onlyIdVertex, vertexMessages);
                        this.messageStore.clearVertexMessages(i);
                        partitionStats.incrVertexCount();
                        progressable.progress();
                    }
                }
            } else {
                int i2 = 0;
                Iterator<Vertex<I, V, E>> it2 = partition.iterator();
                while (it2.hasNext()) {
                    Vertex<I, V, E> vertex = (Vertex) it2.next();
                    if (outOfCoreEngine != null) {
                        i2++;
                        if ((i2 & OutOfCoreEngine.CHECK_IN_INTERVAL) == 0) {
                            outOfCoreEngine.activeThreadCheckIn();
                        }
                    }
                    Iterable<M1> vertexMessages2 = this.messageStore.getVertexMessages(vertex.getId());
                    if (vertex.isHalted() && !Iterables.isEmpty(vertexMessages2)) {
                        vertex.wakeUp();
                    }
                    if (!vertex.isHalted()) {
                        this.context.progress();
                        computation.compute(vertex, vertexMessages2);
                        vertex.unwrapMutableEdges();
                        if (vertex instanceof Trimmable) {
                            ((Trimmable) vertex).trim();
                        }
                        this.vertexWriter.writeVertex(vertex);
                        partition.saveVertex(vertex);
                    }
                    if (vertex.isHalted()) {
                        partitionStats.incrFinishedVertexCount();
                    }
                    this.messageStore.clearVertexMessages(vertex.getId());
                    partitionStats.incrVertexCount();
                    partitionStats.addEdgeCount(vertex.getNumEdges());
                    progressable.progress();
                }
            }
            this.messageStore.clearPartition(partition.getId());
        }
        WorkerProgress.get().addVerticesComputed(longRef.value);
        WorkerProgress.get().incrementPartitionsComputed();
        return partitionStats;
    }
}
