package gobblin.runtime;

import com.google.common.base.CaseFormat;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.eventbus.EventBus;
import com.google.common.io.Closer;
import gobblin.commit.CommitSequence;
import gobblin.commit.CommitSequenceStore;
import gobblin.commit.DeliverySemantics;
import gobblin.configuration.WorkUnitState;
import gobblin.converter.initializer.ConverterInitializerFactory;
import gobblin.metastore.StateStore;
import gobblin.metrics.GobblinMetrics;
import gobblin.metrics.GobblinMetricsRegistry;
import gobblin.metrics.MetricContext;
import gobblin.metrics.Tag;
import gobblin.metrics.event.EventSubmitter;
import gobblin.metrics.event.TimingEvent;
import gobblin.runtime.JobState;
import gobblin.runtime.listeners.CloseableJobListener;
import gobblin.runtime.listeners.JobExecutionEventSubmitterListener;
import gobblin.runtime.listeners.JobListener;
import gobblin.runtime.listeners.JobListeners;
import gobblin.runtime.locks.JobLock;
import gobblin.runtime.locks.JobLockEventListener;
import gobblin.runtime.locks.JobLockException;
import gobblin.runtime.locks.JobLockFactory;
import gobblin.runtime.util.JobMetrics;
import gobblin.source.workunit.WorkUnit;
import gobblin.util.ClusterNameTags;
import gobblin.util.ExecutorsUtils;
import gobblin.util.JobLauncherUtils;
import gobblin.writer.initializer.WriterInitializerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:gobblin/runtime/AbstractJobLauncher.class */
public abstract class AbstractJobLauncher implements JobLauncher {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractJobLauncher.class);
    public static final String TASK_STATE_STORE_TABLE_SUFFIX = ".tst";
    public static final String JOB_STATE_FILE_NAME = "job.state";
    public static final String WORK_UNIT_FILE_EXTENSION = ".wu";
    public static final String MULTI_WORK_UNIT_FILE_EXTENSION = ".mwu";
    protected final Properties jobProps;
    protected final JobContext jobContext;
    protected final ExecutorService cancellationExecutor;
    protected final Optional<MetricContext> runtimeMetricContext;
    protected final EventSubmitter eventSubmitter;
    protected Optional<JobLock> jobLockOptional = Optional.absent();
    protected final Object cancellationRequest = new Object();
    protected volatile boolean cancellationRequested = false;
    protected final Object cancellationExecution = new Object();
    protected volatile boolean cancellationExecuted = false;
    protected final EventBus eventBus = new EventBus(AbstractJobLauncher.class.getSimpleName());
    private final List<JobListener> mandatoryJobListeners = Lists.newArrayList();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:gobblin/runtime/AbstractJobLauncher$JobListenerAction.class */
    public interface JobListenerAction {
        void apply(JobListener jobListener, JobContext jobContext) throws Exception;
    }

    public AbstractJobLauncher(Properties properties, List<? extends Tag<?>> list) throws Exception {
        Preconditions.checkArgument(properties.containsKey("job.name"), "A job must have a job name specified by job.name");
        this.jobProps = new Properties();
        this.jobProps.putAll(properties);
        this.jobContext = new JobContext(this.jobProps, LOG);
        this.eventBus.register(this.jobContext);
        this.cancellationExecutor = Executors.newSingleThreadExecutor(ExecutorsUtils.newThreadFactory(Optional.of(LOG), Optional.of("CancellationExecutor")));
        this.runtimeMetricContext = this.jobContext.getJobMetricsOptional().transform(new Function<JobMetrics, MetricContext>() { // from class: gobblin.runtime.AbstractJobLauncher.1
            public MetricContext apply(JobMetrics jobMetrics) {
                return jobMetrics.getMetricContext();
            }
        });
        List<Tag<?>> addClusterNameTags = addClusterNameTags(list);
        this.eventSubmitter = buildEventSubmitter(addClusterNameTags);
        GobblinMetrics.addCustomTagToState(this.jobContext.getJobState(), addClusterNameTags);
        this.mandatoryJobListeners.add(new JobExecutionEventSubmitterListener(new JobExecutionEventSubmitter(this.eventSubmitter)));
    }

    @Override // gobblin.runtime.JobLauncher
    public void cancelJob(JobListener jobListener) throws JobException {
        synchronized (this.cancellationRequest) {
            if (this.cancellationRequested) {
                return;
            }
            this.cancellationRequested = true;
            this.cancellationRequest.notify();
            synchronized (this.cancellationExecution) {
                while (!this.cancellationExecuted) {
                    try {
                        this.cancellationExecution.wait();
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
                notifyListeners(this.jobContext, jobListener, "JobCancelTimer", new JobListenerAction() { // from class: gobblin.runtime.AbstractJobLauncher.2
                    @Override // gobblin.runtime.AbstractJobLauncher.JobListenerAction
                    public void apply(JobListener jobListener2, JobContext jobContext) throws Exception {
                        jobListener2.onJobCancellation(jobContext);
                    }
                });
            }
        }
    }

    /* JADX WARN: Finally extract failed */
    @Override // gobblin.runtime.JobLauncher
    public void launchJob(JobListener jobListener) throws JobException {
        Closer create;
        Throwable th;
        Optional fromNullable;
        String jobId = this.jobContext.getJobId();
        JobState jobState = this.jobContext.getJobState();
        try {
            TimingEvent timingEvent = this.eventSubmitter.getTimingEvent("FullJobExecutionTimer");
            try {
                try {
                    if (!tryLockJob(this.jobProps)) {
                        this.eventSubmitter.submit("LockInUse");
                        throw new JobException(String.format("Previous instance of job %s is still running, skipping this scheduled run", this.jobContext.getJobName()));
                    }
                    try {
                        create = Closer.create();
                        th = null;
                        try {
                            notifyListeners(this.jobContext, jobListener, "JobPrepareTimer", new JobListenerAction() { // from class: gobblin.runtime.AbstractJobLauncher.3
                                @Override // gobblin.runtime.AbstractJobLauncher.JobListenerAction
                                public void apply(JobListener jobListener2, JobContext jobContext) throws Exception {
                                    jobListener2.onJobPrepare(jobContext);
                                }
                            });
                            if (this.jobContext.getSemantics() == DeliverySemantics.EXACTLY_ONCE) {
                                executeUnfinishedCommitSequences(jobState.getJobName());
                            }
                            TimingEvent timingEvent2 = this.eventSubmitter.getTimingEvent("WorkUnitsCreationTimer");
                            fromNullable = Optional.fromNullable(this.jobContext.getSource().getWorkunits(jobState));
                            timingEvent2.stop();
                        } catch (Throwable th2) {
                            if (create != null) {
                                if (0 != 0) {
                                    try {
                                        create.close();
                                    } catch (Throwable th3) {
                                        th.addSuppressed(th3);
                                    }
                                } else {
                                    create.close();
                                }
                            }
                            throw th2;
                        }
                    } catch (Throwable th4) {
                        jobState.setState(JobState.RunningState.FAILED);
                        LOG.error(("Failed to launch and run job " + jobId) + ": " + th4, th4);
                        try {
                            TimingEvent timingEvent3 = this.eventSubmitter.getTimingEvent("JobCleanupTimer");
                            cleanupStagingData(jobState);
                            timingEvent3.stop();
                            this.jobContext.storeJobExecutionInfo();
                            unlockJob();
                        } finally {
                            unlockJob();
                        }
                    }
                    if (!fromNullable.isPresent()) {
                        this.eventSubmitter.submit("WorkUnitsMissing");
                        jobState.setState(JobState.RunningState.FAILED);
                        throw new JobException("Failed to get work units for job " + jobId);
                    }
                    if (((List) fromNullable.get()).isEmpty()) {
                        this.eventSubmitter.submit("WorkUnitsEmpty");
                        LOG.warn("No work units have been created for job " + jobId);
                        if (create != null) {
                            if (0 != 0) {
                                try {
                                    create.close();
                                } catch (Throwable th5) {
                                    th.addSuppressed(th5);
                                }
                            } else {
                                create.close();
                            }
                        }
                        try {
                            TimingEvent timingEvent4 = this.eventSubmitter.getTimingEvent("JobCleanupTimer");
                            cleanupStagingData(jobState);
                            timingEvent4.stop();
                            this.jobContext.storeJobExecutionInfo();
                            unlockJob();
                            if (this.jobContext.getJobMetricsOptional().isPresent()) {
                                JobMetrics.remove(jobState);
                                return;
                            }
                            return;
                        } finally {
                        }
                    }
                    create.register(WriterInitializerFactory.newInstace(jobState, (Collection) fromNullable.get())).initialize();
                    create.register(ConverterInitializerFactory.newInstance(jobState, (Collection) fromNullable.get())).initialize();
                    TimingEvent timingEvent5 = this.eventSubmitter.getTimingEvent("JobMrStagingDataCleanTimer");
                    cleanLeftoverStagingData((List) fromNullable.get(), jobState);
                    timingEvent5.stop();
                    jobState.setStartTime(System.currentTimeMillis());
                    jobState.setState(JobState.RunningState.RUNNING);
                    try {
                        LOG.info("Starting job " + jobId);
                        notifyListeners(this.jobContext, jobListener, "JobStartTimer", new JobListenerAction() { // from class: gobblin.runtime.AbstractJobLauncher.4
                            @Override // gobblin.runtime.AbstractJobLauncher.JobListenerAction
                            public void apply(JobListener jobListener2, JobContext jobContext) throws Exception {
                                jobListener2.onJobStart(jobContext);
                            }
                        });
                        TimingEvent timingEvent6 = this.eventSubmitter.getTimingEvent("WorkUnitsPreparationTimer");
                        prepareWorkUnits(JobLauncherUtils.flattenWorkUnits((List) fromNullable.get()), jobState);
                        timingEvent6.stop();
                        this.jobContext.storeJobExecutionInfo();
                        TimingEvent timingEvent7 = this.eventSubmitter.getTimingEvent("JobRunTimer");
                        runWorkUnits((List) fromNullable.get());
                        timingEvent7.stop();
                        this.eventSubmitter.submit(CaseFormat.UPPER_UNDERSCORE.to(CaseFormat.UPPER_CAMEL, "JOB_" + jobState.getState()));
                        if (jobState.getState() == JobState.RunningState.CANCELLED) {
                            LOG.info(String.format("Job %s has been cancelled, aborting now", jobId));
                            long currentTimeMillis = System.currentTimeMillis();
                            jobState.setEndTime(currentTimeMillis);
                            jobState.setDuration(currentTimeMillis - jobState.getStartTime());
                            if (create != null) {
                                if (0 != 0) {
                                    try {
                                        create.close();
                                    } catch (Throwable th6) {
                                        th.addSuppressed(th6);
                                    }
                                } else {
                                    create.close();
                                }
                            }
                            try {
                                TimingEvent timingEvent8 = this.eventSubmitter.getTimingEvent("JobCleanupTimer");
                                cleanupStagingData(jobState);
                                timingEvent8.stop();
                                this.jobContext.storeJobExecutionInfo();
                                unlockJob();
                                timingEvent.stop();
                                if (this.jobContext.getJobMetricsOptional().isPresent()) {
                                    JobMetrics.remove(jobState);
                                    return;
                                }
                                return;
                            } finally {
                            }
                        }
                        TimingEvent timingEvent9 = this.eventSubmitter.getTimingEvent("JobCommitTimer");
                        this.jobContext.finalizeJobStateBeforeCommit();
                        this.jobContext.commit();
                        postProcessJobState(jobState);
                        timingEvent9.stop();
                        long currentTimeMillis2 = System.currentTimeMillis();
                        jobState.setEndTime(currentTimeMillis2);
                        jobState.setDuration(currentTimeMillis2 - jobState.getStartTime());
                        if (create != null) {
                            if (0 != 0) {
                                try {
                                    create.close();
                                } catch (Throwable th7) {
                                    th.addSuppressed(th7);
                                }
                            } else {
                                create.close();
                            }
                        }
                        try {
                            TimingEvent timingEvent10 = this.eventSubmitter.getTimingEvent("JobCleanupTimer");
                            cleanupStagingData(jobState);
                            timingEvent10.stop();
                            this.jobContext.storeJobExecutionInfo();
                            unlockJob();
                            timingEvent.stop();
                            Iterator<JobState.DatasetState> it = this.jobContext.getDatasetStatesByUrns().values().iterator();
                            while (true) {
                                if (!it.hasNext()) {
                                    break;
                                } else if (it.next().getState() == JobState.RunningState.FAILED) {
                                    jobState.setState(JobState.RunningState.FAILED);
                                    break;
                                }
                            }
                            notifyListeners(this.jobContext, jobListener, "JobCompleteTimer", new JobListenerAction() { // from class: gobblin.runtime.AbstractJobLauncher.5
                                @Override // gobblin.runtime.AbstractJobLauncher.JobListenerAction
                                public void apply(JobListener jobListener2, JobContext jobContext) throws Exception {
                                    jobListener2.onJobCompletion(jobContext);
                                }
                            });
                            if (jobState.getState() == JobState.RunningState.FAILED) {
                                notifyListeners(this.jobContext, jobListener, "JobFailedTimer", new JobListenerAction() { // from class: gobblin.runtime.AbstractJobLauncher.6
                                    @Override // gobblin.runtime.AbstractJobLauncher.JobListenerAction
                                    public void apply(JobListener jobListener2, JobContext jobContext) throws Exception {
                                        jobListener2.onJobFailure(jobContext);
                                    }
                                });
                                throw new JobException(String.format("Job %s failed", jobId));
                            }
                        } finally {
                            unlockJob();
                        }
                    } catch (Throwable th8) {
                        long currentTimeMillis3 = System.currentTimeMillis();
                        jobState.setEndTime(currentTimeMillis3);
                        jobState.setDuration(currentTimeMillis3 - jobState.getStartTime());
                        throw th8;
                    }
                } catch (Throwable th9) {
                    try {
                        TimingEvent timingEvent11 = this.eventSubmitter.getTimingEvent("JobCleanupTimer");
                        cleanupStagingData(jobState);
                        timingEvent11.stop();
                        this.jobContext.storeJobExecutionInfo();
                        unlockJob();
                        throw th9;
                    } finally {
                        unlockJob();
                    }
                }
            } finally {
                timingEvent.stop();
            }
        } finally {
            if (this.jobContext.getJobMetricsOptional().isPresent()) {
                JobMetrics.remove(jobState);
            }
        }
    }

    private void executeUnfinishedCommitSequences(String str) throws IOException {
        Preconditions.checkState(this.jobContext.getCommitSequenceStore().isPresent());
        CommitSequenceStore commitSequenceStore = (CommitSequenceStore) this.jobContext.getCommitSequenceStore().get();
        for (String str2 : commitSequenceStore.get(str)) {
            Optional optional = commitSequenceStore.get(str, str2);
            if (optional.isPresent()) {
                ((CommitSequence) optional.get()).execute();
            }
            commitSequenceStore.delete(str, str2);
        }
    }

    @Deprecated
    protected void postProcessTaskStates(List<TaskState> list) {
    }

    protected void postProcessJobState(JobState jobState) {
        postProcessTaskStates(jobState.getTaskStates());
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.cancellationExecutor.shutdownNow();
        try {
            this.jobContext.getSource().shutdown(this.jobContext.getJobState());
        } finally {
            if (GobblinMetrics.isEnabled(this.jobProps)) {
                GobblinMetricsRegistry.getInstance().remove(this.jobContext.getJobId());
            }
        }
    }

    protected abstract void runWorkUnits(List<WorkUnit> list) throws Exception;

    protected JobLock getJobLock(Properties properties, JobLockEventListener jobLockEventListener) throws JobLockException {
        return JobLockFactory.getJobLock(properties, jobLockEventListener);
    }

    protected abstract void executeCancellation();

    /* JADX INFO: Access modifiers changed from: protected */
    public void startCancellationExecutor() {
        this.cancellationExecutor.execute(new Runnable() { // from class: gobblin.runtime.AbstractJobLauncher.7
            @Override // java.lang.Runnable
            public void run() {
                synchronized (AbstractJobLauncher.this.cancellationRequest) {
                    while (!AbstractJobLauncher.this.cancellationRequested) {
                        try {
                            AbstractJobLauncher.this.cancellationRequest.wait();
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        }
                    }
                    AbstractJobLauncher.LOG.info("Cancellation has been requested for job " + AbstractJobLauncher.this.jobContext.getJobId());
                    AbstractJobLauncher.this.executeCancellation();
                    AbstractJobLauncher.LOG.info("Cancellation has been executed for job " + AbstractJobLauncher.this.jobContext.getJobId());
                }
                synchronized (AbstractJobLauncher.this.cancellationExecution) {
                    AbstractJobLauncher.this.cancellationExecuted = true;
                    AbstractJobLauncher.this.jobContext.getJobState().setState(JobState.RunningState.CANCELLED);
                    AbstractJobLauncher.this.cancellationExecution.notifyAll();
                }
            }
        });
    }

    private void prepareWorkUnits(List<WorkUnit> list, JobState jobState) {
        int i = 0;
        for (WorkUnit workUnit : list) {
            workUnit.setProp("job.id", this.jobContext.getJobId());
            int i2 = i;
            i++;
            String newTaskId = JobLauncherUtils.newTaskId(this.jobContext.getJobId(), i2);
            workUnit.setId(newTaskId);
            workUnit.setProp("task.id", newTaskId);
            jobState.incrementTaskCount();
            jobState.addTaskState(new TaskState(new WorkUnitState(workUnit, jobState)));
        }
    }

    private boolean tryLockJob(Properties properties) {
        try {
            if (this.jobContext.isJobLockEnabled()) {
                this.jobLockOptional = Optional.of(getJobLock(properties, new JobLockEventListener() { // from class: gobblin.runtime.AbstractJobLauncher.8
                    @Override // gobblin.runtime.locks.JobLockEventListener
                    public void onLost() {
                        AbstractJobLauncher.this.executeCancellation();
                    }
                }));
            }
            if (this.jobLockOptional.isPresent()) {
                if (!((JobLock) this.jobLockOptional.get()).tryLock()) {
                    return false;
                }
            }
            return true;
        } catch (JobLockException e) {
            LOG.error(String.format("Failed to acquire job lock for job %s: %s", this.jobContext.getJobId(), e), e);
            return false;
        }
    }

    private void unlockJob() {
        try {
            if (this.jobLockOptional.isPresent()) {
                try {
                    ((JobLock) this.jobLockOptional.get()).unlock();
                    try {
                        ((JobLock) this.jobLockOptional.get()).close();
                    } catch (IOException e) {
                        LOG.error(String.format("Failed to close job lock for job %s: %s", this.jobContext.getJobId(), e), e);
                    }
                } catch (JobLockException e2) {
                    LOG.error(String.format("Failed to unlock for job %s: %s", this.jobContext.getJobId(), e2), e2);
                    try {
                        ((JobLock) this.jobLockOptional.get()).close();
                    } catch (IOException e3) {
                        LOG.error(String.format("Failed to close job lock for job %s: %s", this.jobContext.getJobId(), e3), e3);
                    }
                }
            }
        } catch (Throwable th) {
            try {
                ((JobLock) this.jobLockOptional.get()).close();
            } catch (IOException e4) {
                LOG.error(String.format("Failed to close job lock for job %s: %s", this.jobContext.getJobId(), e4), e4);
            }
            throw th;
        }
    }

    private CloseableJobListener getParallelCombinedJobListener(JobState jobState, JobListener jobListener) {
        ArrayList newArrayList = Lists.newArrayList(this.mandatoryJobListeners);
        newArrayList.add(jobListener);
        for (String str : jobState.getPropAsSet("job.listeners", "")) {
            try {
                newArrayList.add(Class.forName(str).newInstance());
            } catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {
                LOG.warn(String.format("JobListener could not be created due to %s", str), e);
            }
        }
        return JobListeners.parallelJobListener(newArrayList);
    }

    private static List<Tag<?>> addClusterNameTags(List<? extends Tag<?>> list) {
        return ImmutableList.builder().addAll(list).addAll(Tag.fromMap(ClusterNameTags.getClusterNameTags())).build();
    }

    private EventSubmitter buildEventSubmitter(List<? extends Tag<?>> list) {
        return new EventSubmitter.Builder(this.runtimeMetricContext, "gobblin.runtime").addMetadata(Tag.toMap(Tag.tagValuesToString(list))).build();
    }

    public static void runWorkUnits(String str, String str2, JobState jobState, List<WorkUnit> list, TaskStateTracker taskStateTracker, TaskExecutor taskExecutor, StateStore<TaskState> stateStore, Logger logger) throws IOException, InterruptedException {
        if (list.isEmpty()) {
            logger.warn("No work units to run in container " + str2);
            return;
        }
        Iterator<WorkUnit> it = list.iterator();
        while (it.hasNext()) {
            String prop = it.next().getProp("task.id");
            if (stateStore.exists(str, prop + TASK_STATE_STORE_TABLE_SUFFIX)) {
                stateStore.delete(str, prop + TASK_STATE_STORE_TABLE_SUFFIX);
            }
        }
        CountDownLatch countDownLatch = new CountDownLatch(list.size());
        List<Task> runWorkUnits = runWorkUnits(str, jobState, list, taskStateTracker, taskExecutor, countDownLatch);
        logger.info(String.format("Waiting for submitted tasks of job %s to complete in container %s...", str, str2));
        while (countDownLatch.getCount() > 0) {
            logger.info(String.format("%d out of %d tasks of job %s are running in container %s", Long.valueOf(countDownLatch.getCount()), Integer.valueOf(list.size()), str, str2));
            if (countDownLatch.await(10L, TimeUnit.SECONDS)) {
                break;
            }
        }
        logger.info(String.format("All assigned tasks of job %s have completed in container %s", str, str2));
        boolean z = false;
        for (Task task : runWorkUnits) {
            logger.info("Writing task state for task " + task.getTaskId());
            stateStore.put(task.getJobId(), task.getTaskId() + TASK_STATE_STORE_TABLE_SUFFIX, task.getTaskState());
            if (task.getTaskState().getWorkingState() == WorkUnitState.WorkingState.FAILED) {
                z = true;
            }
        }
        if (z) {
            for (Task task2 : runWorkUnits) {
                if (task2.getTaskState().contains("task.failure.exception")) {
                    logger.error(String.format("Task %s failed due to exception: %s", task2.getTaskId(), task2.getTaskState().getProp("task.failure.exception")));
                }
            }
            throw new IOException(String.format("Not all tasks running in container %s completed successfully", str2));
        }
    }

    public static List<Task> runWorkUnits(String str, JobState jobState, List<WorkUnit> list, TaskStateTracker taskStateTracker, TaskExecutor taskExecutor, CountDownLatch countDownLatch) {
        ArrayList newArrayList = Lists.newArrayList();
        for (WorkUnit workUnit : list) {
            String prop = workUnit.getProp("task.id");
            WorkUnitState workUnitState = new WorkUnitState(workUnit, jobState);
            workUnitState.setId(prop);
            workUnitState.setProp("job.id", str);
            workUnitState.setProp("task.id", prop);
            Task task = new Task(new TaskContext(workUnitState), taskStateTracker, taskExecutor, Optional.of(countDownLatch));
            taskStateTracker.registerNewTask(task);
            newArrayList.add(task);
            taskExecutor.execute(task);
        }
        new EventSubmitter.Builder(JobMetrics.get(str).getMetricContext(), "gobblin.runtime").build().submit("TasksSubmitted", new String[]{"tasksCount", Integer.toString(list.size())});
        return newArrayList;
    }

    /* JADX WARN: Finally extract failed */
    private void cleanLeftoverStagingData(List<WorkUnit> list, JobState jobState) throws JobException {
        if (jobState.getPropAsBoolean("cleanup.staging.data.by.initializer", false)) {
            return;
        }
        try {
            if (!canCleanStagingData(jobState)) {
                LOG.error("Job " + jobState.getJobName() + " has unfinished commit sequences. Will not clean up staging data.");
                return;
            }
            try {
                if (this.jobContext.shouldCleanupStagingDataPerTask()) {
                    Closer create = Closer.create();
                    HashMap newHashMap = Maps.newHashMap();
                    try {
                        try {
                            Iterator it = JobLauncherUtils.flattenWorkUnits(list).iterator();
                            while (it.hasNext()) {
                                JobLauncherUtils.cleanTaskStagingData(new WorkUnitState((WorkUnit) it.next(), jobState), LOG, create, newHashMap);
                            }
                            create.close();
                        } catch (Throwable th) {
                            throw create.rethrow(th);
                        }
                    } catch (Throwable th2) {
                        create.close();
                        throw th2;
                    }
                } else {
                    JobLauncherUtils.cleanJobStagingData(jobState, LOG);
                }
            } catch (Throwable th3) {
                LOG.error("Failed to clean leftover staging data", th3);
            }
        } catch (IOException e) {
            throw new JobException("Failed to check unfinished commit sequences", e);
        }
    }

    private void cleanupStagingData(JobState jobState) throws JobException {
        if (jobState.getPropAsBoolean("cleanup.staging.data.by.initializer", false)) {
            return;
        }
        try {
            if (!canCleanStagingData(jobState)) {
                LOG.error("Job " + jobState.getJobName() + " has unfinished commit sequences. Will not clean up staging data.");
            } else if (this.jobContext.shouldCleanupStagingDataPerTask()) {
                cleanupStagingDataPerTask(jobState);
            } else {
                cleanupStagingDataForEntireJob(jobState);
            }
        } catch (IOException e) {
            throw new JobException("Failed to check unfinished commit sequences", e);
        }
    }

    private boolean canCleanStagingData(JobState jobState) throws IOException {
        return (this.jobContext.getSemantics() == DeliverySemantics.EXACTLY_ONCE && ((CommitSequenceStore) this.jobContext.getCommitSequenceStore().get()).exists(jobState.getJobName())) ? false : true;
    }

    private static void cleanupStagingDataPerTask(JobState jobState) {
        Closer create = Closer.create();
        HashMap newHashMap = Maps.newHashMap();
        try {
            for (TaskState taskState : jobState.getTaskStates()) {
                try {
                    JobLauncherUtils.cleanTaskStagingData(taskState, LOG, create, newHashMap);
                } catch (IOException e) {
                    LOG.error(String.format("Failed to clean staging data for task %s: %s", taskState.getTaskId(), e), e);
                }
            }
        } finally {
            try {
                create.close();
            } catch (IOException e2) {
                LOG.error("Failed to clean staging data", e2);
            }
        }
    }

    private static void cleanupStagingDataForEntireJob(JobState jobState) {
        try {
            JobLauncherUtils.cleanJobStagingData(jobState, LOG);
        } catch (IOException e) {
            LOG.error("Failed to clean staging data for job " + jobState.getJobId(), e);
        }
    }

    private void notifyListeners(JobContext jobContext, JobListener jobListener, String str, JobListenerAction jobListenerAction) throws JobException {
        TimingEvent timingEvent = this.eventSubmitter.getTimingEvent(str);
        try {
            try {
                CloseableJobListener parallelCombinedJobListener = getParallelCombinedJobListener(this.jobContext.getJobState(), jobListener);
                Throwable th = null;
                try {
                    try {
                        jobListenerAction.apply(parallelCombinedJobListener, jobContext);
                        if (parallelCombinedJobListener != null) {
                            if (0 != 0) {
                                try {
                                    parallelCombinedJobListener.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                parallelCombinedJobListener.close();
                            }
                        }
                    } finally {
                    }
                } catch (Throwable th3) {
                    if (parallelCombinedJobListener != null) {
                        if (th != null) {
                            try {
                                parallelCombinedJobListener.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            parallelCombinedJobListener.close();
                        }
                    }
                    throw th3;
                }
            } catch (Exception e) {
                throw new JobException("Failed to execute all JobListeners", e);
            }
        } finally {
            timingEvent.stop();
        }
    }
}
