package org.apache.giraph.ooc;

import java.lang.Thread;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.giraph.utils.CallableFactory;
import org.apache.giraph.utils.ThreadUtils;
import org.apache.log4j.Logger;

/* loaded from: input_file:org/apache/giraph/ooc/OutOfCoreIOCallableFactory.class */
public class OutOfCoreIOCallableFactory {
    private static final Logger LOG = Logger.getLogger(OutOfCoreIOCallableFactory.class);
    private final OutOfCoreEngine oocEngine;
    private final List<Future> results;
    private final int numIOThreads;
    private final Thread.UncaughtExceptionHandler uncaughtExceptionHandler;
    private ExecutorService outOfCoreIOExecutor;

    public OutOfCoreIOCallableFactory(OutOfCoreEngine outOfCoreEngine, int i, Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
        this.oocEngine = outOfCoreEngine;
        this.numIOThreads = i;
        this.results = new ArrayList(i);
        this.uncaughtExceptionHandler = uncaughtExceptionHandler;
    }

    public void createCallable() {
        CallableFactory<Void> callableFactory = new CallableFactory<Void>() { // from class: org.apache.giraph.ooc.OutOfCoreIOCallableFactory.1
            @Override // org.apache.giraph.utils.CallableFactory
            public Callable<Void> newCallable(int i) {
                return new OutOfCoreIOCallable(OutOfCoreIOCallableFactory.this.oocEngine, i);
            }
        };
        this.outOfCoreIOExecutor = new ThreadPoolExecutor(this.numIOThreads, this.numIOThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), ThreadUtils.createThreadFactory("ooc-io-%d"));
        for (int i = 0; i < this.numIOThreads; i++) {
            this.results.add(ThreadUtils.submitToExecutor(this.outOfCoreIOExecutor, callableFactory.newCallable(i), this.uncaughtExceptionHandler));
        }
        this.outOfCoreIOExecutor.shutdown();
    }

    public void shutdown() {
        boolean z = false;
        while (!z) {
            if (LOG.isInfoEnabled()) {
                LOG.info("shutdown: waiting for IO threads to finish!");
            }
            try {
                z = this.outOfCoreIOExecutor.awaitTermination(1000L, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                throw new IllegalStateException("shutdown: caught InterruptedException while waiting for IO threads to finish");
            }
        }
        for (int i = 0; i < this.numIOThreads; i++) {
            try {
                this.results.get(i).get();
            } catch (InterruptedException e2) {
                LOG.error("shutdown: IO thread " + i + " was interrupted during its execution");
                throw new IllegalStateException(e2);
            } catch (ExecutionException e3) {
                LOG.error("shutdown: IO thread " + i + " threw an exception during its execution");
                throw new IllegalStateException(e3);
            }
        }
    }
}
