package gobblin.scheduler;

import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.Closer;
import com.google.common.util.concurrent.AbstractIdleService;
import gobblin.configuration.ConfigurationKeys;
import gobblin.runtime.JobException;
import gobblin.runtime.JobLauncher;
import gobblin.runtime.JobLauncherFactory;
import gobblin.runtime.listeners.EmailNotificationJobListener;
import gobblin.runtime.listeners.JobListener;
import gobblin.runtime.listeners.RunOnceJobListener;
import gobblin.util.ExecutorsUtils;
import gobblin.util.JobLauncherUtils;
import gobblin.util.SchedulerUtils;
import gobblin.util.filesystem.PathAlterationDetector;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.hadoop.fs.Path;
import org.quartz.CronScheduleBuilder;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.Job;
import org.quartz.JobBuilder;
import org.quartz.JobDataMap;
import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.JobKey;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:gobblin/scheduler/JobScheduler.class */
public class JobScheduler extends AbstractIdleService {
    private static final Logger LOG = LoggerFactory.getLogger(JobScheduler.class);
    public static final String JOB_SCHEDULER_KEY = "jobScheduler";
    public static final String PROPERTIES_KEY = "jobProps";
    public static final String JOB_LISTENER_KEY = "jobListener";
    public final Properties properties;
    private final SchedulerService scheduler;
    protected final ExecutorService jobExecutor;
    private final Map<String, JobListener> jobListenerMap = Maps.newHashMap();
    private final Map<String, JobKey> scheduledJobs = Maps.newHashMap();
    public final Set<String> jobConfigFileExtensions;
    public final Path jobConfigFileDirPath;
    public final PathAlterationDetector pathAlterationDetector;
    public final PathAlterationListenerAdaptorForMonitor listener;
    private final boolean waitForJobCompletion;

    /* loaded from: input_file:gobblin/scheduler/JobScheduler$Action.class */
    public enum Action {
        SCHEDULE,
        RESCHEDULE,
        UNSCHEDULE
    }

    @DisallowConcurrentExecution
    /* loaded from: input_file:gobblin/scheduler/JobScheduler$GobblinJob.class */
    public static class GobblinJob implements Job {
        public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
            JobScheduler.LOG.info("Starting job " + jobExecutionContext.getJobDetail().getKey());
            JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap();
            try {
                ((JobScheduler) jobDataMap.get(JobScheduler.JOB_SCHEDULER_KEY)).runJob((Properties) jobDataMap.get(JobScheduler.PROPERTIES_KEY), (JobListener) jobDataMap.get(JobScheduler.JOB_LISTENER_KEY));
            } catch (Throwable th) {
                throw new JobExecutionException(th);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:gobblin/scheduler/JobScheduler$NonScheduledJobRunner.class */
    public class NonScheduledJobRunner implements Runnable {
        private final Properties jobProps;
        private final JobListener jobListener;

        public NonScheduledJobRunner(Properties properties, JobListener jobListener) {
            this.jobProps = properties;
            this.jobListener = jobListener;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                JobScheduler.this.runJob(this.jobProps, this.jobListener);
            } catch (JobException e) {
                JobScheduler.LOG.error("Failed to run job " + this.jobProps.getProperty("job.name"), e);
            }
        }
    }

