package org.apache.gobblin.azkaban;

import azkaban.jobExecutor.AbstractJob;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.io.Closer;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigValue;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import org.apache.gobblin.broker.iface.SharedResourcesBroker;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.metrics.GobblinMetrics;
import org.apache.gobblin.metrics.RootMetricContext;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.runtime.DynamicConfigGeneratorFactory;
import org.apache.gobblin.runtime.JobException;
import org.apache.gobblin.runtime.JobLauncher;
import org.apache.gobblin.runtime.JobLauncherFactory;
import org.apache.gobblin.runtime.app.ApplicationException;
import org.apache.gobblin.runtime.app.ApplicationLauncher;
import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
import org.apache.gobblin.runtime.job_catalog.PackagedTemplatesJobCatalogDecorator;
import org.apache.gobblin.runtime.listeners.CompositeJobListener;
import org.apache.gobblin.runtime.listeners.EmailNotificationJobListener;
import org.apache.gobblin.runtime.listeners.JobListener;
import org.apache.gobblin.service.modules.orchestration.AzkabanClientParams;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.TimeRangeChecker;
import org.apache.gobblin.util.hadoop.TokenUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;

/* loaded from: input_file:org/apache/gobblin/azkaban/AzkabanJobLauncher.class */
public class AzkabanJobLauncher extends AbstractJob implements ApplicationLauncher, JobLauncher {
    public static final String GOBBLIN_LOG_LEVEL_KEY = "gobblin.log.levelOverride";
    public static final String GOBBLIN_CUSTOM_JOB_LISTENERS = "gobblin.custom.job.listeners";
    public static final String TEMPLATE_KEY = "gobblin.template.uri";
    private static final String HADOOP_FS_DEFAULT_NAME = "fs.default.name";
    private static final String AZKABAN_LINK_JOBEXEC_URL = "azkaban.link.jobexec.url";
    private static final String AZKABAN_FLOW_EXEC_ID = "azkaban.flow.execid";
    private static final String MAPREDUCE_JOB_CREDENTIALS_BINARY = "mapreduce.job.credentials.binary";
    private static final String AZKABAN_GOBBLIN_JOB_SLA_IN_SECONDS = "gobblin.azkaban.SLAInSeconds";
    private static final String DEFAULT_AZKABAN_GOBBLIN_JOB_SLA_IN_SECONDS = "-1";
    private final Closer closer;
    private final JobLauncher jobLauncher;
    private final JobListener jobListener;
    private final Properties props;
    private final ApplicationLauncher applicationLauncher;
    private final long ownAzkabanSla;
    private static final Logger LOG = Logger.getLogger(AzkabanJobLauncher.class);
    private static final String HADOOP_JAVA_JOB = "hadoopJava";
    private static final String JAVA_JOB = "java";
    private static final String GOBBLIN_JOB = "gobblin";
    private static final Set<String> JOB_TYPES_WITH_AUTOMATIC_TOKEN = Sets.newHashSet(new String[]{HADOOP_JAVA_JOB, JAVA_JOB, GOBBLIN_JOB});

