package org.apache.hugegraph.computer.core.output.hdfs;

import java.io.IOException;
import java.net.URI;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hugegraph.computer.core.common.exception.ComputerException;
import org.apache.hugegraph.computer.core.config.ComputerOptions;
import org.apache.hugegraph.computer.core.config.Config;
import org.apache.hugegraph.computer.core.graph.vertex.Vertex;
import org.apache.hugegraph.computer.core.output.AbstractComputerOutput;
import org.apache.hugegraph.computer.core.util.StringEncoding;
import org.apache.hugegraph.util.Log;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/hugegraph/computer/core/output/hdfs/HdfsOutput.class */
public class HdfsOutput extends AbstractComputerOutput {
    private static final Logger LOG = Log.logger(HdfsOutput.class);
    private FileSystem fs;
    private FSDataOutputStream fileOutputStream;
    private String delimiter;
    private static final String REPLICATION_KEY = "dfs.replication";
    private static final String FILE_PREFIX = "partition_";
    private static final String FILE_SUFFIX = ".csv";

    public void init(Config config, int i) {
        super.init(config, i);
        try {
            Configuration configuration = new Configuration();
            configuration.set(REPLICATION_KEY, String.valueOf((Short) config.get(ComputerOptions.OUTPUT_HDFS_REPLICATION)));
            this.fs = openHDFS(config, configuration);
            this.delimiter = (String) config.get(ComputerOptions.OUTPUT_HDFS_DELIMITER);
            this.fileOutputStream = this.fs.create(buildPath((String) config.get(ComputerOptions.OUTPUT_HDFS_DIR), (String) config.get(ComputerOptions.JOB_ID), i), true);
        } catch (IOException | InterruptedException e) {
            throw new ComputerException("Failed to init hdfs output on partition [%s]", e, new Object[]{Integer.valueOf(i)});
        }
    }

    public void write(Vertex vertex) {
        try {
            if (filter(vertex)) {
                writeString(vertex.id().toString());
                writeString(this.delimiter);
                writeString(constructValueString(vertex));
                writeString(System.lineSeparator());
            }
        } catch (IOException e) {
            throw new ComputerException("Failed to write vertex: {}", new Object[]{vertex.toString(), e});
        }
    }

    protected boolean filter(Vertex vertex) {
        return true;
    }

    protected void writeBytes(byte[] bArr) throws IOException {
        this.fileOutputStream.write(bArr);
    }

    protected void writeString(String str) throws IOException {
        writeBytes(StringEncoding.encode(str));
    }

    protected String constructValueString(Vertex vertex) {
        return vertex.value().string();
    }

    public static Path buildPath(String str, String str2, int i) {
        return new Path(new Path(str, str2), "partition_" + i + ".csv");
    }

    public void mergePartitions(Config config) {
        if (((Boolean) config.get(ComputerOptions.OUTPUT_HDFS_MERGE)).booleanValue()) {
            LOG.info("Merge hdfs output partitions started");
            HdfsOutputMerger hdfsOutputMerger = new HdfsOutputMerger();
            try {
                hdfsOutputMerger.init(config);
                hdfsOutputMerger.merge();
                hdfsOutputMerger.close();
                LOG.info("Merge hdfs output partitions finished");
            } catch (Throwable th) {
                hdfsOutputMerger.close();
                throw th;
            }
        }
    }

    public void close() {
        try {
            if (this.fileOutputStream != null) {
                this.fileOutputStream.close();
            }
            if (this.fs != null) {
                this.fs.close();
            }
        } catch (IOException e) {
            throw new ComputerException("Failed to close hdfs", e);
        }
    }

    public static FileSystem openHDFS(Config config, Configuration configuration) throws IOException, InterruptedException {
        String str = (String) config.get(ComputerOptions.OUTPUT_HDFS_URL);
        Boolean bool = (Boolean) config.get(ComputerOptions.OUTPUT_HDFS_KERBEROS_ENABLE);
        String str2 = (String) config.get(ComputerOptions.OUTPUT_HDFS_CORE_SITE_PATH);
        if (StringUtils.isNotBlank(str2)) {
            configuration.addResource(new Path(str2));
        }
        String str3 = (String) config.get(ComputerOptions.OUTPUT_HDFS_SITE_PATH);
        if (StringUtils.isNotBlank(str3)) {
            configuration.addResource(new Path(str3));
        }
        if (!bool.booleanValue()) {
            return FileSystem.get(URI.create(str), configuration, (String) config.get(ComputerOptions.OUTPUT_HDFS_USER));
        }
        System.setProperty("java.security.krb5.conf", (String) config.get(ComputerOptions.OUTPUT_HDFS_KRB5_CONF));
        String str4 = (String) config.get(ComputerOptions.OUTPUT_HDFS_KERBEROS_PRINCIPAL);
        String str5 = (String) config.get(ComputerOptions.OUTPUT_HDFS_KERBEROS_KEYTAB);
        configuration.set("fs.defaultFS", str);
        configuration.set("hadoop.security.authentication", "kerberos");
        configuration.set("dfs.namenode.kerberos.principal", str4);
        UserGroupInformation.setConfiguration(configuration);
        UserGroupInformation.loginUserFromKeytab(str4, str5);
        return FileSystem.get(configuration);
    }
}
