package org.apache.kylin.spark.common.logging;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.Locale;
import lombok.Generated;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.kylin.common.util.RandomUtil;
import org.apache.kylin.guava30.shaded.common.annotations.VisibleForTesting;
import org.apache.kylin.job.shaded.org.apache.commons.lang3.StringUtils;
import org.apache.kylin.metadata.realization.IRealizationConstants;
import org.apache.kylin.spark.common.logging.AbstractHdfsLogAppender;
import org.apache.logging.log4j.core.Appender;
import org.apache.logging.log4j.core.Filter;
import org.apache.logging.log4j.core.Layout;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.config.Property;
import org.apache.logging.log4j.core.config.plugins.Plugin;
import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
import org.apache.logging.log4j.core.config.plugins.PluginElement;
import org.apache.logging.log4j.core.config.plugins.PluginFactory;
import org.apache.logging.log4j.status.StatusLogger;
import org.apache.spark.SparkEnv;
import org.apache.spark.utils.SparkHadoopUtils;

@Plugin(name = "ExecutorHdfsAppender", category = "Core", elementType = Appender.ELEMENT_TYPE, printObject = true)
/* loaded from: input_file:org/apache/kylin/spark/common/logging/SparkExecutorHdfsLogAppender.class */
public class SparkExecutorHdfsLogAppender extends AbstractHdfsLogAppender {
    private static final long A_DAY_MILLIS = 86400000;
    private final SimpleDateFormat dateFormat;

    @VisibleForTesting
    String outputPath;

    @VisibleForTesting
    String executorId;

    @VisibleForTesting
    long startTime;

    @VisibleForTesting
    int rollingPeriod;
    private String metadataId;
    private String category;
    private String identifier;
    private String jobName;
    private String project;

    protected SparkExecutorHdfsLogAppender(String str, Layout<? extends Serializable> layout, Filter filter, boolean z, boolean z2, Property[] propertyArr, AbstractHdfsLogAppender.HdfsManager hdfsManager) {
        super(str, layout, filter, z, z2, propertyArr, hdfsManager);
        this.dateFormat = new SimpleDateFormat("yyyy-MM-dd", Locale.getDefault(Locale.Category.FORMAT));
        this.startTime = 0L;
        this.rollingPeriod = 5;
    }

    @PluginFactory
    public static SparkExecutorHdfsLogAppender createAppender(@PluginAttribute("name") String str, @PluginAttribute("workingDir") String str2, @PluginAttribute("metadataId") String str3, @PluginAttribute("category") String str4, @PluginAttribute("identifier") String str5, @PluginAttribute("jobName") String str6, @PluginAttribute("project") String str7, @PluginAttribute("rollingPeriod") int i, @PluginAttribute("logQueueCapacity") int i2, @PluginAttribute("flushInterval") int i3, @PluginElement("Layout") Layout<? extends Serializable> layout, @PluginElement("Filter") Filter filter, @PluginElement("Properties") Property[] propertyArr) {
        SparkExecutorHdfsLogAppender sparkExecutorHdfsLogAppender = new SparkExecutorHdfsLogAppender(str, layout, filter, false, false, propertyArr, new AbstractHdfsLogAppender.HdfsManager(str, layout));
        sparkExecutorHdfsLogAppender.setWorkingDir(str2);
        sparkExecutorHdfsLogAppender.setMetadataId(str3);
        sparkExecutorHdfsLogAppender.setCategory(str4);
        sparkExecutorHdfsLogAppender.setIdentifier(str5);
        sparkExecutorHdfsLogAppender.setJobName(str6);
        sparkExecutorHdfsLogAppender.setProject(str7);
        sparkExecutorHdfsLogAppender.setRollingPeriod(i);
        sparkExecutorHdfsLogAppender.setLogQueueCapacity(i2);
        sparkExecutorHdfsLogAppender.setFlushInterval(i3);
        return sparkExecutorHdfsLogAppender;
    }

    @Override // org.apache.kylin.spark.common.logging.AbstractHdfsLogAppender
    void init() {
        StatusLogger.getLogger().warn("metadataIdentifier -> " + getMetadataId());
        StatusLogger.getLogger().warn("category -> " + getCategory());
        StatusLogger.getLogger().warn("identifier -> " + getIdentifier());
        if (null != getProject()) {
            StatusLogger.getLogger().warn("project -> " + getProject());
        }
        if (null != getJobName()) {
            StatusLogger.getLogger().warn("jobName -> " + getJobName());
        }
    }

    @Override // org.apache.kylin.spark.common.logging.AbstractHdfsLogAppender
    String getAppenderName() {
        return "SparkExecutorHdfsLogAppender";
    }

    @Override // org.apache.kylin.spark.common.logging.AbstractHdfsLogAppender
    boolean isSkipCheckAndFlushLog() {
        if (SparkEnv.get() != null || !StringUtils.isBlank(this.executorId)) {
            return false;
        }
        StatusLogger.getLogger().warn("Waiting for spark executor to start");
        try {
            Thread.sleep(1000L);
            return true;
        } catch (InterruptedException e) {
            StatusLogger.getLogger().error("Waiting for spark executor starting is interrupted!", (Throwable) e);
            Thread.currentThread().interrupt();
            return true;
        }
    }

