package org.apache.inlong.sort.standalone.sink.hive;

import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.inlong.sort.standalone.config.pojo.InlongId;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/inlong/sort/standalone/sink/hive/PartitionLeaderElectionRunnable.class */
public class PartitionLeaderElectionRunnable implements Runnable {
    public static final Logger LOG = InlongLoggerFactory.getLogger(HiveSinkContext.class);
    private final HiveSinkContext context;
    private Map<String, PartitionCreateRunnable> partitionCreateMap = new ConcurrentHashMap();

    public PartitionLeaderElectionRunnable(HiveSinkContext hiveSinkContext) {
        this.context = hiveSinkContext;
    }

    @Override // java.lang.Runnable
    public void run() {
        LOG.info("start to PartitionLeaderElectionRunnable.");
        HashSet hashSet = new HashSet();
        for (Map.Entry<String, PartitionCreateRunnable> entry : this.partitionCreateMap.entrySet()) {
            if (entry.getValue().getState() != PartitionState.INIT && entry.getValue().getState() != PartitionState.CREATING) {
                hashSet.add(entry.getKey());
            }
        }
        hashSet.forEach(str -> {
            this.partitionCreateMap.remove(str);
        });
        try {
            Connection hiveConnection = this.context.getHiveConnection();
            Throwable th = null;
            try {
                try {
                    Map<String, HdfsIdConfig> idConfigMap = this.context.getIdConfigMap();
                    ExecutorService partitionCreatePool = this.context.getPartitionCreatePool();
                    Statement createStatement = hiveConnection.createStatement();
                    for (Map.Entry<String, HdfsIdConfig> entry2 : idConfigMap.entrySet()) {
                        LOG.info("start to PartitionLeaderElectionRunnable check id token:{}", entry2.getKey());
                        if (hasToken(entry2.getValue())) {
                            HdfsIdConfig value = entry2.getValue();
                            ResultSet executeQuery = createStatement.executeQuery("show partitions " + value.getHiveTableName());
                            HashSet hashSet2 = new HashSet();
                            while (executeQuery.next()) {
                                String string = executeQuery.getString(1);
                                int indexOf = string.indexOf(61);
                                if (indexOf >= 0) {
                                    hashSet2.add(string.substring(indexOf + 1));
                                }
                            }
                            executeQuery.close();
                            LOG.info("find id:{},partitions:{}", entry2.getKey(), hashSet2);
                            long currentTimeMillis = System.currentTimeMillis();
                            long maxPartitionOpenDelayHour = currentTimeMillis - ((2 * value.getMaxPartitionOpenDelayHour()) * HdfsIdConfig.HOUR_MS);
                            long maxFileOpenDelayMinute = currentTimeMillis - (this.context.getMaxFileOpenDelayMinute() * 60000);
                            long maxPartitionOpenDelayHour2 = currentTimeMillis - (value.getMaxPartitionOpenDelayHour() * HdfsIdConfig.HOUR_MS);
                            LOG.info("start to PartitionLeaderElectionRunnable scan:beginScanTime:{},endScanTime:{},getPartitionIntervalMs:{}", new Object[]{Long.valueOf(maxPartitionOpenDelayHour), Long.valueOf(maxFileOpenDelayMinute), Long.valueOf(value.getPartitionIntervalMs())});
                            long j = maxPartitionOpenDelayHour;
                            while (j < maxFileOpenDelayMinute) {
                                String parsePartitionField = value.parsePartitionField(j);
                                if (!hashSet2.contains(parsePartitionField)) {
                                    boolean z = j < maxPartitionOpenDelayHour2;
                                    String str2 = InlongId.generateUid(value.getInlongGroupId(), value.getInlongStreamId()) + "." + parsePartitionField;
                                    PartitionCreateRunnable partitionCreateRunnable = this.partitionCreateMap.get(str2);
                                    LOG.info("start to PartitionLeaderElectionRunnable createTask:{},isForce:{}", str2, Boolean.valueOf(z));
                                    if (partitionCreateRunnable != null) {
                                        partitionCreateRunnable.setForce(z);
                                    } else {
                                        PartitionCreateRunnable partitionCreateRunnable2 = new PartitionCreateRunnable(this.context, value, parsePartitionField, j, z);
                                        this.partitionCreateMap.put(str2, partitionCreateRunnable2);
                                        partitionCreatePool.execute(partitionCreateRunnable2);
                                    }
                                }
                                j += value.getPartitionIntervalMs();
                            }
                        }
                    }
                    createStatement.close();
                    if (hiveConnection != null) {
                        if (0 != 0) {
                            try {
                                hiveConnection.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            hiveConnection.close();
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } finally {
            }
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
        }
    }

    private boolean hasToken(HdfsIdConfig hdfsIdConfig) {
        String nodeId = this.context.getNodeId();
        String hdfsPath = this.context.getHdfsPath();
        String str = (hdfsPath + hdfsIdConfig.getIdRootPath()) + "/token";
        FileSystem fileSystem = null;
        try {
            try {
                FileSystem fileSystem2 = new Path(hdfsPath).getFileSystem(new Configuration());
                Path path = new Path(str);
                if (!fileSystem2.exists(path)) {
                    FSDataOutputStream create = fileSystem2.create(path, true);
                    create.write(nodeId.getBytes());
                    create.flush();
                    create.close();
                    LOG.info("node:{} get id token:inlongGroupId:{},inlongStreamId:{} because token file is not existed.", new Object[]{nodeId, hdfsIdConfig.getInlongGroupId(), hdfsIdConfig.getInlongStreamId()});
                    if (fileSystem2 != null) {
                        try {
                            fileSystem2.close();
                        } catch (IOException e) {
                            LOG.error(e.getMessage(), e);
                        }
                    }
                    return true;
                }
                FileStatus fileStatus = fileSystem2.getFileStatus(path);
                long tokenOvertimeMinute = this.context.getTokenOvertimeMinute() * 60000;
                long currentTimeMillis = System.currentTimeMillis();
                long accessTime = fileStatus.getAccessTime();
                long modificationTime = fileStatus.getModificationTime();
                if (currentTimeMillis - modificationTime >= tokenOvertimeMinute || fileStatus.getLen() >= HiveSinkContext.KB_BYTES) {
                    FSDataOutputStream create2 = fileSystem2.create(path, true);
                    create2.write(nodeId.getBytes());
                    create2.flush();
                    create2.close();
                    LOG.info("node:{} get id token:inlongGroupId:{},inlongStreamId:{} because leader is overtime:currentTime:{},accessTime:{},modifiedTime:{},tokenOvertime:{}.", new Object[]{nodeId, hdfsIdConfig.getInlongGroupId(), hdfsIdConfig.getInlongStreamId(), Long.valueOf(currentTimeMillis), Long.valueOf(accessTime), Long.valueOf(modificationTime), Long.valueOf(tokenOvertimeMinute)});
                    if (fileSystem2 != null) {
                        try {
                            fileSystem2.close();
                        } catch (IOException e2) {
                            LOG.error(e2.getMessage(), e2);
                        }
                    }
                    return true;
                }
                FSDataInputStream open = fileSystem2.open(path);
                byte[] bArr = new byte[(int) fileStatus.getLen()];
                int read = open.read(bArr);
                open.close();
                String str2 = new String(bArr, 0, read);
                LOG.info("node:{},leader:{},containerNameLength:{},leaderNameLength:{},leaderBytesLength:{}", new Object[]{nodeId, str2, Integer.valueOf(nodeId.length()), Integer.valueOf(str2.length()), Integer.valueOf(bArr.length)});
                if (!str2.equals(nodeId)) {
                    LOG.info("node:{},leader:{},inlongGroupId:{},inlongStreamId:{} because leader is the other container:currentTime:{},accessTime:{},modifiedTime:{},tokenOvertime:{}.", new Object[]{nodeId, str2, hdfsIdConfig.getInlongGroupId(), hdfsIdConfig.getInlongStreamId(), Long.valueOf(currentTimeMillis), Long.valueOf(accessTime), Long.valueOf(modificationTime), Long.valueOf(tokenOvertimeMinute)});
                    if (fileSystem2 != null) {
                        try {
                            fileSystem2.close();
                        } catch (IOException e3) {
                            LOG.error(e3.getMessage(), e3);
                        }
                    }
                    return false;
                }
                FSDataOutputStream create3 = fileSystem2.create(path, true);
                create3.write(nodeId.getBytes());
                create3.flush();
                create3.close();
                LOG.info("node:{} get id token:inlongGroupId:{},inlongStreamId:{} because leader rewrite file:currentTime:{},accessTime:{},modifiedTime:{},tokenOvertime:{}.", new Object[]{nodeId, hdfsIdConfig.getInlongGroupId(), hdfsIdConfig.getInlongStreamId(), Long.valueOf(currentTimeMillis), Long.valueOf(accessTime), Long.valueOf(modificationTime), Long.valueOf(tokenOvertimeMinute)});
                if (fileSystem2 != null) {
                    try {
                        fileSystem2.close();
                    } catch (IOException e4) {
                        LOG.error(e4.getMessage(), e4);
                    }
                }
                return true;
            } catch (Exception e5) {
                LOG.error("node:{} is fail to get id token:inlongGroupId:{},inlongStreamId:{} because of error:{}", new Object[]{nodeId, hdfsIdConfig.getInlongGroupId(), hdfsIdConfig.getInlongStreamId(), e5});
                if (0 != 0) {
                    try {
                        fileSystem.close();
                    } catch (IOException e6) {
                        LOG.error(e6.getMessage(), e6);
                    }
                }
                return false;
            }
        } catch (Throwable th) {
            if (0 != 0) {
                try {
                    fileSystem.close();
                } catch (IOException e7) {
                    LOG.error(e7.getMessage(), e7);
                }
            }
            throw th;
        }
    }
}