    /* JADX WARN: Multi-variable type inference failed */
    public AzkabanJobLauncher(String str, Properties properties) throws Exception {
        super(str, LOG);
        this.closer = Closer.create();
        HadoopUtils.addGobblinSite();
        ArrayList newArrayList = Lists.newArrayList();
        newArrayList.addAll(Tag.fromMap(AzkabanTags.getAzkabanTags()));
        RootMetricContext.get(newArrayList);
        if (properties.containsKey(GOBBLIN_LOG_LEVEL_KEY)) {
            Logger.getLogger("org.apache.gobblin").setLevel(Level.toLevel(properties.getProperty(GOBBLIN_LOG_LEVEL_KEY), Level.INFO));
        }
        this.props = new Properties();
        this.props.putAll(properties);
        this.jobListener = initJobListener();
        Config propertiesToConfig = ConfigUtils.propertiesToConfig(properties);
        for (Map.Entry entry : DynamicConfigGeneratorFactory.createDynamicConfigGenerator(propertiesToConfig).generateDynamicConfig(propertiesToConfig).entrySet()) {
            this.props.put(entry.getKey(), ((ConfigValue) entry.getValue()).unwrapped().toString());
        }
        Configuration configuration = new Configuration();
        String str2 = configuration.get(HADOOP_FS_DEFAULT_NAME);
        if (!Strings.isNullOrEmpty(str2)) {
            if (!this.props.containsKey("fs.uri")) {
                this.props.setProperty("fs.uri", str2);
            }
            if (!this.props.containsKey("state.store.fs.uri")) {
                this.props.setProperty("state.store.fs.uri", str2);
            }
        }
        this.props.setProperty("job.tracking.url", Strings.nullToEmpty(configuration.get(AZKABAN_LINK_JOBEXEC_URL)));
        if (properties.containsKey("type") && JOB_TYPES_WITH_AUTOMATIC_TOKEN.contains(properties.getProperty("type"))) {
            LOG.info("Job type " + properties.getProperty("type") + " provides Hadoop tokens automatically. Using provided tokens.");
            if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
                this.props.setProperty(MAPREDUCE_JOB_CREDENTIALS_BINARY, System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
            }
        } else {
            LOG.info(String.format("Job type %s does not provide Hadoop tokens. Negotiating Hadoop tokens.", properties.getProperty("type")));
            File createTempFile = File.createTempFile("mr-azkaban", ".token");
            TokenUtils.getHadoopTokens(new State(properties), Optional.of(createTempFile), new Credentials());
            System.setProperty("HADOOP_TOKEN_FILE_LOCATION", createTempFile.getAbsolutePath());
            System.setProperty(MAPREDUCE_JOB_CREDENTIALS_BINARY, createTempFile.getAbsolutePath());
            this.props.setProperty(MAPREDUCE_JOB_CREDENTIALS_BINARY, createTempFile.getAbsolutePath());
            this.props.setProperty("env.HADOOP_TOKEN_FILE_LOCATION", createTempFile.getAbsolutePath());
        }
        Properties properties2 = this.props;
        properties2 = properties2.containsKey(TEMPLATE_KEY) ? ConfigUtils.configToProperties(new PackagedTemplatesJobCatalogDecorator().getTemplate(new URI(properties2.getProperty(TEMPLATE_KEY))).getResolvedConfig(ConfigUtils.propertiesToConfig(properties2))) : properties2;
        GobblinMetrics.addCustomTagsToProperties(properties2, newArrayList);
        if (!properties2.containsKey("launcher.type")) {
            properties2.setProperty("launcher.type", JobLauncherFactory.JobLauncherType.MAPREDUCE.toString());
        }
        this.ownAzkabanSla = Long.parseLong(properties2.getProperty(AZKABAN_GOBBLIN_JOB_SLA_IN_SECONDS, DEFAULT_AZKABAN_GOBBLIN_JOB_SLA_IN_SECONDS));
        this.jobLauncher = this.closer.register(JobLauncherFactory.newJobLauncher(properties2, properties2, (SharedResourcesBroker) null, properties2.containsKey("flow.name") ? addAdditionalMetadataTags(properties2) : Lists.newArrayList()));
        this.applicationLauncher = this.closer.register(new ServiceBasedAppLauncher(properties2, "Azkaban-" + UUID.randomUUID()));
    }

    protected JobListener initJobListener() {
        CompositeJobListener compositeJobListener = new CompositeJobListener();
        try {
            Iterator it = new State(this.props).getPropAsList(GOBBLIN_CUSTOM_JOB_LISTENERS, EmailNotificationJobListener.class.getSimpleName()).iterator();
            while (it.hasNext()) {
                compositeJobListener.addJobListener((JobListener) new ClassAliasResolver(JobListener.class).resolveClass((String) it.next()).newInstance());
            }
            return compositeJobListener;
        } catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {
            throw new IllegalArgumentException(e);
        }
    }

