/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.common.persistence;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.NavigableSet;
import java.util.TreeSet;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.StorageURL;
import org.apache.kylin.common.persistence.RawResource;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.util.HadoopUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HDFSResourceStore
extends ResourceStore {
    private static final Logger logger = LoggerFactory.getLogger(HDFSResourceStore.class);
    private Path hdfsMetaPath;
    private FileSystem fs;
    private static final String HDFS_SCHEME = "hdfs";

    public HDFSResourceStore(KylinConfig kylinConfig) throws Exception {
        this(kylinConfig, kylinConfig.getMetadataUrl());
    }

    public HDFSResourceStore(KylinConfig kylinConfig, StorageURL metadataUrl) throws Exception {
        super(kylinConfig);
        Preconditions.checkState((boolean)HDFS_SCHEME.equals(metadataUrl.getScheme()));
        String path = metadataUrl.getParameter("path");
        if (path == null) {
            path = kylinConfig.getHdfsWorkingDirectory() + "tmp_metadata";
            logger.warn("Missing path, fall back to " + path);
        }
        this.fs = HadoopUtil.getFileSystem(path);
        Path metadataPath = new Path(path);
        if (!this.fs.exists(metadataPath)) {
            logger.warn("Path not exist in HDFS, create it: " + path);
            this.createMetaFolder(metadataPath);
        }
        this.hdfsMetaPath = metadataPath;
        logger.info("hdfs meta path : " + this.hdfsMetaPath.toString());
    }

    private void createMetaFolder(Path metaDirName) throws Exception {
        if (!this.fs.exists(metaDirName)) {
            this.fs.mkdirs(metaDirName);
        }
        logger.info("hdfs meta path created: " + metaDirName.toString());
    }

    @Override
    protected NavigableSet<String> listResourcesImpl(String folderPath) throws IOException {
        Path p = this.getRealHDFSPath(folderPath);
        if (!this.fs.exists(p) || !this.fs.isDirectory(p)) {
            return null;
        }
        TreeSet<String> r = new TreeSet<String>();
        FileStatus[] statuses = this.fs.listStatus(p);
        String prefix = folderPath.endsWith("/") ? folderPath : folderPath + "/";
        for (FileStatus status : statuses) {
            r.add(prefix + status.getPath().getName());
        }
        return r;
    }

    @Override
    protected boolean existsImpl(String resPath) throws IOException {
        Path p = this.getRealHDFSPath(resPath);
        return this.fs.exists(p) && this.fs.isFile(p);
    }

    @Override
    protected List<RawResource> getAllResourcesImpl(String folderPath, long timeStart, long timeEndExclusive) throws IOException {
        NavigableSet<String> resources = this.listResources(folderPath);
        if (resources == null) {
            return Collections.emptyList();
        }
        ArrayList result = Lists.newArrayListWithCapacity((int)resources.size());
        try {
            for (String res : resources) {
                RawResource resource;
                long ts = this.getResourceTimestampImpl(res);
                if (timeStart > ts || ts >= timeEndExclusive || (resource = this.getResourceImpl(res)) == null) continue;
                result.add(resource);
            }
        }
        catch (IOException ex) {
            for (RawResource rawResource : result) {
                IOUtils.closeQuietly((InputStream)rawResource.inputStream);
            }
            throw ex;
        }
        return result;
    }

    @Override
    protected RawResource getResourceImpl(String resPath) throws IOException {
        Path p = this.getRealHDFSPath(resPath);
        if (this.fs.exists(p) && this.fs.isFile(p)) {
            if (this.fs.getFileStatus(p).getLen() == 0L) {
                logger.warn("Zero length file: " + p.toString());
            }
            FSDataInputStream in = this.fs.open(p);
            long t = in.readLong();
            return new RawResource((InputStream)in, t);
        }
        return null;
    }

    @Override
    protected long getResourceTimestampImpl(String resPath) throws IOException {
        long l;
        Path p = this.getRealHDFSPath(resPath);
        if (!this.fs.exists(p) || !this.fs.isFile(p)) {
            return 0L;
        }
        FSDataInputStream in = null;
        try {
            long t;
            in = this.fs.open(p);
            l = t = in.readLong();
        }
        catch (Exception e) {
            try {
                throw new IOException("Put resource fail", e);
            }
            catch (Throwable throwable) {
                IOUtils.closeQuietly(in);
                throw throwable;
            }
        }
        IOUtils.closeQuietly((InputStream)in);
        return l;
    }

    @Override
    protected void putResourceImpl(String resPath, InputStream content, long ts) throws IOException {
        logger.trace("res path : " + resPath);
        Path p = this.getRealHDFSPath(resPath);
        logger.trace("put resource : " + p.toUri());
        FSDataOutputStream out = null;
        try {
            out = this.fs.create(p, true);
            out.writeLong(ts);
            IOUtils.copy((InputStream)content, (OutputStream)out);
        }
        catch (Exception e) {
            try {
                throw new IOException("Put resource fail", e);
            }
            catch (Throwable throwable) {
                IOUtils.closeQuietly(out);
                throw throwable;
            }
        }
        IOUtils.closeQuietly((OutputStream)out);
    }

    @Override
    protected long checkAndPutResourceImpl(String resPath, byte[] content, long oldTS, long newTS) throws IOException, IllegalStateException {
        Path p = this.getRealHDFSPath(resPath);
        if (!this.fs.exists(p)) {
            if (oldTS != 0L) {
                throw new IllegalStateException("For not exist file. OldTS have to be 0. but Actual oldTS is : " + oldTS);
            }
        } else {
            long realLastModify = this.getResourceTimestamp(resPath);
            if (realLastModify != oldTS) {
                throw new IllegalStateException("Overwriting conflict " + resPath + ", expect old TS " + oldTS + ", but found " + realLastModify);
            }
        }
        this.putResourceImpl(resPath, new ByteArrayInputStream(content), newTS);
        return newTS;
    }

    @Override
    protected void deleteResourceImpl(String resPath) throws IOException {
        try {
            Path p = this.getRealHDFSPath(resPath);
            if (this.fs.exists(p)) {
                this.fs.delete(p, true);
            }
        }
        catch (Exception e) {
            throw new IOException("Delete resource fail", e);
        }
    }

    @Override
    protected String getReadableResourcePathImpl(String resPath) {
        return this.getRealHDFSPath(resPath).toString();
    }

    private Path getRealHDFSPath(String resourcePath) {
        if (resourcePath.equals("/")) {
            return this.hdfsMetaPath;
        }
        if (resourcePath.startsWith("/") && resourcePath.length() > 1) {
            resourcePath = resourcePath.substring(1, resourcePath.length());
        }
        return new Path(this.hdfsMetaPath, resourcePath);
    }
}

