package org.apache.gobblin.runtime.job_catalog;

import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.typesafe.config.Config;
import java.beans.ConstructorProperties;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.gobblin.metrics.GobblinMetrics;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.runtime.api.GobblinInstanceEnvironment;
import org.apache.gobblin.runtime.api.JobCatalog;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.JobSpecNotFoundException;
import org.apache.gobblin.runtime.template.HOCONInputStreamJobTemplate;
import org.apache.gobblin.util.PathUtils;
import org.apache.gobblin.util.PullFileLoader;
import org.apache.gobblin.util.filesystem.PathAlterationObserver;
import org.apache.gobblin.util.filesystem.PathAlterationObserverScheduler;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/gobblin/runtime/job_catalog/ImmutableFSJobCatalog.class */
public class ImmutableFSJobCatalog extends JobCatalogBase implements JobCatalog {
    protected final FileSystem fs;
    protected final Config sysConfig;
    protected static final Logger LOGGER = LoggerFactory.getLogger(ImmutableFSJobCatalog.class);
    protected final PullFileLoader loader;
    protected final Path jobConfDirPath;
    private final JobSpecConverter converter;
    protected final PathAlterationObserverScheduler pathAlterationDetector;
    public static final String FS_CATALOG_KEY_PREFIX = "gobblin.fsJobCatalog";
    public static final String VERSION_KEY_IN_JOBSPEC = "gobblin.fsJobCatalog.version";
    public static final String DESCRIPTION_KEY_IN_JOBSPEC = "gobblin.fsJobCatalog.description";

    /* loaded from: input_file:org/apache/gobblin/runtime/job_catalog/ImmutableFSJobCatalog$ConfigAccessor.class */
    public static class ConfigAccessor {
        private final Config cfg;
        private final long pollingInterval;
        private final String jobConfDir;
        private final Path jobConfDirPath;
        private final FileSystem jobConfDirFileSystem;
        private final Set<String> JobConfigurationFileExtensions;

        public ConfigAccessor(Config config) {
            this.cfg = config;
            this.pollingInterval = this.cfg.hasPath("jobconf.monitor.interval") ? this.cfg.getLong("jobconf.monitor.interval") : 30000L;
            if (this.cfg.hasPath("jobconf.fullyQualifiedPath")) {
                this.jobConfDir = this.cfg.getString("jobconf.fullyQualifiedPath");
            } else {
                if (!this.cfg.hasPath("jobconf.dir")) {
                    throw new IllegalArgumentException("Expected jobconf.fullyQualifiedPath or jobconf.dir properties.");
                }
                this.jobConfDir = "file://" + new File(this.cfg.getString("jobconf.dir")).getAbsolutePath();
            }
            this.jobConfDirPath = new Path(this.jobConfDir);
            try {
                this.jobConfDirFileSystem = this.jobConfDirPath.getFileSystem(new Configuration());
                this.JobConfigurationFileExtensions = ImmutableSet.copyOf(Splitter.on(",").omitEmptyStrings().trimResults().split(getJobConfigurationFileExtensionsString()));
            } catch (IOException e) {
                throw new RuntimeException("Unable to detect job config directory file system: " + e, e);
            }
        }

        private String getJobConfigurationFileExtensionsString() {
            return this.cfg.hasPath("jobconf.extensions") ? this.cfg.getString("jobconf.extensions").toLowerCase() : "pull,job";
        }

        public Config getCfg() {
            return this.cfg;
        }

        public long getPollingInterval() {
            return this.pollingInterval;
        }

        public String getJobConfDir() {
            return this.jobConfDir;
        }

        public Path getJobConfDirPath() {
            return this.jobConfDirPath;
        }

        public FileSystem getJobConfDirFileSystem() {
            return this.jobConfDirFileSystem;
        }

        public Set<String> getJobConfigurationFileExtensions() {
            return this.JobConfigurationFileExtensions;
        }
    }

    /* loaded from: input_file:org/apache/gobblin/runtime/job_catalog/ImmutableFSJobCatalog$JobSpecConverter.class */
    public static class JobSpecConverter implements Function<Config, JobSpec> {
        private final Path jobConfigDirPath;
        private final Optional<String> extensionToStrip;

        public URI computeURI(Path path) {
            URI uri = PathUtils.relativizePath(path, this.jobConfigDirPath).toUri();
            if (this.extensionToStrip.isPresent()) {
                uri = PathUtils.removeExtension(new Path(uri), new String[]{(String) this.extensionToStrip.get()}).toUri();
            }
            return uri;
        }

        @Nullable
        public JobSpec apply(Config config) {
            URI computeURI = computeURI(new Path(config.getString("job.config.path")));
            JobSpec.Builder withVersion = JobSpec.builder(computeURI).withConfig(config.withoutPath(ImmutableFSJobCatalog.FS_CATALOG_KEY_PREFIX)).withDescription(config.hasPath(ImmutableFSJobCatalog.DESCRIPTION_KEY_IN_JOBSPEC) ? config.getString(ImmutableFSJobCatalog.DESCRIPTION_KEY_IN_JOBSPEC) : "Gobblin job " + computeURI).withVersion(config.hasPath(ImmutableFSJobCatalog.VERSION_KEY_IN_JOBSPEC) ? config.getString(ImmutableFSJobCatalog.VERSION_KEY_IN_JOBSPEC) : HOCONInputStreamJobTemplate.DEFAULT_VERSION);
            if (config.hasPath("job.template")) {
                try {
                    withVersion.withTemplate(new URI(config.getString("job.template")));
                } catch (URISyntaxException e) {
                    throw new RuntimeException("Bad job template URI " + e, e);
                }
            }
            return withVersion.build();
        }

