package org.apache.inlong.sort.standalone.sink.hive;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.sql.Connection;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/inlong/sort/standalone/sink/hive/PartitionCreateRunnable.class */
public class PartitionCreateRunnable implements Runnable {
    public static final Logger LOG = InlongLoggerFactory.getLogger(PartitionCreateRunnable.class);
    private final HiveSinkContext context;
    private final HdfsIdConfig idConfig;
    private final String strPartitionValue;
    private final long partitionTime;
    private boolean isForce;
    private PartitionState state = PartitionState.INIT;

    public PartitionCreateRunnable(HiveSinkContext hiveSinkContext, HdfsIdConfig hdfsIdConfig, String str, long j, boolean z) {
        this.context = hiveSinkContext;
        this.idConfig = hdfsIdConfig;
        this.strPartitionValue = str;
        this.partitionTime = j;
        this.isForce = z;
    }

    @Override // java.lang.Runnable
    public void run() {
        LOG.info("start to PartitionCreateRunnable:id:{},partition:{}", this.idConfig.getInlongGroupId(), this.strPartitionValue);
        this.state = PartitionState.CREATING;
        HdfsIdFile hdfsIdFile = null;
        try {
            try {
                HdfsIdFile hdfsIdFile2 = new HdfsIdFile(this.context, this.idConfig, this.idConfig.parsePartitionPath(this.partitionTime));
                if (this.isForce) {
                    process(hdfsIdFile2);
                } else {
                    if (!canArchive(hdfsIdFile2, hdfsIdFile2.getIntmpPath()) || !canArchive(hdfsIdFile2, hdfsIdFile2.getInPath())) {
                        LOG.info("inlongGroupId:{},partition:{} can not archived.", this.idConfig.getInlongGroupId(), this.strPartitionValue);
                        if (hdfsIdFile2 != null) {
                            hdfsIdFile2.close();
                            return;
                        }
                        return;
                    }
                    process(hdfsIdFile2);
                }
                this.state = PartitionState.CREATED;
                if (hdfsIdFile2 != null) {
                    hdfsIdFile2.close();
                }
            } catch (Exception e) {
                LOG.error(e.getMessage(), e);
                this.state = PartitionState.ERROR;
                if (0 != 0) {
                    hdfsIdFile.close();
                }
            }
        } catch (Throwable th) {
            if (0 != 0) {
                hdfsIdFile.close();
            }
            throw th;
        }
    }

    private boolean canArchive(HdfsIdFile hdfsIdFile, Path path) throws FileNotFoundException, IOException {
        FileStatus[] listStatus = hdfsIdFile.getFs().listStatus(path);
        long currentTimeMillis = System.currentTimeMillis();
        long fileArchiveDelayMinute = currentTimeMillis - (this.context.getFileArchiveDelayMinute() * 60000);
        LOG.info("start to PartitionCreateRunnable id:{},currentTime:{},fileArchiveDelayTime:{},FileArchiveDelayMinute:{},MINUTE_MS:{}", new Object[]{this.idConfig.getInlongGroupId(), Long.valueOf(currentTimeMillis), Long.valueOf(fileArchiveDelayMinute), Long.valueOf(this.context.getFileArchiveDelayMinute()), 60000L});
        for (FileStatus fileStatus : listStatus) {
            Path path2 = fileStatus.getPath();
            if (!path2.equals(hdfsIdFile.getIntmpFilePath())) {
                if (fileStatus.getModificationTime() > fileArchiveDelayMinute) {
                    this.state = PartitionState.ERROR;
                    LOG.info("error PartitionCreateRunnable id:{},fileStatus:{},getModificationTime:{},fileArchiveDelayTime:{}", new Object[]{this.idConfig.getInlongGroupId(), path2.toString(), Long.valueOf(fileStatus.getModificationTime()), Long.valueOf(fileArchiveDelayMinute)});
                    LOG.info("inlongGroupId:{},partition:{} can not archived in path:{}.", new Object[]{this.idConfig.getInlongGroupId(), this.strPartitionValue, path});
                    return false;
                }
                LOG.info("ok PartitionCreateRunnable id:{},fileStatus:{},getModificationTime:{},fileArchiveDelayTime:{}", new Object[]{this.idConfig.getInlongGroupId(), path2.toString(), Long.valueOf(fileStatus.getModificationTime()), Long.valueOf(fileArchiveDelayMinute)});
            }
        }
        return true;
    }

