package org.apache.kylin.spark.common.logging;

import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Generated;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.kylin.common.util.ExecutorServiceUtil;
import org.apache.logging.log4j.core.Filter;
import org.apache.logging.log4j.core.Layout;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.appender.AbstractOutputStreamAppender;
import org.apache.logging.log4j.core.appender.AppenderLoggingException;
import org.apache.logging.log4j.core.appender.OutputStreamManager;
import org.apache.logging.log4j.core.config.Property;
import org.apache.logging.log4j.status.StatusLogger;
import org.apache.spark.utils.SparkHadoopUtils;

/* loaded from: input_file:org/apache/kylin/spark/common/logging/AbstractHdfsLogAppender.class */
public abstract class AbstractHdfsLogAppender extends AbstractOutputStreamAppender<HdfsManager> {
    public static final long ROLLING_BYTE_SIZE_DEFAULT = 524288000;
    private static final double QUEUE_FLUSH_THRESHOLD = 0.2d;
    private final Object flushLogLock;
    private final Object initWriterLock;
    private final Object closeLock;
    private final Object fileSystemLock;
    private volatile FSDataOutputStream outStream;
    private volatile FileSystem fileSystem;
    private ExecutorService appendHdfsService;
    private BlockingDeque<LogEvent> logBufferQue;
    private int logQueueCapacity;
    private int flushInterval;
    private String workingDir;
    private AtomicBoolean finished;
    private long start;

    /* loaded from: input_file:org/apache/kylin/spark/common/logging/AbstractHdfsLogAppender$HdfsManager.class */
    public static class HdfsManager extends OutputStreamManager {
        /* JADX INFO: Access modifiers changed from: protected */
        public HdfsManager(String str, Layout<?> layout) {
            super((OutputStream) null, str, layout, false);
        }

        private OutputStream getOutputStreamQuietly() {
            try {
                return getOutputStream();
            } catch (Exception e) {
                return null;
            }
        }

        protected synchronized void flushDestination() {
            HdfsDataOutputStream outputStreamQuietly = getOutputStreamQuietly();
            if (outputStreamQuietly != null) {
                try {
                    if (outputStreamQuietly instanceof HdfsDataOutputStream) {
                        outputStreamQuietly.hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
                    } else {
                        ((FSDataOutputStream) outputStreamQuietly).hflush();
                    }
                } catch (IOException e) {
                    throw new AppenderLoggingException("Error flushing stream " + getName(), e);
                }
            }
        }

        protected synchronized boolean closeOutputStream() {
            flush();
            FSDataOutputStream outputStreamQuietly = getOutputStreamQuietly();
            if (outputStreamQuietly == null || outputStreamQuietly == System.out || outputStreamQuietly == System.err) {
                return true;
            }
            try {
                outputStreamQuietly.hsync();
                outputStreamQuietly.close();
                LOGGER.debug("OutputStream closed");
                return true;
            } catch (IOException e) {
                logError("Unable to close stream", e);
                return false;
            }
        }

        public void setOutputStream(OutputStream outputStream) {
            super.setOutputStream(outputStream);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractHdfsLogAppender(String str, Layout<? extends Serializable> layout, Filter filter, boolean z, boolean z2, Property[] propertyArr, HdfsManager hdfsManager) {
        super(str, layout, filter, z, z2, propertyArr, hdfsManager);
        this.flushLogLock = new Object();
        this.initWriterLock = new Object();
        this.closeLock = new Object();
        this.fileSystemLock = new Object();
        this.outStream = null;
        this.fileSystem = null;
        this.appendHdfsService = null;
        this.logBufferQue = null;
        this.logQueueCapacity = 8192;
        this.flushInterval = 5000;
        this.finished = new AtomicBoolean(false);
        this.start = System.currentTimeMillis();
        StatusLogger.getLogger().warn("{} starting ...", getAppenderName());
        StatusLogger.getLogger().warn("hdfsWorkingDir -> {}", getWorkingDir());
        init();
        this.logBufferQue = new LinkedBlockingDeque(getLogQueueCapacity());
        this.appendHdfsService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("logger-thread-%d").build());
        this.appendHdfsService.submit(this::checkAndFlushLog);
        ShutdownHookManager.get().addShutdownHook(() -> {
            stop();
            closeWriter();
        }, 20);
        StatusLogger.getLogger().warn("{} started ...", getAppenderName());
    }