    public void run() throws Exception {
        if (isCurrentTimeInRange()) {
            if (this.ownAzkabanSla <= 0) {
                runRealJob();
                return;
            }
            LOG.info("Found gobblin defined SLA: " + this.ownAzkabanSla);
            ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
            boolean z = false;
            Future submit = newSingleThreadExecutor.submit(new Callable<Void>() { // from class: org.apache.gobblin.azkaban.AzkabanJobLauncher.1
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public Void call() throws Exception {
                    AzkabanJobLauncher.this.runRealJob();
                    return null;
                }
            });
            try {
                try {
                    submit.get(this.ownAzkabanSla, TimeUnit.SECONDS);
                    newSingleThreadExecutor.shutdown();
                    if (0 != 0) {
                        cancel();
                        throw new RuntimeException("Job failed because it reaches SLA limit: " + this.ownAzkabanSla);
                    }
                } catch (TimeoutException e) {
                    LOG.info("Cancelling job since SLA is reached: " + this.ownAzkabanSla);
                    submit.cancel(true);
                    z = true;
                    cancelJob(this.jobListener);
                    newSingleThreadExecutor.shutdown();
                    if (1 != 0) {
                        cancel();
                        throw new RuntimeException("Job failed because it reaches SLA limit: " + this.ownAzkabanSla);
                    }
                }
            } catch (Throwable th) {
                newSingleThreadExecutor.shutdown();
                if (!z) {
                    throw th;
                }
                cancel();
                throw new RuntimeException("Job failed because it reaches SLA limit: " + this.ownAzkabanSla);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void runRealJob() throws Exception {
        try {
            start();
            launchJob(this.jobListener);
            try {
                stop();
            } finally {
            }
        } catch (Throwable th) {
            try {
                stop();
                throw th;
            } finally {
            }
        }
    }

    public void cancel() throws Exception {
        try {
            stop();
        } finally {
            close();
        }
    }

    public void start() throws ApplicationException {
        this.applicationLauncher.start();
    }

    public void stop() throws ApplicationException {
        this.applicationLauncher.stop();
    }

    public void launchJob(@Nullable JobListener jobListener) throws JobException {
        this.jobLauncher.launchJob(jobListener);
    }

    public void cancelJob(@Nullable JobListener jobListener) throws JobException {
        this.jobLauncher.cancelJob(jobListener);
    }

    public void close() throws IOException {
        this.closer.close();
    }

    private boolean isCurrentTimeInRange() {
        Splitter trimResults = Splitter.on(",").omitEmptyStrings().trimResults();
        if (!this.props.contains("azkaban.execution.days.list") || !this.props.contains("azkaban.execution.time.range")) {
            return true;
        }
        List splitToList = trimResults.splitToList(this.props.getProperty("azkaban.execution.time.range"));
        List splitToList2 = trimResults.splitToList(this.props.getProperty("azkaban.execution.days.list"));
        Preconditions.checkArgument(splitToList.size() == 2, "The property azkaban.execution.days.list should be a comma separated list of two entries");
        return TimeRangeChecker.isTimeInRange(splitToList2, (String) splitToList.get(0), (String) splitToList.get(1), new DateTime(DateTimeZone.forID("America/Los_Angeles")));
    }

    private static List<? extends Tag<?>> addAdditionalMetadataTags(Properties properties) {
        ArrayList newArrayList = Lists.newArrayList();
        String property = properties.getProperty(AZKABAN_FLOW_EXEC_ID, "");
        String property2 = properties.getProperty(AZKABAN_LINK_JOBEXEC_URL, "");
        newArrayList.add(new Tag("flowGroup", properties.getProperty("flow.group", "")));
        newArrayList.add(new Tag("flowName", properties.getProperty("flow.name")));
        newArrayList.add(new Tag("flowExecutionId", properties.getProperty("flow.executionId", property)));
        newArrayList.add(new Tag("jobExecutionId", property));
        newArrayList.add(new Tag("jobGroup", properties.getProperty("job.group", "")));
        newArrayList.add(new Tag("jobName", properties.getProperty("job.name", "")));
        newArrayList.add(new Tag(AzkabanClientParams.MESSAGE, property2));
        LOG.debug(String.format("AzkabanJobLauncher.addAdditionalMetadataTags: metadataTags %s", newArrayList));
        return newArrayList;
    }
}
