package org.apache.seatunnel.engine.checkpoint.storage.hdfs;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.seatunnel.engine.checkpoint.storage.PipelineState;
import org.apache.seatunnel.engine.checkpoint.storage.api.AbstractCheckpointStorage;
import org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.class */
public class HdfsStorage extends AbstractCheckpointStorage {
    private static final Logger log = LoggerFactory.getLogger(HdfsStorage.class);
    public FileSystem fs;
    private static final String HADOOP_SECURITY_AUTHENTICATION_KEY = "hadoop.security.authentication";
    private static final String KERBEROS_KEY = "kerberos";
    private static final String STORAGE_TMP_SUFFIX = "tmp";

    public HdfsStorage(Map<String, String> map) throws CheckpointStorageException {
        initStorage(map);
    }

    public void initStorage(Map<String, String> map) throws CheckpointStorageException {
        Configuration configuration = new Configuration();
        if (map.containsKey(HdfsConstants.HDFS_DEF_FS_NAME)) {
            configuration.set(HdfsConstants.HDFS_DEF_FS_NAME, map.get(HdfsConstants.HDFS_DEF_FS_NAME));
        }
        if (StringUtils.isNotBlank(map.get("storageNameSpace"))) {
            setStorageNameSpace(map.get("storageNameSpace"));
        }
        if (map.containsKey(HdfsConstants.KERBEROS_PRINCIPAL) && map.containsKey(HdfsConstants.KERBEROS_KEYTAB_FILE_PATH)) {
            String str = map.get(HdfsConstants.KERBEROS_PRINCIPAL);
            String str2 = map.get(HdfsConstants.KERBEROS_KEYTAB_FILE_PATH);
            if (StringUtils.isNotBlank(str) && StringUtils.isNotBlank(str2)) {
                configuration.set(HADOOP_SECURITY_AUTHENTICATION_KEY, KERBEROS_KEY);
                authenticateKerberos(str, str2, configuration);
            }
        }
        try {
            this.fs = FileSystem.get(new JobConf(configuration));
        } catch (IOException e) {
            throw new CheckpointStorageException("Failed to get file system", e);
        }
    }