    public FileSystem getFileSystem() {
        return null == this.fileSystem ? getFileSystem(SparkHadoopUtils.newConfigurationWithSparkConf()) : this.fileSystem;
    }

    private FileSystem getFileSystem(Configuration configuration) {
        synchronized (this.fileSystemLock) {
            if (null == this.fileSystem) {
                try {
                    if (Objects.isNull(this.workingDir) || this.workingDir.isEmpty()) {
                        this.workingDir = System.getProperty("kylin.hdfs.working.dir");
                        StatusLogger.getLogger().warn("hdfsWorkingDir -> " + getWorkingDir());
                    }
                    this.fileSystem = new Path(this.workingDir).getFileSystem(configuration);
                } catch (IOException e) {
                    StatusLogger.getLogger().error("Failed to create the file system, ", e);
                    throw new RuntimeException("Failed to create the file system, ", e);
                }
            }
        }
        return this.fileSystem;
    }

    public boolean isWriterInited() {
        boolean z;
        synchronized (this.initWriterLock) {
            z = null != this.outStream;
        }
        return z;
    }

    abstract void init();

    abstract String getAppenderName();

    public void append(LogEvent logEvent) {
        try {
            if (!this.logBufferQue.offer(logEvent, 10L, TimeUnit.SECONDS)) {
                StatusLogger.getLogger().error("LogEvent cannot put into the logBufferQue, log event content:");
                printLoggingEvent(logEvent);
            }
        } catch (InterruptedException e) {
            StatusLogger.getLogger().warn("Append logging event interrupted!", e);
            Thread.currentThread().interrupt();
        }
    }

    public void stop() {
        synchronized (this.closeLock) {
            if (!isStopped()) {
                this.finished.set(true);
                try {
                    if (this.appendHdfsService != null && !this.appendHdfsService.isShutdown()) {
                        ExecutorServiceUtil.shutdownGracefully(this.appendHdfsService, 60);
                    }
                } catch (Exception e) {
                    while (!getLogBufferQue().isEmpty()) {
                        try {
                            printLoggingEvent(getLogBufferQue().take());
                        } catch (Exception e2) {
                            StatusLogger.getLogger().error("clear the logging buffer queue failed!", e2);
                        }
                    }
                    StatusLogger.getLogger().error(String.format(Locale.ROOT, "close %s failed!", getAppenderName()), e);
                }
                StatusLogger.getLogger().warn("{} closed ...", getAppenderName());
            }
        }
    }

    private void closeWriter() {
        ((HdfsManager) getManager()).close();
        this.outStream = null;
    }

    abstract boolean isSkipCheckAndFlushLog();

    private void clearLogBufferQueueWhenBlocked() {
        if (this.logBufferQue.size() >= getLogQueueCapacity()) {
            for (int logQueueCapacity = getLogQueueCapacity() / 5; logQueueCapacity > 0; logQueueCapacity--) {
                try {
                    printLoggingEvent(this.logBufferQue.take());
                } catch (Exception e) {
                    StatusLogger.getLogger().error("Take event interrupted!", e);
                }
            }
        }
    }

    private void printLoggingEvent(LogEvent logEvent) {
        try {
            getLayout().encode(logEvent, getManager());
            if (null != logEvent.getThrown()) {
                for (StackTraceElement stackTraceElement : logEvent.getThrown().getStackTrace()) {
                    StatusLogger.getLogger().error(stackTraceElement);
                }
            }
        } catch (Exception e) {
            StatusLogger.getLogger().error("print logging event failed!", e);
        }
    }