    private void process(HdfsIdFile hdfsIdFile) throws FileNotFoundException, IOException {
        DistributedFileSystem fs = hdfsIdFile.getFs();
        FileStatus[] listStatus = fs.listStatus(hdfsIdFile.getIntmpPath());
        long currentTimeMillis = System.currentTimeMillis() - (this.context.getFileArchiveDelayMinute() * 60000);
        for (FileStatus fileStatus : listStatus) {
            if (fileStatus.getModificationTime() <= currentTimeMillis) {
                Path path = fileStatus.getPath();
                fs.rename(path, new Path(hdfsIdFile.getInPath(), path.getName()));
            }
        }
        FileStatus[] listStatus2 = fs.listStatus(hdfsIdFile.getInPath());
        for (FileStatus fileStatus2 : listStatus2) {
            Path path2 = fileStatus2.getPath();
            if (path2.getName().lastIndexOf(HdfsIdFile.OUTTMP_FILE_POSTFIX) >= 0) {
                fs.delete(path2, true);
            }
        }
        long j = 0;
        ArrayList arrayList = new ArrayList();
        for (FileStatus fileStatus3 : listStatus2) {
            if (fileStatus3.getLen() > 0) {
                if (j < this.context.getMaxOutputFileSizeGb() * HiveSinkContext.GB_BYTES) {
                    arrayList.add(fileStatus3.getPath());
                } else {
                    concatInFiles2OuttmpFile(hdfsIdFile, arrayList, fs);
                    j = 0;
                    arrayList.clear();
                }
            }
        }
        if (arrayList.size() > 0) {
            concatInFiles2OuttmpFile(hdfsIdFile, arrayList, fs);
            arrayList.clear();
        }
        FileStatus[] listStatus3 = fs.listStatus(hdfsIdFile.getInPath());
        for (FileStatus fileStatus4 : listStatus3) {
            Path path3 = fileStatus4.getPath();
            if (path3.getName().lastIndexOf(HdfsIdFile.OUTTMP_FILE_POSTFIX) >= 0) {
                String name = path3.getName();
                fs.rename(path3, new Path(hdfsIdFile.getOutPath(), name.substring(0, name.length() - HdfsIdFile.OUTTMP_FILE_POSTFIX.length())));
            }
        }
        for (FileStatus fileStatus5 : listStatus3) {
            fs.delete(fileStatus5.getPath(), true);
        }
        try {
            Connection hiveConnection = this.context.getHiveConnection();
            try {
                Statement createStatement = hiveConnection.createStatement();
                String format = String.format("ALTER TABLE %s.%s ADD IF NOT EXISTS PARTITION (dt='%s') LOCATION '%s'", this.context.getHiveDatabase(), this.idConfig.getHiveTableName(), this.strPartitionValue, hdfsIdFile.getOutPath().toString());
                LOG.info("create partition sql:{}", format);
                createStatement.executeUpdate(format);
                createStatement.close();
                if (hiveConnection != null) {
                    hiveConnection.close();
                }
            } finally {
            }
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
        }
    }

    private void concatInFiles2OuttmpFile(HdfsIdFile hdfsIdFile, List<Path> list, DistributedFileSystem distributedFileSystem) throws IOException {
        Path path = new Path(hdfsIdFile.getInPath(), HdfsIdFile.getFileName(this.context, System.currentTimeMillis()) + HdfsIdFile.OUTTMP_FILE_POSTFIX);
        LOG.info("start to concat outtmp file:{},inFiles:{}", path, list);
        FSDataOutputStream create = distributedFileSystem.create(path, true);
        create.flush();
        create.close();
        distributedFileSystem.concat(path, (Path[]) list.toArray(new Path[list.size()]));
    }

    public PartitionState getState() {
        return this.state;
    }

    public void setState(PartitionState partitionState) {
        this.state = partitionState;
    }

    public HiveSinkContext getContext() {
        return this.context;
    }

    public HdfsIdConfig getIdConfig() {
        return this.idConfig;
    }

    public String getStrPartitionValue() {
        return this.strPartitionValue;
    }

    public boolean isForce() {
        return this.isForce;
    }

    public long getPartitionTime() {
        return this.partitionTime;
    }

    public void setForce(boolean z) {
        this.isForce = z;
    }
}