        @ConstructorProperties({"jobConfigDirPath", "extensionToStrip"})
        public JobSpecConverter(Path path, Optional<String> optional) {
            this.jobConfigDirPath = path;
            this.extensionToStrip = optional;
        }
    }

    public ImmutableFSJobCatalog(Config config) throws IOException {
        this(config, (PathAlterationObserver) null);
    }

    public ImmutableFSJobCatalog(GobblinInstanceEnvironment gobblinInstanceEnvironment) throws IOException {
        this(gobblinInstanceEnvironment.getSysConfig().getConfig(), null, Optional.of(gobblinInstanceEnvironment.getMetricContext()), gobblinInstanceEnvironment.isInstrumentationEnabled());
    }

    public ImmutableFSJobCatalog(GobblinInstanceEnvironment gobblinInstanceEnvironment, PathAlterationObserver pathAlterationObserver) throws IOException {
        this(gobblinInstanceEnvironment.getSysConfig().getConfig(), pathAlterationObserver, Optional.of(gobblinInstanceEnvironment.getMetricContext()), gobblinInstanceEnvironment.isInstrumentationEnabled());
    }

    public ImmutableFSJobCatalog(Config config, PathAlterationObserver pathAlterationObserver) throws IOException {
        this(config, pathAlterationObserver, Optional.absent(), GobblinMetrics.isEnabled(config));
    }

    public ImmutableFSJobCatalog(Config config, PathAlterationObserver pathAlterationObserver, Optional<MetricContext> optional, boolean z) throws IOException {
        super(Optional.of(LOGGER), optional, z, Optional.of(config));
        this.sysConfig = config;
        ConfigAccessor configAccessor = new ConfigAccessor(this.sysConfig);
        this.jobConfDirPath = configAccessor.getJobConfDirPath();
        this.fs = configAccessor.getJobConfDirFileSystem();
        this.loader = new PullFileLoader(this.jobConfDirPath, this.jobConfDirPath.getFileSystem(new Configuration()), configAccessor.getJobConfigurationFileExtensions(), PullFileLoader.DEFAULT_HOCON_PULL_FILE_EXTENSIONS);
        this.converter = new JobSpecConverter(this.jobConfDirPath, getInjectedExtension());
        long pollingInterval = configAccessor.getPollingInterval();
        if (pollingInterval == -1) {
            this.pathAlterationDetector = null;
            return;
        }
        this.pathAlterationDetector = new PathAlterationObserverScheduler(pollingInterval);
        Optional fromNullable = Optional.fromNullable(pathAlterationObserver);
        this.pathAlterationDetector.addPathAlterationObserver(new FSPathAlterationListenerAdaptor(this.jobConfDirPath, this.loader, this.sysConfig, this.listeners, this.converter), fromNullable, this.jobConfDirPath);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.gobblin.runtime.job_catalog.JobCatalogBase
    public void startUp() throws IOException {
        super.startUp();
        if (this.pathAlterationDetector != null) {
            this.pathAlterationDetector.start();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.gobblin.runtime.job_catalog.JobCatalogBase
    public void shutDown() throws IOException {
        try {
            if (this.pathAlterationDetector != null) {
                this.pathAlterationDetector.stop();
            }
            super.shutDown();
        } catch (InterruptedException e) {
            throw new RuntimeException("Failed to stop " + ImmutableFSJobCatalog.class.getName(), e);
        }
    }

    @Override // org.apache.gobblin.runtime.api.JobCatalog
    public synchronized List<JobSpec> getJobs() {
        return Lists.transform(Lists.newArrayList(this.loader.loadPullFilesRecursively(this.loader.getRootDirectory(), this.sysConfig, shouldLoadGlobalConf())), this.converter);
    }

    @Override // org.apache.gobblin.runtime.api.JobCatalog
    public synchronized JobSpec getJobSpec(URI uri) throws JobSpecNotFoundException {
        try {
            return this.converter.apply(this.loader.loadPullFile(getPathForURI(this.jobConfDirPath, uri), this.sysConfig, shouldLoadGlobalConf()));
        } catch (FileNotFoundException e) {
            throw new JobSpecNotFoundException(uri);
        } catch (IOException e2) {
            throw new RuntimeException("IO exception thrown on loading single job configuration file:" + e2.getMessage());
        }
    }

    public boolean shouldLoadGlobalConf() {
        return true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Path getPathForURI(Path path, URI uri) {
        return PathUtils.mergePaths(path, new Path(uri));
    }

    protected Optional<String> getInjectedExtension() {
        return Optional.absent();
    }
}
