package org.apache.gobblin.runtime.mapreduce;

import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.runtime.TaskContext;
import org.apache.gobblin.runtime.task.BaseAbstractTask;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/gobblin/runtime/mapreduce/MRTask.class */
public class MRTask extends BaseAbstractTask {
    private static final Logger log = LoggerFactory.getLogger(MRTask.class);
    private static final String JOB_CONFIGURATION_PREFIX = "MRTask.jobConfiguration.";
    private final TaskContext taskContext;
    private final EventSubmitter eventSubmitter;
    protected Job mrJob;

    /* loaded from: input_file:org/apache/gobblin/runtime/mapreduce/MRTask$Events.class */
    public static class Events {
        public static final String MR_JOB_STARTED_EVENT = "MRJobStarted";
        public static final String MR_JOB_SUCCESSFUL = "MRJobSuccessful";
        public static final String MR_JOB_FAILED = "MRJobFailed";
        public static final String MR_JOB_SKIPPED = "MRJobSkipped";
        public static final String JOB_URL = "jobTrackingUrl";
        public static final String FAILURE_CONTEXT = "failureContext";
    }

    public static void serializeJobToState(State state, Job job) {
        Iterator it = job.getConfiguration().iterator();
        while (it.hasNext()) {
            Map.Entry entry = (Map.Entry) it.next();
            state.setProp(JOB_CONFIGURATION_PREFIX + ((String) entry.getKey()), entry.getValue());
        }
    }

    public MRTask(TaskContext taskContext) {
        super(taskContext);
        this.taskContext = taskContext;
        this.eventSubmitter = new EventSubmitter.Builder(this.metricContext, "gobblin.MRTask").addMetadata(additionalEventMetadata()).build();
    }

    public void onMRTaskComplete(boolean z, Throwable th) throws RuntimeException {
        if (z) {
            this.workingState = WorkUnitState.WorkingState.SUCCESSFUL;
        } else if (th == null) {
            this.workingState = WorkUnitState.WorkingState.FAILED;
        } else {
            log.error("Failed to run MR job with exception {}", ExceptionUtils.getStackTrace(th));
            this.workingState = WorkUnitState.WorkingState.FAILED;
        }
    }

    @Override // org.apache.gobblin.runtime.task.BaseAbstractTask, org.apache.gobblin.runtime.task.TaskIFace
    public void commit() {
        log.debug("State is set to {} inside onMRTaskComplete.", this.workingState);
    }

    @Override // org.apache.gobblin.runtime.task.BaseAbstractTask, org.apache.gobblin.runtime.task.TaskIFace, java.lang.Runnable
    public void run() {
        try {
            Job createJob = createJob();
            if (createJob == null) {
                log.info("No MR job created. Skipping.");
                this.workingState = WorkUnitState.WorkingState.SUCCESSFUL;
                this.eventSubmitter.submit(Events.MR_JOB_SKIPPED);
                onSkippedMRJob();
                return;
            }
            createJob.submit();
            log.info("MR tracking URL {} for job {}", createJob.getTrackingURL(), createJob.getJobName());
            this.eventSubmitter.submit(Events.MR_JOB_STARTED_EVENT, new String[]{Events.JOB_URL, createJob.getTrackingURL()});
            createJob.waitForCompletion(false);
            this.mrJob = createJob;
            if (createJob.isSuccessful()) {
                this.eventSubmitter.submit(Events.MR_JOB_SUCCESSFUL, new String[]{Events.JOB_URL, createJob.getTrackingURL()});
                onMRTaskComplete(true, null);
            } else {
                this.eventSubmitter.submit(Events.MR_JOB_FAILED, new String[]{Events.JOB_URL, createJob.getTrackingURL()});
                onMRTaskComplete(false, new IOException(String.format("MR Job:%s is not successful", createJob.getTrackingURL())));
            }
        } catch (Throwable th) {
            log.error("Failed to run MR job.", th);
            this.eventSubmitter.submit(Events.MR_JOB_FAILED, new String[]{Events.FAILURE_CONTEXT, th.getMessage()});
            onMRTaskComplete(false, th);
        }
    }

    protected Map<String, String> additionalEventMetadata() {
        return Maps.newHashMap();
    }

    protected Job createJob() throws IOException {
        Job job = Job.getInstance(new Configuration());
        for (Map.Entry entry : this.taskContext.getTaskState().getProperties().entrySet()) {
            if ((entry.getKey() instanceof String) && ((String) entry.getKey()).startsWith(JOB_CONFIGURATION_PREFIX)) {
                job.getConfiguration().set(((String) entry.getKey()).substring(JOB_CONFIGURATION_PREFIX.length()), (String) entry.getValue());
            }
        }
        return job;
    }

    protected void onSkippedMRJob() {
    }
}
