package org.apache.giraph.ooc;

import com.google.common.base.Preconditions;
import com.sun.management.GarbageCollectionNotificationInfo;
import com.yammer.metrics.core.Gauge;
import edu.umd.cs.findbugs.annotations.SuppressWarnings;
import java.lang.reflect.InvocationTargetException;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.comm.ServerData;
import org.apache.giraph.comm.flow_control.FlowControl;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.metrics.GiraphMetrics;
import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
import org.apache.giraph.metrics.SuperstepMetricsRegistry;
import org.apache.giraph.ooc.command.IOCommand;
import org.apache.giraph.ooc.command.LoadPartitionIOCommand;
import org.apache.giraph.ooc.data.MetaPartitionManager;
import org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor;
import org.apache.giraph.ooc.policy.FixedPartitionsOracle;
import org.apache.giraph.ooc.policy.OutOfCoreOracle;
import org.apache.giraph.utils.AdjustableSemaphore;
import org.apache.log4j.Logger;

/* loaded from: input_file:org/apache/giraph/ooc/OutOfCoreEngine.class */
public class OutOfCoreEngine implements ResetSuperstepMetricsObserver {
    public static final int CHECK_IN_INTERVAL = 1023;
    public static final String GRAPH_PERCENTAGE_IN_MEMORY = "ooc-graph-in-mem-%";
    private static final Logger LOG = Logger.getLogger(OutOfCoreEngine.class);
    private static final long MSEC_TO_WAIT = 10000;
    private final CentralizedServiceWorker<?, ?, ?> service;
    private FlowControl flowControl;
    private final OutOfCoreIOScheduler ioScheduler;
    private final MetaPartitionManager metaPartitionManager;
    private final OutOfCoreOracle oracle;
    private final OutOfCoreIOStatistics statistics;
    private final OutOfCoreDataAccessor dataAccessor;
    private final OutOfCoreIOCallableFactory oocIOCallableFactory;
    private int numComputeThreads;
    private volatile int numProcessingThreads;
    private final AdjustableSemaphore activeThreadsPermit;
    private long superstep;
    private boolean resetDone;
    private final ReadWriteLock superstepLock = new ReentrantReadWriteLock();
    private final Object partitionAvailable = new Object();

    public OutOfCoreEngine(ImmutableClassesGiraphConfiguration<?, ?, ?> immutableClassesGiraphConfiguration, CentralizedServiceWorker<?, ?, ?> centralizedServiceWorker) {
        this.service = centralizedServiceWorker;
        try {
            this.dataAccessor = GiraphConstants.OUT_OF_CORE_DATA_ACCESSOR.get(immutableClassesGiraphConfiguration).getConstructor(ImmutableClassesGiraphConfiguration.class).newInstance(immutableClassesGiraphConfiguration);
            int numAccessorThreads = this.dataAccessor.getNumAccessorThreads();
            this.oocIOCallableFactory = new OutOfCoreIOCallableFactory(this, numAccessorThreads, centralizedServiceWorker.getGraphTaskManager().createUncaughtExceptionHandler());
            this.ioScheduler = new OutOfCoreIOScheduler(immutableClassesGiraphConfiguration, this, numAccessorThreads);
            this.metaPartitionManager = new MetaPartitionManager(numAccessorThreads, this);
            this.statistics = new OutOfCoreIOStatistics(immutableClassesGiraphConfiguration, numAccessorThreads);
            int i = GiraphConstants.MAX_PARTITIONS_IN_MEMORY.get(immutableClassesGiraphConfiguration);
            Class<? extends OutOfCoreOracle> cls = GiraphConstants.OUT_OF_CORE_ORACLE.get(immutableClassesGiraphConfiguration);
            if (i != 0 && cls != FixedPartitionsOracle.class) {
                LOG.warn("OutOfCoreEngine: Max number of partitions in memory is set but the out-of-core oracle used is not tailored for fixed out-of-core policy. Setting the oracle to be FixedPartitionsOracle");
                cls = FixedPartitionsOracle.class;
            }
            try {
                this.oracle = cls.getConstructor(ImmutableClassesGiraphConfiguration.class, OutOfCoreEngine.class).newInstance(immutableClassesGiraphConfiguration, this);
                this.numComputeThreads = immutableClassesGiraphConfiguration.getNumComputeThreads();
                this.numProcessingThreads = immutableClassesGiraphConfiguration.getNumInputSplitsThreads();
                this.activeThreadsPermit = new AdjustableSemaphore(this.numProcessingThreads);
                this.superstep = -1L;
                this.resetDone = false;
                GiraphMetrics.get().addSuperstepResetObserver(this);
            } catch (IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) {
                throw new IllegalStateException("OutOfCoreEngine: caught exception while creating the oracle!", e);
            }
        } catch (IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e2) {
            throw new IllegalStateException("OutOfCoreEngine: caught exception while creating the data accessor instance!", e2);
        }
    }

    public void initialize() {
        this.dataAccessor.initialize();
        this.oocIOCallableFactory.createCallable();
    }

    public void shutdown() {
        if (LOG.isInfoEnabled()) {
            LOG.info("shutdown: out-of-core engine shutting down, signalling IO threads to shutdown");
        }
        this.ioScheduler.shutdown();
        this.oocIOCallableFactory.shutdown();
        this.dataAccessor.shutdown();
    }

    public ServerData getServerData() {
        return this.service.getServerData();
    }

    public CentralizedServiceWorker getServiceWorker() {
        return this.service;
    }

    public OutOfCoreIOScheduler getIOScheduler() {
        return this.ioScheduler;
    }