    public JobScheduler(Properties properties, SchedulerService schedulerService) throws Exception {
        this.properties = properties;
        this.scheduler = schedulerService;
        this.jobExecutor = Executors.newFixedThreadPool(Integer.parseInt(properties.getProperty("jobexecutor.threadpool.size", Integer.toString(5))), ExecutorsUtils.newThreadFactory(Optional.of(LOG), Optional.of("JobScheduler-%d")));
        this.jobConfigFileExtensions = Sets.newHashSet(Splitter.on(",").omitEmptyStrings().split(this.properties.getProperty("jobconf.extensions", "pull,job")));
        this.pathAlterationDetector = new PathAlterationDetector(Long.parseLong(this.properties.getProperty("jobconf.monitor.interval", Long.toString(300000L))));
        this.waitForJobCompletion = Boolean.parseBoolean(this.properties.getProperty("scheduler.wait.for.job.completion", ConfigurationKeys.DEFAULT_SCHEDULER_WAIT_FOR_JOB_COMPLETION));
        if (this.properties.containsKey("jobconf.fullyQualifiedPath")) {
            this.jobConfigFileDirPath = new Path(this.properties.getProperty("jobconf.fullyQualifiedPath"));
            this.listener = new PathAlterationListenerAdaptorForMonitor(this.jobConfigFileDirPath, this);
        } else {
            this.jobConfigFileDirPath = null;
            this.listener = null;
        }
    }

    protected void startUp() throws Exception {
        LOG.info("Starting the job scheduler");
        Preconditions.checkState(this.listener != null, String.format("Property %s was not defined.", "jobconf.fullyQualifiedPath"));
        try {
            this.scheduler.awaitRunning(1L, TimeUnit.SECONDS);
            if (this.properties.containsKey("jobconf.dir")) {
                Preconditions.checkArgument(this.properties.containsKey("jobconf.dir") || this.properties.containsKey("jobconf.fullyQualifiedPath"), "Error in configuration file: Please check your .pull file");
                if (this.properties.containsKey("jobconf.dir") && !this.properties.containsKey("jobconf.fullyQualifiedPath")) {
                    this.properties.setProperty("jobconf.fullyQualifiedPath", "file://" + this.properties.getProperty("jobconf.dir"));
                }
                startGeneralJobConfigFileMonitor();
                scheduleGeneralConfiguredJobs();
            }
        } catch (IllegalStateException | TimeoutException e) {
            throw new IllegalStateException("Scheduler service is not running.");
        }
    }

    protected void shutDown() throws Exception {
        LOG.info("Stopping the job scheduler");
        if (this.properties.containsKey("jobconf.fullyQualifiedPath") || this.properties.containsKey("jobconf.dir")) {
            this.pathAlterationDetector.stop(1000L);
        }
        ExecutorsUtils.shutdownExecutorService(this.jobExecutor, Optional.of(LOG));
    }

    public void scheduleJob(Properties properties, JobListener jobListener) throws JobException {
        try {
            scheduleJob(properties, jobListener, Maps.newHashMap(), GobblinJob.class);
        } catch (JobException | RuntimeException e) {
            LOG.error("Could not schedule job " + properties.getProperty("job.name", "Unknown job"), e);
        }
    }

    public void scheduleJob(Properties properties, JobListener jobListener, Map<String, Object> map, Class<? extends Job> cls) throws JobException {
        Preconditions.checkArgument(properties.containsKey("job.name"), "A job must have a job name specified by job.name");
        String property = properties.getProperty("job.name");
        if (this.scheduledJobs.containsKey(property)) {
            LOG.warn("Job " + property + " has already been scheduled");
            return;
        }
        if (Boolean.valueOf(properties.getProperty("job.disabled", "false")).booleanValue()) {
            LOG.info("Skipping disabled job " + property);
            return;
        }
        if (!properties.containsKey("job.schedule")) {
            properties.setProperty("job.runonce", "true");
            this.jobExecutor.execute(new NonScheduledJobRunner(properties, jobListener));
            return;
        }
        if (jobListener != null) {
            this.jobListenerMap.put(property, jobListener);
        }
        JobDataMap jobDataMap = new JobDataMap();
        jobDataMap.put(JOB_SCHEDULER_KEY, this);
        jobDataMap.put(PROPERTIES_KEY, properties);
        jobDataMap.put(JOB_LISTENER_KEY, jobListener);
        jobDataMap.putAll(map);
        JobDetail build = JobBuilder.newJob(cls).withIdentity(property, Strings.nullToEmpty(properties.getProperty("job.group"))).withDescription(Strings.nullToEmpty(properties.getProperty("job.description"))).usingJobData(jobDataMap).build();
        try {
            Trigger trigger = getTrigger(build.getKey(), properties);
            this.scheduler.getScheduler().scheduleJob(build, trigger);
            LOG.info(String.format("Scheduled job %s. Next run: %s.", build.getKey(), trigger.getNextFireTime()));
            this.scheduledJobs.put(property, build.getKey());
        } catch (SchedulerException e) {
            LOG.error("Failed to schedule job " + property, e);
            throw new JobException("Failed to schedule job " + property, e);
        }
    }

