package org.apache.kylin.spark.common.logging;

import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import java.util.Locale;
import java.util.UUID;
import lombok.Generated;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.kylin.guava30.shaded.common.annotations.VisibleForTesting;
import org.apache.kylin.spark.common.logging.AbstractHdfsLogAppender;
import org.apache.logging.log4j.core.Filter;
import org.apache.logging.log4j.core.Layout;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.config.Property;
import org.apache.logging.log4j.core.config.plugins.Plugin;
import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
import org.apache.logging.log4j.core.config.plugins.PluginElement;
import org.apache.logging.log4j.core.config.plugins.PluginFactory;
import org.apache.logging.log4j.status.StatusLogger;
import org.apache.spark.SparkEnv;

@Plugin(name = "ExecutorHdfsRollingAppender", category = "Core", elementType = "appender", printObject = true)
/* loaded from: input_file:org/apache/kylin/spark/common/logging/SparkExecutorHdfsRollingLogAppender.class */
public class SparkExecutorHdfsRollingLogAppender extends AbstractHdfsLogAppender {
    String logPath;
    String executorId;
    boolean logPathInit;
    int rollingPeriod;
    private long rollingByteSize;
    private String metadataId;
    private String category;
    private String identifier;
    private String jobName;
    private String project;
    private String jobTimeStamp;

    protected SparkExecutorHdfsRollingLogAppender(String str, Layout<? extends Serializable> layout, Filter filter, boolean z, boolean z2, Property[] propertyArr, AbstractHdfsLogAppender.HdfsManager hdfsManager) {
        super(str, layout, filter, z, z2, propertyArr, hdfsManager);
        this.logPathInit = false;
        this.rollingPeriod = 5;
    }

    @PluginFactory
    public static SparkExecutorHdfsRollingLogAppender createAppender(@PluginAttribute("name") String str, @PluginAttribute("workingDir") String str2, @PluginAttribute("metadataId") String str3, @PluginAttribute("category") String str4, @PluginAttribute("identifier") String str5, @PluginAttribute("jobName") String str6, @PluginAttribute("project") String str7, @PluginAttribute("jobTimeStamp") String str8, @PluginAttribute("rollingPeriod") int i, @PluginAttribute("logQueueCapacity") int i2, @PluginAttribute("flushInterval") int i3, @PluginAttribute("rollingByteSize") long j, @PluginElement("Layout") Layout<? extends Serializable> layout, @PluginElement("Filter") Filter filter, @PluginElement("Properties") Property[] propertyArr) {
        SparkExecutorHdfsRollingLogAppender sparkExecutorHdfsRollingLogAppender = new SparkExecutorHdfsRollingLogAppender(str, layout, filter, false, false, propertyArr, new AbstractHdfsLogAppender.HdfsManager(str, layout));
        sparkExecutorHdfsRollingLogAppender.setWorkingDir(str2);
        sparkExecutorHdfsRollingLogAppender.setMetadataId(str3);
        sparkExecutorHdfsRollingLogAppender.setCategory(str4);
        sparkExecutorHdfsRollingLogAppender.setIdentifier(str5);
        sparkExecutorHdfsRollingLogAppender.setJobName(str6);
        sparkExecutorHdfsRollingLogAppender.setProject(str7);
        sparkExecutorHdfsRollingLogAppender.setRollingByteSize(j);
        if (sparkExecutorHdfsRollingLogAppender.getRollingByteSize() == 0) {
            sparkExecutorHdfsRollingLogAppender.setRollingByteSize(AbstractHdfsLogAppender.ROLLING_BYTE_SIZE_DEFAULT);
        }
        sparkExecutorHdfsRollingLogAppender.setJobTimeStamp(str8);
        sparkExecutorHdfsRollingLogAppender.setRollingPeriod(i);
        sparkExecutorHdfsRollingLogAppender.setLogQueueCapacity(i2);
        sparkExecutorHdfsRollingLogAppender.setFlushInterval(i3);
        return sparkExecutorHdfsRollingLogAppender;
    }

    @Override // org.apache.kylin.spark.common.logging.AbstractHdfsLogAppender
    String getAppenderName() {
        return "SparkExecutorHdfsRollingLogAppender";
    }

    @Override // org.apache.kylin.spark.common.logging.AbstractHdfsLogAppender
    public void init() {
        StatusLogger.getLogger().warn("metadataIdentifier -> {}", getMetadataId());
        StatusLogger.getLogger().warn("category -> {}", getCategory());
        StatusLogger.getLogger().warn("identifier -> {}", getIdentifier());
        if (null != getProject()) {
            StatusLogger.getLogger().warn("project -> {}", getProject());
        }
        if (null != getJobName()) {
            StatusLogger.getLogger().warn("jobName -> {}", getJobName());
        }
    }

    @Override // org.apache.kylin.spark.common.logging.AbstractHdfsLogAppender
    boolean isSkipCheckAndFlushLog() {
        if (SparkEnv.get() != null || !StringUtils.isBlank(this.executorId)) {
            return false;
        }
        StatusLogger.getLogger().warn("Waiting for spark executor to start");
        try {
            Thread.sleep(1000L);
            return true;
        } catch (InterruptedException e) {
            StatusLogger.getLogger().error("Waiting for spark executor starting is interrupted!", e);
            Thread.currentThread().interrupt();
            return true;
        }
    }