    public MetaPartitionManager getMetaPartitionManager() {
        return this.metaPartitionManager;
    }

    public ReadWriteLock getSuperstepLock() {
        return this.superstepLock;
    }

    public OutOfCoreIOStatistics getIOStatistics() {
        return this.statistics;
    }

    public OutOfCoreOracle getOracle() {
        return this.oracle;
    }

    public Integer getNextPartition() {
        Integer num;
        synchronized (this.partitionAvailable) {
            while (true) {
                Integer nextPartition = this.metaPartitionManager.getNextPartition();
                num = nextPartition;
                if (nextPartition != null) {
                    break;
                }
                try {
                    if (LOG.isInfoEnabled()) {
                        LOG.info("getNextPartition: waiting until a partition becomes available!");
                    }
                    this.partitionAvailable.wait(10000L);
                } catch (InterruptedException e) {
                    throw new IllegalStateException("getNextPartition: caught InterruptedException while waiting to retrieve a partition to process");
                }
            }
            if (num.intValue() == -1) {
                this.partitionAvailable.notifyAll();
                num = null;
            }
        }
        return num;
    }

    public void doneProcessingPartition(int i) {
        this.metaPartitionManager.setPartitionIsProcessed(i);
        if (LOG.isInfoEnabled()) {
            LOG.info("doneProcessingPartition: processing partition " + i + " is done!");
        }
    }

    @SuppressWarnings({"UL_UNRELEASED_LOCK_EXCEPTION_PATH"})
    public void startIteration() {
        if (!this.resetDone) {
            this.superstepLock.writeLock().lock();
            this.metaPartitionManager.resetPartitions();
            this.superstepLock.writeLock().unlock();
        }
        if (this.superstep != -1 && this.numProcessingThreads != this.numComputeThreads) {
            this.activeThreadsPermit.setMaxPermits((this.activeThreadsPermit.availablePermits() * this.numComputeThreads) / this.numProcessingThreads);
            this.numProcessingThreads = this.numComputeThreads;
        }
        if (LOG.isInfoEnabled()) {
            LOG.info("startIteration: with " + this.metaPartitionManager.getNumInMemoryPartitions() + " partitions in memory and " + this.activeThreadsPermit.availablePermits() + " active threads");
        }
        this.resetDone = false;
    }

    public void retrievePartition(int i) {
        if (this.metaPartitionManager.isPartitionOnDisk(i)) {
            this.ioScheduler.addIOCommand(new LoadPartitionIOCommand(this, i, this.superstep));
            synchronized (this.partitionAvailable) {
                while (this.metaPartitionManager.isPartitionOnDisk(i)) {
                    try {
                        if (LOG.isInfoEnabled()) {
                            LOG.info("retrievePartition: waiting until partition " + i + " becomes available");
                        }
                        this.partitionAvailable.wait();
                    } catch (InterruptedException e) {
                        throw new IllegalStateException("retrievePartition: caught InterruptedException while waiting to retrieve partition " + i);
                    }
                }
            }
        }
    }

    public void ioCommandCompleted(IOCommand iOCommand) {
        this.oracle.commandCompleted(iOCommand);
        if (iOCommand instanceof LoadPartitionIOCommand) {
            synchronized (this.partitionAvailable) {
                this.partitionAvailable.notifyAll();
            }
        }
    }

    public void updateActiveThreadsFraction(double d) {
        Preconditions.checkState(d >= 0.0d && d <= 1.0d);
        int i = (int) (this.numProcessingThreads * d);
        if (LOG.isInfoEnabled()) {
            LOG.info("updateActiveThreadsFraction: updating the number of active threads to " + i);
        }
        this.activeThreadsPermit.setMaxPermits(i);
    }

    public void activeThreadCheckIn() {
        this.activeThreadsPermit.release();
        try {
            this.activeThreadsPermit.acquire();
        } catch (InterruptedException e) {
            LOG.error("activeThreadCheckIn: exception while acquiring a permit to remain an active thread");
            throw new IllegalStateException(e);
        }
    }

    public void processingThreadStart() {
        try {
            this.activeThreadsPermit.acquire();
        } catch (InterruptedException e) {
            LOG.error("processingThreadStart: exception while acquiring a permit to start the processing thread!");
            throw new IllegalStateException(e);
        }
    }

    public void processingThreadFinish() {
        this.activeThreadsPermit.release();
    }

    public void reset() {
        this.metaPartitionManager.resetPartitions();
        this.metaPartitionManager.resetMessages();
        this.superstep = this.service.getSuperstep();
        this.resetDone = true;
    }

    public long getSuperstep() {
        return this.superstep;
    }

    public void gcCompleted(GarbageCollectionNotificationInfo garbageCollectionNotificationInfo) {
        this.oracle.gcCompleted(garbageCollectionNotificationInfo);
    }

    @Override // org.apache.giraph.metrics.ResetSuperstepMetricsObserver
    public void newSuperstep(SuperstepMetricsRegistry superstepMetricsRegistry) {
        superstepMetricsRegistry.getGauge(GRAPH_PERCENTAGE_IN_MEMORY, new Gauge<Double>() { // from class: org.apache.giraph.ooc.OutOfCoreEngine.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // com.yammer.metrics.core.Gauge
            public Double value() {
                return Double.valueOf(OutOfCoreEngine.this.metaPartitionManager.getGraphFractionInMemory() * 100.0d);
            }
        });
    }

    public FlowControl getFlowControl() {
        return this.flowControl;
    }

    public void setFlowControl(FlowControl flowControl) {
        this.flowControl = flowControl;
    }

    public OutOfCoreDataAccessor getDataAccessor() {
        return this.dataAccessor;
    }
}
