package org.apache.gobblin.cluster;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.io.Closer;
import com.google.common.util.concurrent.Striped;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.HashMap;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.locks.Lock;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.cluster.GobblinHelixDistributeJobExecutionLauncher;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.JobException;
import org.apache.gobblin.runtime.api.JobExecutionMonitor;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.MutableJobCatalog;
import org.apache.gobblin.runtime.listeners.JobListener;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.PropertiesUtils;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.apache.hadoop.fs.Path;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
@Alpha
/* loaded from: input_file:org/apache/gobblin/cluster/HelixRetriggeringJobCallable.class */
public class HelixRetriggeringJobCallable implements Callable {
    private static final Logger log = LoggerFactory.getLogger(HelixRetriggeringJobCallable.class);
    private final GobblinHelixJobScheduler jobScheduler;
    private final MutableJobCatalog jobCatalog;
    private final Properties sysProps;
    private final Properties jobProps;
    private final JobListener jobListener;
    private final GobblinHelixPlanningJobLauncherMetrics planningJobLauncherMetrics;
    private final GobblinHelixMetrics helixMetrics;
    private final Path appWorkDir;
    private final HelixManager jobHelixManager;
    private final Optional<HelixManager> taskDriverHelixManager;
    protected HelixJobsMapping jobsMapping;
    private boolean isDistributeJobEnabled;
    private final String jobUri;
    private final Striped<Lock> locks;
    private final MetricContext metricContext;
    private final EventSubmitter eventSubmitter;
    private GobblinHelixJobLauncher currentJobLauncher = null;
    private JobExecutionMonitor currentJobMonitor = null;
    private boolean jobDeleteAttempted = false;

    public HelixRetriggeringJobCallable(GobblinHelixJobScheduler gobblinHelixJobScheduler, MutableJobCatalog mutableJobCatalog, Properties properties, Properties properties2, JobListener jobListener, GobblinHelixPlanningJobLauncherMetrics gobblinHelixPlanningJobLauncherMetrics, GobblinHelixMetrics gobblinHelixMetrics, Path path, HelixManager helixManager, Optional<HelixManager> optional, HelixJobsMapping helixJobsMapping, Striped<Lock> striped, MetricContext metricContext) {
        this.isDistributeJobEnabled = false;
        this.jobScheduler = gobblinHelixJobScheduler;
        this.jobCatalog = mutableJobCatalog;
        this.sysProps = properties;
        this.jobProps = properties2;
        this.jobListener = jobListener;
        this.planningJobLauncherMetrics = gobblinHelixPlanningJobLauncherMetrics;
        this.helixMetrics = gobblinHelixMetrics;
        this.appWorkDir = path;
        this.jobHelixManager = helixManager;
        this.taskDriverHelixManager = optional;
        this.isDistributeJobEnabled = isDistributeJobEnabled();
        this.jobUri = properties2.getProperty(GobblinClusterConfigurationKeys.JOB_SPEC_URI);
        this.jobsMapping = helixJobsMapping;
        this.locks = striped;
        this.metricContext = metricContext;
        this.eventSubmitter = new EventSubmitter.Builder(this.metricContext, "gobblin.runtime").build();
    }

    private boolean isRetriggeringEnabled() {
        return PropertiesUtils.getPropAsBoolean(this.jobProps, "job.retriggering.enabled", "true");
    }

    private boolean isDistributeJobEnabled() {
        Properties properties = new Properties();
        properties.putAll(this.sysProps);
        properties.putAll(this.jobProps);
        return PropertiesUtils.getPropAsBoolean(properties, GobblinClusterConfigurationKeys.DISTRIBUTED_JOB_LAUNCHER_ENABLED, Boolean.toString(false));
    }

    @Override // java.util.concurrent.Callable
    public Void call() throws JobException {
        boolean propAsBoolean = PropertiesUtils.getPropAsBoolean(this.jobProps, GobblinClusterConfigurationKeys.JOB_ALWAYS_DELETE, "false");
        try {
            this.jobsMapping.setDistributedJobMode(this.jobUri, this.isDistributeJobEnabled);
            try {
                if (this.isDistributeJobEnabled) {
                    runJobExecutionLauncher();
                } else {
                    runJobLauncherLoop();
                }
                deleteJobSpec();
                return null;
            } catch (Exception e) {
                if (propAsBoolean) {
                    deleteJobSpec();
                }
                throw e;
            }
        } catch (IOException e2) {
            throw new JobException("Could not update jobsMapping", e2);
        }
    }

