package org.apache.gobblin.runtime.embedded;

import com.codahale.metrics.MetricFilter;
import com.github.rholder.retry.RetryListener;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.escape.Escaper;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.linkedin.data.template.DataTemplate;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.beans.ConstructorProperties;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javassist.bytecode.ClassFile;
import javax.annotation.Nullable;
import org.apache.avro.SchemaBuilder;
import org.apache.commons.lang3.ClassUtils;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.instrumented.extractor.InstrumentedExtractorBase;
import org.apache.gobblin.metastore.FsStateStore;
import org.apache.gobblin.metrics.GobblinMetrics;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.runtime.JobLauncherFactory;
import org.apache.gobblin.runtime.Task;
import org.apache.gobblin.runtime.api.Configurable;
import org.apache.gobblin.runtime.api.GobblinInstanceEnvironment;
import org.apache.gobblin.runtime.api.GobblinInstancePluginFactory;
import org.apache.gobblin.runtime.api.JobExecutionDriver;
import org.apache.gobblin.runtime.api.JobExecutionResult;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.JobTemplate;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.cli.CliObjectOption;
import org.apache.gobblin.runtime.cli.CliObjectSupport;
import org.apache.gobblin.runtime.cli.ConstructorAndPublicMethodsGobblinCliFactory;
import org.apache.gobblin.runtime.cli.NotOnCli;
import org.apache.gobblin.runtime.instance.SimpleGobblinInstanceEnvironment;
import org.apache.gobblin.runtime.instance.StandardGobblinInstanceDriver;
import org.apache.gobblin.runtime.job_catalog.ImmutableFSJobCatalog;
import org.apache.gobblin.runtime.job_catalog.PackagedTemplatesJobCatalogDecorator;
import org.apache.gobblin.runtime.job_catalog.StaticJobCatalog;
import org.apache.gobblin.runtime.job_spec.ResolvedJobSpec;
import org.apache.gobblin.runtime.plugins.GobblinInstancePluginUtils;
import org.apache.gobblin.runtime.plugins.PluginStaticKeys;
import org.apache.gobblin.runtime.plugins.metrics.GobblinMetricsPlugin;
import org.apache.gobblin.runtime.std.DefaultConfigurableImpl;
import org.apache.gobblin.runtime.std.DefaultJobLifecycleListenerImpl;
import org.apache.gobblin.state.ConstructState;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.PathUtils;
import org.apache.gobblin.util.PullFileLoader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.ClassUtil;
import org.joda.time.Period;
import org.joda.time.ReadableInstant;
import org.reflections.Reflections;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/gobblin/runtime/embedded/EmbeddedGobblin.class */
public class EmbeddedGobblin {
    private static final Logger log = LoggerFactory.getLogger(EmbeddedGobblin.class);
    private static final Splitter KEY_VALUE_SPLITTER = Splitter.on(":").limit(2);
    private final JobSpec.Builder specBuilder;
    private final Map<String, String> userConfigMap;
    private final Map<String, String> builtConfigMap;
    private final Config defaultSysConfig;
    private final Map<String, String> sysConfigOverrides;
    private final Map<String, Integer> distributedJars;
    private Runnable distributeJarsFunction;
    private JobTemplate template;
    private Logger useLog;
    private FullTimeout launchTimeout;
    private FullTimeout jobTimeout;
    private FullTimeout shutdownTimeout;
    private List<GobblinInstancePluginFactory> plugins;
    private Optional<Path> jobFile;

    /* loaded from: input_file:org/apache/gobblin/runtime/embedded/EmbeddedGobblin$CliFactory.class */
    public static class CliFactory extends ConstructorAndPublicMethodsGobblinCliFactory {
        public CliFactory() {
            super(EmbeddedGobblin.class);
        }

