package org.apache.gobblin.service.modules.scheduler;

import com.codahale.metrics.MetricFilter;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.io.IOException;
import java.net.URI;
import java.text.ParseException;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.TimeZone;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;
import org.apache.commons.lang.StringUtils;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.ContextAwareGauge;
import org.apache.gobblin.metrics.ContextAwareMeter;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.runtime.JobException;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecCatalogListener;
import org.apache.gobblin.runtime.listeners.JobListener;
import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
import org.apache.gobblin.scheduler.BaseGobblinJob;
import org.apache.gobblin.scheduler.JobScheduler;
import org.apache.gobblin.scheduler.SchedulerService;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.PropertiesUtils;
import org.apache.helix.HelixManager;
import org.quartz.CronExpression;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.InterruptableJob;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.SchedulerException;
import org.quartz.UnableToInterruptJobException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
@Alpha
/* loaded from: input_file:org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.class */
public class GobblinServiceJobScheduler extends JobScheduler implements SpecCatalogListener {
    public static final String GOBBLIN_SERVICE_SCHEDULER_DR_NOMINATED = "gobblin.service.drNominatedInstance";
    protected final Logger _log;
    protected final Optional<FlowCatalog> flowCatalog;
    protected final Optional<HelixManager> helixManager;
    protected final Orchestrator orchestrator;
    protected final Boolean warmStandbyEnabled;
    protected final Optional<UserQuotaManager> quotaManager;
    protected final Map<String, Spec> scheduledFlowSpecs;
    protected final Map<String, Long> lastUpdatedTimeForFlowSpec;
    protected volatile int loadSpecsBatchSize;
    protected int skipSchedulingFlowsAfterNumDays;
    private volatile boolean isActive;
    private String serviceName;
    private volatile Long perSpecGetRateValue;
    private volatile Long timeToInitializeSchedulerValue;
    private volatile Long timeToObtainSpecUrisValue;
    private volatile Long individualGetSpecSpeedValue;
    private volatile Long eachCompleteAddSpecValue;
    private volatile Long eachSpecCompilationValue;
    private volatile Long eachScheduleJobValue;
    private volatile Long totalGetSpecTimeValue;
    private volatile Long totalAddSpecTimeValue;
    private volatile int numJobsScheduledDuringStartupValue;
    private final ContextAwareGauge getSpecsPerSpecRateNanos;
    private final ContextAwareGauge batchSize;
    private final ContextAwareGauge timeToInitalizeSchedulerNanos;
    private final ContextAwareGauge timeToObtainSpecUrisNanos;
    private final ContextAwareGauge individualGetSpecSpeedNanos;
    private final ContextAwareGauge eachCompleteAddSpecNanos;
    private final ContextAwareGauge eachSpecCompilationNanos;
    private final ContextAwareGauge eachScheduleJobNanos;
    private final ContextAwareGauge totalGetSpecTimeNanos;
    private final ContextAwareGauge totalAddSpecTimeNanos;
    private final ContextAwareGauge numJobsScheduledDuringStartup;
    private static final MetricContext metricContext = Instrumented.getMetricContext(new State(), GobblinServiceJobScheduler.class);
    private static final ContextAwareMeter scheduledFlows = metricContext.contextAwareMeter("GobblinService.ScheduledFlows");
    private static final ContextAwareMeter nonScheduledFlows = metricContext.contextAwareMeter("GobblinService.NonScheduledFlows");
    private boolean isNominatedDRHandler;
    public static final String DR_FILTER_TAG = "dr";

    @DisallowConcurrentExecution
    /* loaded from: input_file:org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler$GobblinServiceJob.class */
    public static class GobblinServiceJob extends BaseGobblinJob implements InterruptableJob {
        private static final Logger log = LoggerFactory.getLogger(GobblinServiceJob.class);
        private static final Logger _log = LoggerFactory.getLogger(GobblinServiceJob.class);

