package org.apache.gobblin.cluster;

import com.github.rholder.retry.AttemptTimeLimiters;
import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import com.google.common.util.concurrent.Striped;
import com.typesafe.config.Config;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.cluster.event.CancelJobConfigArrivalEvent;
import org.apache.gobblin.cluster.event.DeleteJobConfigArrivalEvent;
import org.apache.gobblin.cluster.event.NewJobConfigArrivalEvent;
import org.apache.gobblin.cluster.event.UpdateJobConfigArrivalEvent;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.instrumented.StandardMetricsBridge;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.runtime.JobException;
import org.apache.gobblin.runtime.api.MutableJobCatalog;
import org.apache.gobblin.runtime.listeners.JobListener;
import org.apache.gobblin.scheduler.JobScheduler;
import org.apache.gobblin.scheduler.SchedulerService;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.PathUtils;
import org.apache.gobblin.util.PropertiesUtils;
import org.apache.hadoop.fs.Path;
import org.apache.helix.HelixManager;
import org.apache.helix.task.TaskDriver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Alpha
/* loaded from: input_file:org/apache/gobblin/cluster/GobblinHelixJobScheduler.class */
public class GobblinHelixJobScheduler extends JobScheduler implements StandardMetricsBridge {
    private static final Logger LOGGER = LoggerFactory.getLogger(GobblinHelixJobScheduler.class);
    private static final String COMMON_JOB_PROPS = "gobblin.common.job.props";
    private final Properties commonJobProperties;
    private final HelixManager jobHelixManager;
    private final Optional<HelixManager> taskDriverHelixManager;
    private final EventBus eventBus;
    private final Path appWorkDir;
    private final List<? extends Tag<?>> metadataTags;
    private final ConcurrentHashMap<String, Boolean> jobRunningMap;
    private final MutableJobCatalog jobCatalog;
    private final MetricContext metricContext;
    final GobblinHelixMetrics helixMetrics;
    final GobblinHelixJobSchedulerMetrics jobSchedulerMetrics;
    final GobblinHelixJobLauncherMetrics launcherMetrics;
    final GobblinHelixPlanningJobLauncherMetrics planningJobLauncherMetrics;
    final HelixJobsMapping jobsMapping;
    final Striped<Lock> locks;
    private final long helixWorkflowListingTimeoutMillis;
    private boolean startServicesCompleted;
    private final long helixJobStopTimeoutMillis;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/gobblin/cluster/GobblinHelixJobScheduler$NonScheduledJobRunner.class */
    public class NonScheduledJobRunner implements Runnable {
        private final Properties jobProps;
        private final GobblinHelixJobLauncherListener jobListener;
        private final Long creationTimeInMillis = Long.valueOf(System.currentTimeMillis());

