package gobblin.runtime.spec_store;

import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.typesafe.config.Config;
import gobblin.runtime.api.GobblinInstanceEnvironment;
import gobblin.runtime.api.Spec;
import gobblin.runtime.api.SpecNotFoundException;
import gobblin.runtime.api.SpecSerDe;
import gobblin.runtime.api.SpecStore;
import gobblin.util.PathUtils;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import org.apache.commons.compress.utils.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:gobblin/runtime/spec_store/FSSpecStore.class */
public class FSSpecStore implements SpecStore {
    protected final Logger log;
    protected final Config sysConfig;
    protected final FileSystem fs;
    protected final String fsSpecStoreDir;
    protected final Path fsSpecStoreDirPath;
    protected final SpecSerDe specSerDe;

    public FSSpecStore(GobblinInstanceEnvironment gobblinInstanceEnvironment, SpecSerDe specSerDe) throws IOException {
        this(gobblinInstanceEnvironment.getSysConfig().getConfig(), specSerDe, (Optional<Logger>) Optional.absent());
    }

    public FSSpecStore(Config config, SpecSerDe specSerDe) throws IOException {
        this(config, specSerDe, (Optional<Logger>) Optional.absent());
    }

    public FSSpecStore(GobblinInstanceEnvironment gobblinInstanceEnvironment, SpecSerDe specSerDe, Optional<Logger> optional) throws IOException {
        this(gobblinInstanceEnvironment.getSysConfig().getConfig(), specSerDe, optional);
    }

    public FSSpecStore(Config config, SpecSerDe specSerDe, Optional<Logger> optional) throws IOException {
        Preconditions.checkArgument(config.hasPath("specStore.fs.dir"), "FS SpecStore path must be specified.");
        this.log = optional.isPresent() ? (Logger) optional.get() : LoggerFactory.getLogger(getClass());
        this.sysConfig = config;
        this.specSerDe = specSerDe;
        this.fsSpecStoreDir = this.sysConfig.getString("specStore.fs.dir");
        this.fsSpecStoreDirPath = new Path(this.fsSpecStoreDir);
        this.log.info("FSSpecStore directory is: " + this.fsSpecStoreDir);
        try {
            this.fs = this.fsSpecStoreDirPath.getFileSystem(new Configuration());
            if (this.fs.exists(this.fsSpecStoreDirPath)) {
                return;
            }
            this.log.info("FSSpecStore directory: " + this.fsSpecStoreDir + " did not exist. Creating it.");
            this.fs.mkdirs(this.fsSpecStoreDirPath);
        } catch (IOException e) {
            throw new RuntimeException("Unable to detect job config directory file system: " + e, e);
        }
    }

    @Override // gobblin.runtime.api.SpecStore
    public boolean exists(URI uri) throws IOException {
        Preconditions.checkArgument(null != uri, "Spec URI should not be null");
        for (FileStatus fileStatus : this.fs.listStatus(this.fsSpecStoreDirPath)) {
            if (StringUtils.startsWith(fileStatus.getPath().getName(), uri.toString())) {
                return true;
            }
        }
        return false;
    }

    @Override // gobblin.runtime.api.SpecStore
    public void addSpec(Spec spec) throws IOException {
        Preconditions.checkArgument(null != spec, "Spec should not be null");
        this.log.info(String.format("Adding Spec with URI: %s in FSSpecStore: %s", spec.getUri(), this.fsSpecStoreDirPath));
        writeSpecToFile(getPathForURI(this.fsSpecStoreDirPath, spec.getUri(), spec.getVersion()), spec);
    }

    @Override // gobblin.runtime.api.SpecStore
    public boolean deleteSpec(Spec spec) throws IOException {
        Preconditions.checkArgument(null != spec, "Spec should not be null");
        return deleteSpec(spec.getUri(), spec.getVersion());
    }

    @Override // gobblin.runtime.api.SpecStore
    public boolean deleteSpec(URI uri) throws IOException {
        Preconditions.checkArgument(null != uri, "Spec URI should not be null");
        try {
            return deleteSpec(uri, getSpec(uri).getVersion());
        } catch (SpecNotFoundException e) {
            throw new IOException(String.format("Issue in removing Spec: %s", uri), e);
        }
    }

    @Override // gobblin.runtime.api.SpecStore
    public boolean deleteSpec(URI uri, String str) throws IOException {
        Preconditions.checkArgument(null != uri, "Spec URI should not be null");
        Preconditions.checkArgument(null != str, "Version should not be null");
        try {
            this.log.info(String.format("Deleting Spec with URI: %s in FSSpecStore: %s", uri, this.fsSpecStoreDirPath));
            Path pathForURI = getPathForURI(this.fsSpecStoreDirPath, uri, str);
            if (this.fs.exists(pathForURI)) {
                return this.fs.delete(pathForURI, false);
            }
            this.log.warn("No file with URI:" + uri + " is found. Deletion failed.");
            return false;
        } catch (IOException e) {
            throw new IOException(String.format("Issue in removing Spec: %s for Version: %s", uri, str), e);
        }
    }

