package org.apache.giraph.job;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.avro.ipc.trace.SpanStorage;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.worker.WorkerProgress;
import org.apache.hadoop.mapreduce.Job;
import org.apache.log4j.Logger;

/* loaded from: input_file:org/apache/giraph/job/DefaultJobProgressTrackerService.class */
public class DefaultJobProgressTrackerService implements JobProgressTrackerService {
    private static final Logger LOG = Logger.getLogger(JobProgressTrackerService.class);
    private static final int UPDATE_MILLISECONDS = 10000;
    private GiraphConfiguration conf;
    private GiraphJobObserver jobObserver;
    private Thread writerThread;
    private int mappersStarted;
    private long lastTimeMappersStartedLogged;
    private Job job;
    private volatile boolean finished = false;
    private final Map<Integer, WorkerProgress> workerProgresses = new ConcurrentHashMap();

    @Override // org.apache.giraph.job.JobProgressTrackerService
    public void init(GiraphConfiguration giraphConfiguration, GiraphJobObserver giraphJobObserver) {
        this.conf = giraphConfiguration;
        this.jobObserver = giraphJobObserver;
        if (LOG.isInfoEnabled()) {
            LOG.info("Waiting for job to start... (this may take a minute)");
        }
        startWriterThread();
    }

    private void startWriterThread() {
        this.writerThread = new Thread(new Runnable() { // from class: org.apache.giraph.job.DefaultJobProgressTrackerService.1
            @Override // java.lang.Runnable
            public void run() {
                while (!DefaultJobProgressTrackerService.this.finished) {
                    if (DefaultJobProgressTrackerService.this.mappersStarted == DefaultJobProgressTrackerService.this.conf.getMaxWorkers() + 1 && !DefaultJobProgressTrackerService.this.workerProgresses.isEmpty()) {
                        CombinedWorkerProgress combinedWorkerProgress = new CombinedWorkerProgress(DefaultJobProgressTrackerService.this.workerProgresses.values(), DefaultJobProgressTrackerService.this.conf);
                        if (DefaultJobProgressTrackerService.LOG.isInfoEnabled()) {
                            DefaultJobProgressTrackerService.LOG.info(combinedWorkerProgress.toString());
                        }
                        if (combinedWorkerProgress.isDone(DefaultJobProgressTrackerService.this.conf.getMaxWorkers())) {
                            return;
                        }
                    }
                    try {
                        Thread.sleep(SpanStorage.DEFAULT_MAX_SPANS);
                    } catch (InterruptedException e) {
                        if (DefaultJobProgressTrackerService.LOG.isInfoEnabled()) {
                            DefaultJobProgressTrackerService.LOG.info("Progress thread interrupted");
                            return;
                        }
                        return;
                    }
                }
            }
        });
        this.writerThread.setDaemon(true);
        this.writerThread.start();
    }

    @Override // org.apache.giraph.job.JobProgressTrackerService
    public void setJob(Job job) {
        this.job = job;
    }

    private void jobGotAllMappers() {
        this.jobObserver.jobGotAllMappers(this.job);
        final long j = GiraphConstants.MAX_ALLOWED_JOB_TIME_MS.get(this.conf);
        if (j > 0) {
            Thread thread = new Thread(new Runnable() { // from class: org.apache.giraph.job.DefaultJobProgressTrackerService.2
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        Thread.sleep(j);
                        try {
                            DefaultJobProgressTrackerService.LOG.warn("Killing job because it took longer than " + j + " milliseconds");
                            DefaultJobProgressTrackerService.this.job.killJob();
                        } catch (IOException e) {
                            DefaultJobProgressTrackerService.LOG.warn("Failed to kill job", e);
                        }
                    } catch (InterruptedException e2) {
                        if (DefaultJobProgressTrackerService.LOG.isDebugEnabled()) {
                            DefaultJobProgressTrackerService.LOG.debug("Thread checking for jobs max allowed time interrupted");
                        }
                    }
                }
            });
            thread.setDaemon(true);
            thread.start();
        }
    }

    @Override // org.apache.giraph.job.JobProgressTracker
    public synchronized void mapperStarted() {
        this.mappersStarted++;
        if (LOG.isInfoEnabled()) {
            if (this.mappersStarted == this.conf.getMaxWorkers() + 1) {
                LOG.info("Got all " + this.mappersStarted + " mappers");
                jobGotAllMappers();
            } else if (System.currentTimeMillis() - this.lastTimeMappersStartedLogged > SpanStorage.DEFAULT_MAX_SPANS) {
                this.lastTimeMappersStartedLogged = System.currentTimeMillis();
                LOG.info("Got " + this.mappersStarted + " but needs " + (this.conf.getMaxWorkers() + 1) + " mappers");
            }
        }
    }

    @Override // org.apache.giraph.job.JobProgressTracker
    public void logInfo(String str) {
        if (LOG.isInfoEnabled()) {
            LOG.info(str);
        }
    }

    @Override // org.apache.giraph.job.JobProgressTracker
    public void logError(String str) {
        LOG.error(str);
    }

    @Override // org.apache.giraph.job.JobProgressTracker
    public void logFailure(String str) {
        LOG.fatal(str);
        this.finished = true;
        this.writerThread.interrupt();
    }

    @Override // org.apache.giraph.job.JobProgressTracker
    public void updateProgress(WorkerProgress workerProgress) {
        this.workerProgresses.put(Integer.valueOf(workerProgress.getTaskId()), workerProgress);
    }

    @Override // org.apache.giraph.job.JobProgressTrackerService
    public void stop(boolean z) {
        this.finished = true;
        this.writerThread.interrupt();
        if (LOG.isInfoEnabled()) {
            LOG.info("Job " + (z ? "finished successfully" : "failed") + ", cleaning up...");
        }
    }

    public static JobProgressTrackerService createJobProgressTrackerService(GiraphConfiguration giraphConfiguration, GiraphJobObserver giraphJobObserver) {
        if (!giraphConfiguration.trackJobProgressOnClient()) {
            return null;
        }
        JobProgressTrackerService newInstance = GiraphConstants.JOB_PROGRESS_TRACKER_CLASS.newInstance(giraphConfiguration);
        newInstance.init(giraphConfiguration, giraphJobObserver);
        return newInstance;
    }
}