        public NonScheduledJobRunner(Properties properties, GobblinHelixJobLauncherListener gobblinHelixJobLauncherListener) {
            this.jobProps = properties;
            this.jobListener = gobblinHelixJobLauncherListener;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                GobblinHelixJobScheduler.this.jobSchedulerMetrics.updateTimeBeforeJobLaunching(this.jobProps);
                GobblinHelixJobScheduler.this.jobSchedulerMetrics.updateTimeBetweenJobSchedulingAndJobLaunching(this.creationTimeInMillis.longValue(), System.currentTimeMillis());
                GobblinHelixJobScheduler.this.runJob(this.jobProps, this.jobListener);
            } catch (JobException e) {
                GobblinHelixJobScheduler.LOGGER.error("Failed to run job " + this.jobProps.getProperty("job.name"), e);
            }
        }
    }

    public GobblinHelixJobScheduler(Config config, HelixManager helixManager, Optional<HelixManager> optional, EventBus eventBus, Path path, List<? extends Tag<?>> list, SchedulerService schedulerService, MutableJobCatalog mutableJobCatalog) throws Exception {
        super(ConfigUtils.configToProperties(config), schedulerService);
        this.locks = Striped.lazyWeakLock(256);
        this.commonJobProperties = ConfigUtils.configToProperties(ConfigUtils.getConfigOrEmpty(config, COMMON_JOB_PROPS));
        this.jobHelixManager = helixManager;
        this.taskDriverHelixManager = optional;
        this.eventBus = eventBus;
        this.jobRunningMap = new ConcurrentHashMap<>();
        this.appWorkDir = path;
        this.metadataTags = list;
        this.jobCatalog = mutableJobCatalog;
        this.metricContext = Instrumented.getMetricContext(new State(this.properties), getClass());
        int intValue = ConfigUtils.getInt(config, "metrics.timer.window.size.in.minutes", 15).intValue();
        this.launcherMetrics = new GobblinHelixJobLauncherMetrics("launcherInScheduler", this.metricContext, intValue);
        this.jobSchedulerMetrics = new GobblinHelixJobSchedulerMetrics(this.jobExecutor, this.metricContext, intValue);
        this.jobsMapping = new HelixJobsMapping(ConfigUtils.propertiesToConfig(this.properties), PathUtils.getRootPath(path).toUri(), path.toString());
        this.planningJobLauncherMetrics = new GobblinHelixPlanningJobLauncherMetrics("planningLauncherInScheduler", this.metricContext, intValue, this.jobsMapping);
        this.helixMetrics = new GobblinHelixMetrics("helixMetricsInJobScheduler", this.metricContext, intValue);
        this.startServicesCompleted = false;
        this.helixJobStopTimeoutMillis = ConfigUtils.getLong(config, GobblinClusterConfigurationKeys.HELIX_JOB_STOP_TIMEOUT_SECONDS, 10L).longValue() * 1000;
        this.helixWorkflowListingTimeoutMillis = ConfigUtils.getLong(config, GobblinClusterConfigurationKeys.HELIX_WORKFLOW_LISTING_TIMEOUT_SECONDS, 60L).longValue() * 1000;
    }

    public Collection<StandardMetricsBridge.StandardMetrics> getStandardMetricsCollection() {
        return ImmutableList.of(this.launcherMetrics, this.jobSchedulerMetrics, this.planningJobLauncherMetrics, this.helixMetrics);
    }

    protected void startUp() throws Exception {
        this.eventBus.register(this);
        super.startUp();
        this.startServicesCompleted = true;
    }

    public void scheduleJob(Properties properties, JobListener jobListener) throws JobException {
        while (!this.startServicesCompleted) {
            try {
                LOGGER.info("{} service is not fully up, waiting here...", getClass().getName());
                Thread.sleep(1000L);
            } catch (Exception e) {
                throw new JobException("Failed to schedule job " + properties.getProperty("job.name"), e);
            }
        }
        scheduleJob(properties, jobListener, Maps.newHashMap(), GobblinHelixJob.class);
    }

    protected void startServices() throws Exception {
        if (PropertiesUtils.getPropAsBoolean(this.properties, GobblinClusterConfigurationKeys.CLEAN_ALL_DIST_JOBS, String.valueOf(false))) {
            Iterator<State> it = this.jobsMapping.getAllStates().iterator();
            while (it.hasNext()) {
                String id = it.next().getId();
                LOGGER.info("Delete mapping for job " + id);
                this.jobsMapping.deleteMapping(id);
            }
        }
    }

    public void runJob(Properties properties, JobListener jobListener) throws JobException {
        new HelixRetriggeringJobCallable(this, this.jobCatalog, this.properties, properties, jobListener, this.planningJobLauncherMetrics, this.helixMetrics, this.appWorkDir, this.jobHelixManager, this.taskDriverHelixManager, this.jobsMapping, this.locks).call();
    }

    /* renamed from: buildJobLauncher, reason: merged with bridge method [inline-methods] */
    public GobblinHelixJobLauncher m12buildJobLauncher(Properties properties) throws Exception {
        Properties properties2 = new Properties();
        properties2.putAll(this.properties);
        properties2.putAll(properties);
        return new GobblinHelixJobLauncher(properties2, this.jobHelixManager, this.appWorkDir, this.metadataTags, this.jobRunningMap, Optional.of(this.helixMetrics));
    }

    public Future<?> scheduleJobImmediately(final Properties properties, JobListener jobListener) {
        final HelixRetriggeringJobCallable helixRetriggeringJobCallable = new HelixRetriggeringJobCallable(this, this.jobCatalog, this.properties, properties, jobListener, this.planningJobLauncherMetrics, this.helixMetrics, this.appWorkDir, this.jobHelixManager, this.taskDriverHelixManager, this.jobsMapping, this.locks);
        final Future submit = this.jobExecutor.submit(helixRetriggeringJobCallable);
        return new Future() { // from class: org.apache.gobblin.cluster.GobblinHelixJobScheduler.1
            @Override // java.util.concurrent.Future
            public boolean cancel(boolean z) {
                if (!GobblinHelixJobScheduler.this.isCancelRequested()) {
                    return false;
                }
                boolean z2 = true;
                try {
                    helixRetriggeringJobCallable.cancel();
                } catch (JobException e) {
                    GobblinHelixJobScheduler.LOGGER.error("Failed to cancel job " + properties.getProperty("job.name"), e);
                    z2 = false;
                }
                if (z) {
                    z2 &= submit.cancel(true);
                }
                return z2;
            }

            @Override // java.util.concurrent.Future
            public boolean isCancelled() {
                return submit.isCancelled();
            }

            @Override // java.util.concurrent.Future
            public boolean isDone() {
                return submit.isDone();
            }

            @Override // java.util.concurrent.Future
            public Object get() throws InterruptedException, ExecutionException {
                return submit.get();
            }

            @Override // java.util.concurrent.Future
            public Object get(long j, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
                return submit.get(j, timeUnit);
            }
        };
    }

    @Subscribe
    public void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobConfigArrivalEvent) {
        String jobName = newJobConfigArrivalEvent.getJobName();
        LOGGER.info("Received new job configuration of job " + jobName);
        try {
            Properties properties = new Properties();
            properties.putAll(this.commonJobProperties);
            properties.putAll(newJobConfigArrivalEvent.getJobConfig());
            properties.setProperty(GobblinClusterConfigurationKeys.JOB_SPEC_URI, jobName);
            this.jobSchedulerMetrics.updateTimeBeforeJobScheduling(properties);
            if (properties.containsKey("job.schedule")) {
                LOGGER.info("Scheduling job " + jobName);
                scheduleJob(properties, new GobblinHelixJobLauncherListener(this.launcherMetrics));
            } else {
                LOGGER.info("No job schedule found, so running job " + jobName);
                this.jobExecutor.execute(new NonScheduledJobRunner(properties, new GobblinHelixJobLauncherListener(this.launcherMetrics)));
            }
        } catch (JobException e) {
            LOGGER.error("Failed to schedule or run job " + jobName, e);
        }
    }

    @Subscribe
    public void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent updateJobConfigArrivalEvent) {
        LOGGER.info("Received update for job configuration of job " + updateJobConfigArrivalEvent.getJobName());
        try {
            handleDeleteJobConfigArrival(new DeleteJobConfigArrivalEvent(updateJobConfigArrivalEvent.getJobName(), updateJobConfigArrivalEvent.getJobConfig()));
        } catch (Exception e) {
            LOGGER.error("Failed to update job " + updateJobConfigArrivalEvent.getJobName(), e);
        }
        try {
            handleNewJobConfigArrival(new NewJobConfigArrivalEvent(updateJobConfigArrivalEvent.getJobName(), updateJobConfigArrivalEvent.getJobConfig()));
        } catch (Exception e2) {
            LOGGER.error("Failed to update job " + updateJobConfigArrivalEvent.getJobName(), e2);
        }
    }

    private void waitForJobCompletion(String str) {
        while (this.jobRunningMap.getOrDefault(str, false).booleanValue()) {
            LOGGER.info("Waiting for job {} to stop...", str);
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                LOGGER.warn("Interrupted exception encountered: ", e);
            }
        }
    }

    @Subscribe
    public void handleDeleteJobConfigArrival(DeleteJobConfigArrivalEvent deleteJobConfigArrivalEvent) throws InterruptedException {
        LOGGER.info("Received delete for job configuration of job " + deleteJobConfigArrivalEvent.getJobName());
        try {
            unscheduleJob(deleteJobConfigArrivalEvent.getJobName());
            cancelJobIfRequired(deleteJobConfigArrivalEvent);
        } catch (JobException e) {
            LOGGER.error("Failed to unschedule job " + deleteJobConfigArrivalEvent.getJobName());
        }
    }

    @Subscribe
    public void handleCancelJobConfigArrival(CancelJobConfigArrivalEvent cancelJobConfigArrivalEvent) throws InterruptedException {
        String joburi = cancelJobConfigArrivalEvent.getJoburi();
        LOGGER.info("Received cancel for job configuration of job " + joburi);
        Optional<String> empty = Optional.empty();
        Optional<String> empty2 = Optional.empty();
        this.jobSchedulerMetrics.numCancellationStart.incrementAndGet();
        try {
            Optional<String> distributedJobMode = this.jobsMapping.getDistributedJobMode(joburi);
            if (distributedJobMode.isPresent() && Boolean.parseBoolean(distributedJobMode.get())) {
                empty = this.jobsMapping.getPlanningJobId(joburi);
            } else {
                empty2 = this.jobsMapping.getActualJobId(joburi);
            }
            if (empty.isPresent()) {
                LOGGER.info("Cancelling planning job helix workflow: {}", empty.get());
                new TaskDriver(this.taskDriverHelixManager.get()).waitToStop(empty.get(), this.helixJobStopTimeoutMillis);
            }
            if (empty2.isPresent()) {
                LOGGER.info("Cancelling actual job helix workflow: {}", empty2.get());
                new TaskDriver(this.jobHelixManager).waitToStop(empty2.get(), this.helixJobStopTimeoutMillis);
            }
            this.jobSchedulerMetrics.numCancellationStart.decrementAndGet();
        } catch (IOException e) {
            LOGGER.warn("jobsMapping could not be retrieved for job {}", joburi);
        }
    }

    private void cancelJobIfRequired(DeleteJobConfigArrivalEvent deleteJobConfigArrivalEvent) throws InterruptedException {
        if (PropertiesUtils.getPropAsBoolean(deleteJobConfigArrivalEvent.getJobConfig(), GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE, "false")) {
            LOGGER.info("Cancelling workflow: {}", deleteJobConfigArrivalEvent.getJobName());
            try {
                Map map = (Map) RetryerBuilder.newBuilder().retryIfException().withStopStrategy(StopStrategies.stopAfterAttempt(5)).withAttemptTimeLimiter(AttemptTimeLimiters.fixedTimeLimit(this.helixWorkflowListingTimeoutMillis, TimeUnit.MILLISECONDS)).build().call(() -> {
                    return HelixUtils.getWorkflowIdsFromJobNames(this.jobHelixManager, Collections.singletonList(deleteJobConfigArrivalEvent.getJobName()));
                });
                if (!map.containsKey(deleteJobConfigArrivalEvent.getJobName())) {
                    LOGGER.warn("Could not find Helix Workflow Id for job: {}", deleteJobConfigArrivalEvent.getJobName());
                    return;
                }
                new TaskDriver(this.jobHelixManager).waitToStop((String) map.get(deleteJobConfigArrivalEvent.getJobName()), this.helixJobStopTimeoutMillis);
                LOGGER.info("Stopped workflow: {}", deleteJobConfigArrivalEvent.getJobName());
                waitForJobCompletion(deleteJobConfigArrivalEvent.getJobName());
            } catch (ExecutionException | RetryException e) {
                LOGGER.error("Exception encountered when getting workflows from Helix; likely a Helix/Zk issue.", e);
            }
        }
    }
}
