package gobblin.runtime.mapreduce;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.io.Closer;
import com.google.common.util.concurrent.Service;
import com.google.common.util.concurrent.ServiceManager;
import gobblin.configuration.ConfigurationKeys;
import gobblin.metastore.FsStateStore;
import gobblin.metastore.StateStore;
import gobblin.metrics.GobblinMetrics;
import gobblin.metrics.event.TimingEvent;
import gobblin.password.PasswordManager;
import gobblin.runtime.AbstractJobLauncher;
import gobblin.runtime.FileBasedJobLock;
import gobblin.runtime.JobLock;
import gobblin.runtime.JobState;
import gobblin.runtime.TaskExecutor;
import gobblin.runtime.TaskState;
import gobblin.runtime.TaskStateCollectorService;
import gobblin.runtime.TaskStateTracker;
import gobblin.runtime.util.JobMetrics;
import gobblin.runtime.util.MetricGroup;
import gobblin.runtime.util.TimingEventNames;
import gobblin.source.workunit.MultiWorkUnit;
import gobblin.source.workunit.WorkUnit;
import gobblin.util.HadoopUtils;
import gobblin.util.JobConfigurationUtils;
import gobblin.util.JobLauncherUtils;
import gobblin.util.ParallelRunner;
import gobblin.util.SerializationUtils;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.net.URI;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:gobblin/runtime/mapreduce/MRJobLauncher.class */
public class MRJobLauncher extends AbstractJobLauncher {
    private static final String JOB_NAME_PREFIX = "Gobblin-";
    private static final String JARS_DIR_NAME = "_jars";
    private static final String FILES_DIR_NAME = "_files";
    static final String INPUT_DIR_NAME = "input";
    private static final String OUTPUT_DIR_NAME = "output";
    private static final String WORK_UNIT_LIST_FILE_EXTENSION = ".wulist";
    private final Configuration conf;
    private final FileSystem fs;
    private final Job job;
    private final Path mrJobDir;
    private final Path jobInputPath;
    private final Path jobOutputPath;
    private final int parallelRunnerThreads;
    private final TaskStateCollectorService taskStateCollectorService;
    private volatile boolean hadoopJobSubmitted;
    private static final Logger LOG = LoggerFactory.getLogger(MRJobLauncher.class);
    private static final Splitter SPLITTER = Splitter.on(',').omitEmptyStrings().trimResults();

    /* loaded from: input_file:gobblin/runtime/mapreduce/MRJobLauncher$TaskRunner.class */
    public static class TaskRunner extends Mapper<LongWritable, Text, NullWritable, NullWritable> {
        private FileSystem fs;
        private StateStore<TaskState> taskStateStore;
        private TaskExecutor taskExecutor;
        private TaskStateTracker taskStateTracker;
        private ServiceManager serviceManager;
        private Optional<JobMetrics> jobMetrics = Optional.absent();
        private final JobState jobState = new JobState();
        private final List<WorkUnit> workUnits = Lists.newArrayList();

        protected void setup(Mapper<LongWritable, Text, NullWritable, NullWritable>.Context context) {
            try {
                this.fs = FileSystem.get(context.getConfiguration());
                this.taskStateStore = new FsStateStore(this.fs, SequenceFileOutputFormat.getOutputPath(context).toUri().getPath(), TaskState.class);
                SerializationUtils.deserializeState(this.fs, new Path(context.getConfiguration().get("job.state.file.path")), this.jobState);
                this.taskExecutor = new TaskExecutor(context.getConfiguration());
                this.taskStateTracker = new MRTaskStateTracker(context);
                this.serviceManager = new ServiceManager(Lists.newArrayList(new Service[]{this.taskExecutor, this.taskStateTracker}));
                try {
                    this.serviceManager.startAsync().awaitHealthy(5L, TimeUnit.SECONDS);
                    Configuration configuration = context.getConfiguration();
                    if (Boolean.valueOf(configuration.get("metrics.enabled", ConfigurationKeys.DEFAULT_METRICS_ENABLED)).booleanValue()) {
                        this.jobMetrics = Optional.of(JobMetrics.get(this.jobState));
                        ((JobMetrics) this.jobMetrics.get()).startMetricReportingWithFileSuffix(HadoopUtils.getStateFromConf(configuration), context.getTaskAttemptID().getTaskID().toString());
                    }
                } catch (TimeoutException e) {
                    MRJobLauncher.LOG.error("Timed out while waiting for the service manager to start up", e);
                    throw new RuntimeException(e);
                }
            } catch (IOException e2) {
                throw new RuntimeException("Failed to setup the mapper task", e2);
            }
        }

