package org.apache.gobblin.metastore;

import com.google.common.base.Predicate;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.io.Closer;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.WritableShimSerialization;
import org.apache.gobblin.util.hadoop.GobblinSequenceFileReader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.DefaultCodec;

/* loaded from: input_file:org/apache/gobblin/metastore/FsStateStore.class */
public class FsStateStore<T extends State> implements StateStore<T> {
    public static final String TMP_FILE_PREFIX = "_tmp_";
    protected final Configuration conf;
    protected final FileSystem fs;
    protected boolean useTmpFileForPut;
    protected final String storeRootDir;
    protected final Class<T> stateClass;

    public FsStateStore(String str, String str2, Class<T> cls) throws IOException {
        this.conf = getConf(null);
        this.fs = FileSystem.get(URI.create(str), this.conf);
        this.useTmpFileForPut = !HadoopUtils.FS_SCHEMES_NON_ATOMIC.contains(this.fs.getUri().getScheme());
        this.storeRootDir = str2;
        this.stateClass = cls;
    }

    private Configuration getConf(Configuration configuration) {
        Configuration configuration2 = configuration == null ? new Configuration() : new Configuration(configuration);
        WritableShimSerialization.addToHadoopConfiguration(configuration2);
        return configuration2;
    }

    public FsStateStore(FileSystem fileSystem, String str, Class<T> cls) {
        this.fs = fileSystem;
        this.useTmpFileForPut = !HadoopUtils.FS_SCHEMES_NON_ATOMIC.contains(this.fs.getUri().getScheme());
        this.conf = getConf(this.fs.getConf());
        this.storeRootDir = str;
        this.stateClass = cls;
    }

    public FsStateStore(String str, Class<T> cls) throws IOException {
        this.conf = getConf(null);
        Path path = new Path(str);
        this.fs = path.getFileSystem(this.conf);
        this.useTmpFileForPut = !HadoopUtils.FS_SCHEMES_NON_ATOMIC.contains(this.fs.getUri().getScheme());
        this.storeRootDir = path.toUri().getPath();
        this.stateClass = cls;
    }

    @Override // org.apache.gobblin.metastore.StateStore
    public boolean create(String str) throws IOException {
        Path path = new Path(this.storeRootDir, str);
        return this.fs.exists(path) || this.fs.mkdirs(path, new FsPermission((short) 493));
    }

    @Override // org.apache.gobblin.metastore.StateStore
    public boolean create(String str, String str2) throws IOException {
        Path path = new Path(this.storeRootDir, str);
        if (!this.fs.exists(path) && !create(str)) {
            return false;
        }
        Path path2 = new Path(path, str2);
        if (this.fs.exists(path2)) {
            throw new IOException(String.format("State file %s already exists for table %s", path2, str2));
        }
        return this.fs.createNewFile(path2);
    }

    @Override // org.apache.gobblin.metastore.StateStore
    public boolean exists(String str, String str2) throws IOException {
        return this.fs.exists(new Path(new Path(this.storeRootDir, str), str2));
    }

    @Override // org.apache.gobblin.metastore.StateStore
    public void put(String str, String str2, T t) throws IOException {
        String str3 = this.useTmpFileForPut ? TMP_FILE_PREFIX + str2 : str2;
        Path path = new Path(new Path(this.storeRootDir, str), str3);
        if (!this.fs.exists(path) && !create(str, str3)) {
            throw new IOException("Failed to create a state file for table " + str3);
        }
        Closer create = Closer.create();
        try {
            try {
                create.register(SequenceFile.createWriter(this.fs, this.conf, path, Text.class, this.stateClass, SequenceFile.CompressionType.BLOCK, new DefaultCodec())).append(new Text(Strings.nullToEmpty(t.getId())), t);
                create.close();
                if (this.useTmpFileForPut) {
                    renamePath(path, new Path(new Path(this.storeRootDir, str), str2));
                }
            } catch (Throwable th) {
                throw create.rethrow(th);
            }
        } catch (Throwable th2) {
            create.close();
            throw th2;
        }
    }

    @Override // org.apache.gobblin.metastore.StateStore
    public void putAll(String str, String str2, Collection<T> collection) throws IOException {
        String str3 = this.useTmpFileForPut ? TMP_FILE_PREFIX + str2 : str2;
        Path path = new Path(new Path(this.storeRootDir, str), str3);
        if (!this.fs.exists(path) && !create(str, str3)) {
            throw new IOException("Failed to create a state file for table " + str3);
        }
        Closer create = Closer.create();
        try {
            try {
                SequenceFile.Writer register = create.register(SequenceFile.createWriter(this.fs, this.conf, path, Text.class, this.stateClass, SequenceFile.CompressionType.BLOCK, new DefaultCodec()));
                for (T t : collection) {
                    register.append(new Text(Strings.nullToEmpty(t.getId())), t);
                }
                if (this.useTmpFileForPut) {
                    renamePath(path, new Path(new Path(this.storeRootDir, str), str2));
                }
            } catch (Throwable th) {
                throw create.rethrow(th);
            }
        } finally {
            create.close();
        }
    }