    @Override // org.apache.kylin.spark.common.logging.AbstractHdfsLogAppender
    void doWriteLog(int i, List<LogEvent> list) throws IOException, InterruptedException {
        while (i > 0) {
            LogEvent take = getLogBufferQue().take();
            if (isTimeChanged(take)) {
                updateOutPutDir(take);
                Path path = new Path(this.outputPath);
                StatusLogger.getLogger().warn("login user is " + UserGroupInformation.getLoginUser() + " SPARK_USER is " + System.getenv("SPARK_USER") + " USER is " + System.getenv(IRealizationConstants.HTableUser));
                UserGroupInformation ugi = SparkEnv.getUGI();
                if (ugi != null) {
                    StatusLogger.getLogger().warn("Login user hashcode is " + ugi.hashCode());
                    ugi.doAs(() -> {
                        if (!initHdfsWriter(path, SparkHadoopUtils.newConfigurationWithSparkConf())) {
                            StatusLogger.getLogger().error("Failed to init the hdfs writer!");
                        }
                        doRollingClean(take);
                        return null;
                    });
                } else {
                    if (!initHdfsWriter(path, SparkHadoopUtils.newConfigurationWithSparkConf())) {
                        StatusLogger.getLogger().error("Failed to init the hdfs writer!");
                    }
                    doRollingClean(take);
                }
            }
            list.add(take);
            writeLogEvent(take);
            i--;
        }
    }

    @Override // org.apache.kylin.spark.common.logging.AbstractHdfsLogAppender
    String getLogPathAfterRolling(String str) {
        return null;
    }

    @Override // org.apache.kylin.spark.common.logging.AbstractHdfsLogAppender
    String getLogPathRollingDone(String str) {
        return null;
    }

    @VisibleForTesting
    void updateOutPutDir(LogEvent logEvent) {
        this.outputPath = getOutPutDir(this.dateFormat.format(new Date(logEvent.getTimeMillis())));
    }

    private String getOutPutDir(String str) {
        if (StringUtils.isBlank(this.executorId)) {
            this.executorId = SparkEnv.get() != null ? SparkEnv.get().executorId() : RandomUtil.randomUUIDStr();
            StatusLogger.getLogger().warn("executorId set to " + this.executorId);
        }
        return "job".equals(getCategory()) ? getRootPathName() + "/" + str + "/" + getIdentifier() + "/" + getJobName() + "/executor-" + this.executorId + ".log" : getRootPathName() + "/" + str + "/" + getIdentifier() + "/executor-" + this.executorId + ".log";
    }

    @VisibleForTesting
    void doRollingClean(LogEvent logEvent) throws IOException {
        FileStatus[] listStatus;
        FileSystem fileSystem = getFileSystem();
        String rootPathName = getRootPathName();
        Path path = new Path(rootPathName);
        if (fileSystem.exists(path) && (listStatus = fileSystem.listStatus(path)) != null) {
            String format = this.dateFormat.format(new Date(logEvent.getTimeMillis() - (86400000 * this.rollingPeriod)));
            for (FileStatus fileStatus : listStatus) {
                String name = fileStatus.getPath().getName();
                if (name.compareTo(format) < 0) {
                    Path path2 = new Path(rootPathName + File.separator + name);
                    if (fileSystem.exists(path2)) {
                        fileSystem.delete(path2, true);
                    }
                }
            }
        }
    }

    @VisibleForTesting
    String getRootPathName() {
        String category = getCategory();
        boolean z = -1;
        switch (category.hashCode()) {
            case -2011717117:
                if (category.equals("sparder")) {
                    z = true;
                    break;
                }
                break;
            case 105405:
                if (category.equals("job")) {
                    z = false;
                    break;
                }
                break;
            case 754311776:
                if (category.equals("streaming_job")) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                return parseHdfsWordingDir() + "/" + getProject() + "/spark_logs";
            case true:
                return parseHdfsWordingDir() + "/_sparder_logs";
            case true:
                return parseHdfsWordingDir() + "/streaming/spark_logs/" + getProject();
            default:
                throw new IllegalArgumentException("illegal category: " + getCategory());
        }
    }

    public String getIdentifier() {
        try {
            return StringUtils.isBlank(this.identifier) ? SparkEnv.get().conf().getAppId() : this.identifier;
        } catch (Exception e) {
            return null;
        }
    }

    @VisibleForTesting
    boolean isTimeChanged(LogEvent logEvent) {
        if (0 != this.startTime && (logEvent.getTimeMillis() / 86400000) - (this.startTime / 86400000) <= 0) {
            return false;
        }
        this.startTime = logEvent.getTimeMillis();
        return true;
    }

    private String parseHdfsWordingDir() {
        return StringUtils.appendIfMissing(getWorkingDir(), "/", new CharSequence[0]) + StringUtils.replace(getMetadataId(), "/", "-");
    }

    @Generated
    public int getRollingPeriod() {
        return this.rollingPeriod;
    }

    @Generated
    public void setRollingPeriod(int i) {
        this.rollingPeriod = i;
    }

    @Generated
    public String getMetadataId() {
        return this.metadataId;
    }

    @Generated
    public void setMetadataId(String str) {
        this.metadataId = str;
    }

    @Generated
    public String getCategory() {
        return this.category;
    }

    @Generated
    public void setCategory(String str) {
        this.category = str;
    }

    @Generated
    public void setIdentifier(String str) {
        this.identifier = str;
    }

    @Generated
    public String getJobName() {
        return this.jobName;
    }

    @Generated
    public void setJobName(String str) {
        this.jobName = str;
    }

    @Generated
    public String getProject() {
        return this.project;
    }

    @Generated
    public void setProject(String str) {
        this.project = str;
    }
}