        public void executeImpl(JobExecutionContext jobExecutionContext) throws JobExecutionException {
            JobExecutionException jobExecutionException;
            _log.info("Starting FlowSpec " + jobExecutionContext.getJobDetail().getKey());
            JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap();
            try {
                try {
                    ((JobScheduler) jobDataMap.get("jobScheduler")).runJob((Properties) jobDataMap.get("jobProps"), (JobListener) jobDataMap.get("jobListener"));
                    GobblinServiceJobScheduler.scheduledFlows.mark();
                } finally {
                }
            } catch (Throwable th) {
                GobblinServiceJobScheduler.scheduledFlows.mark();
                throw th;
            }
        }

        public void interrupt() throws UnableToInterruptJobException {
            log.info("Job was interrupted");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler$NonScheduledJobRunner.class */
    public class NonScheduledJobRunner implements Runnable {
        private final URI specUri;
        private final Properties jobConfig;
        private final JobListener jobListener;
        private final boolean removeSpec;

        public NonScheduledJobRunner(URI uri, boolean z, Properties properties, JobListener jobListener) {
            this.specUri = uri;
            this.jobConfig = properties;
            this.jobListener = jobListener;
            this.removeSpec = z;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                try {
                    try {
                        GobblinServiceJobScheduler.this.runJob(this.jobConfig, this.jobListener);
                        if (GobblinServiceJobScheduler.this.flowCatalog.isPresent() && this.removeSpec) {
                            Object syncObject = ((FlowCatalog) GobblinServiceJobScheduler.this.flowCatalog.get()).getSyncObject(this.specUri.toString());
                            if (syncObject != null) {
                                synchronized (syncObject) {
                                    while (!((FlowCatalog) GobblinServiceJobScheduler.this.flowCatalog.get()).exists(this.specUri)) {
                                        syncObject.wait();
                                    }
                                }
                            }
                            ((FlowCatalog) GobblinServiceJobScheduler.this.flowCatalog.get()).remove(this.specUri, new Properties(), false);
                            GobblinServiceJobScheduler.this.scheduledFlowSpecs.remove(this.specUri.toString());
                            GobblinServiceJobScheduler.this.lastUpdatedTimeForFlowSpec.remove(this.specUri.toString());
                        }
                        GobblinServiceJobScheduler.nonScheduledFlows.mark();
                    } catch (InterruptedException e) {
                        GobblinServiceJobScheduler.this._log.error("Failed to delete the spec " + this.specUri, e);
                        GobblinServiceJobScheduler.nonScheduledFlows.mark();
                    }
                } catch (JobException e2) {
                    GobblinServiceJobScheduler.this._log.error("Failed to run job " + this.jobConfig.getProperty("job.name"), e2);
                    GobblinServiceJobScheduler.nonScheduledFlows.mark();
                }
            } catch (Throwable th) {
                GobblinServiceJobScheduler.nonScheduledFlows.mark();
                throw th;
            }
        }
    }