    protected void renamePath(Path path, Path path2) throws IOException {
        HadoopUtils.renamePath(this.fs, path, path2);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v24, types: [org.apache.gobblin.configuration.State] */
    @Override // org.apache.gobblin.metastore.StateStore
    public T get(String str, String str2, String str3) throws IOException {
        RuntimeException rethrow;
        Path path = new Path(new Path(this.storeRootDir, str), str2);
        if (!this.fs.exists(path)) {
            return null;
        }
        Closer create = Closer.create();
        try {
            try {
                GobblinSequenceFileReader register = create.register(new GobblinSequenceFileReader(this.fs, path, this.conf));
                try {
                    Text text = new Text();
                    T newInstance = this.stateClass.newInstance();
                    while (register.next(text)) {
                        newInstance = (State) register.getCurrentValue(newInstance);
                        if (text.toString().equals(str3)) {
                            newInstance.setId(str3);
                            create.close();
                            return newInstance;
                        }
                    }
                    return null;
                } catch (Exception e) {
                    throw new IOException("failure retrieving state from storeName " + str + " tableName " + str2, e);
                }
            } finally {
            }
        } finally {
            create.close();
        }
    }

    @Override // org.apache.gobblin.metastore.StateStore
    public List<T> getAll(String str, String str2) throws IOException {
        ArrayList newArrayList = Lists.newArrayList();
        Path path = new Path(new Path(this.storeRootDir, str), str2);
        if (!this.fs.exists(path)) {
            return newArrayList;
        }
        Closer create = Closer.create();
        try {
            try {
                GobblinSequenceFileReader register = create.register(new GobblinSequenceFileReader(this.fs, path, this.conf));
                try {
                    Text text = new Text();
                    T newInstance = this.stateClass.newInstance();
                    while (register.next(text)) {
                        State state = (State) register.getCurrentValue(newInstance);
                        state.setId(text.toString());
                        newArrayList.add(state);
                        newInstance = this.stateClass.newInstance();
                    }
                    return newArrayList;
                } catch (Exception e) {
                    throw new IOException("failure retrieving state from storeName " + str + " tableName " + str2, e);
                }
            } finally {
                create.close();
            }
        } catch (Throwable th) {
            throw create.rethrow(th);
        }
    }

    @Override // org.apache.gobblin.metastore.StateStore
    public List<T> getAll(String str) throws IOException {
        ArrayList newArrayList = Lists.newArrayList();
        Path path = new Path(this.storeRootDir, str);
        if (!this.fs.exists(path)) {
            return newArrayList;
        }
        for (FileStatus fileStatus : this.fs.listStatus(path)) {
            newArrayList.addAll(getAll(str, fileStatus.getPath().getName()));
        }
        return newArrayList;
    }

    @Override // org.apache.gobblin.metastore.StateStore
    public List<String> getTableNames(String str, Predicate<String> predicate) throws IOException {
        ArrayList newArrayList = Lists.newArrayList();
        Path path = new Path(this.storeRootDir, str);
        if (!this.fs.exists(path)) {
            return newArrayList;
        }
        for (FileStatus fileStatus : this.fs.listStatus(path)) {
            if (predicate.apply(fileStatus.getPath().getName())) {
                newArrayList.add(fileStatus.getPath().getName());
            }
        }
        return newArrayList;
    }

    @Override // org.apache.gobblin.metastore.StateStore
    public List<String> getStoreNames(Predicate<String> predicate) throws IOException {
        ArrayList newArrayList = Lists.newArrayList();
        Path path = new Path(this.storeRootDir);
        if (!this.fs.exists(path)) {
            return newArrayList;
        }
        for (FileStatus fileStatus : this.fs.listStatus(path)) {
            if (predicate.apply(fileStatus.getPath().getName())) {
                newArrayList.add(fileStatus.getPath().getName());
            }
        }
        return newArrayList;
    }

    @Override // org.apache.gobblin.metastore.StateStore
    public void createAlias(String str, String str2, String str3) throws IOException {
        Path path = new Path(new Path(this.storeRootDir, str), str2);
        if (!this.fs.exists(path)) {
            throw new IOException(String.format("State file %s does not exist for table %s", path, str2));
        }
        Path path2 = new Path(new Path(this.storeRootDir, str), str3);
        HadoopUtils.copyFile(this.fs, path, this.fs, path2, new Path(path2.getParent(), new Path(TMP_FILE_PREFIX, path2.getName())), true, this.conf);
    }

    @Override // org.apache.gobblin.metastore.StateStore
    public void delete(String str, String str2) throws IOException {
        Path path = new Path(new Path(this.storeRootDir, str), str2);
        if (this.fs.exists(path)) {
            this.fs.delete(path, false);
        }
    }

    @Override // org.apache.gobblin.metastore.StateStore
    public void delete(String str) throws IOException {
        Path path = new Path(this.storeRootDir, str);
        if (this.fs.exists(path)) {
            this.fs.delete(path, true);
        }
    }
}