    public String storeCheckPoint(PipelineState pipelineState) throws CheckpointStorageException {
        try {
            byte[] serializeCheckPointData = serializeCheckPointData(pipelineState);
            Path path = new Path(getStorageParentDirectory() + pipelineState.getJobId() + "/" + getCheckPointName(pipelineState));
            Path path2 = new Path(getStorageParentDirectory() + pipelineState.getJobId() + "/" + getCheckPointName(pipelineState) + STORAGE_TMP_SUFFIX);
            try {
                try {
                    FSDataOutputStream create = this.fs.create(path2);
                    Throwable th = null;
                    try {
                        try {
                            create.write(serializeCheckPointData);
                            create.hsync();
                            this.fs.rename(path2, path);
                            if (create != null) {
                                if (0 != 0) {
                                    try {
                                        create.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    create.close();
                                }
                            }
                            return path.getName();
                        } finally {
                        }
                    } catch (Throwable th3) {
                        if (create != null) {
                            if (th != null) {
                                try {
                                    create.close();
                                } catch (Throwable th4) {
                                    th.addSuppressed(th4);
                                }
                            } else {
                                create.close();
                            }
                        }
                        throw th3;
                    }
                } finally {
                    try {
                        if (this.fs.exists(path2)) {
                            this.fs.delete(path2, false);
                        }
                    } catch (IOException e) {
                        log.error("Failed to delete tmp file", e);
                    }
                }
            } catch (IOException e2) {
                throw new CheckpointStorageException("Failed to write checkpoint data, state: " + pipelineState, e2);
            }
        } catch (IOException e3) {
            throw new CheckpointStorageException("Failed to serialize checkpoint data,state is :" + pipelineState, e3);
        }
    }

    public List<PipelineState> getAllCheckpoints(String str) throws CheckpointStorageException {
        List<String> fileNames = getFileNames(getStorageParentDirectory() + str);
        if (fileNames.isEmpty()) {
            throw new CheckpointStorageException("No checkpoint found for job, job id is: " + str);
        }
        ArrayList arrayList = new ArrayList();
        fileNames.forEach(str2 -> {
            try {
                arrayList.add(readPipelineState(str2, str));
            } catch (CheckpointStorageException e) {
                log.error("Failed to read checkpoint data from file: " + str2, e);
            }
        });
        if (arrayList.isEmpty()) {
            throw new CheckpointStorageException("No checkpoint found for job, job id is: " + str);
        }
        return arrayList;
    }

    public List<PipelineState> getLatestCheckpoint(String str) throws CheckpointStorageException {
        List<String> fileNames = getFileNames(getStorageParentDirectory() + str);
        if (fileNames.isEmpty()) {
            throw new CheckpointStorageException("No checkpoint found for job, job id is: " + str);
        }
        Set latestPipelineNames = getLatestPipelineNames(fileNames);
        ArrayList arrayList = new ArrayList();
        latestPipelineNames.forEach(str2 -> {
            try {
                arrayList.add(readPipelineState(str2, str));
            } catch (CheckpointStorageException e) {
                log.error("Failed to read pipeline state for file: {}", str2, e);
            }
        });
        if (arrayList.isEmpty()) {
            throw new CheckpointStorageException("No checkpoint found for job, job id:{} " + str);
        }
        return arrayList;
    }

    public PipelineState getLatestCheckpointByJobIdAndPipelineId(String str, String str2) throws CheckpointStorageException {
        List<String> fileNames = getFileNames(getStorageParentDirectory() + str);
        if (fileNames.isEmpty()) {
            throw new CheckpointStorageException("No checkpoint found for job, job id is: " + str);
        }
        String latestCheckpointFileNameByJobIdAndPipelineId = getLatestCheckpointFileNameByJobIdAndPipelineId(fileNames, str2);
        if (latestCheckpointFileNameByJobIdAndPipelineId == null) {
            throw new CheckpointStorageException("No checkpoint found for job, job id is: " + str + ", pipeline id is: " + str2);
        }
        return readPipelineState(latestCheckpointFileNameByJobIdAndPipelineId, str);
    }

    public List<PipelineState> getCheckpointsByJobIdAndPipelineId(String str, String str2) throws CheckpointStorageException {
        List<String> fileNames = getFileNames(getStorageParentDirectory() + str);
        if (fileNames.isEmpty()) {
            throw new CheckpointStorageException("No checkpoint found for job, job id is: " + str);
        }
        ArrayList arrayList = new ArrayList();
        fileNames.forEach(str3 -> {
            if (str2.equals(str3.split("-")[2])) {
                try {
                    arrayList.add(readPipelineState(str3, str));
                } catch (Exception e) {
                    log.error("Failed to read checkpoint data from file " + str3, e);
                }
            }
        });
        return arrayList;
    }

    public void deleteCheckpoint(String str) {
        try {
            this.fs.delete(new Path(getStorageParentDirectory() + str), true);
        } catch (IOException e) {
            log.error("Failed to delete checkpoint for job {}", str, e);
        }
    }

    public PipelineState getCheckpoint(String str, String str2, String str3) throws CheckpointStorageException {
        List<String> fileNames = getFileNames(getStorageParentDirectory() + str);
        if (fileNames.isEmpty()) {
            throw new CheckpointStorageException("No checkpoint found for job, job id is: " + str);
        }
        for (String str4 : fileNames) {
            if (str2.equals(getPipelineIdByFileName(str4)) && str3.equals(getCheckpointIdByFileName(str4))) {
                try {
                    return readPipelineState(str4, str);
                } catch (Exception e) {
                    log.error("Failed to get checkpoint {} for job {}, pipeline {}", new Object[]{str3, str, str2, e});
                }
            }
        }
        throw new CheckpointStorageException(String.format("No checkpoint found, job(%s), pipeline(%s), checkpoint(%s)", str, str2, str3));
    }

    public void deleteCheckpoint(String str, String str2, String str3) throws CheckpointStorageException {
        List<String> fileNames = getFileNames(getStorageParentDirectory() + str);
        if (fileNames.isEmpty()) {
            throw new CheckpointStorageException("No checkpoint found for job, job id is: " + str);
        }
        fileNames.forEach(str4 -> {
            if (str2.equals(getPipelineIdByFileName(str4)) && str3.equals(getCheckpointIdByFileName(str4))) {
                try {
                    this.fs.delete(new Path(str4), false);
                } catch (Exception e) {
                    log.error("Failed to delete checkpoint {} for job {}, pipeline {}", new Object[]{str3, str, str2, e});
                }
            }
        });
    }

    private void authenticateKerberos(String str, String str2, Configuration configuration) throws CheckpointStorageException {
        UserGroupInformation.setConfiguration(configuration);
        try {
            UserGroupInformation.loginUserFromKeytab(str, str2);
        } catch (IOException e) {
            throw new CheckpointStorageException("Failed to login user from keytab : " + str2 + " and kerberos principal : " + str, e);
        }
    }

    private List<String> getFileNames(String str) throws CheckpointStorageException {
        try {
            RemoteIterator listFiles = this.fs.listFiles(new Path(str), false);
            ArrayList arrayList = new ArrayList();
            while (listFiles.hasNext()) {
                LocatedFileStatus locatedFileStatus = (LocatedFileStatus) listFiles.next();
                if (!locatedFileStatus.getPath().getName().endsWith("ser")) {
                    arrayList.add(locatedFileStatus.getPath().getName());
                }
                arrayList.add(locatedFileStatus.getPath().getName());
            }
            return arrayList;
        } catch (IOException e) {
            throw new CheckpointStorageException("Failed to list files from names" + str, e);
        }
    }

    private PipelineState readPipelineState(String str, String str2) throws CheckpointStorageException {
        String str3 = getStorageParentDirectory() + str2 + "/" + str;
        try {
            FSDataInputStream open = this.fs.open(new Path(str3));
            Throwable th = null;
            try {
                try {
                    byte[] bArr = new byte[open.available()];
                    open.read(bArr);
                    PipelineState deserializeCheckPointData = deserializeCheckPointData(bArr);
                    if (open != null) {
                        if (0 != 0) {
                            try {
                                open.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            open.close();
                        }
                    }
                    return deserializeCheckPointData;
                } finally {
                }
            } finally {
            }
        } catch (IOException e) {
            throw new CheckpointStorageException(String.format("Failed to read checkpoint data, file name is %s,job id is %s", str3, str2), e);
        }
    }
}