        public void run(Mapper<LongWritable, Text, NullWritable, NullWritable>.Context context) throws IOException, InterruptedException {
            setup(context);
            while (context.nextKeyValue()) {
                try {
                    map((LongWritable) context.getCurrentKey(), (Text) context.getCurrentValue(), context);
                } finally {
                    cleanup(context);
                }
            }
            AbstractJobLauncher.runWorkUnits(this.jobState.getJobId(), context.getTaskAttemptID().toString(), this.workUnits, this.taskStateTracker, this.taskExecutor, this.taskStateStore, MRJobLauncher.LOG);
        }

        public void map(LongWritable longWritable, Text text, Mapper<LongWritable, Text, NullWritable, NullWritable>.Context context) throws IOException, InterruptedException {
            MultiWorkUnit createEmpty = text.toString().endsWith(AbstractJobLauncher.MULTI_WORK_UNIT_FILE_EXTENSION) ? MultiWorkUnit.createEmpty() : WorkUnit.createEmpty();
            SerializationUtils.deserializeState(this.fs, new Path(text.toString()), createEmpty);
            if (!(createEmpty instanceof MultiWorkUnit)) {
                createEmpty.addAllIfNotExist(this.jobState);
                this.workUnits.add(createEmpty);
                return;
            }
            List flattenWorkUnits = JobLauncherUtils.flattenWorkUnits(createEmpty.getWorkUnits());
            Iterator it = flattenWorkUnits.iterator();
            while (it.hasNext()) {
                ((WorkUnit) it.next()).addAllIfNotExist(this.jobState);
            }
            this.workUnits.addAll(flattenWorkUnits);
        }

        protected void cleanup(Mapper<LongWritable, Text, NullWritable, NullWritable>.Context context) throws IOException, InterruptedException {
            try {
                this.serviceManager.stopAsync().awaitStopped(5L, TimeUnit.SECONDS);
                if (this.jobMetrics.isPresent()) {
                    try {
                        ((JobMetrics) this.jobMetrics.get()).stopMetricsReporting();
                    } finally {
                    }
                }
            } catch (TimeoutException e) {
                if (this.jobMetrics.isPresent()) {
                    try {
                        ((JobMetrics) this.jobMetrics.get()).stopMetricsReporting();
                    } finally {
                    }
                }
            } catch (Throwable th) {
                if (this.jobMetrics.isPresent()) {
                    try {
                        ((JobMetrics) this.jobMetrics.get()).stopMetricsReporting();
                    } finally {
                    }
                }
                throw th;
            }
        }

        public /* bridge */ /* synthetic */ void map(Object obj, Object obj2, Mapper.Context context) throws IOException, InterruptedException {
            map((LongWritable) obj, (Text) obj2, (Mapper<LongWritable, Text, NullWritable, NullWritable>.Context) context);
        }
    }

    public MRJobLauncher(Properties properties) throws Exception {
        this(properties, new Configuration());
    }

    public MRJobLauncher(Properties properties, Configuration configuration) throws Exception {
        super(properties, ImmutableList.of());
        this.hadoopJobSubmitted = false;
        this.conf = configuration;
        JobConfigurationUtils.putPropertiesIntoConfiguration(this.jobProps, this.conf);
        this.conf.set("mapred.max.map.failures.percent", "100");
        this.conf.set("mapreduce.map.failures.maxpercent", "100");
        this.conf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", false);
        this.fs = buildFileSystem(properties, this.conf);
        this.mrJobDir = new Path(this.jobProps.getProperty("mr.job.root.dir"), this.jobContext.getJobName());
        if (this.fs.exists(this.mrJobDir)) {
            LOG.warn("Job working directory already exists for job " + this.jobContext.getJobName());
            this.fs.delete(this.mrJobDir, true);
        }
        this.fs.mkdirs(this.mrJobDir);
        this.jobInputPath = new Path(this.mrJobDir, INPUT_DIR_NAME);
        this.jobOutputPath = new Path(this.mrJobDir, OUTPUT_DIR_NAME);
        Path path = new Path(this.jobOutputPath, this.jobContext.getJobId());
        addDependencies();
        this.job = Job.getInstance(this.conf, JOB_NAME_PREFIX + this.jobContext.getJobName());
        this.parallelRunnerThreads = Integer.parseInt(properties.getProperty("parallel.runner.threads", Integer.toString(10)));
        this.taskStateCollectorService = new TaskStateCollectorService(properties, this.jobContext.getJobState(), this.eventBus, this.fs, path);
        startCancellationExecutor();
    }

