package org.apache.giraph.job;

import com.facebook.swift.codec.ThriftCodec;
import com.facebook.swift.codec.ThriftCodecManager;
import com.facebook.swift.service.ThriftServer;
import com.facebook.swift.service.ThriftServerConfig;
import com.facebook.swift.service.ThriftServiceProcessor;
import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.avro.ipc.trace.SpanStorage;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.worker.WorkerProgress;
import org.apache.hadoop.mapreduce.Job;
import org.apache.log4j.Logger;

/* loaded from: input_file:org/apache/giraph/job/JobProgressTrackerService.class */
public class JobProgressTrackerService implements JobProgressTracker {
    private static final Logger LOG = Logger.getLogger(JobProgressTrackerService.class);
    private static final int UPDATE_MILLISECONDS = 10000;
    private final GiraphConfiguration conf;
    private Thread writerThread;
    private ThriftServer server;
    private int mappersStarted;
    private long lastTimeMappersStartedLogged;
    private Job job;
    private volatile boolean finished = false;
    private final Map<Integer, WorkerProgress> workerProgresses = new ConcurrentHashMap();

    public JobProgressTrackerService(GiraphConfiguration giraphConfiguration) {
        this.conf = giraphConfiguration;
        if (LOG.isInfoEnabled()) {
            LOG.info("Waiting for job to start... (this may take a minute)");
        }
        startWriterThread();
    }

    private void startWriterThread() {
        this.writerThread = new Thread(new Runnable() { // from class: org.apache.giraph.job.JobProgressTrackerService.1
            @Override // java.lang.Runnable
            public void run() {
                while (!JobProgressTrackerService.this.finished) {
                    if (JobProgressTrackerService.this.mappersStarted == JobProgressTrackerService.this.conf.getMaxWorkers() + 1 && !JobProgressTrackerService.this.workerProgresses.isEmpty()) {
                        CombinedWorkerProgress combinedWorkerProgress = new CombinedWorkerProgress(JobProgressTrackerService.this.workerProgresses.values());
                        if (JobProgressTrackerService.LOG.isInfoEnabled()) {
                            JobProgressTrackerService.LOG.info(combinedWorkerProgress.toString());
                        }
                        if (combinedWorkerProgress.isDone(JobProgressTrackerService.this.conf.getMaxWorkers())) {
                            return;
                        }
                    }
                    try {
                        Thread.sleep(SpanStorage.DEFAULT_MAX_SPANS);
                    } catch (InterruptedException e) {
                        if (JobProgressTrackerService.LOG.isInfoEnabled()) {
                            JobProgressTrackerService.LOG.info("Progress thread interrupted");
                            return;
                        }
                        return;
                    }
                }
            }
        });
        this.writerThread.start();
    }

    public void setJob(Job job) {
        this.job = job;
    }

    private void jobGotAllMappers() {
        final long j = GiraphConstants.MAX_ALLOWED_JOB_TIME_MS.get(this.conf);
        if (j > 0) {
            new Thread(new Runnable() { // from class: org.apache.giraph.job.JobProgressTrackerService.2
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        Thread.sleep(j);
                        try {
                            JobProgressTrackerService.LOG.warn("Killing job because it took longer than " + j + " milliseconds");
                            JobProgressTrackerService.this.job.killJob();
                        } catch (IOException e) {
                            JobProgressTrackerService.LOG.warn("Failed to kill job", e);
                        }
                    } catch (InterruptedException e2) {
                        if (JobProgressTrackerService.LOG.isDebugEnabled()) {
                            JobProgressTrackerService.LOG.debug("Thread checking for jobs max allowed time interrupted");
                        }
                    }
                }
            }).start();
        }
    }

    @Override // org.apache.giraph.job.JobProgressTracker
    public synchronized void mapperStarted() {
        this.mappersStarted++;
        if (LOG.isInfoEnabled()) {
            if (this.mappersStarted == this.conf.getMaxWorkers() + 1) {
                LOG.info("Got all " + this.mappersStarted + " mappers");
                jobGotAllMappers();
            } else if (System.currentTimeMillis() - this.lastTimeMappersStartedLogged > SpanStorage.DEFAULT_MAX_SPANS) {
                this.lastTimeMappersStartedLogged = System.currentTimeMillis();
                LOG.info("Got " + this.mappersStarted + " but needs " + (this.conf.getMaxWorkers() + 1) + " mappers");
            }
        }
    }

    @Override // org.apache.giraph.job.JobProgressTracker
    public void logInfo(String str) {
        if (LOG.isInfoEnabled()) {
            LOG.info(str);
        }
    }

    @Override // org.apache.giraph.job.JobProgressTracker
    public void logFailure(String str) {
        LOG.fatal(str);
        this.finished = true;
        this.writerThread.interrupt();
    }

    @Override // org.apache.giraph.job.JobProgressTracker
    public void updateProgress(WorkerProgress workerProgress) {
        this.workerProgresses.put(Integer.valueOf(workerProgress.getTaskId()), workerProgress);
    }

    public void stop(boolean z) {
        this.finished = true;
        this.writerThread.interrupt();
        this.server.close();
        if (LOG.isInfoEnabled()) {
            LOG.info("Job " + (z ? "finished successfully" : "failed") + ", cleaning up...");
        }
    }

    public static JobProgressTrackerService createJobProgressServer(GiraphConfiguration giraphConfiguration) {
        if (!giraphConfiguration.trackJobProgressOnClient()) {
            return null;
        }
        try {
            JobProgressTrackerService jobProgressTrackerService = new JobProgressTrackerService(giraphConfiguration);
            jobProgressTrackerService.server = new ThriftServer(new ThriftServiceProcessor(new ThriftCodecManager(new ThriftCodec[0]), new ArrayList(), jobProgressTrackerService), new ThriftServerConfig());
            jobProgressTrackerService.server.start();
            JOB_PROGRESS_SERVICE_HOST.set(giraphConfiguration, InetAddress.getLocalHost().getHostName());
            JOB_PROGRESS_SERVICE_PORT.set(giraphConfiguration, jobProgressTrackerService.server.getPort().intValue());
            return jobProgressTrackerService;
        } catch (Exception e) {
            LOG.warn("Exception occurred while trying to create JobProgressTrackerService - not using progress reporting", e);
            return null;
        }
    }
}