        public String getUsageString() {
            return "-jobName <jobName> [OPTIONS]";
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/gobblin/runtime/embedded/EmbeddedGobblin$EmbeddedJobLifecycleListener.class */
    public static class EmbeddedJobLifecycleListener extends DefaultJobLifecycleListenerImpl {
        private final Lock lock;
        private final Condition runningStateCondition;
        private volatile boolean running;
        private JobExecutionDriver jobDriver;

        public EmbeddedJobLifecycleListener(Logger logger) {
            super(logger);
            this.lock = new ReentrantLock();
            this.runningStateCondition = this.lock.newCondition();
            this.running = false;
        }

        public boolean awaitStarted(long j, TimeUnit timeUnit) throws InterruptedException {
            this.lock.lock();
            try {
                long currentTimeMillis = System.currentTimeMillis();
                long millis = timeUnit.toMillis(j);
                while (!this.running) {
                    long currentTimeMillis2 = millis - (System.currentTimeMillis() - currentTimeMillis);
                    if (currentTimeMillis2 < 0) {
                        return false;
                    }
                    this.runningStateCondition.await(currentTimeMillis2, TimeUnit.MILLISECONDS);
                }
                this.lock.unlock();
                return true;
            } finally {
                this.lock.unlock();
            }
        }

        @Override // org.apache.gobblin.runtime.std.DefaultJobLifecycleListenerImpl, org.apache.gobblin.runtime.api.JobLifecycleListener
        public void onJobLaunch(JobExecutionDriver jobExecutionDriver) {
            if (this.jobDriver != null) {
                throw new IllegalStateException("OnJobLaunch called when a job was already running.");
            }
            super.onJobLaunch(jobExecutionDriver);
            this.lock.lock();
            try {
                this.running = true;
                this.jobDriver = jobExecutionDriver;
                this.runningStateCondition.signal();
            } finally {
                this.lock.unlock();
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public JobExecutionDriver getJobDriver() {
            return this.jobDriver;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/gobblin/runtime/embedded/EmbeddedGobblin$FullTimeout.class */
    public static class FullTimeout {
        private final long timeout;
        private final TimeUnit timeUnit;

        @ConstructorProperties({"timeout", "timeUnit"})
        public FullTimeout(long j, TimeUnit timeUnit) {
            this.timeout = j;
            this.timeUnit = timeUnit;
        }

        public long getTimeout() {
            return this.timeout;
        }

        public TimeUnit getTimeUnit() {
            return this.timeUnit;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof FullTimeout)) {
                return false;
            }
            FullTimeout fullTimeout = (FullTimeout) obj;
            if (!fullTimeout.canEqual(this) || getTimeout() != fullTimeout.getTimeout()) {
                return false;
            }
            TimeUnit timeUnit = getTimeUnit();
            TimeUnit timeUnit2 = fullTimeout.getTimeUnit();
            return timeUnit == null ? timeUnit2 == null : timeUnit.equals(timeUnit2);
        }

        protected boolean canEqual(Object obj) {
            return obj instanceof FullTimeout;
        }

        public int hashCode() {
            long timeout = getTimeout();
            int i = (1 * 59) + ((int) ((timeout >>> 32) ^ timeout));
            TimeUnit timeUnit = getTimeUnit();
            return (i * 59) + (timeUnit == null ? 43 : timeUnit.hashCode());
        }

        public String toString() {
            return "EmbeddedGobblin.FullTimeout(timeout=" + getTimeout() + ", timeUnit=" + getTimeUnit() + ")";
        }
    }

    public EmbeddedGobblin() {
        this("EmbeddedGobblin");
    }

    @CliObjectSupport(argumentNames = {"jobName"})
    public EmbeddedGobblin(String str) {
        this.useLog = log;
        this.launchTimeout = new FullTimeout(10L, TimeUnit.SECONDS);
        this.jobTimeout = new FullTimeout(10L, TimeUnit.DAYS);
        this.shutdownTimeout = new FullTimeout(10L, TimeUnit.SECONDS);
        this.plugins = Lists.newArrayList();
        this.jobFile = Optional.absent();
        HadoopUtils.addGobblinSite();
        this.specBuilder = new JobSpec.Builder(str);
        this.userConfigMap = Maps.newHashMap();
        this.builtConfigMap = Maps.newHashMap();
        this.sysConfigOverrides = Maps.newHashMap();
        this.defaultSysConfig = getDefaultSysConfig();
        this.distributedJars = Maps.newHashMap();
        loadCoreGobblinJarsToDistributedJars();
        this.distributeJarsFunction = new Runnable() { // from class: org.apache.gobblin.runtime.embedded.EmbeddedGobblin.1
            @Override // java.lang.Runnable
            public void run() {
            }
        };
    }

    public EmbeddedGobblin mrMode() throws IOException {
        this.sysConfigOverrides.put("launcher.type", JobLauncherFactory.JobLauncherType.MAPREDUCE.name());
        this.builtConfigMap.put("fs.uri", FileSystem.get(new Configuration()).getUri().toString());
        this.builtConfigMap.put("mr.job.root.dir", "/tmp/EmbeddedGobblin_" + System.currentTimeMillis());
        this.distributeJarsFunction = new Runnable() { // from class: org.apache.gobblin.runtime.embedded.EmbeddedGobblin.2
            @Override // java.lang.Runnable
            public void run() {
                EmbeddedGobblin.this.sysConfigOverrides.put("job.jars", Joiner.on(",").join(EmbeddedGobblin.this.getPrioritizedDistributedJars()));
            }
        };
        return this;
    }

    public EmbeddedGobblin distributeJar(String str) {
        return distributeJarWithPriority(str, 0);
    }

    public EmbeddedGobblin distributeJarByClassWithPriority(Class<?> cls, int i) {
        return distributeJarWithPriority(ClassUtil.findContainingJar(cls), i);
    }

    public synchronized EmbeddedGobblin distributeJarWithPriority(String str, int i) {
        if (this.distributedJars.containsKey(str)) {
            this.distributedJars.put(str, Integer.valueOf(Math.min(i, this.distributedJars.get(str).intValue())));
        } else {
            this.distributedJars.put(str, Integer.valueOf(i));
        }
        return this;
    }

    public EmbeddedGobblin setTemplate(JobTemplate jobTemplate) {
        this.template = jobTemplate;
        return this;
    }

    public EmbeddedGobblin setTemplate(String str) throws URISyntaxException, SpecNotFoundException, JobTemplate.TemplateException {
        return setTemplate(new PackagedTemplatesJobCatalogDecorator().getTemplate(new URI(str)));
    }

    public EmbeddedGobblin usePlugin(GobblinInstancePluginFactory gobblinInstancePluginFactory) {
        this.plugins.add(gobblinInstancePluginFactory);
        return this;
    }

    public EmbeddedGobblin usePlugin(String str) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
        return usePlugin(GobblinInstancePluginUtils.instantiatePluginByAlias(str));
    }

    public EmbeddedGobblin sysConfig(String str, String str2) {
        this.sysConfigOverrides.put(str, str2);
        return this;
    }

    public EmbeddedGobblin sysConfig(String str) {
        List splitToList = KEY_VALUE_SPLITTER.splitToList(str);
        if (splitToList.size() != 2) {
            throw new RuntimeException("Cannot parse " + str + ". Expected <key>:<value>.");
        }
        return sysConfig((String) splitToList.get(0), (String) splitToList.get(1));
    }

    public EmbeddedGobblin jobFile(String str) {
        this.jobFile = Optional.of(new Path(str));
        return this;
    }

    @CliObjectOption(description = "Authenticate using kerberos. Format: \"<login-user>:<keytab-file>\".")
    public EmbeddedGobblin kerberosAuthentication(String str) {
        List splitToList = Splitter.on(":").splitToList(str);
        if (splitToList.size() != 2) {
            throw new RuntimeException("Cannot parse " + str + ". Expected <login-user>:<keytab-file>");
        }
        try {
            usePlugin(PluginStaticKeys.HADOOP_LOGIN_FROM_KEYTAB_ALIAS);
            sysConfig(PluginStaticKeys.LOGIN_USER_FULL_KEY, (String) splitToList.get(0));
            sysConfig(PluginStaticKeys.LOGIN_USER_KEYTAB_FILE_FULL_KEY, (String) splitToList.get(1));
            return this;
        } catch (ReflectiveOperationException e) {
            throw new RuntimeException(String.format("Could not instantiate %s. Make sure gobblin-runtime-hadoop is in your classpath.", PluginStaticKeys.HADOOP_LOGIN_FROM_KEYTAB_ALIAS), e);
        }
    }

    public EmbeddedGobblin setConfiguration(String str, String str2) {
        this.userConfigMap.put(str, str2);
        return this;
    }

    public EmbeddedGobblin setConfiguration(String str) {
        List splitToList = KEY_VALUE_SPLITTER.splitToList(str);
        if (splitToList.size() != 2) {
            throw new RuntimeException("Cannot parse " + str + ". Expected <key>:<value>.");
        }
        return setConfiguration((String) splitToList.get(0), (String) splitToList.get(1));
    }

    public EmbeddedGobblin setJobTimeout(long j, TimeUnit timeUnit) {
        this.jobTimeout = new FullTimeout(j, timeUnit);
        return this;
    }

    public EmbeddedGobblin setJobTimeout(String str) {
        return setJobTimeout(Period.parse(str).getSeconds(), TimeUnit.SECONDS);
    }

    public EmbeddedGobblin setLaunchTimeout(long j, TimeUnit timeUnit) {
        this.launchTimeout = new FullTimeout(j, timeUnit);
        return this;
    }

    public EmbeddedGobblin setLaunchTimeout(String str) {
        return setLaunchTimeout(Period.parse(str).getSeconds(), TimeUnit.SECONDS);
    }

    public EmbeddedGobblin setShutdownTimeout(long j, TimeUnit timeUnit) {
        this.shutdownTimeout = new FullTimeout(j, timeUnit);
        return this;
    }

    public EmbeddedGobblin setShutdownTimeout(String str) {
        return setShutdownTimeout(Period.parse(str).getSeconds(), TimeUnit.SECONDS);
    }

    public EmbeddedGobblin useStateStore(String str) {
        setConfiguration("state.store.enabled", "true");
        setConfiguration("state.store.dir", str);
        return this;
    }

    public EmbeddedGobblin enableMetrics() {
        usePlugin(new GobblinMetricsPlugin.Factory());
        sysConfig("metrics.enabled", Boolean.toString(true));
        return this;
    }

    protected Config getDefaultSysConfig() {
        return ConfigFactory.parseResources("embedded/embedded.conf");
    }

    @NotOnCli
    public JobExecutionResult run() throws InterruptedException, TimeoutException, ExecutionException {
        return runAsync().get(this.jobTimeout.getTimeout(), this.jobTimeout.getTimeUnit());
    }

    @NotOnCli
    public JobExecutionDriver runAsync() throws TimeoutException, InterruptedException {
        JobSpec apply;
        this.distributeJarsFunction.run();
        Config withFallback = ConfigFactory.parseMap(this.builtConfigMap).withFallback(this.defaultSysConfig);
        Config parseMap = ConfigFactory.parseMap(this.userConfigMap);
        if (this.jobFile.isPresent()) {
            try {
                Path path = (Path) this.jobFile.get();
                apply = new ImmutableFSJobCatalog.JobSpecConverter(path.getParent(), Optional.absent()).apply(parseMap.withFallback(new PullFileLoader(path.getParent(), path.getFileSystem(new Configuration()), PullFileLoader.DEFAULT_JAVA_PROPS_PULL_FILE_EXTENSIONS, PullFileLoader.DEFAULT_HOCON_PULL_FILE_EXTENSIONS).loadPullFile(path, withFallback, false)));
            } catch (IOException e) {
                throw new RuntimeException("Failed to run embedded Gobblin.", e);
            }
        } else {
            Config withFallback2 = parseMap.withFallback(withFallback);
            if (this.template != null) {
                try {
                    withFallback2 = this.template.getResolvedConfig(withFallback2);
                } catch (JobTemplate.TemplateException | SpecNotFoundException e2) {
                    throw new RuntimeException(e2);
                }
            }
            apply = this.specBuilder.withConfig(withFallback2).build();
        }
        try {
            StandardGobblinInstanceDriver.Builder withImmediateJobScheduler = new StandardGobblinInstanceDriver.Builder((Optional<GobblinInstanceEnvironment>) Optional.of(new SimpleGobblinInstanceEnvironment("EmbeddedGobblinInstance", this.useLog, getSysConfig()))).withLog(this.useLog).withJobCatalog(new StaticJobCatalog((Optional<Logger>) Optional.of(this.useLog), Lists.newArrayList(new JobSpec[]{new ResolvedJobSpec(apply)}))).withImmediateJobScheduler();
            Iterator<GobblinInstancePluginFactory> it = this.plugins.iterator();
            while (it.hasNext()) {
                withImmediateJobScheduler.addPlugin(it.next());
            }
            final StandardGobblinInstanceDriver build = withImmediateJobScheduler.build();
            EmbeddedJobLifecycleListener embeddedJobLifecycleListener = new EmbeddedJobLifecycleListener(this.useLog);
            build.registerJobLifecycleListener(embeddedJobLifecycleListener);
            build.startAsync();
            if (embeddedJobLifecycleListener.awaitStarted(this.launchTimeout.getTimeout(), this.launchTimeout.getTimeUnit())) {
                Futures.addCallback(embeddedJobLifecycleListener.getJobDriver(), new FutureCallback<JobExecutionResult>() { // from class: org.apache.gobblin.runtime.embedded.EmbeddedGobblin.3
                    public void onSuccess(@Nullable JobExecutionResult jobExecutionResult) {
                        stopGobblinInstanceDriver();
                    }

                    public void onFailure(Throwable th) {
                        stopGobblinInstanceDriver();
                    }

                    private void stopGobblinInstanceDriver() {
                        try {
                            build.stopAsync();
                            build.awaitTerminated(EmbeddedGobblin.this.shutdownTimeout.getTimeout(), EmbeddedGobblin.this.shutdownTimeout.getTimeUnit());
                        } catch (TimeoutException e3) {
                            EmbeddedGobblin.log.error("Failed to shutdown Gobblin instance driver.");
                        }
                    }
                });
                return embeddedJobLifecycleListener.getJobDriver();
            }
            log.warn("Timeout waiting for job to start. Aborting.");
            build.stopAsync();
            build.awaitTerminated(this.shutdownTimeout.getTimeout(), this.shutdownTimeout.getTimeUnit());
            throw new TimeoutException("Timeout waiting for job to start.");
        } catch (JobTemplate.TemplateException | SpecNotFoundException e3) {
            throw new RuntimeException("Failed to resolved template.", e3);
        }
    }

    private Configurable getSysConfig() {
        return DefaultConfigurableImpl.createFromConfig(ConfigFactory.parseMap(this.sysConfigOverrides).withFallback(this.defaultSysConfig));
    }

    private void loadCoreGobblinJarsToDistributedJars() {
        distributeJarByClassWithPriority(State.class, 0);
        distributeJarByClassWithPriority(ConstructState.class, 0);
        distributeJarByClassWithPriority(InstrumentedExtractorBase.class, 0);
        distributeJarByClassWithPriority(MetricContext.class, 0);
        distributeJarByClassWithPriority(GobblinMetrics.class, 0);
        distributeJarByClassWithPriority(FsStateStore.class, 0);
        distributeJarByClassWithPriority(Task.class, 0);
        distributeJarByClassWithPriority(PathUtils.class, 0);
        distributeJarByClassWithPriority(ReadableInstant.class, 0);
        distributeJarByClassWithPriority(Escaper.class, -10);
        distributeJarByClassWithPriority(MetricFilter.class, 0);
        distributeJarByClassWithPriority(DataTemplate.class, 0);
        distributeJarByClassWithPriority(ClassUtils.class, 0);
        distributeJarByClassWithPriority(SchemaBuilder.class, 0);
        distributeJarByClassWithPriority(RetryListener.class, 0);
        distributeJarByClassWithPriority(ConfigFactory.class, 0);
        distributeJarByClassWithPriority(Reflections.class, 0);
        distributeJarByClassWithPriority(ClassFile.class, 0);
    }

    @VisibleForTesting
    protected List<String> getPrioritizedDistributedJars() {
        ArrayList newArrayList = Lists.newArrayList(this.distributedJars.entrySet());
        Collections.sort(newArrayList, new Comparator<Map.Entry<String, Integer>>() { // from class: org.apache.gobblin.runtime.embedded.EmbeddedGobblin.4
            @Override // java.util.Comparator
            public int compare(Map.Entry<String, Integer> entry, Map.Entry<String, Integer> entry2) {
                return Integer.compare(entry.getValue().intValue(), entry2.getValue().intValue());
            }
        });
        return Lists.transform(newArrayList, new Function<Map.Entry<String, Integer>, String>() { // from class: org.apache.gobblin.runtime.embedded.EmbeddedGobblin.5
            public String apply(Map.Entry<String, Integer> entry) {
                return entry.getKey();
            }
        });
    }
}
