package gobblin.runtime.mapreduce;

import com.google.common.collect.Maps;
import gobblin.configuration.State;
import gobblin.configuration.WorkUnitState;
import gobblin.metrics.event.EventSubmitter;
import gobblin.runtime.TaskContext;
import gobblin.runtime.task.BaseAbstractTask;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:gobblin/runtime/mapreduce/MRTask.class */
public class MRTask extends BaseAbstractTask {
    private static final Logger log = LoggerFactory.getLogger(MRTask.class);
    private static final String JOB_CONFIGURATION_PREFIX = "MRTask.jobConfiguration.";
    private final TaskContext taskContext;
    private final EventSubmitter eventSubmitter;

    /* loaded from: input_file:gobblin/runtime/mapreduce/MRTask$Events.class */
    public static class Events {
        public static final String MR_JOB_STARTED_EVENT = "MRJobStarted";
        public static final String MR_JOB_SUCCESSFUL = "MRJobSuccessful";
        public static final String MR_JOB_FAILED = "MRJobFailed";
        public static final String JOB_URL = "jobTrackingUrl";
        public static final String FAILURE_CONTEXT = "failureContext";
    }

    public static void serializeJobToState(State state, Job job) {
        Iterator it = job.getConfiguration().iterator();
        while (it.hasNext()) {
            Map.Entry entry = (Map.Entry) it.next();
            state.setProp(JOB_CONFIGURATION_PREFIX + ((String) entry.getKey()), entry.getValue());
        }
    }

    public MRTask(TaskContext taskContext) {
        super(taskContext);
        this.taskContext = taskContext;
        this.eventSubmitter = new EventSubmitter.Builder(this.metricContext, "gobblin.MRTask").addMetadata(additionalEventMetadata()).build();
    }

    @Override // gobblin.runtime.task.BaseAbstractTask, gobblin.runtime.task.TaskIFace, java.lang.Runnable
    public void run() {
        try {
            Job createJob = createJob();
            createJob.submit();
            this.eventSubmitter.submit(Events.MR_JOB_STARTED_EVENT, new String[]{Events.JOB_URL, createJob.getTrackingURL()});
            createJob.waitForCompletion(false);
            if (createJob.isSuccessful()) {
                this.eventSubmitter.submit(Events.MR_JOB_SUCCESSFUL, new String[]{Events.JOB_URL, createJob.getTrackingURL()});
                this.workingState = WorkUnitState.WorkingState.SUCCESSFUL;
            } else {
                this.eventSubmitter.submit(Events.MR_JOB_FAILED, new String[]{Events.JOB_URL, createJob.getTrackingURL()});
                this.workingState = WorkUnitState.WorkingState.FAILED;
            }
        } catch (Throwable th) {
            log.error("Failed to run MR job.", th);
            this.eventSubmitter.submit(Events.MR_JOB_FAILED, new String[]{Events.FAILURE_CONTEXT, th.getMessage()});
            this.workingState = WorkUnitState.WorkingState.FAILED;
        }
    }

    protected Map<String, String> additionalEventMetadata() {
        return Maps.newHashMap();
    }

    protected Job createJob() throws IOException {
        Job job = Job.getInstance(new Configuration());
        for (Map.Entry entry : this.taskContext.getTaskState().getProperties().entrySet()) {
            if ((entry.getKey() instanceof String) && ((String) entry.getKey()).startsWith(JOB_CONFIGURATION_PREFIX)) {
                job.getConfiguration().set(((String) entry.getKey()).substring(JOB_CONFIGURATION_PREFIX.length()), (String) entry.getValue());
            }
        }
        return job;
    }
}