    @Inject
    public GobblinServiceJobScheduler(@Named("serviceName") String str, Config config, Optional<HelixManager> optional, Optional<FlowCatalog> optional2, Optional<TopologyCatalog> optional3, Orchestrator orchestrator, SchedulerService schedulerService, Optional<UserQuotaManager> optional4, Optional<Logger> optional5, @Named("statelessRestAPIEnabled") boolean z) throws Exception {
        super(ConfigUtils.configToProperties(config), schedulerService);
        this.loadSpecsBatchSize = -1;
        this.perSpecGetRateValue = -1L;
        this.timeToInitializeSchedulerValue = -1L;
        this.timeToObtainSpecUrisValue = -1L;
        this.individualGetSpecSpeedValue = -1L;
        this.eachCompleteAddSpecValue = -1L;
        this.eachSpecCompilationValue = -1L;
        this.eachScheduleJobValue = -1L;
        this.totalGetSpecTimeValue = -1L;
        this.totalAddSpecTimeValue = -1L;
        this.numJobsScheduledDuringStartupValue = -1;
        this.getSpecsPerSpecRateNanos = metricContext.newContextAwareGauge("GobblinService.jobScheduler.getSpecsDuringStartupPerSpecRateNanos", () -> {
            return this.perSpecGetRateValue;
        });
        this.batchSize = metricContext.newContextAwareGauge("GobblinService.jobScheduler.loadSpecBatchSize", () -> {
            return Integer.valueOf(this.loadSpecsBatchSize);
        });
        this.timeToInitalizeSchedulerNanos = metricContext.newContextAwareGauge("GobblinService.jobScheduler.timeToInitializeSchedulerNanos", () -> {
            return this.timeToInitializeSchedulerValue;
        });
        this.timeToObtainSpecUrisNanos = metricContext.newContextAwareGauge("GobblinService.jobScheduler.timeToObtainSpecUrisNanos", () -> {
            return this.timeToObtainSpecUrisValue;
        });
        this.individualGetSpecSpeedNanos = metricContext.newContextAwareGauge("GobblinService.jobScheduler.individualGetSpecSpeedNanos", () -> {
            return this.individualGetSpecSpeedValue;
        });
        this.eachCompleteAddSpecNanos = metricContext.newContextAwareGauge("GobblinService.jobScheduler.eachCompleteAddSpecNanos", () -> {
            return this.eachCompleteAddSpecValue;
        });
        this.eachSpecCompilationNanos = metricContext.newContextAwareGauge("GobblinService.jobScheduler.eachSpecCompilationNanos", () -> {
            return this.eachSpecCompilationValue;
        });
        this.eachScheduleJobNanos = metricContext.newContextAwareGauge("GobblinService.jobScheduler.eachScheduleJobNanos", () -> {
            return this.eachScheduleJobValue;
        });
        this.totalGetSpecTimeNanos = metricContext.newContextAwareGauge("GobblinService.jobScheduler.totalGetSpecTimeNanos", () -> {
            return this.totalGetSpecTimeValue;
        });
        this.totalAddSpecTimeNanos = metricContext.newContextAwareGauge("GobblinService.jobScheduler.totalAddSpecTimeNanos", () -> {
            return this.totalAddSpecTimeValue;
        });
        this.numJobsScheduledDuringStartup = metricContext.newContextAwareGauge("GobblinService.jobScheduler.numJobsScheduledDuringStartup", () -> {
            return Integer.valueOf(this.numJobsScheduledDuringStartupValue);
        });
        this._log = optional5.isPresent() ? (Logger) optional5.get() : LoggerFactory.getLogger(getClass());
        this.serviceName = str;
        this.flowCatalog = optional2;
        this.helixManager = optional;
        this.orchestrator = orchestrator;
        this.scheduledFlowSpecs = Maps.newHashMap();
        this.lastUpdatedTimeForFlowSpec = Maps.newHashMap();
        this.loadSpecsBatchSize = Integer.parseInt(ConfigUtils.configToProperties(config).getProperty("load.spec.batch.size", String.valueOf(500)));
        this.skipSchedulingFlowsAfterNumDays = Integer.parseInt(ConfigUtils.configToProperties(config).getProperty("skip.scheduling.flows.after.num.days", String.valueOf(365)));
        this.isNominatedDRHandler = config.hasPath(GOBBLIN_SERVICE_SCHEDULER_DR_NOMINATED) && config.hasPath(GOBBLIN_SERVICE_SCHEDULER_DR_NOMINATED);
        this.warmStandbyEnabled = Boolean.valueOf(z);
        this.quotaManager = optional4;
        if (metricContext.getGauges(MetricFilter.contains("GobblinService.jobScheduler.getSpecsDuringStartupPerSpecRateNanos")).isEmpty()) {
            metricContext.register(this.getSpecsPerSpecRateNanos);
            metricContext.register(this.batchSize);
            metricContext.register(this.timeToInitalizeSchedulerNanos);
            metricContext.register(this.timeToObtainSpecUrisNanos);
            metricContext.register(this.individualGetSpecSpeedNanos);
            metricContext.register(this.eachCompleteAddSpecNanos);
            metricContext.register(this.eachSpecCompilationNanos);
            metricContext.register(this.eachScheduleJobNanos);
            metricContext.register(this.totalGetSpecTimeNanos);
            metricContext.register(this.totalAddSpecTimeNanos);
            metricContext.register(this.numJobsScheduledDuringStartup);
        }
    }

