package org.apache.giraph.ooc;

import com.yammer.metrics.core.Counter;
import com.yammer.metrics.core.Histogram;
import java.util.concurrent.Callable;
import org.apache.giraph.metrics.GiraphMetrics;
import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
import org.apache.giraph.metrics.SuperstepMetricsRegistry;
import org.apache.giraph.ooc.command.IOCommand;
import org.apache.giraph.ooc.command.LoadPartitionIOCommand;
import org.apache.giraph.ooc.command.WaitIOCommand;
import org.apache.log4j.Logger;

/* loaded from: input_file:org/apache/giraph/ooc/OutOfCoreIOCallable.class */
public class OutOfCoreIOCallable implements Callable<Void>, ResetSuperstepMetricsObserver {
    public static final String BYTES_LOAD_FROM_DISK = "ooc-bytes-load";
    public static final String BYTES_STORE_TO_DISK = "ooc-bytes-store";
    public static final String HISTOGRAM_LOAD_SIZE = "ooc-load-size-bytes";
    public static final String HISTOGRAM_STORE_SIZE = "ooc-store-size-bytes";
    private static final Logger LOG = Logger.getLogger(OutOfCoreIOCallable.class);
    private final OutOfCoreEngine oocEngine;
    private final int diskId;
    private Counter bytesReadPerSuperstep;
    private Counter bytesWrittenPerSuperstep;
    private Histogram histogramLoadSize;
    private Histogram histogramStoreSize;

    public OutOfCoreIOCallable(OutOfCoreEngine outOfCoreEngine, int i) {
        this.oocEngine = outOfCoreEngine;
        this.diskId = i;
        newSuperstep(GiraphMetrics.get().perSuperstep());
        GiraphMetrics.get().addSuperstepResetObserver(this);
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.concurrent.Callable
    public Void call() throws Exception {
        while (true) {
            this.oocEngine.getSuperstepLock().readLock().lock();
            IOCommand nextIOCommand = this.oocEngine.getIOScheduler().getNextIOCommand(this.diskId);
            if (LOG.isDebugEnabled() && !(nextIOCommand instanceof WaitIOCommand)) {
                LOG.debug("call: thread " + this.diskId + "'s next IO command is: " + nextIOCommand);
            }
            if (nextIOCommand == null) {
                break;
            }
            if (nextIOCommand instanceof WaitIOCommand) {
                this.oocEngine.getSuperstepLock().readLock().unlock();
            }
            try {
                long superstepGCTime = this.oocEngine.getServiceWorker().getGraphTaskManager().getSuperstepGCTime();
                long currentTimeMillis = System.currentTimeMillis();
                boolean execute = nextIOCommand.execute();
                long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                long superstepGCTime2 = this.oocEngine.getServiceWorker().getGraphTaskManager().getSuperstepGCTime() - superstepGCTime;
                long bytesTransferred = nextIOCommand.bytesTransferred();
                if (LOG.isDebugEnabled() && !(nextIOCommand instanceof WaitIOCommand)) {
                    LOG.debug("call: thread " + this.diskId + "'s command " + nextIOCommand + " completed: bytes= " + bytesTransferred + ", duration=" + currentTimeMillis2 + ", bandwidth=" + String.format("%.2f", Double.valueOf((((bytesTransferred / currentTimeMillis2) * 1000.0d) / 1024.0d) / 1024.0d)) + (nextIOCommand instanceof WaitIOCommand ? "" : ", bandwidth (excluding GC time)=" + String.format("%.2f", Double.valueOf((((bytesTransferred / (currentTimeMillis2 - superstepGCTime2)) * 1000.0d) / 1024.0d) / 1024.0d))));
                }
                if (!(nextIOCommand instanceof WaitIOCommand)) {
                    this.oocEngine.getSuperstepLock().readLock().unlock();
                    if (bytesTransferred != 0) {
                        if (nextIOCommand instanceof LoadPartitionIOCommand) {
                            this.bytesReadPerSuperstep.inc(bytesTransferred);
                            this.histogramLoadSize.update(bytesTransferred);
                        } else {
                            this.bytesWrittenPerSuperstep.inc(bytesTransferred);
                            this.histogramStoreSize.update(bytesTransferred);
                        }
                    }
                }
                if (execute && currentTimeMillis2 > 0) {
                    this.oocEngine.getIOStatistics().update(nextIOCommand.getType(), nextIOCommand.bytesTransferred(), currentTimeMillis2);
                }
                this.oocEngine.getIOScheduler().ioCommandCompleted(nextIOCommand);
            } catch (Exception e) {
                throw new RuntimeException("call: execution of IO command " + nextIOCommand + " failed!", e);
            }
        }
        this.oocEngine.getSuperstepLock().readLock().unlock();
        if (!LOG.isInfoEnabled()) {
            return null;
        }
        LOG.info("call: out-of-core IO thread " + this.diskId + " terminating!");
        return null;
    }

    @Override // org.apache.giraph.metrics.ResetSuperstepMetricsObserver
    public void newSuperstep(SuperstepMetricsRegistry superstepMetricsRegistry) {
        this.bytesReadPerSuperstep = superstepMetricsRegistry.getCounter(BYTES_LOAD_FROM_DISK);
        this.bytesWrittenPerSuperstep = superstepMetricsRegistry.getCounter(BYTES_STORE_TO_DISK);
        this.histogramLoadSize = superstepMetricsRegistry.getUniformHistogram(HISTOGRAM_LOAD_SIZE);
        this.histogramStoreSize = superstepMetricsRegistry.getUniformHistogram(HISTOGRAM_STORE_SIZE);
    }
}