    private void deleteJobSpec() throws JobException {
        boolean booleanValue = Boolean.valueOf(this.jobProps.getProperty("job.runonce", "false")).booleanValue();
        boolean containsKey = this.jobProps.containsKey("job.schedule");
        if ((booleanValue || !containsKey) && this.jobCatalog != null) {
            try {
                if (!this.jobDeleteAttempted) {
                    log.info("Deleting job spec on {}", this.jobUri);
                    this.jobScheduler.unscheduleJob(this.jobUri);
                    this.jobCatalog.remove(new URI(this.jobUri));
                    this.jobDeleteAttempted = true;
                }
            } catch (URISyntaxException e) {
                log.error("Failed to remove job with bad uri " + this.jobUri, e);
            }
        }
    }

    @VisibleForTesting
    static GobblinHelixJobLauncher buildJobLauncherForCentralizedMode(GobblinHelixJobScheduler gobblinHelixJobScheduler, Properties properties) throws Exception {
        String property = properties.getProperty("job.id");
        if (property != null) {
            Preconditions.checkArgument(!property.contains(GobblinClusterConfigurationKeys.ACTUAL_JOB_NAME_PREFIX), "Job Id should not contain ActualJob in centralized mode.");
        }
        return gobblinHelixJobScheduler.m12buildJobLauncher(properties);
    }

    private void runJobLauncherLoop() throws JobException {
        try {
            try {
                this.jobHelixManager.connect();
                while (true) {
                    this.currentJobLauncher = buildJobLauncherForCentralizedMode(this.jobScheduler, this.jobProps);
                    boolean runJob = this.jobScheduler.runJob(this.jobProps, this.jobListener, this.currentJobLauncher);
                    boolean isRetriggeringEnabled = isRetriggeringEnabled();
                    if (!runJob || !isRetriggeringEnabled) {
                        break;
                    } else {
                        log.info("Job {} will be re-triggered.", this.jobProps.getProperty("job.name"));
                    }
                }
            } catch (Exception e) {
                log.error("Failed to run job {}", this.jobProps.getProperty("job.name"), e);
                throw new JobException("Failed to run job " + this.jobProps.getProperty("job.name"), e);
            }
        } finally {
            this.jobHelixManager.disconnect();
            this.currentJobLauncher = null;
        }
    }