    @Override // org.apache.kylin.spark.common.logging.AbstractHdfsLogAppender
    void doWriteLog(int i, List<LogEvent> list) throws IOException, InterruptedException {
        if (!this.logPathInit) {
            setLogPath(getInitLogPath());
            this.logPathInit = true;
        }
        UserGroupInformation ugi = getUGI();
        initFileSystemWithToken(ugi);
        if (needRollingFile(getLogPath(), Long.valueOf(getRollingByteSize()))) {
            StatusLogger.getLogger().debug("current log file size > {}, need to rolling", Long.valueOf(getRollingByteSize()));
            setLogPath(updateOutPutPath(getLogPath()));
        }
        if (!isWriterInited()) {
            Path path = new Path(getLogPath());
            if (ugi != null) {
                ugi.doAs(() -> {
                    if (initHdfsWriter(path, new Configuration())) {
                        return null;
                    }
                    StatusLogger.getLogger().error("Failed to init the hdfs writer!");
                    return null;
                });
            } else if (!initHdfsWriter(path, new Configuration())) {
                StatusLogger.getLogger().error("Failed to init the hdfs writer!");
            }
        }
        while (i > 0) {
            LogEvent take = getLogBufferQue().take();
            list.add(take);
            writeLogEvent(take);
            i--;
        }
    }

    private UserGroupInformation getUGI() throws IOException {
        StatusLogger.getLogger().warn(" out login user is {} SPARK_USER is {} USER is {}", UserGroupInformation.getLoginUser(), System.getenv("SPARK_USER"), System.getenv("USER"));
        return SparkEnv.getUGI();
    }

    private void initFileSystemWithToken(UserGroupInformation userGroupInformation) throws IOException, InterruptedException {
        if (ObjectUtils.isEmpty(userGroupInformation)) {
            StatusLogger.getLogger().warn("UserGroupInformation is null");
        } else {
            userGroupInformation.doAs(() -> {
                getFileSystem();
                return null;
            });
        }
    }

    @Override // org.apache.kylin.spark.common.logging.AbstractHdfsLogAppender
    String getLogPathAfterRolling(String str) {
        return String.format(Locale.ROOT, "%s/executor-%s.%s.log_processing", new Path(str).getParent().toString(), this.executorId, Long.valueOf(System.currentTimeMillis()));
    }

    @Override // org.apache.kylin.spark.common.logging.AbstractHdfsLogAppender
    String getLogPathRollingDone(String str) {
        return StringUtils.replace(str, "_processing", "");
    }

    private String getInitLogPath() {
        if (StringUtils.isBlank(this.executorId)) {
            this.executorId = SparkEnv.get() != null ? SparkEnv.get().executorId() : UUID.randomUUID().toString();
            StatusLogger.getLogger().warn("executorId set to {}", this.executorId);
        }
        String category = getCategory();
        boolean z = -1;
        switch (category.hashCode()) {
            case 754311776:
                if (category.equals("streaming_job")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                return String.format(Locale.ROOT, "%s/%s/%s/executor-%s.%s.log_processing", getRootPathName(), getIdentifier(), getJobTimeStamp(), this.executorId, getJobTimeStamp());
            default:
                throw new IllegalArgumentException("illegal category: " + getCategory());
        }
    }

    @VisibleForTesting
    String getRootPathName() {
        String category = getCategory();
        boolean z = -1;
        switch (category.hashCode()) {
            case 754311776:
                if (category.equals("streaming_job")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                return String.format(Locale.ROOT, "%s/streaming/spark_logs/%s", parseHdfsWordingDir(), getProject());
            default:
                throw new IllegalArgumentException("illegal category: " + getCategory());
        }
    }

    public String getIdentifier() {
        try {
            return StringUtils.isBlank(this.identifier) ? SparkEnv.get().conf().getAppId() : this.identifier;
        } catch (Exception e) {
            return null;
        }
    }

    private String parseHdfsWordingDir() {
        return StringUtils.appendIfMissing(getWorkingDir(), "/", new CharSequence[0]) + StringUtils.replace(getMetadataId(), "/", "-");
    }

    @Generated
    public String getLogPath() {
        return this.logPath;
    }

    @Generated
    public void setLogPath(String str) {
        this.logPath = str;
    }

    @Generated
    public int getRollingPeriod() {
        return this.rollingPeriod;
    }

    @Generated
    public void setRollingPeriod(int i) {
        this.rollingPeriod = i;
    }

    @Generated
    public void setRollingByteSize(long j) {
        this.rollingByteSize = j;
    }

    @Generated
    public long getRollingByteSize() {
        return this.rollingByteSize;
    }

    @Generated
    public String getMetadataId() {
        return this.metadataId;
    }

    @Generated
    public void setMetadataId(String str) {
        this.metadataId = str;
    }

    @Generated
    public String getCategory() {
        return this.category;
    }

    @Generated
    public void setCategory(String str) {
        this.category = str;
    }

    @Generated
    public void setIdentifier(String str) {
        this.identifier = str;
    }

    @Generated
    public String getJobName() {
        return this.jobName;
    }

    @Generated
    public void setJobName(String str) {
        this.jobName = str;
    }

    @Generated
    public String getProject() {
        return this.project;
    }

    @Generated
    public void setProject(String str) {
        this.project = str;
    }

    @Generated
    public String getJobTimeStamp() {
        return this.jobTimeStamp;
    }

    @Generated
    public void setJobTimeStamp(String str) {
        this.jobTimeStamp = str;
    }
}
