package org.apache.gobblin.service.modules.scheduler;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.lang.StringUtils;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.runtime.JobException;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecCatalogListener;
import org.apache.gobblin.runtime.listeners.JobListener;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
import org.apache.gobblin.scheduler.BaseGobblinJob;
import org.apache.gobblin.scheduler.JobScheduler;
import org.apache.gobblin.scheduler.SchedulerService;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.PropertiesUtils;
import org.apache.helix.HelixManager;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.InterruptableJob;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.UnableToInterruptJobException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Alpha
/* loaded from: input_file:org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.class */
public class GobblinServiceJobScheduler extends JobScheduler implements SpecCatalogListener {
    protected final Logger _log;
    protected final Optional<FlowCatalog> flowCatalog;
    protected final Optional<HelixManager> helixManager;
    protected final Orchestrator orchestrator;
    protected final Map<String, Spec> scheduledFlowSpecs;
    private volatile boolean isActive;
    private String serviceName;

    @DisallowConcurrentExecution
    /* loaded from: input_file:org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler$GobblinServiceJob.class */
    public static class GobblinServiceJob extends BaseGobblinJob implements InterruptableJob {
        private static final Logger log = LoggerFactory.getLogger(GobblinServiceJob.class);
        private static final Logger _log = LoggerFactory.getLogger(GobblinServiceJob.class);

        public void executeImpl(JobExecutionContext jobExecutionContext) throws JobExecutionException {
            _log.info("Starting FlowSpec " + jobExecutionContext.getJobDetail().getKey());
            JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap();
            try {
                ((JobScheduler) jobDataMap.get("jobScheduler")).runJob((Properties) jobDataMap.get("jobProps"), (JobListener) jobDataMap.get("jobListener"));
            } catch (Throwable th) {
                throw new JobExecutionException(th);
            }
        }

        public void interrupt() throws UnableToInterruptJobException {
            log.info("Job was interrupted");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler$NonScheduledJobRunner.class */
    public class NonScheduledJobRunner implements Runnable {
        private final URI specUri;
        private final Properties jobConfig;
        private final JobListener jobListener;
        private final boolean removeSpec;

        public NonScheduledJobRunner(URI uri, boolean z, Properties properties, JobListener jobListener) {
            this.specUri = uri;
            this.jobConfig = properties;
            this.jobListener = jobListener;
            this.removeSpec = z;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                GobblinServiceJobScheduler.this.runJob(this.jobConfig, this.jobListener);
                if (GobblinServiceJobScheduler.this.flowCatalog.isPresent() && this.removeSpec) {
                    ((FlowCatalog) GobblinServiceJobScheduler.this.flowCatalog.get()).remove(this.specUri, new Properties(), false);
                }
            } catch (JobException e) {
                GobblinServiceJobScheduler.this._log.error("Failed to run job " + this.jobConfig.getProperty("job.name"), e);
            }
        }
    }

    public GobblinServiceJobScheduler(String str, Config config, Optional<HelixManager> optional, Optional<FlowCatalog> optional2, Optional<TopologyCatalog> optional3, Orchestrator orchestrator, SchedulerService schedulerService, Optional<Logger> optional4) throws Exception {
        super(ConfigUtils.configToProperties(config), schedulerService);
        this._log = optional4.isPresent() ? (Logger) optional4.get() : LoggerFactory.getLogger(getClass());
        this.serviceName = str;
        this.flowCatalog = optional2;
        this.helixManager = optional;
        this.orchestrator = orchestrator;
        this.scheduledFlowSpecs = Maps.newHashMap();
    }

    public GobblinServiceJobScheduler(String str, Config config, Optional<HelixManager> optional, Optional<FlowCatalog> optional2, Optional<TopologyCatalog> optional3, SchedulerService schedulerService, Optional<Logger> optional4) throws Exception {
        this(str, config, optional, optional2, optional3, new Orchestrator(config, optional3, optional4), schedulerService, optional4);
    }

    public synchronized void setActive(boolean z) {
        if (this.isActive == z) {
            return;
        }
        if (!z) {
            for (Spec spec : this.scheduledFlowSpecs.values()) {
                onDeleteSpec(spec.getUri(), spec.getVersion());
            }
            this.isActive = z;
            return;
        }
        this.isActive = z;
        if (this.flowCatalog.isPresent()) {
            for (FlowSpec flowSpec : ((FlowCatalog) this.flowCatalog.get()).getSpecsWithTimeUpdate()) {
                if (flowSpec instanceof FlowSpec) {
                    onAddSpec(disableFlowRunImmediatelyOnStart(flowSpec));
                } else {
                    onAddSpec(flowSpec);
                }
            }
        }
    }

    @VisibleForTesting
    protected static Spec disableFlowRunImmediatelyOnStart(FlowSpec flowSpec) {
        Properties configAsProperties = flowSpec.getConfigAsProperties();
        configAsProperties.setProperty("flow.runImmediately", "false");
        return new FlowSpec(flowSpec.getUri(), flowSpec.getVersion(), flowSpec.getDescription(), ConfigFactory.parseProperties(configAsProperties), configAsProperties, flowSpec.getTemplateURIs(), flowSpec.getChildSpecs());
    }

