package org.apache.kylin.engine.spark.common.logging;

import com.google.common.annotations.VisibleForTesting;
import java.io.File;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.Locale;
import java.util.UUID;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.metadata.realization.IRealizationConstants;
import org.apache.log4j.helpers.LogLog;
import org.apache.log4j.spi.LoggingEvent;
import org.apache.spark.SparkEnv;
import org.apache.spark.deploy.SparkHadoopUtil;
import org.apache.spark.utils.SparkHadoopUtils;
import scala.runtime.AbstractFunction0;
import scala.runtime.BoxedUnit;

/* loaded from: input_file:WEB-INF/lib/kylin-spark-common-4.0.4.jar:org/apache/kylin/engine/spark/common/logging/SparkExecutorHdfsAppender.class */
public class SparkExecutorHdfsAppender extends AbstractHdfsLogAppender {
    private static final long A_DAY_MILLIS = 86400000;
    private static final long A_HOUR_MILLIS = 3600000;

    @VisibleForTesting
    String outPutPath;

    @VisibleForTesting
    String executorId;
    private String metadataIdentifier;
    private String category;
    private String identifier;
    private String jobName;
    private String project;
    private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd", Locale.ROOT);
    private SimpleDateFormat hourFormat = new SimpleDateFormat("HH", Locale.ROOT);

    @VisibleForTesting
    long startTime = 0;

    @VisibleForTesting
    boolean rollingByHour = false;

    @VisibleForTesting
    int rollingPeriod = 5;

    public String getProject() {
        return this.project;
    }

    public void setProject(String str) {
        this.project = str;
    }

    public String getJobName() {
        return this.jobName;
    }

    public void setJobName(String str) {
        this.jobName = str;
    }

    public void setIdentifier(String str) {
        this.identifier = str;
    }

    public String getIdentifier() {
        return this.identifier;
    }

    public void setCategory(String str) {
        this.category = str;
    }

    public String getCategory() {
        return this.category;
    }

    public void setMetadataIdentifier(String str) {
        this.metadataIdentifier = str;
    }

    public String getMetadataIdentifier() {
        return this.metadataIdentifier;
    }

    @Override // org.apache.kylin.engine.spark.common.logging.AbstractHdfsLogAppender
    void init() {
        LogLog.warn("metadataIdentifier -> " + getMetadataIdentifier());
        LogLog.warn("category -> " + getCategory());
        LogLog.warn("identifier -> " + getIdentifier());
        if (null != getProject()) {
            LogLog.warn("project -> " + getProject());
        }
        if (null != getJobName()) {
            LogLog.warn("jobName -> " + getJobName());
        }
    }

    @Override // org.apache.kylin.engine.spark.common.logging.AbstractHdfsLogAppender
    String getAppenderName() {
        return "SparkExecutorHdfsAppender";
    }

    @Override // org.apache.kylin.engine.spark.common.logging.AbstractHdfsLogAppender
    boolean isSkipCheckAndFlushLog() {
        if (SparkEnv.get() != null || !StringUtils.isBlank(this.executorId)) {
            return false;
        }
        LogLog.warn("Waiting for spark executor to start");
        try {
            Thread.sleep(1000L);
            return true;
        } catch (InterruptedException e) {
            LogLog.error("Waiting for spark executor starting is interrupted!", e);
            Thread.currentThread().interrupt();
            return true;
        }
    }

    @Override // org.apache.kylin.engine.spark.common.logging.AbstractHdfsLogAppender
    void doWriteLog(int i, List<LoggingEvent> list) throws IOException, InterruptedException {
        while (i > 0) {
            final LoggingEvent take = getLogBufferQue().take();
            if (isTimeChanged(take)) {
                updateOutPutDir(take);
                final Path path = new Path(this.outPutPath);
                LogLog.warn("login user is " + UserGroupInformation.getLoginUser() + " SPARK_USER is " + System.getenv("SPARK_USER") + " USER is " + System.getenv(IRealizationConstants.HTableUser));
                SparkHadoopUtil.get().runAsSparkUser(new AbstractFunction0<BoxedUnit>() { // from class: org.apache.kylin.engine.spark.common.logging.SparkExecutorHdfsAppender.1
                    @Override // scala.Function0
                    /* renamed from: apply */
                    public BoxedUnit mo10375apply() {
                        if (!SparkExecutorHdfsAppender.this.initHdfsWriter(path, SparkHadoopUtils.newConfigurationWithSparkConf())) {
                            LogLog.error("Failed to init the hdfs writer!");
                        }
                        try {
                            SparkExecutorHdfsAppender.this.doRollingClean(take);
                            return null;
                        } catch (IOException e) {
                            e.printStackTrace();
                            return null;
                        }
                    }
                });
            }
            list.add(take);
            writeLogEvent(take);
            i--;
        }
    }