    public GobblinServiceJobScheduler(String str, Config config, FlowStatusGenerator flowStatusGenerator, Optional<HelixManager> optional, Optional<FlowCatalog> optional2, Optional<TopologyCatalog> optional3, Optional<DagManager> optional4, Optional<UserQuotaManager> optional5, SchedulerService schedulerService, Optional<Logger> optional6, boolean z) throws Exception {
        this(str, config, optional, optional2, optional3, new Orchestrator(config, flowStatusGenerator, optional3, optional4, optional6), schedulerService, optional5, optional6, z);
    }

    public synchronized void setActive(boolean z) {
        if (this.isActive == z) {
            return;
        }
        if (z) {
            this.isActive = z;
            if (this.flowCatalog.isPresent()) {
                new Thread(new Runnable() { // from class: org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler.1
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            GobblinServiceJobScheduler.this.orchestrator.getSpecCompiler().awaitHealthy();
                            GobblinServiceJobScheduler.this.scheduleSpecsFromCatalog();
                        } catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        }
                    }
                }).start();
                return;
            }
            return;
        }
        try {
            this.scheduledFlowSpecs.clear();
            unscheduleAllJobs();
            this.isActive = z;
        } catch (SchedulerException e) {
            this._log.error(String.format("Not all jobs were unscheduled", new Object[0]), e);
            throw new RuntimeException((Throwable) e);
        }
    }

    private boolean addSpecHelperMethod(Spec spec) {
        if (!(spec instanceof FlowSpec)) {
            this._log.debug("Not scheduling spec {} during startup as next job to schedule is outside of threshold.", spec);
            return false;
        }
        FlowSpec flowSpec = (FlowSpec) spec;
        if (flowSpec.getConfig().hasPath("job.schedule") && !isWithinRange(flowSpec.getConfig().getString("job.schedule"), this.skipSchedulingFlowsAfterNumDays)) {
            return false;
        }
        if (PropertiesUtils.getPropAsBoolean(flowSpec.getConfigAsProperties(), "flow.runImmediately", "false")) {
            onAddSpec(disableFlowRunImmediatelyOnStart((FlowSpec) spec));
            return true;
        }
        onAddSpec(spec);
        return true;
    }

    @VisibleForTesting
    public static boolean isWithinRange(String str, int i) {
        Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
        try {
            CronExpression cronExpression = new CronExpression(str);
            cronExpression.setTimeZone(TimeZone.getTimeZone("UTC"));
            calendar.setTime(cronExpression.getNextValidTimeAfter(new Date()));
            return ((int) Math.round(((double) (calendar.getTimeInMillis() - System.currentTimeMillis())) / 8.64E7d)) < i;
        } catch (ParseException e) {
            e.printStackTrace();
            return true;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void scheduleSpecsFromCatalog() {
        int size = ((FlowCatalog) this.flowCatalog.get()).getSize();
        int i = 0;
        this._log.info("Scheduling specs from catalog: {} flows in the catalog, will skip scheduling flows with next run after {} days", Integer.valueOf(size), Integer.valueOf(this.skipSchedulingFlowsAfterNumDays));
        long nanoTime = System.nanoTime();
        long j = 0;
        long j2 = 0;
        HashSet hashSet = new HashSet();
        try {
            Iterator specURIs = ((FlowCatalog) this.flowCatalog.get()).getSpecURIs();
            while (specURIs.hasNext()) {
                hashSet.add(specURIs.next());
            }
            this.timeToObtainSpecUrisValue = Long.valueOf(System.nanoTime() - nanoTime);
            try {
                if (this.isNominatedDRHandler) {
                    clearRunningFlowState(((FlowCatalog) this.flowCatalog.get()).getSpecURISWithTag(DR_FILTER_TAG));
                }
                int i2 = 0;
                while (i2 < size) {
                    long nanoTime2 = System.nanoTime();
                    long nanoTime3 = System.nanoTime();
                    for (Spec spec : ((FlowCatalog) this.flowCatalog.get()).getSpecsPaginated(i2, this.loadSpecsBatchSize)) {
                        try {
                            if (addSpecHelperMethod(spec)) {
                                j2 += this.eachCompleteAddSpecValue.longValue();
                                i++;
                            }
                        } catch (Exception e) {
                            this._log.error("Could not schedule spec {} from flowCatalog due to ", spec, e);
                        }
                        hashSet.remove(spec.getUri());
                    }
                    i2 += this.loadSpecsBatchSize;
                    j += nanoTime3 - nanoTime2;
                    if (i2 == 0 || r0.size() >= Math.round(0.75d * this.loadSpecsBatchSize)) {
                        this.perSpecGetRateValue = Long.valueOf((nanoTime3 - nanoTime2) / r0.size());
                    }
                }
                Iterator it = hashSet.iterator();
                while (it.hasNext()) {
                    URI uri = (URI) it.next();
                    try {
                        long nanoTime4 = System.nanoTime();
                        Spec specWrapper = ((FlowCatalog) this.flowCatalog.get()).getSpecWrapper(uri);
                        this.individualGetSpecSpeedValue = Long.valueOf(System.nanoTime() - nanoTime4);
                        j += this.individualGetSpecSpeedValue.longValue();
                        if (addSpecHelperMethod(specWrapper)) {
                            j2 += this.eachCompleteAddSpecValue.longValue();
                            i++;
                        }
                    } catch (Exception e2) {
                        this._log.error("Could not schedule spec uri {} from flowCatalog due to {}", uri, e2);
                    }
                }
                this.perSpecGetRateValue = -1L;
                this.individualGetSpecSpeedValue = -1L;
                this.totalGetSpecTimeValue = Long.valueOf(j);
                this.totalAddSpecTimeValue = Long.valueOf(j2);
                this.numJobsScheduledDuringStartupValue = i;
                ((FlowCatalog) this.flowCatalog.get()).getMetrics().updateGetSpecTime(nanoTime);
                this.timeToInitializeSchedulerValue = Long.valueOf(System.nanoTime() - nanoTime);
            } catch (IOException e3) {
                throw new RuntimeException("Failed to get Spec URIs with tag to clear running flow state", e3);
            }
        } catch (IOException e4) {
            throw new RuntimeException(e4);
        }
    }

    private void clearRunningFlowState(Iterator<URI> it) {
        while (it.hasNext()) {
            onDeleteSpec(it.next(), "");
        }
    }

    @VisibleForTesting
    protected static Spec disableFlowRunImmediatelyOnStart(FlowSpec flowSpec) {
        Properties configAsProperties = flowSpec.getConfigAsProperties();
        configAsProperties.setProperty("flow.runImmediately", "false");
        return new FlowSpec(flowSpec.getUri(), flowSpec.getVersion(), flowSpec.getDescription(), ConfigFactory.parseProperties(configAsProperties), configAsProperties, flowSpec.getTemplateURIs(), flowSpec.getChildSpecs());
    }

    protected void startUp() throws Exception {
        super.startUp();
    }

    public synchronized void scheduleJob(Properties properties, JobListener jobListener) throws JobException {
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("gobblin.service.flowSpec", this.scheduledFlowSpecs.get(properties.getProperty("job.name")));
        try {
            scheduleJob(properties, jobListener, newHashMap, GobblinServiceJob.class);
        } catch (Exception e) {
            throw new JobException("Failed to schedule job " + properties.getProperty("job.name"), e);
        }
    }

    public void runJob(Properties properties, JobListener jobListener) throws JobException {
        try {
            this.orchestrator.orchestrate(this.scheduledFlowSpecs.get(properties.getProperty("job.name")));
        } catch (Exception e) {
            throw new JobException("Failed to run Spec: " + properties.getProperty("job.name"), e);
        }
    }

    public AddSpecResponse onAddSpec(Spec spec) {
        long nanoTime = System.nanoTime();
        if (this.helixManager.isPresent() && !((HelixManager) this.helixManager.get()).isConnected()) {
            this._log.info("System not yet initialized. Skipping Spec Addition: " + spec);
            return null;
        }
        this._log.info("New Flow Spec detected: " + spec);
        if (!(spec instanceof FlowSpec)) {
            return null;
        }
        Spec spec2 = (FlowSpec) spec;
        URI uri = spec2.getUri();
        Properties createJobConfig = createJobConfig(spec2);
        boolean booleanValue = spec2.isExplain().booleanValue();
        String str = null;
        long nanoTime2 = System.nanoTime();
        Dag<JobExecutionPlan> compileFlow = this.orchestrator.getSpecCompiler().compileFlow(spec2);
        this.eachSpecCompilationValue = Long.valueOf(System.nanoTime() - nanoTime2);
        if (compileFlow != null && !compileFlow.isEmpty()) {
            str = compileFlow.toString();
        }
        boolean isCompileSuccessful = FlowCatalog.isCompileSuccessful(str);
        if (booleanValue || !isCompileSuccessful || !this.isActive) {
            this._log.info("Ignoring the spec {}. isExplain: {}, compileSuccess: {}, master: {}", new Object[]{spec, Boolean.valueOf(booleanValue), Boolean.valueOf(isCompileSuccessful), Boolean.valueOf(this.isActive)});
            return new AddSpecResponse(str);
        }
        if (!this.warmStandbyEnabled.booleanValue() && !createJobConfig.containsKey("job.schedule") && this.quotaManager.isPresent()) {
            try {
                ((UserQuotaManager) this.quotaManager.get()).checkQuota(compileFlow.getStartNodes());
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
        Long valueOf = Long.valueOf(spec2.getConfigAsProperties().getProperty("modified_time", "0"));
        String uri2 = spec2.getUri().toString();
        Boolean valueOf2 = Boolean.valueOf(PropertiesUtils.getPropAsBoolean(createJobConfig, "flow.runImmediately", "false"));
        if (valueOf.longValue() != 0 && this.scheduledFlowSpecs.containsKey(uri2) && this.lastUpdatedTimeForFlowSpec.containsKey(uri2) && (this.lastUpdatedTimeForFlowSpec.get(uri2).compareTo(valueOf) > 0 || (this.lastUpdatedTimeForFlowSpec.get(uri2).equals(valueOf) && !valueOf2.booleanValue()))) {
            this._log.warn("Ignoring the spec {} modified at time {} because we have a more updated version from time {}", new Object[]{spec, valueOf, this.lastUpdatedTimeForFlowSpec.get(uri2)});
            this.eachCompleteAddSpecValue = Long.valueOf(System.nanoTime() - nanoTime);
            return new AddSpecResponse(str);
        }
        this.scheduledFlowSpecs.put(uri.toString(), spec);
        this.lastUpdatedTimeForFlowSpec.put(uri.toString(), valueOf);
        if (createJobConfig.containsKey("job.schedule")) {
            this._log.info("{} Scheduling flow spec: {} ", this.serviceName, spec);
            try {
                long nanoTime3 = System.nanoTime();
                scheduleJob(createJobConfig, null);
                this.eachScheduleJobValue = Long.valueOf(System.nanoTime() - nanoTime3);
                if (PropertiesUtils.getPropAsBoolean(createJobConfig, "flow.runImmediately", "false")) {
                    this._log.info("RunImmediately requested, hence executing FlowSpec: " + spec);
                    this.jobExecutor.execute(new NonScheduledJobRunner(uri, false, createJobConfig, null));
                }
            } catch (JobException e2) {
                this._log.error("{} Failed to schedule or run FlowSpec {}", new Object[]{this.serviceName, spec, e2});
                this.scheduledFlowSpecs.remove(spec.getUri().toString());
                this.lastUpdatedTimeForFlowSpec.remove(uri.toString());
                this.eachCompleteAddSpecValue = Long.valueOf(System.nanoTime() - nanoTime);
                return null;
            }
        } else {
            this._log.info("No FlowSpec schedule found, so running FlowSpec: " + spec);
            this.jobExecutor.execute(new NonScheduledJobRunner(uri, true, createJobConfig, null));
        }
        this.eachCompleteAddSpecValue = Long.valueOf(System.nanoTime() - nanoTime);
        return new AddSpecResponse(str);
    }

    private void unscheduleSpec(URI uri, String str) throws JobException {
        if (!this.scheduledFlowSpecs.containsKey(uri.toString())) {
            throw new JobException(String.format("Spec with URI: %s was not found in cache. May be it was cleaned, if not please clean it manually", uri));
        }
        this._log.info("Unscheduling flowSpec " + uri + "/" + str);
        this.scheduledFlowSpecs.remove(uri.toString());
        this.lastUpdatedTimeForFlowSpec.remove(uri.toString());
        unscheduleJob(uri.toString());
    }

    public void onDeleteSpec(URI uri, String str) {
        onDeleteSpec(uri, str, new Properties());
    }

    public void onDeleteSpec(URI uri, String str, Properties properties) {
        if (this.helixManager.isPresent() && !((HelixManager) this.helixManager.get()).isConnected()) {
            this._log.info("System not yet initialized. Skipping Spec Deletion: " + uri);
            return;
        }
        this._log.info("Spec deletion detected: " + uri + "/" + str);
        if (!this.isActive) {
            this._log.info("Skipping deletion of this spec {}/{} for non-leader host", uri, str);
            return;
        }
        try {
            Spec spec = this.scheduledFlowSpecs.get(uri.toString());
            unscheduleSpec(uri, str);
            this.orchestrator.remove(spec, properties);
        } catch (JobException | IOException e) {
            this._log.warn(String.format("Spec with URI: %s was not unscheduled cleaning", uri), e);
        }
    }

    public void onUpdateSpec(Spec spec) {
        if (this.helixManager.isPresent() && !((HelixManager) this.helixManager.get()).isConnected()) {
            this._log.info("System not yet initialized. Skipping Spec Update: " + spec);
            return;
        }
        this._log.info("Spec changed: " + spec);
        if (spec instanceof FlowSpec) {
            try {
                onAddSpec(spec);
            } catch (Exception e) {
                this._log.error("Failed to update Spec: " + spec, e);
            }
        }
    }

    private Properties createJobConfig(FlowSpec flowSpec) {
        Properties properties = new Properties();
        Properties configAsProperties = flowSpec.getConfigAsProperties();
        properties.putAll(this.properties);
        properties.setProperty("job.name", flowSpec.getUri().toString());
        properties.setProperty("job.group", flowSpec.getConfig().getValue("flow.group").toString());
        properties.setProperty("flow.runImmediately", ConfigUtils.getString(flowSpec.getConfig(), "flow.runImmediately", "false"));
        if (configAsProperties.containsKey("job.schedule") && StringUtils.isNotBlank(configAsProperties.getProperty("job.schedule"))) {
            properties.setProperty("job.schedule", configAsProperties.getProperty("job.schedule"));
        }
        return properties;
    }

    public Map<String, Spec> getScheduledFlowSpecs() {
        return this.scheduledFlowSpecs;
    }

    public Map<String, Long> getLastUpdatedTimeForFlowSpec() {
        return this.lastUpdatedTimeForFlowSpec;
    }

    public boolean isActive() {
        return this.isActive;
    }
}