    protected void startUp() throws Exception {
        super.startUp();
    }

    public synchronized void scheduleJob(Properties properties, JobListener jobListener) throws JobException {
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("gobblin.service.flowSpec", this.scheduledFlowSpecs.get(properties.getProperty("job.name")));
        try {
            scheduleJob(properties, jobListener, newHashMap, GobblinServiceJob.class);
        } catch (Exception e) {
            throw new JobException("Failed to schedule job " + properties.getProperty("job.name"), e);
        }
    }

    public void runJob(Properties properties, JobListener jobListener) throws JobException {
        try {
            this.orchestrator.orchestrate(this.scheduledFlowSpecs.get(properties.getProperty("job.name")));
        } catch (Exception e) {
            throw new JobException("Failed to run Spec: " + properties.getProperty("job.name"), e);
        }
    }

    public void onAddSpec(Spec spec) {
        if (this.helixManager.isPresent() && !((HelixManager) this.helixManager.get()).isConnected()) {
            this._log.info("System not yet initialized. Skipping Spec Addition: " + spec);
            return;
        }
        this._log.info("New Flow Spec detected: " + spec);
        if (spec instanceof FlowSpec) {
            try {
                FlowSpec flowSpec = (FlowSpec) spec;
                Properties properties = new Properties();
                Properties configAsProperties = ((FlowSpec) spec).getConfigAsProperties();
                properties.putAll(this.properties);
                properties.setProperty("job.name", spec.getUri().toString());
                properties.setProperty("job.group", flowSpec.getConfig().getValue("flow.group").toString());
                properties.setProperty("flow.runImmediately", ConfigUtils.getString(flowSpec.getConfig(), "flow.runImmediately", "false"));
                if (configAsProperties.containsKey("job.schedule") && StringUtils.isNotBlank(configAsProperties.getProperty("job.schedule"))) {
                    properties.setProperty("job.schedule", configAsProperties.getProperty("job.schedule"));
                }
                this.scheduledFlowSpecs.put(spec.getUri().toString(), spec);
                if (properties.containsKey("job.schedule")) {
                    this._log.info("{} Scheduling flow spec: {} ", this.serviceName, spec);
                    scheduleJob(properties, null);
                    if (PropertiesUtils.getPropAsBoolean(properties, "flow.runImmediately", "false")) {
                        this._log.info("RunImmediately requested, hence executing FlowSpec: " + spec);
                        this.jobExecutor.execute(new NonScheduledJobRunner(flowSpec.getUri(), false, properties, null));
                    }
                } else {
                    this._log.info("No FlowSpec schedule found, so running FlowSpec: " + spec);
                    this.jobExecutor.execute(new NonScheduledJobRunner(flowSpec.getUri(), true, properties, null));
                }
            } catch (JobException e) {
                this._log.error("{} Failed to schedule or run FlowSpec {}", new Object[]{this.serviceName, spec, e});
            }
        }
    }

    public void onDeleteSpec(URI uri, String str) {
        onDeleteSpec(uri, str, new Properties());
    }

    public void onDeleteSpec(URI uri, String str, Properties properties) {
        if (this.helixManager.isPresent() && !((HelixManager) this.helixManager.get()).isConnected()) {
            this._log.info("System not yet initialized. Skipping Spec Deletion: " + uri);
            return;
        }
        this._log.info("Spec deletion detected: " + uri + "/" + str);
        try {
            Spec spec = this.scheduledFlowSpecs.get(uri.toString());
            if (null != spec) {
                this.orchestrator.remove(spec, properties);
                this.scheduledFlowSpecs.remove(uri.toString());
                unscheduleJob(uri.toString());
            } else {
                this._log.warn(String.format("Spec with URI: %s was not found in cache. May be it was cleaned, if not please clean it manually", uri));
            }
        } catch (JobException e) {
            this._log.warn(String.format("Spec with URI: %s was not unscheduled cleaning", uri), e);
        }
    }

    public void onUpdateSpec(Spec spec) {
        if (this.helixManager.isPresent() && !((HelixManager) this.helixManager.get()).isConnected()) {
            this._log.info("System not yet initialized. Skipping Spec Update: " + spec);
            return;
        }
        this._log.info("Spec changed: " + spec);
        if (spec instanceof FlowSpec) {
            try {
                onDeleteSpec(spec.getUri(), spec.getVersion());
            } catch (Exception e) {
                this._log.error("Failed to update Spec: " + spec, e);
            }
            try {
                onAddSpec(spec);
            } catch (Exception e2) {
                this._log.error("Failed to update Spec: " + spec, e2);
            }
        }
    }

    public Map<String, Spec> getScheduledFlowSpecs() {
        return this.scheduledFlowSpecs;
    }

    public boolean isActive() {
        return this.isActive;
    }
}