    @VisibleForTesting
    void updateOutPutDir(LoggingEvent loggingEvent) {
        if (this.rollingByHour) {
            this.outPutPath = getOutPutDir(this.dateFormat.format(new Date(loggingEvent.getTimeStamp())) + "/" + this.hourFormat.format(new Date(loggingEvent.getTimeStamp())));
        } else {
            this.outPutPath = getOutPutDir(this.dateFormat.format(new Date(loggingEvent.getTimeStamp())));
        }
        LogLog.warn("Update to " + this.outPutPath);
    }

    private String getOutPutDir(String str) {
        if (StringUtils.isBlank(this.executorId)) {
            this.executorId = SparkEnv.get() != null ? SparkEnv.get().executorId() : UUID.randomUUID().toString();
            LogLog.warn("executorId set to " + this.executorId);
        }
        return "job".equals(getCategory()) ? getRootPathName() + "/" + str + "/" + getIdentifier() + "/" + getJobName() + "/executor-" + this.executorId + ".log" : getRootPathName() + "/" + str + "/" + getIdentifier() + "/executor-" + this.executorId + ".log";
    }

    @VisibleForTesting
    void doRollingClean(LoggingEvent loggingEvent) throws IOException {
        FileStatus[] listStatus;
        FileSystem fileSystem = getFileSystem();
        String rootPathName = getRootPathName();
        Path path = new Path(rootPathName);
        if (fileSystem.exists(path) && (listStatus = fileSystem.listStatus(path)) != null) {
            String format = this.dateFormat.format(new Date(loggingEvent.getTimeStamp() - (86400000 * this.rollingPeriod)));
            for (FileStatus fileStatus : listStatus) {
                String name = fileStatus.getPath().getName();
                if (name.compareTo(format) < 0) {
                    Path path2 = new Path(rootPathName + File.separator + name);
                    if (fileSystem.exists(path2)) {
                        fileSystem.delete(path2, true);
                    }
                }
            }
        }
    }

    @VisibleForTesting
    String getRootPathName() {
        if ("job".equals(getCategory())) {
            return getHdfsWorkingDir() + "/" + getProject() + "/spark_logs/executor/";
        }
        if ("sparder".equals(getCategory())) {
            return parseHdfsWorkingDir() + "/_sparder_logs";
        }
        throw new IllegalArgumentException("illegal category: " + getCategory());
    }

    @VisibleForTesting
    boolean isTimeChanged(LoggingEvent loggingEvent) {
        return this.rollingByHour ? isNeedRolling(loggingEvent, 3600000L) : isNeedRolling(loggingEvent, 86400000L);
    }

    private boolean isNeedRolling(LoggingEvent loggingEvent, Long l) {
        if (0 != this.startTime && (loggingEvent.getTimeStamp() / l.longValue()) - (this.startTime / l.longValue()) <= 0) {
            return false;
        }
        this.startTime = loggingEvent.getTimeStamp();
        return true;
    }

    private String parseHdfsWorkingDir() {
        String hdfsWorkingDir = getHdfsWorkingDir();
        Path path = new Path(hdfsWorkingDir);
        if (!path.isAbsolute()) {
            throw new IllegalArgumentException("kylin.env.hdfs-working-dir must be absolute, but got " + hdfsWorkingDir);
        }
        try {
            Path makeQualified = path.getFileSystem(HadoopUtil.getCurrentConfiguration()).makeQualified(path);
            String replace = getMetadataIdentifier().replace(':', '-');
            if (replace.startsWith("../")) {
                replace = replace.replace("../", "").replace('/', '-');
            }
            String path2 = new Path(makeQualified, replace).toString();
            if (!path2.endsWith("/")) {
                path2 = path2 + "/";
            }
            return path2;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}