    private void runJobExecutionLauncher() throws JobException {
        Closer create = Closer.create();
        try {
            try {
                HelixManager orElse = this.taskDriverHelixManager.orElse(this.jobHelixManager);
                orElse.connect();
                String property = this.jobProps.getProperty(GobblinClusterConfigurationKeys.DISTRIBUTED_JOB_LAUNCHER_BUILDER, GobblinHelixDistributeJobExecutionLauncher.Builder.class.getName());
                Optional<String> planningJobId = this.jobsMapping.getPlanningJobId(this.jobUri);
                Lock lock = (Lock) this.locks.get(this.jobUri);
                lock.lock();
                try {
                    if (planningJobId.isPresent() && !canRun(planningJobId.get(), orElse)) {
                        new TimingEvent(this.eventSubmitter, "jobSkippedTime").stop(new HashMap(Tag.toMap(Tag.tagValuesToString(HelixUtils.initBaseEventTags(this.jobProps, Lists.newArrayList())))));
                        this.planningJobLauncherMetrics.skippedPlanningJobs.mark();
                        orElse.disconnect();
                        lock.unlock();
                        try {
                            create.close();
                            return;
                        } catch (IOException e) {
                            throw new JobException("Cannot properly close planning job for " + this.jobUri, e);
                        }
                    }
                    log.info("Planning job for {} does not exist. First time run.", this.jobUri);
                    GobblinHelixDistributeJobExecutionLauncher.Builder builder = (GobblinHelixDistributeJobExecutionLauncher.Builder) GobblinConstructorUtils.invokeLongestConstructor(new ClassAliasResolver(GobblinHelixDistributeJobExecutionLauncher.Builder.class).resolveClass(property), new Object[0]);
                    Properties properties = new Properties();
                    properties.putAll(this.jobProps);
                    String createPlanningJobId = HelixJobsMapping.createPlanningJobId(properties);
                    properties.setProperty(GobblinClusterConfigurationKeys.PLANNING_ID_KEY, createPlanningJobId);
                    properties.setProperty(GobblinClusterConfigurationKeys.PLANNING_JOB_CREATE_TIME, String.valueOf(System.currentTimeMillis()));
                    builder.setSysProps(this.sysProps);
                    builder.setJobPlanningProps(properties);
                    builder.setPlanningJobHelixManager(orElse);
                    builder.setAppWorkDir(this.appWorkDir);
                    builder.setJobsMapping(this.jobsMapping);
                    builder.setPlanningJobLauncherMetrics(this.planningJobLauncherMetrics);
                    builder.setHelixMetrics(this.helixMetrics);
                    boolean z = ConfigUtils.getBoolean(ConfigUtils.propertiesToConfig(properties).withFallback(ConfigUtils.propertiesToConfig(this.sysProps)), GobblinClusterConfigurationKeys.NON_BLOCKING_PLANNING_JOB_ENABLED, false);
                    log.info("Planning job {} started.", createPlanningJobId);
                    GobblinHelixDistributeJobExecutionLauncher build = builder.build();
                    create.register(build);
                    this.jobsMapping.setPlanningJobId(this.jobUri, createPlanningJobId);
                    long currentTimeMillis = System.currentTimeMillis();
                    this.currentJobMonitor = build.m7launchJob((JobSpec) null);
                    HelixUtils.waitJobInitialization(orElse, createPlanningJobId, createPlanningJobId, Duration.ofSeconds(PropertiesUtils.getPropAsLong(this.sysProps, GobblinClusterConfigurationKeys.HELIX_WORKFLOW_SUBMISSION_TIMEOUT_SECONDS, 300L)));
                    orElse.disconnect();
                    lock.unlock();
                    deleteJobSpec();
                    this.currentJobMonitor.get();
                    this.currentJobMonitor = null;
                    if (z) {
                        log.info("Planning job {} submitted successfully.", createPlanningJobId);
                    } else {
                        log.info("Planning job {} finished.", createPlanningJobId);
                        this.planningJobLauncherMetrics.updateTimeForCompletedPlanningJobs(currentTimeMillis);
                    }
                    try {
                        create.close();
                    } catch (IOException e2) {
                        throw new JobException("Cannot properly close planning job for " + this.jobUri, e2);
                    }
                } catch (Throwable th) {
                    orElse.disconnect();
                    lock.unlock();
                    throw th;
                }
            } catch (Throwable th2) {
                try {
                    create.close();
                    throw th2;
                } catch (IOException e3) {
                    throw new JobException("Cannot properly close planning job for " + this.jobUri, e3);
                }
            }
        } catch (Exception e4) {
            if (0 != 0) {
                this.planningJobLauncherMetrics.updateTimeForFailedPlanningJobs(0L);
            }
            log.error("Failed to run planning job for {}", this.jobUri, e4);
            throw new JobException("Failed to run planning job for " + this.jobUri, e4);
        }
    }

    private boolean canRun(String str, HelixManager helixManager) throws JobException, InterruptedException {
        if (HelixUtils.isJobFinished(str, str, helixManager)) {
            log.info("Previous planning job {} has reached to the final state. Start a new one.", str);
            return true;
        }
        if (!PropertiesUtils.getPropAsBoolean(this.jobProps, GobblinClusterConfigurationKeys.KILL_DUPLICATE_PLANNING_JOB, String.valueOf(true))) {
            log.info("Previous planning job {} has not finished yet. Skip this job.", str);
            return false;
        }
        log.info("Previous planning job {} has not finished yet. Kill it.", str);
        long propAsLong = PropertiesUtils.getPropAsLong(this.sysProps, GobblinClusterConfigurationKeys.HELIX_WORKFLOW_DELETE_TIMEOUT_SECONDS, 300L) * 1000;
        try {
            HelixUtils.deleteWorkflow(str, helixManager, propAsLong);
            return true;
        } catch (HelixException e) {
            log.info("Helix cannot delete previous planning job id {} within {} seconds.", str, Long.valueOf(propAsLong / 1000));
            throw new JobException("Helix cannot delete previous planning job id " + str, e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void cancel() throws JobException {
        this.jobScheduler.jobSchedulerMetrics.numCancellationStart.incrementAndGet();
        if (this.isDistributeJobEnabled) {
            if (this.currentJobMonitor != null) {
                this.currentJobMonitor.cancel(false);
            }
        } else if (this.currentJobLauncher != null) {
            this.currentJobLauncher.cancelJob(this.jobListener);
        }
        this.jobScheduler.jobSchedulerMetrics.numCancellationComplete.incrementAndGet();
    }
}