    @Override // gobblin.runtime.AbstractJobLauncher, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        try {
            if (this.hadoopJobSubmitted && !this.job.isComplete()) {
                LOG.info("Killing the Hadoop MR job for job " + this.jobContext.getJobId());
                this.job.killJob();
            }
            try {
                cleanUpWorkingDirectory();
            } finally {
            }
        } catch (Throwable th) {
            try {
                cleanUpWorkingDirectory();
                throw th;
            } finally {
            }
        }
    }

    @Override // gobblin.runtime.AbstractJobLauncher
    protected void runWorkUnits(List<WorkUnit> list) throws Exception {
        String jobName = this.jobContext.getJobName();
        JobState jobState = this.jobContext.getJobState();
        try {
            prepareHadoopJob(list);
            this.taskStateCollectorService.startAsync().awaitRunning();
            LOG.info("Launching Hadoop MR job " + this.job.getJobName());
            this.job.submit();
            this.hadoopJobSubmitted = true;
            if (!jobState.contains("job.tracking.url")) {
                jobState.setProp("job.tracking.url", this.job.getTrackingURL());
            }
            TimingEvent timingEvent = this.eventSubmitter.getTimingEvent(TimingEventNames.RunJobTimings.MR_JOB_RUN);
            LOG.info(String.format("Waiting for Hadoop MR job %s to complete", this.job.getJobID()));
            this.job.waitForCompletion(true);
            timingEvent.stop(ImmutableMap.of("hadoopMRJobId", this.job.getJobID().toString()));
            if (this.cancellationRequested) {
                synchronized (this.cancellationExecution) {
                    if (this.cancellationExecuted) {
                        return;
                    }
                }
            }
            countersToMetrics(JobMetrics.get(jobName, this.jobProps.getProperty("job.id")));
            this.taskStateCollectorService.stopAsync().awaitTerminated();
            cleanUpWorkingDirectory();
        } finally {
            this.taskStateCollectorService.stopAsync().awaitTerminated();
            cleanUpWorkingDirectory();
        }
    }

    @Override // gobblin.runtime.AbstractJobLauncher
    protected JobLock getJobLock() throws IOException {
        return new FileBasedJobLock(this.fs, this.jobProps.getProperty("job.lock.dir"), this.jobContext.getJobName());
    }

    @Override // gobblin.runtime.AbstractJobLauncher
    protected void executeCancellation() {
        try {
            if (this.hadoopJobSubmitted && !this.job.isComplete()) {
                LOG.info("Killing the Hadoop MR job for job " + this.jobContext.getJobId());
                this.job.killJob();
            }
        } catch (IOException e) {
            LOG.error("Failed to kill the Hadoop MR job for job " + this.jobContext.getJobId());
        } catch (IllegalStateException e2) {
            LOG.error("The Hadoop MR job has not started for job " + this.jobContext.getJobId());
        }
    }

    private void addDependencies() throws IOException {
        TimingEvent timingEvent = this.eventSubmitter.getTimingEvent(TimingEventNames.RunJobTimings.MR_DISTRIBUTED_CACHE_SETUP);
        Path path = new Path(this.mrJobDir, JARS_DIR_NAME);
        if (this.jobProps.containsKey("framework.jars")) {
            addJars(path, this.jobProps.getProperty("framework.jars"));
        }
        if (this.jobProps.containsKey("job.jars")) {
            addJars(path, this.jobProps.getProperty("job.jars"));
        }
        if (this.jobProps.containsKey("job.local.files")) {
            addLocalFiles(new Path(this.mrJobDir, FILES_DIR_NAME), this.jobProps.getProperty("job.local.files"));
        }
        if (this.jobProps.containsKey("job.hdfs.files")) {
            addHDFSFiles(this.jobProps.getProperty("job.hdfs.files"));
        }
        timingEvent.stop();
    }

    private void prepareHadoopJob(List<WorkUnit> list) throws IOException {
        int parseInt;
        TimingEvent timingEvent = this.eventSubmitter.getTimingEvent(TimingEventNames.RunJobTimings.MR_JOB_SETUP);
        this.job.setJarByClass(MRJobLauncher.class);
        this.job.setMapperClass(TaskRunner.class);
        this.job.setNumReduceTasks(0);
        this.job.setInputFormatClass(NLineInputFormat.class);
        this.job.setOutputFormatClass(GobblinOutputFormat.class);
        this.job.setMapOutputKeyClass(NullWritable.class);
        this.job.setMapOutputValueClass(NullWritable.class);
        this.job.setSpeculativeExecution(false);
        NLineInputFormat.addInputPath(this.job, prepareJobInput(list));
        SequenceFileOutputFormat.setOutputPath(this.job, this.jobOutputPath);
        Path path = new Path(this.mrJobDir, AbstractJobLauncher.JOB_STATE_FILE_NAME);
        SerializationUtils.serializeState(this.fs, path, this.jobContext.getJobState());
        this.job.getConfiguration().set("job.state.file.path", path.toString());
        if (this.jobProps.containsKey("mr.job.max.mappers") && list.size() > (parseInt = Integer.parseInt(this.jobProps.getProperty("mr.job.max.mappers")))) {
            NLineInputFormat.setNumLinesPerSplit(this.job, list.size() % parseInt == 0 ? list.size() / parseInt : (list.size() / parseInt) + 1);
        }
        timingEvent.stop();
    }

    private void addJars(Path path, String str) throws IOException {
        LocalFileSystem local = FileSystem.getLocal(this.conf);
        Iterator it = SPLITTER.split(str).iterator();
        while (it.hasNext()) {
            for (FileStatus fileStatus : local.globStatus(new Path((String) it.next()))) {
                Path path2 = new Path(this.fs.makeQualified(path), fileStatus.getPath().getName());
                this.fs.copyFromLocalFile(fileStatus.getPath(), path2);
                LOG.info(String.format("Adding %s to classpath", path2));
                DistributedCache.addFileToClassPath(path2, this.conf, this.fs);
            }
        }
    }

    private void addLocalFiles(Path path, String str) throws IOException {
        DistributedCache.createSymlink(this.conf);
        Iterator it = SPLITTER.split(str).iterator();
        while (it.hasNext()) {
            Path path2 = new Path((String) it.next());
            Path path3 = new Path(this.fs.makeQualified(path), path2.getName());
            this.fs.copyFromLocalFile(path2, path3);
            URI create = URI.create(path3.toUri().getPath() + "#" + path3.getName());
            LOG.info(String.format("Adding %s to DistributedCache", create));
            DistributedCache.addCacheFile(create, this.conf);
        }
    }

    private void addHDFSFiles(String str) throws IOException {
        DistributedCache.createSymlink(this.conf);
        Iterator it = SPLITTER.split(PasswordManager.getInstance(this.jobProps).readPassword(str)).iterator();
        while (it.hasNext()) {
            Path path = new Path((String) it.next());
            URI create = URI.create(path.toUri().getPath() + "#" + path.getName());
            LOG.info(String.format("Adding %s to DistributedCache", create));
            DistributedCache.addCacheFile(create, this.conf);
        }
    }

    private Path prepareJobInput(List<WorkUnit> list) throws IOException {
        RuntimeException rethrow;
        String str;
        Path path = new Path(this.jobInputPath, this.jobContext.getJobId() + WORK_UNIT_LIST_FILE_EXTENSION);
        Closer create = Closer.create();
        try {
            try {
                ParallelRunner register = create.register(new ParallelRunner(this.parallelRunnerThreads, this.fs));
                Writer writer = (Writer) create.register(new BufferedWriter((Writer) create.register(new OutputStreamWriter((OutputStream) create.register(this.fs.create(path)), ConfigurationKeys.DEFAULT_CHARSET_ENCODING))));
                int i = 0;
                for (WorkUnit workUnit : list) {
                    if (workUnit instanceof MultiWorkUnit) {
                        int i2 = i;
                        i++;
                        str = JobLauncherUtils.newMultiTaskId(this.jobContext.getJobId(), i2) + AbstractJobLauncher.MULTI_WORK_UNIT_FILE_EXTENSION;
                    } else {
                        str = workUnit.getProp("task.id") + AbstractJobLauncher.WORK_UNIT_FILE_EXTENSION;
                    }
                    Path path2 = new Path(this.jobInputPath, str);
                    register.serializeToFile(workUnit, path2);
                    writer.write(path2.toUri().getPath() + "\n");
                }
                return path;
            } finally {
            }
        } finally {
            create.close();
        }
    }

    private void cleanUpWorkingDirectory() {
        try {
            if (this.fs.exists(this.mrJobDir)) {
                this.fs.delete(this.mrJobDir, true);
                LOG.info("Deleted working directory " + this.mrJobDir);
            }
        } catch (IOException e) {
            LOG.error("Failed to delete working directory " + this.mrJobDir);
        }
    }

    @VisibleForTesting
    void countersToMetrics(GobblinMetrics gobblinMetrics) throws IOException {
        Optional fromNullable = Optional.fromNullable(this.job.getCounters());
        if (fromNullable.isPresent()) {
            Iterator it = ((Counters) fromNullable.get()).getGroup(MetricGroup.JOB.name()).iterator();
            while (it.hasNext()) {
                Counter counter = (Counter) it.next();
                gobblinMetrics.getCounter(counter.getName(), new String[0]).inc(counter.getValue());
            }
            Iterator it2 = ((Counters) fromNullable.get()).getGroup(MetricGroup.TASK.name()).iterator();
            while (it2.hasNext()) {
                Counter counter2 = (Counter) it2.next();
                gobblinMetrics.getCounter(counter2.getName(), new String[0]).inc(counter2.getValue());
            }
        }
    }

    private static FileSystem buildFileSystem(Properties properties, Configuration configuration) throws IOException {
        return FileSystem.get(URI.create(properties.getProperty("fs.uri", "file:///")), configuration);
    }
}
