package org.apache.giraph.graph;

import com.sun.management.GarbageCollectionNotificationInfo;
import com.yammer.metrics.core.Counter;
import java.io.IOException;
import java.lang.Thread;
import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import javax.management.Notification;
import javax.management.NotificationFilter;
import javax.management.NotificationListener;
import javax.management.openmbean.CompositeData;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.giraph.bsp.CentralizedServiceMaster;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.bsp.checkpoints.CheckpointStatus;
import org.apache.giraph.comm.messages.MessageStore;
import org.apache.giraph.conf.ClassConfOption;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.job.JobProgressTracker;
import org.apache.giraph.master.BspServiceMaster;
import org.apache.giraph.master.MasterThread;
import org.apache.giraph.metrics.GiraphMetrics;
import org.apache.giraph.metrics.GiraphMetricsRegistry;
import org.apache.giraph.metrics.GiraphTimer;
import org.apache.giraph.metrics.GiraphTimerContext;
import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
import org.apache.giraph.metrics.SuperstepMetricsRegistry;
import org.apache.giraph.ooc.OutOfCoreEngine;
import org.apache.giraph.partition.PartitionOwner;
import org.apache.giraph.partition.PartitionStats;
import org.apache.giraph.partition.PartitionStore;
import org.apache.giraph.scripting.ScriptLoader;
import org.apache.giraph.utils.CallableFactory;
import org.apache.giraph.utils.GcObserver;
import org.apache.giraph.utils.MemoryUtils;
import org.apache.giraph.utils.ProgressableUtils;
import org.apache.giraph.worker.BspServiceWorker;
import org.apache.giraph.worker.InputSplitsCallable;
import org.apache.giraph.worker.WorkerContext;
import org.apache.giraph.worker.WorkerObserver;
import org.apache.giraph.worker.WorkerProgress;
import org.apache.giraph.writable.kryo.KryoWritableWrapper;
import org.apache.giraph.zk.ZooKeeperManager;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Appender;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
import org.apache.zookeeper.server.PrepRequestProcessor;

/* loaded from: input_file:org/apache/giraph/graph/GraphTaskManager.class */
public class GraphTaskManager<I extends WritableComparable, V extends Writable, E extends Writable> implements ResetSuperstepMetricsObserver {
    public static final String TIMER_SUPERSTEP_TIME = "superstep-time-ms";
    public static final String TIMER_COMPUTE_ALL = "compute-all-ms";
    public static final String TIMER_TIME_TO_FIRST_MSG = "time-to-first-message-ms";
    public static final String TIMER_COMMUNICATION_TIME = "communication-time-ms";
    public static final String TIMER_SUPERSTEP_GC_TIME = "superstep-gc-time-ms";
    private CentralizedServiceWorker<I, V, E> serviceWorker;
    private CentralizedServiceMaster<I, V, E> serviceMaster;
    private ZooKeeperManager zkManager;
    private ImmutableClassesGiraphConfiguration<I, V, E> conf;
    private JobProgressTrackerClient jobProgressTracker;
    private GiraphTimer wcPreAppTimer;
    private GiraphTimer wcPostAppTimer;
    private GiraphTimer superstepTimer;
    private GiraphTimer computeAll;
    private GiraphTimer timeToFirstMessage;
    private GiraphTimerContext timeToFirstMessageTimerContext;
    private GiraphTimer communicationTimer;
    private GiraphTimerContext communicationTimerContext;
    private GiraphTimer wcPreSuperstepTimer;
    private Counter gcTimeMetric;
    private final Mapper<?, ?, ?, ?>.Context context;
    private MapperObserver[] mapperObservers;
    public static final ClassConfOption<CheckerIfWorkerShouldFailAfterException> CHECKER_IF_WORKER_SHOULD_FAIL_AFTER_EXCEPTION_CLASS = ClassConfOption.create("giraph.checkerIfWorkerShouldFailAfterExceptionClass", FailWithEveryExceptionOccurred.class, CheckerIfWorkerShouldFailAfterException.class, "Class which checks if an exception on some thread should cause worker to fail, by default all exceptions cause failure");
    private static final Logger LOG = Logger.getLogger(GraphTaskManager.class);
    private Thread masterThread = null;
    private boolean alreadyRun = false;
    private boolean done = false;
    private GraphFunctions graphFunctions = GraphFunctions.UNKNOWN;
    private FinishedSuperstepStats finishedSuperstepStats = new FinishedSuperstepStats(0, false, 0, 0, false, CheckpointStatus.NONE);
    private boolean isMaster = false;