    public void unscheduleJob(String str) throws JobException {
        if (this.scheduledJobs.containsKey(str)) {
            try {
                this.scheduler.getScheduler().deleteJob(this.scheduledJobs.remove(str));
            } catch (SchedulerException e) {
                LOG.error("Failed to unschedule and delete job " + str, e);
                throw new JobException("Failed to unschedule and delete job " + str, e);
            }
        }
    }

    public void runJob(Properties properties, JobListener jobListener) throws JobException {
        try {
            runJob(properties, jobListener, JobLauncherFactory.newJobLauncher(this.properties, properties));
        } catch (Exception e) {
            throw new JobException("Failed to run job " + properties.getProperty("job.name"), e);
        }
    }

    public void runJob(Properties properties, JobListener jobListener, JobLauncher jobLauncher) throws JobException {
        Preconditions.checkArgument(properties.containsKey("job.name"), "A job must have a job name specified by job.name");
        String property = properties.getProperty("job.name");
        if (Boolean.valueOf(properties.getProperty("job.disabled", "false")).booleanValue()) {
            LOG.info("Skipping disabled job " + property);
            return;
        }
        properties.setProperty("job.id", JobLauncherUtils.newJobId(property));
        try {
            Closer create = Closer.create();
            Throwable th = null;
            try {
                try {
                    ((JobLauncher) create.register(jobLauncher)).launchJob(jobListener);
                    if (Boolean.valueOf(properties.getProperty("job.runonce", "false")).booleanValue() && this.scheduledJobs.containsKey(property)) {
                        this.scheduler.getScheduler().deleteJob(this.scheduledJobs.remove(property));
                    }
                    if (create != null) {
                        if (0 != 0) {
                            try {
                                create.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            create.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        } catch (Throwable th3) {
            throw new JobException("Failed to launch and run job " + property, th3);
        }
    }

    public Collection<String> getScheduledJobs() {
        return this.scheduledJobs.keySet();
    }

    private void scheduleGeneralConfiguredJobs() throws ConfigurationException, JobException, IOException {
        LOG.info("Scheduling configured jobs");
        for (Properties properties : loadGeneralJobConfigs()) {
            scheduleJob(properties, Boolean.valueOf(properties.getProperty("job.runonce", "false")).booleanValue() ? new RunOnceJobListener() : new EmailNotificationJobListener());
            this.listener.addToJobNameMap(properties);
        }
    }

    private List<Properties> loadGeneralJobConfigs() throws ConfigurationException, IOException {
        List<Properties> loadGenericJobConfigs = SchedulerUtils.loadGenericJobConfigs(this.properties);
        LOG.info(String.format("Loaded %d job configurations", Integer.valueOf(loadGenericJobConfigs.size())));
        return loadGenericJobConfigs;
    }

    private void startGeneralJobConfigFileMonitor() throws Exception {
        SchedulerUtils.addPathAlterationObserver(this.pathAlterationDetector, this.listener, this.jobConfigFileDirPath);
        this.pathAlterationDetector.start();
    }

    private Trigger getTrigger(JobKey jobKey, Properties properties) {
        return TriggerBuilder.newTrigger().withIdentity(properties.getProperty("job.name"), Strings.nullToEmpty(properties.getProperty("job.group"))).forJob(jobKey).withSchedule(CronScheduleBuilder.cronSchedule(properties.getProperty("job.schedule"))).build();
    }
}