    @Override // gobblin.runtime.api.SpecStore
    public Spec updateSpec(Spec spec) throws IOException, SpecNotFoundException {
        Preconditions.checkArgument(null != spec, "Spec should not be null");
        this.log.info(String.format("Updating Spec with URI: %s in FSSpecStore: %s", spec.getUri(), this.fsSpecStoreDirPath));
        writeSpecToFile(getPathForURI(this.fsSpecStoreDirPath, spec.getUri(), spec.getVersion()), spec);
        return spec;
    }

    @Override // gobblin.runtime.api.SpecStore
    public Spec getSpec(URI uri) throws IOException, SpecNotFoundException {
        Preconditions.checkArgument(null != uri, "Spec URI should not be null");
        Spec spec = null;
        for (Spec spec2 : getAllVersionsOfSpec(uri)) {
            if (null == spec) {
                spec = spec2;
            } else if (null != spec2.getVersion() && spec2.getVersion().compareTo(spec2.getVersion()) > 0) {
                spec = spec2;
            }
        }
        if (null == spec) {
            throw new SpecNotFoundException(uri);
        }
        return spec;
    }

    @Override // gobblin.runtime.api.SpecStore
    public Spec getSpec(URI uri, String str) throws IOException, SpecNotFoundException {
        Preconditions.checkArgument(null != uri, "Spec URI should not be null");
        Preconditions.checkArgument(null != str, "Version should not be null");
        Path pathForURI = getPathForURI(this.fsSpecStoreDirPath, uri, str);
        if (this.fs.exists(pathForURI)) {
            return readSpecFromFile(pathForURI);
        }
        throw new SpecNotFoundException(uri);
    }

    @Override // gobblin.runtime.api.SpecStore
    public Collection<Spec> getAllVersionsOfSpec(URI uri) throws IOException, SpecNotFoundException {
        Preconditions.checkArgument(null != uri, "Spec URI should not be null");
        Collection<Spec> specs = getSpecs();
        ArrayList newArrayList = Lists.newArrayList();
        for (Spec spec : specs) {
            if (spec.getUri().equals(uri)) {
                newArrayList.add(spec);
            }
        }
        if (newArrayList.size() == 0) {
            throw new SpecNotFoundException(uri);
        }
        return newArrayList;
    }

    @Override // gobblin.runtime.api.SpecStore
    public Collection<Spec> getSpecs() throws IOException {
        ArrayList newArrayList = Lists.newArrayList();
        try {
            getSpecs(this.fsSpecStoreDirPath, newArrayList);
            return newArrayList;
        } catch (Exception e) {
            throw new IOException(e);
        }
    }

    private void getSpecs(Path path, Collection<Spec> collection) throws IOException {
        for (FileStatus fileStatus : this.fs.listStatus(path)) {
            if (fileStatus.isDirectory()) {
                getSpecs(fileStatus.getPath(), collection);
            } else {
                collection.add(readSpecFromFile(fileStatus.getPath()));
            }
        }
    }

    protected Spec readSpecFromFile(Path path) throws IOException {
        FSDataInputStream open = this.fs.open(path);
        Throwable th = null;
        try {
            try {
                Spec deserialize = this.specSerDe.deserialize(IOUtils.toByteArray(open));
                if (open != null) {
                    if (0 != 0) {
                        try {
                            open.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        open.close();
                    }
                }
                return deserialize;
            } finally {
            }
        } catch (Throwable th3) {
            if (open != null) {
                if (th != null) {
                    try {
                        open.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    open.close();
                }
            }
            throw th3;
        }
    }

    protected void writeSpecToFile(Path path, Spec spec) throws IOException {
        if (this.fs.exists(path)) {
            this.fs.delete(path, true);
        }
        byte[] serialize = this.specSerDe.serialize(spec);
        FSDataOutputStream create = this.fs.create(path);
        Throwable th = null;
        try {
            create.write(serialize);
            if (create != null) {
                if (0 == 0) {
                    create.close();
                    return;
                }
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (create != null) {
                if (0 != 0) {
                    try {
                        create.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    create.close();
                }
            }
            throw th3;
        }
    }

    protected Path getPathForURI(Path path, URI uri, String str) {
        return PathUtils.addExtension(PathUtils.mergePaths(path, new Path(uri)), new String[]{str});
    }
}