    protected void checkAndFlushLog() {
        do {
            ArrayList newArrayList = Lists.newArrayList();
            try {
                if (!isSkipCheckAndFlushLog()) {
                    int size = getLogBufferQue().size();
                    if (this.finished.get()) {
                        if (size > 0) {
                            flushLog(size, newArrayList);
                        }
                        return;
                    } else if (size > getLogQueueCapacity() * QUEUE_FLUSH_THRESHOLD || System.currentTimeMillis() - this.start > getFlushInterval()) {
                        this.start = System.currentTimeMillis();
                        flushLog(size, newArrayList);
                    } else {
                        Thread.sleep(getFlushInterval() / 100);
                    }
                }
            } catch (Exception e) {
                newArrayList.forEach(this::printLoggingEvent);
                clearLogBufferQueueWhenBlocked();
                StatusLogger.getLogger().error("Error occurred when consume event", e);
            }
        } while (!isStopped());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean initHdfsWriter(Path path, Configuration configuration) {
        synchronized (this.initWriterLock) {
            StatusLogger.getLogger().warn("init hdfs writer...");
            closeWriter();
            int i = 10;
            while (true) {
                int i2 = i;
                i--;
                if (i2 <= 0) {
                    break;
                }
                try {
                    this.fileSystem = getFileSystem(configuration);
                    this.outStream = this.fileSystem.exists(path) ? this.fileSystem.append(path, 8192) : this.fileSystem.create(path, false);
                    break;
                } catch (Exception e) {
                    StatusLogger.getLogger().error("fail to create stream for path: " + path, e);
                    try {
                        this.initWriterLock.wait(1000L);
                    } catch (InterruptedException e2) {
                        StatusLogger.getLogger().warn("Init writer interrupted!", e2);
                        Thread.currentThread().interrupt();
                    }
                }
            }
            if (null == this.outStream) {
                return false;
            }
            ((HdfsManager) getManager()).setOutputStream(this.outStream);
            return true;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void writeLogEvent(LogEvent logEvent) {
        if (null != logEvent) {
            getLayout().encode(logEvent, getManager());
        }
    }

    abstract void doWriteLog(int i, List<LogEvent> list) throws IOException, InterruptedException;

    private void flush() {
        ((HdfsManager) getManager()).flush();
    }

    protected void flushLog(int i, List<LogEvent> list) throws IOException, InterruptedException {
        if (i <= 0) {
            return;
        }
        synchronized (this.flushLogLock) {
            if (i > getLogBufferQue().size()) {
                i = getLogBufferQue().size();
            }
            doWriteLog(i, list);
            flush();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean needRollingFile(String str, Long l) throws IOException {
        Path path = new Path(str);
        FileSystem fileSystem = getFileSystem();
        StatusLogger.getLogger().debug("log file path {}, {}", str, Boolean.valueOf(fileSystem.exists(path)));
        if (!fileSystem.exists(path)) {
            return false;
        }
        long length = fileSystem.getContentSummary(path).getLength();
        StatusLogger.getLogger().debug("log file length {}", Long.valueOf(length));
        return length > l.longValue();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String updateOutPutPath(String str) throws IOException {
        String logPathAfterRolling;
        synchronized (this.initWriterLock) {
            StatusLogger.getLogger().debug("start rolling log file {}", str);
            getFileSystem().rename(new Path(str), new Path(getLogPathRollingDone(str)));
            logPathAfterRolling = getLogPathAfterRolling(str);
            this.outStream = null;
            StatusLogger.getLogger().debug("end rolling log file {}", logPathAfterRolling);
        }
        return logPathAfterRolling;
    }

    abstract String getLogPathAfterRolling(String str);

    abstract String getLogPathRollingDone(String str);

    @Generated
    public BlockingDeque<LogEvent> getLogBufferQue() {
        return this.logBufferQue;
    }

    @Generated
    public int getLogQueueCapacity() {
        return this.logQueueCapacity;
    }

    @Generated
    public void setLogQueueCapacity(int i) {
        this.logQueueCapacity = i;
    }

    @Generated
    public int getFlushInterval() {
        return this.flushInterval;
    }

    @Generated
    public void setFlushInterval(int i) {
        this.flushInterval = i;
    }

    @Generated
    public String getWorkingDir() {
        return this.workingDir;
    }

    @Generated
    public void setWorkingDir(String str) {
        this.workingDir = str;
    }
}