    /* loaded from: input_file:org/apache/giraph/graph/GraphTaskManager$CheckerIfWorkerShouldFailAfterException.class */
    public interface CheckerIfWorkerShouldFailAfterException {
        boolean checkIfWorkerShouldFail(Thread thread, Throwable th);
    }

    /* loaded from: input_file:org/apache/giraph/graph/GraphTaskManager$FailWithEveryExceptionOccurred.class */
    public static class FailWithEveryExceptionOccurred implements CheckerIfWorkerShouldFailAfterException {
        @Override // org.apache.giraph.graph.GraphTaskManager.CheckerIfWorkerShouldFailAfterException
        public boolean checkIfWorkerShouldFail(Thread thread, Throwable th) {
            return true;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/giraph/graph/GraphTaskManager$OverrideExceptionHandler.class */
    public class OverrideExceptionHandler implements Thread.UncaughtExceptionHandler {
        private final CheckerIfWorkerShouldFailAfterException checker;
        private final JobProgressTracker jobProgressTracker;

        public OverrideExceptionHandler(CheckerIfWorkerShouldFailAfterException checkerIfWorkerShouldFailAfterException, JobProgressTracker jobProgressTracker) {
            this.checker = checkerIfWorkerShouldFailAfterException;
            this.jobProgressTracker = jobProgressTracker;
        }

        @Override // java.lang.Thread.UncaughtExceptionHandler
        public void uncaughtException(Thread thread, Throwable th) {
            if (!this.checker.checkIfWorkerShouldFail(thread, th)) {
                GraphTaskManager.LOG.error("uncaughtException: OverrideExceptionHandler on thread " + thread.getName() + ", msg = " + th.getMessage(), th);
                return;
            }
            try {
                GraphTaskManager.LOG.fatal("uncaughtException: OverrideExceptionHandler on thread " + thread.getName() + ", msg = " + th.getMessage() + ", exiting...", th);
                this.jobProgressTracker.logError(ExceptionUtils.getStackTrace(th), KryoWritableWrapper.convertToByteArray(th));
                GraphTaskManager.this.zooKeeperCleanup();
                GraphTaskManager.this.workerFailureCleanup();
                System.exit(1);
            } catch (Throwable th2) {
                System.exit(1);
                throw th2;
            }
        }
    }

    public GraphTaskManager(Mapper<?, ?, ?, ?>.Context context) {
        this.context = context;
    }

    private void checkInput() {
        if (this.conf.hasEdgeInputFormat()) {
            this.conf.createWrappedEdgeInputFormat().checkInputSpecs(this.conf);
        }
        if (this.conf.hasVertexInputFormat()) {
            this.conf.createWrappedVertexInputFormat().checkInputSpecs(this.conf);
        }
    }

    private void createZooKeeperCounter(String str) {
        this.context.getCounter(GiraphConstants.ZOOKEEPER_SERVER_PORT_COUNTER_GROUP, str);
    }

    public void setup(Path[] pathArr) throws IOException, InterruptedException {
        this.context.setStatus("setup: Beginning worker setup.");
        this.conf = new ImmutableClassesGiraphConfiguration<>(this.context.getConfiguration());
        initializeJobProgressTracker();
        Thread.setDefaultUncaughtExceptionHandler(createUncaughtExceptionHandler());
        setupMapperObservers();
        this.conf.getGiraphTypes().writeIfUnset(this.conf);
        initializeAndConfigureLogging();
        setupAndInitializeGiraphMetrics();
        checkInput();
        ScriptLoader.loadScripts(this.conf);
        this.conf.createComputationFactory().initialize(this.conf);
        this.context.setStatus("setup: Initializing Zookeeper services.");
        String zookeeperList = this.conf.getZookeeperList();
        if (!zookeeperList.isEmpty()) {
            createZooKeeperCounter(zookeeperList);
        } else if (startZooKeeperManager()) {
            return;
        }
        if (this.zkManager != null && this.zkManager.runsZooKeeper() && LOG.isInfoEnabled()) {
            LOG.info("setup: Chosen to run ZooKeeper...");
        }
        this.context.setStatus("setup: Connected to Zookeeper service " + zookeeperList);
        this.graphFunctions = determineGraphFunctions(this.conf, this.zkManager);
        if (this.zkManager != null && this.graphFunctions.isMaster()) {
            this.zkManager.cleanupOnExit();
        }
        try {
            instantiateBspService();
            this.context.setStatus(getGraphFunctions().toString() + " starting...");
        } catch (IOException e) {
            LOG.error("setup: Caught exception just before end of setup", e);
            if (this.zkManager != null) {
                this.zkManager.offlineZooKeeperServers(ZooKeeperManager.State.FAILED);
            }
            throw new RuntimeException("setup: Offlining servers due to exception...", e);
        }
    }

    private void initializeJobProgressTracker() {
        if (this.conf.trackJobProgressOnClient()) {
            this.jobProgressTracker = GiraphConstants.JOB_PROGRESS_TRACKER_CLIENT_CLASS.newInstance(this.conf);
            try {
                this.jobProgressTracker.init(this.conf);
            } catch (Exception e) {
                throw new RuntimeException("Failed to initialize JobProgressTrackerClient", e);
            }
        } else {
            this.jobProgressTracker = new JobProgressTrackerClientNoOp();
        }
        this.jobProgressTracker.mapperStarted();
    }

    public void execute() throws IOException, InterruptedException {
        if (checkTaskState()) {
            return;
        }
        preLoadOnWorkerObservers();
        GiraphTimerContext time = this.superstepTimer.time();
        this.finishedSuperstepStats = this.serviceWorker.setup();
        time.stop();
        if (collectInputSuperstepStats(this.finishedSuperstepStats)) {
            return;
        }
        prepareGraphStateAndWorkerContext();
        ArrayList arrayList = new ArrayList();
        int numComputeThreads = this.conf.getNumComputeThreads();
        while (!this.finishedSuperstepStats.allVerticesHalted()) {
            long superstep = this.serviceWorker.getSuperstep();
            GiraphTimerContext timerForThisSuperstep = getTimerForThisSuperstep(superstep);
            GraphState graphState = new GraphState(superstep, this.finishedSuperstepStats.getVertexCount(), this.finishedSuperstepStats.getEdgeCount(), this.context);
            Collection<? extends PartitionOwner> startSuperstep = this.serviceWorker.startSuperstep();
            if (LOG.isDebugEnabled()) {
                LOG.debug("execute: " + MemoryUtils.getRuntimeMemoryStats());
            }
            this.context.progress();
            this.serviceWorker.exchangeVertexPartitions(startSuperstep);
            this.context.progress();
            boolean checkSuperstepRestarted = checkSuperstepRestarted(superstep);
            GlobalStats globalStats = this.serviceWorker.getGlobalStats();
            if (!checkSuperstepRestarted) {
                if (storeCheckpoint(globalStats.getCheckpointStatus())) {
                    break;
                }
            } else {
                graphState = new GraphState(superstep, this.finishedSuperstepStats.getVertexCount(), this.finishedSuperstepStats.getEdgeCount(), this.context);
            }
            this.serviceWorker.getServerData().prepareResolveMutations();
            this.context.progress();
            prepareForSuperstep(graphState);
            this.context.progress();
            MessageStore<I, Writable> currentMessageStore = this.serviceWorker.getServerData().getCurrentMessageStore();
            int numPartitions = this.serviceWorker.getPartitionStore().getNumPartitions();
            int min = Math.min(numComputeThreads, numPartitions);
            if (LOG.isInfoEnabled()) {
                LOG.info("execute: " + numPartitions + " partitions to process with " + min + " compute thread(s), originally " + numComputeThreads + " thread(s) on superstep " + superstep);
            }
            arrayList.clear();
            if (numPartitions > 0) {
                processGraphPartitions(this.context, arrayList, graphState, currentMessageStore, min);
            }
            this.finishedSuperstepStats = completeSuperstepAndCollectStats(arrayList, timerForThisSuperstep);
        }
        if (LOG.isInfoEnabled()) {
            LOG.info("execute: BSP application done (global vertices marked done)");
        }
        updateSuperstepGraphState();
        postApplication();
    }

    private void postApplication() throws IOException, InterruptedException {
        GiraphTimerContext time = this.wcPostAppTimer.time();
        this.serviceWorker.getWorkerContext().postApplication();
        this.serviceWorker.getSuperstepOutput().postApplication();
        time.stop();
        this.context.progress();
        for (WorkerObserver workerObserver : this.serviceWorker.getWorkerObservers()) {
            workerObserver.postApplication();
            this.context.progress();
        }
    }

    public void setIsMaster(boolean z) {
        this.isMaster = z;
    }

    public boolean isMaster() {
        return this.isMaster;
    }

    private GiraphTimerContext getTimerForThisSuperstep(long j) {
        GiraphMetrics.get().resetSuperstepMetrics(j);
        return this.superstepTimer.time();
    }

    private void setupAndInitializeGiraphMetrics() {
        GiraphMetrics.init(this.conf);
        GiraphMetrics.get().addSuperstepResetObserver(this);
        initJobMetrics();
        MemoryUtils.initMetrics();
        InputSplitsCallable.initMetrics();
    }

    private boolean startZooKeeperManager() throws IOException, InterruptedException {
        this.zkManager = new ZooKeeperManager(this.context, this.conf);
        this.context.setStatus("setup: Setting up Zookeeper manager.");
        this.zkManager.setup();
        if (this.zkManager.computationDone()) {
            this.done = true;
            return true;
        }
        this.zkManager.onlineZooKeeperServer();
        String zooKeeperServerPortString = this.zkManager.getZooKeeperServerPortString();
        this.conf.setZookeeperList(zooKeeperServerPortString);
        createZooKeeperCounter(zooKeeperServerPortString);
        return false;
    }

    private void updateSuperstepGraphState() {
        this.serviceWorker.getWorkerContext().setGraphState(new GraphState(this.serviceWorker.getSuperstep(), this.finishedSuperstepStats.getVertexCount(), this.finishedSuperstepStats.getEdgeCount(), this.context));
    }

    private FinishedSuperstepStats completeSuperstepAndCollectStats(List<PartitionStats> list, GiraphTimerContext giraphTimerContext) {
        this.finishedSuperstepStats = this.serviceWorker.finishSuperstep(list, giraphTimerContext);
        if (this.conf.metricsEnabled()) {
            GiraphMetrics.get().perSuperstep().printSummary(System.err);
        }
        return this.finishedSuperstepStats;
    }

    private void prepareForSuperstep(GraphState graphState) {
        this.serviceWorker.prepareSuperstep();
        this.serviceWorker.getWorkerContext().setGraphState(graphState);
        this.serviceWorker.getWorkerContext().setupSuperstep(this.serviceWorker);
        GiraphTimerContext time = this.wcPreSuperstepTimer.time();
        this.serviceWorker.getWorkerContext().preSuperstep();
        time.stop();
        this.context.progress();
        for (WorkerObserver workerObserver : this.serviceWorker.getWorkerObservers()) {
            workerObserver.preSuperstep(graphState.getSuperstep());
            this.context.progress();
        }
    }

    private void prepareGraphStateAndWorkerContext() {
        updateSuperstepGraphState();
        workerContextPreApp();
    }

    public GraphFunctions getGraphFunctions() {
        return this.graphFunctions;
    }

    public final WorkerContext getWorkerContext() {
        return this.serviceWorker.getWorkerContext();
    }

    public JobProgressTracker getJobProgressTracker() {
        return this.jobProgressTracker;
    }

    private static GraphFunctions determineGraphFunctions(ImmutableClassesGiraphConfiguration immutableClassesGiraphConfiguration, ZooKeeperManager zooKeeperManager) {
        boolean splitMasterWorker = immutableClassesGiraphConfiguration.getSplitMasterWorker();
        int taskPartition = immutableClassesGiraphConfiguration.getTaskPartition();
        boolean isZookeeperExternal = immutableClassesGiraphConfiguration.isZookeeperExternal();
        GraphFunctions graphFunctions = GraphFunctions.UNKNOWN;
        return !splitMasterWorker ? (zooKeeperManager == null || !zooKeeperManager.runsZooKeeper()) ? GraphFunctions.ALL_EXCEPT_ZOOKEEPER : GraphFunctions.ALL : isZookeeperExternal ? taskPartition == 0 ? GraphFunctions.MASTER_ONLY : GraphFunctions.WORKER_ONLY : (zooKeeperManager == null || !zooKeeperManager.runsZooKeeper()) ? GraphFunctions.WORKER_ONLY : GraphFunctions.MASTER_ZOOKEEPER_ONLY;
    }

    private void instantiateBspService() throws IOException, InterruptedException {
        if (this.graphFunctions.isMaster()) {
            if (LOG.isInfoEnabled()) {
                LOG.info("setup: Starting up BspServiceMaster (master thread)...");
            }
            this.serviceMaster = new BspServiceMaster(this.context, this);
            this.masterThread = new MasterThread(this.serviceMaster, this.context);
            this.masterThread.setUncaughtExceptionHandler(createUncaughtExceptionHandler());
            this.masterThread.start();
        }
        if (this.graphFunctions.isWorker()) {
            if (LOG.isInfoEnabled()) {
                LOG.info("setup: Starting up BspServiceWorker...");
            }
            this.serviceWorker = new BspServiceWorker(this.context, this);
            installGCMonitoring();
            if (LOG.isInfoEnabled()) {
                LOG.info("setup: Registering health of this worker...");
            }
        }
    }

    private void installGCMonitoring() {
        final GcObserver[] createGcObservers = this.conf.createGcObservers(this.context);
        List garbageCollectorMXBeans = ManagementFactory.getGarbageCollectorMXBeans();
        final OutOfCoreEngine oocEngine = this.serviceWorker.getServerData().getOocEngine();
        Iterator it = garbageCollectorMXBeans.iterator();
        while (it.hasNext()) {
            ((GarbageCollectorMXBean) it.next()).addNotificationListener(new NotificationListener() { // from class: org.apache.giraph.graph.GraphTaskManager.1
                public void handleNotification(Notification notification, Object obj) {
                    if (notification.getType().equals("com.sun.management.gc.notification")) {
                        GarbageCollectionNotificationInfo from = GarbageCollectionNotificationInfo.from((CompositeData) notification.getUserData());
                        if (GraphTaskManager.LOG.isInfoEnabled()) {
                            GraphTaskManager.LOG.info("installGCMonitoring: name = " + from.getGcName() + ", action = " + from.getGcAction() + ", cause = " + from.getGcCause() + ", duration = " + from.getGcInfo().getDuration() + "ms");
                        }
                        GraphTaskManager.this.gcTimeMetric.inc(from.getGcInfo().getDuration());
                        GiraphMetrics.get().getGcTracker().gcOccurred(from.getGcInfo());
                        for (GcObserver gcObserver : createGcObservers) {
                            gcObserver.gcOccurred(from);
                        }
                        if (oocEngine != null) {
                            oocEngine.gcCompleted(from);
                        }
                    }
                }
            }, (NotificationFilter) null, (Object) null);
        }
    }

    private void initializeAndConfigureLogging() {
        String localLevel = this.conf.getLocalLevel();
        if (!Logger.getRootLogger().getLevel().equals(Level.toLevel(localLevel))) {
            Logger.getRootLogger().setLevel(Level.toLevel(localLevel));
            if (LOG.isInfoEnabled()) {
                LOG.info("setup: Set log level to " + localLevel);
            }
        } else if (LOG.isInfoEnabled()) {
            LOG.info("setup: Log level remains at " + localLevel);
        }
        if (this.conf.useLogThreadLayout()) {
            PatternLayout patternLayout = new PatternLayout("%-7p %d [%t] %c %x - %m%n");
            Enumeration allAppenders = Logger.getRootLogger().getAllAppenders();
            while (allAppenders.hasMoreElements()) {
                ((Appender) allAppenders.nextElement()).setLayout(patternLayout);
            }
        }
        if (this.conf.getLocalTestMode()) {
            LogManager.getLogger(PrepRequestProcessor.class.getName()).setLevel(Level.ERROR);
        }
    }

    private void initJobMetrics() {
        GiraphMetricsRegistry perJobOptional = GiraphMetrics.get().perJobOptional();
        this.wcPreAppTimer = new GiraphTimer(perJobOptional, "worker-context-pre-app", TimeUnit.MILLISECONDS);
        this.wcPostAppTimer = new GiraphTimer(perJobOptional, "worker-context-post-app", TimeUnit.MILLISECONDS);
    }

    @Override // org.apache.giraph.metrics.ResetSuperstepMetricsObserver
    public void newSuperstep(SuperstepMetricsRegistry superstepMetricsRegistry) {
        this.superstepTimer = new GiraphTimer(superstepMetricsRegistry, TIMER_SUPERSTEP_TIME, TimeUnit.MILLISECONDS);
        this.computeAll = new GiraphTimer(superstepMetricsRegistry, TIMER_COMPUTE_ALL, TimeUnit.MILLISECONDS);
        this.timeToFirstMessage = new GiraphTimer(superstepMetricsRegistry, TIMER_TIME_TO_FIRST_MSG, TimeUnit.MICROSECONDS);
        this.communicationTimer = new GiraphTimer(superstepMetricsRegistry, TIMER_COMMUNICATION_TIME, TimeUnit.MILLISECONDS);
        this.gcTimeMetric = superstepMetricsRegistry.getCounter(TIMER_SUPERSTEP_GC_TIME);
        this.wcPreSuperstepTimer = new GiraphTimer(superstepMetricsRegistry, "worker-context-pre-superstep", TimeUnit.MILLISECONDS);
    }

    public void notifySentMessages() {
        if (this.timeToFirstMessageTimerContext != null) {
            synchronized (this.timeToFirstMessage) {
                if (this.timeToFirstMessageTimerContext != null) {
                    this.timeToFirstMessageTimerContext.stop();
                    this.timeToFirstMessageTimerContext = null;
                    this.communicationTimerContext = this.communicationTimer.time();
                }
            }
        }
    }

    public void notifyFinishedCommunication() {
        if (this.communicationTimerContext != null) {
            synchronized (this.communicationTimer) {
                if (this.communicationTimerContext != null) {
                    this.communicationTimerContext.stop();
                    this.communicationTimerContext = null;
                }
            }
        }
    }

    private void processGraphPartitions(final Mapper<?, ?, ?, ?>.Context context, List<PartitionStats> list, final GraphState graphState, final MessageStore<I, Writable> messageStore, int i) {
        PartitionStore<I, V, E> partitionStore = this.serviceWorker.getPartitionStore();
        long j = 0;
        Iterator<Integer> it = partitionStore.getPartitionIds().iterator();
        while (it.hasNext()) {
            j += partitionStore.getPartitionVertexCount(it.next());
        }
        WorkerProgress.get().startSuperstep(this.serviceWorker.getSuperstep(), j, this.serviceWorker.getPartitionStore().getNumPartitions());
        partitionStore.startIteration();
        GiraphTimerContext time = this.computeAll.time();
        this.timeToFirstMessageTimerContext = this.timeToFirstMessage.time();
        Iterator it2 = ProgressableUtils.getResultsWithNCallables(new CallableFactory<Collection<PartitionStats>>() { // from class: org.apache.giraph.graph.GraphTaskManager.2
            @Override // org.apache.giraph.utils.CallableFactory
            /* renamed from: newCallable */
            public Callable<Collection<PartitionStats>> newCallable2(int i2) {
                return new ComputeCallable(context, graphState, messageStore, GraphTaskManager.this.conf, GraphTaskManager.this.serviceWorker);
            }
        }, i, "compute-%d", context).iterator();
        while (it2.hasNext()) {
            list.addAll((Collection) it2.next());
        }
        time.stop();
    }

    private boolean checkSuperstepRestarted(long j) throws IOException {
        if (this.serviceWorker.getRestartedSuperstep() != j) {
            return false;
        }
        if (LOG.isInfoEnabled()) {
            LOG.info("execute: Loading from checkpoint " + j);
        }
        VertexEdgeCount loadCheckpoint = this.serviceWorker.loadCheckpoint(this.serviceWorker.getRestartedSuperstep());
        this.finishedSuperstepStats = new FinishedSuperstepStats(0L, false, loadCheckpoint.getVertexCount(), loadCheckpoint.getEdgeCount(), false, CheckpointStatus.NONE);
        return true;
    }

    private boolean storeCheckpoint(CheckpointStatus checkpointStatus) throws IOException {
        if (checkpointStatus != CheckpointStatus.NONE) {
            this.serviceWorker.storeCheckpoint();
        }
        return checkpointStatus == CheckpointStatus.CHECKPOINT_AND_HALT;
    }

    private boolean collectInputSuperstepStats(FinishedSuperstepStats finishedSuperstepStats) {
        if (finishedSuperstepStats.getVertexCount() == 0 && !finishedSuperstepStats.mustLoadCheckpoint()) {
            LOG.warn("map: No vertices in the graph, exiting.");
            return true;
        }
        if (!this.conf.metricsEnabled()) {
            return false;
        }
        GiraphMetrics.get().perSuperstep().printSummary(System.err);
        return false;
    }

    private boolean checkTaskState() {
        if (this.done) {
            return true;
        }
        GiraphMetrics.get().resetSuperstepMetrics(-1L);
        if (this.graphFunctions.isNotAWorker()) {
            if (!LOG.isInfoEnabled()) {
                return true;
            }
            LOG.info("map: No need to do anything when not a worker");
            return true;
        }
        if (this.alreadyRun) {
            throw new RuntimeException("map: In BSP, map should have only been run exactly once, (already run)");
        }
        this.alreadyRun = true;
        return false;
    }

    private void workerContextPreApp() {
        GiraphTimerContext time = this.wcPreAppTimer.time();
        try {
            this.serviceWorker.getWorkerContext().preApplication();
            time.stop();
            this.context.progress();
            for (WorkerObserver workerObserver : this.serviceWorker.getWorkerObservers()) {
                workerObserver.preApplication();
                this.context.progress();
            }
        } catch (IllegalAccessException e) {
            LOG.fatal("execute: preApplication failed in access", e);
            throw new RuntimeException("execute: preApplication failed in access", e);
        } catch (InstantiationException e2) {
            LOG.fatal("execute: preApplication failed in instantiation", e2);
            throw new RuntimeException("execute: preApplication failed in instantiation", e2);
        }
    }

    public void setupMapperObservers() {
        this.mapperObservers = this.conf.createMapperObservers(this.context);
        for (MapperObserver mapperObserver : this.mapperObservers) {
            mapperObserver.setup();
        }
    }

    private void preLoadOnWorkerObservers() {
        for (WorkerObserver workerObserver : this.serviceWorker.getWorkerObservers()) {
            workerObserver.preLoad();
            this.context.progress();
        }
    }

    private void postSaveOnWorkerObservers() {
        for (WorkerObserver workerObserver : this.serviceWorker.getWorkerObservers()) {
            workerObserver.postSave();
            this.context.progress();
        }
    }

    public void cleanup() throws IOException, InterruptedException {
        if (LOG.isInfoEnabled()) {
            LOG.info("cleanup: Starting for " + getGraphFunctions());
        }
        this.jobProgressTracker.cleanup();
        if (this.done || this.serviceWorker == null) {
            return;
        }
        this.serviceWorker.cleanup(this.finishedSuperstepStats);
    }

    public void sendWorkerCountersAndFinishCleanup() {
        if (this.serviceWorker != null) {
            postSaveOnWorkerObservers();
            this.serviceWorker.storeCountersInZooKeeper(true);
            this.serviceWorker.closeZooKeeper();
        }
        try {
            if (this.masterThread != null) {
                this.masterThread.join();
                LOG.info("cleanup: Joined with master thread");
            }
        } catch (InterruptedException e) {
            LOG.error("cleanup: Master thread couldn't join");
        }
        if (this.zkManager != null) {
            LOG.info("cleanup: Offlining ZooKeeper servers");
            try {
                this.zkManager.offlineZooKeeperServers(ZooKeeperManager.State.FINISHED);
            } catch (Throwable th) {
                LOG.error("cleanup: Error offlining zookeeper", th);
            }
        }
        GiraphMetrics.get().shutdown();
    }

    public void zooKeeperCleanup() {
        if (!this.graphFunctions.isZooKeeper() || this.zkManager == null) {
            return;
        }
        this.zkManager.cleanup();
    }

    public void workerFailureCleanup() {
        try {
            if (this.graphFunctions.isWorker()) {
                this.serviceWorker.failureCleanup();
            }
            GiraphMetrics.get().shutdown();
        } catch (RuntimeException e) {
            LOG.error("run: Worker failure failed on another RuntimeException, original expection will be rethrown", e);
        }
    }

    public Thread.UncaughtExceptionHandler createUncaughtExceptionHandler() {
        return new OverrideExceptionHandler(CHECKER_IF_WORKER_SHOULD_FAIL_AFTER_EXCEPTION_CLASS.newInstance(getConf()), getJobProgressTracker());
    }

    public Thread.UncaughtExceptionHandler createUncaughtExceptionHandler(CheckerIfWorkerShouldFailAfterException checkerIfWorkerShouldFailAfterException) {
        return new OverrideExceptionHandler(checkerIfWorkerShouldFailAfterException, getJobProgressTracker());
    }

    public ImmutableClassesGiraphConfiguration<I, V, E> getConf() {
        return this.conf;
    }

    public long getSuperstepGCTime() {
        if (this.gcTimeMetric == null) {
            return 0L;
        }
        return this.gcTimeMetric.count();
    }

    public String getZookeeperList() {
        return this.zkManager != null ? this.zkManager.getZooKeeperServerPortString() : this.conf.getZookeeperList();
    }

    public static boolean isConnectionResetByPeer(Throwable th) {
        return th.getMessage().startsWith("Connection reset by peer");
    }
}
