package gobblin.runtime.local;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Service;
import com.google.common.util.concurrent.ServiceManager;
import gobblin.metrics.event.TimingEvent;
import gobblin.runtime.AbstractJobLauncher;
import gobblin.runtime.FileBasedJobLock;
import gobblin.runtime.JobLock;
import gobblin.runtime.JobState;
import gobblin.runtime.TaskExecutor;
import gobblin.runtime.TaskStateTracker;
import gobblin.runtime.util.TimingEventNames;
import gobblin.source.workunit.WorkUnit;
import gobblin.util.JobLauncherUtils;
import java.io.IOException;
import java.net.URI;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:gobblin/runtime/local/LocalJobLauncher.class */
public class LocalJobLauncher extends AbstractJobLauncher {
    private static final Logger LOG = LoggerFactory.getLogger(LocalJobLauncher.class);
    private final TaskExecutor taskExecutor;
    private final TaskStateTracker taskStateTracker;
    private final ServiceManager serviceManager;
    private volatile CountDownLatch countDownLatch;

    public LocalJobLauncher(Properties properties) throws Exception {
        super(properties, ImmutableList.of());
        TimingEvent timingEvent = this.eventSubmitter.getTimingEvent(TimingEventNames.RunJobTimings.JOB_LOCAL_SETUP);
        this.taskExecutor = new TaskExecutor(properties);
        this.taskStateTracker = new LocalTaskStateTracker(properties, this.jobContext.getJobState(), this.taskExecutor, this.eventBus);
        this.serviceManager = new ServiceManager(Lists.newArrayList(new Service[]{this.taskExecutor, this.taskStateTracker}));
        this.serviceManager.startAsync().awaitHealthy(5L, TimeUnit.SECONDS);
        startCancellationExecutor();
        timingEvent.stop();
    }

    @Override // gobblin.runtime.AbstractJobLauncher, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        try {
            this.serviceManager.stopAsync().awaitStopped(5L, TimeUnit.SECONDS);
        } catch (TimeoutException e) {
            LOG.warn("Timed out while waiting for the service manager to be stopped", e);
        } finally {
            super.close();
        }
    }

    @Override // gobblin.runtime.AbstractJobLauncher
    protected void runWorkUnits(List<WorkUnit> list) throws Exception {
        TimingEvent timingEvent = this.eventSubmitter.getTimingEvent("WorkUnitsPreparationTimer");
        List flattenWorkUnits = JobLauncherUtils.flattenWorkUnits(list);
        timingEvent.stop();
        if (flattenWorkUnits.isEmpty()) {
            LOG.warn("No work units to run");
            return;
        }
        String jobId = this.jobContext.getJobId();
        JobState jobState = this.jobContext.getJobState();
        Iterator it = flattenWorkUnits.iterator();
        while (it.hasNext()) {
            ((WorkUnit) it.next()).addAllIfNotExist(jobState);
        }
        TimingEvent timingEvent2 = this.eventSubmitter.getTimingEvent(TimingEventNames.RunJobTimings.WORK_UNITS_RUN);
        this.countDownLatch = new CountDownLatch(flattenWorkUnits.size());
        AbstractJobLauncher.runWorkUnits(this.jobContext.getJobId(), flattenWorkUnits, this.taskStateTracker, this.taskExecutor, this.countDownLatch);
        LOG.info(String.format("Waiting for submitted tasks of job %s to complete...", jobId));
        while (this.countDownLatch.getCount() > 0) {
            LOG.info(String.format("%d out of %d tasks of job %s are running", Long.valueOf(this.countDownLatch.getCount()), Integer.valueOf(flattenWorkUnits.size()), jobId));
            this.countDownLatch.await(1L, TimeUnit.MINUTES);
        }
        timingEvent2.stop();
        if (this.cancellationRequested) {
            synchronized (this.cancellationExecution) {
                if (this.cancellationExecuted) {
                    return;
                }
            }
        }
        LOG.info(String.format("All tasks of job %s have completed", jobId));
        if (jobState.getState() == JobState.RunningState.RUNNING) {
            jobState.setState(JobState.RunningState.SUCCESSFUL);
        }
    }

    @Override // gobblin.runtime.AbstractJobLauncher
    protected JobLock getJobLock() throws IOException {
        return new FileBasedJobLock(FileSystem.get(URI.create(this.jobProps.getProperty("fs.uri", "file:///")), new Configuration()), this.jobProps.getProperty("job.lock.dir"), this.jobContext.getJobName());
    }

    @Override // gobblin.runtime.AbstractJobLauncher
    protected void executeCancellation() {
        if (this.countDownLatch != null) {
            while (this.countDownLatch.getCount() > 0) {
                this.countDownLatch.countDown();
            }
        }
    }
}
