package co.cask.cdap.logging.framework.local;

import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.classic.spi.LoggingEvent;
import co.cask.cdap.api.logging.AppenderContext;
import co.cask.cdap.api.metrics.MetricsCollectionService;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.logging.appender.LogAppender;
import co.cask.cdap.logging.appender.LogMessage;
import co.cask.cdap.logging.framework.LocalAppenderContext;
import co.cask.cdap.logging.framework.LogPipelineLoader;
import co.cask.cdap.logging.framework.LogPipelineSpecification;
import co.cask.cdap.logging.pipeline.LogProcessorPipelineContext;
import com.google.common.util.concurrent.AbstractExecutionThreadService;
import com.google.inject.Inject;
import com.google.inject.Provider;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.tephra.TransactionSystemClient;
import org.apache.twill.filesystem.LocationFactory;

/* loaded from: input_file:co/cask/cdap/logging/framework/local/LocalLogAppender.class */
public class LocalLogAppender extends LogAppender {
    private static final ILoggingEvent SHUTDOWN_EVENT = new LoggingEvent();
    private static final int EVENT_QUEUE_SIZE = 65536;
    private final CConfiguration cConf;
    private final DatasetFramework datasetFramework;
    private final TransactionSystemClient txClient;
    private final LocationFactory locationFactory;
    private final MetricsCollectionService metricsCollectionService;
    private final List<LocalLogProcessorPipeline> pipelines = new ArrayList();
    private final AtomicBoolean started = new AtomicBoolean();
    private final AtomicBoolean stopped = new AtomicBoolean();
    private final Set<Thread> pipelineThreads = Collections.newSetFromMap(new IdentityHashMap());

    /* loaded from: input_file:co/cask/cdap/logging/framework/local/LocalLogAppender$LocalLogProcessorPipeline.class */
    private final class LocalLogProcessorPipeline extends AbstractExecutionThreadService {
        private final LogProcessorPipelineContext context;
        private final long syncIntervalMillis;
        private final BlockingQueue<ILoggingEvent> eventQueue;
        private long lastSyncTime;
        private Thread appenderThread;

        private LocalLogProcessorPipeline(LogProcessorPipelineContext logProcessorPipelineContext, long j) {
            this.context = logProcessorPipelineContext;
            this.syncIntervalMillis = j;
            this.eventQueue = new ArrayBlockingQueue(LocalLogAppender.EVENT_QUEUE_SIZE);
        }

        String getName() {
            return this.context.getName();
        }

        Thread getAppenderThread() {
            return this.appenderThread;
        }

        protected Executor executor() {
            return new Executor() { // from class: co.cask.cdap.logging.framework.local.LocalLogAppender.LocalLogProcessorPipeline.1
                @Override // java.util.concurrent.Executor
                public void execute(Runnable runnable) {
                    new Thread(runnable, "LocalLogProcessor-" + LocalLogProcessorPipeline.this.getName()).start();
                }
            };
        }

        protected void startUp() throws Exception {
            LocalLogAppender.this.addInfo("Starting log processing pipeline " + getName());
            this.context.start();
            LocalLogAppender.this.addInfo("Log processing pipeline " + getName() + " started");
            this.appenderThread = Thread.currentThread();
        }

        protected void shutDown() throws Exception {
            LocalLogAppender.this.addInfo("Stopping log processing pipeline " + getName());
            for (ILoggingEvent iLoggingEvent : this.eventQueue) {
                if (iLoggingEvent != LocalLogAppender.SHUTDOWN_EVENT) {
                    this.context.getEffectiveLogger(iLoggingEvent.getLoggerName()).callAppenders(iLoggingEvent);
                }
            }
            this.context.stop();
            LocalLogAppender.this.addInfo("Log processing pipeline " + getName() + " stopped");
        }

        protected void triggerShutdown() {
            this.eventQueue.offer(LocalLogAppender.SHUTDOWN_EVENT);
        }

        protected void run() {
            try {
                ILoggingEvent take = this.eventQueue.take();
                while (isRunning()) {
                    callAppenders(take);
                    long timeStamp = this.syncIntervalMillis - (take.getTimeStamp() - this.lastSyncTime);
                    take = timeStamp > 0 ? this.eventQueue.poll(timeStamp, TimeUnit.MILLISECONDS) : null;
                    if (take == null) {
                        sync(System.currentTimeMillis());
                        take = this.eventQueue.take();
                    }
                }
                callAppenders(take);
            } catch (InterruptedException e) {
            }
        }

        void append(ILoggingEvent iLoggingEvent) {
            if (isRunning()) {
                if (iLoggingEvent.getLevel().isGreaterOrEqual(this.context.getEffectiveLogger(iLoggingEvent.getLoggerName()).getEffectiveLevel())) {
                    try {
                        this.eventQueue.put(iLoggingEvent);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
        }

        private void callAppenders(ILoggingEvent iLoggingEvent) {
            if (iLoggingEvent == LocalLogAppender.SHUTDOWN_EVENT) {
                return;
            }
            Logger effectiveLogger = this.context.getEffectiveLogger(iLoggingEvent.getLoggerName());
            try {
                effectiveLogger.callAppenders(iLoggingEvent);
            } catch (Throwable th) {
                LocalLogAppender.this.addError("Exception raised when appending to logger " + effectiveLogger.getName() + " with message " + iLoggingEvent.getFormattedMessage(), th);
            }
        }

        private void sync(long j) {
            try {
                this.context.sync();
                this.lastSyncTime = j;
            } catch (IOException e) {
                LocalLogAppender.this.addError("Exception raised when syncing log processing pipeline " + getName(), e);
            }
        }
    }

    @Inject
    LocalLogAppender(CConfiguration cConfiguration, DatasetFramework datasetFramework, TransactionSystemClient transactionSystemClient, LocationFactory locationFactory, MetricsCollectionService metricsCollectionService) {
        this.cConf = cConfiguration;
        this.datasetFramework = datasetFramework;
        this.txClient = transactionSystemClient;
        this.locationFactory = locationFactory;
        this.metricsCollectionService = metricsCollectionService;
        setName(getClass().getName());
    }

    public void start() {
        if (this.started.compareAndSet(false, true)) {
            Map load = new LogPipelineLoader(this.cConf).load(new Provider<AppenderContext>() { // from class: co.cask.cdap.logging.framework.local.LocalLogAppender.1
                /* renamed from: get, reason: merged with bridge method [inline-methods] */
                public AppenderContext m21get() {
                    return new LocalAppenderContext(LocalLogAppender.this.datasetFramework, LocalLogAppender.this.txClient, LocalLogAppender.this.locationFactory, LocalLogAppender.this.metricsCollectionService);
                }
            });
            long j = this.cConf.getLong("log.process.pipeline.event.delay.ms");
            for (LogPipelineSpecification logPipelineSpecification : load.values()) {
                LocalLogProcessorPipeline localLogProcessorPipeline = new LocalLogProcessorPipeline(new LogProcessorPipelineContext(this.cConf, logPipelineSpecification.getName(), logPipelineSpecification.getContext(), logPipelineSpecification.getContext().getMetricsContext(), logPipelineSpecification.getContext().getInstanceId()), j);
                localLogProcessorPipeline.startAndWait();
                this.pipelineThreads.add(localLogProcessorPipeline.getAppenderThread());
                this.pipelines.add(localLogProcessorPipeline);
            }
            super.start();
        }
    }

    public void stop() {
        if (this.stopped.compareAndSet(false, true)) {
            super.stop();
            for (LocalLogProcessorPipeline localLogProcessorPipeline : this.pipelines) {
                try {
                    localLogProcessorPipeline.stopAndWait();
                } catch (Throwable th) {
                    addError("Exception raised when stopping log processing pipeline " + localLogProcessorPipeline.getName(), th);
                }
            }
        }
    }

    public void doAppend(ILoggingEvent iLoggingEvent) {
        if (this.pipelineThreads.contains(Thread.currentThread())) {
            return;
        }
        super.doAppend((Object) iLoggingEvent);
    }

    @Override // co.cask.cdap.logging.appender.LogAppender
    protected void appendEvent(LogMessage logMessage) {
        logMessage.prepareForDeferredProcessing();
        logMessage.getCallerData();
        Iterator<LocalLogProcessorPipeline> it = this.pipelines.iterator();
        while (it.hasNext()) {
            it.next().append(logMessage);
        }
    }

    public void appendEvent(ILoggingEvent iLoggingEvent) {
        iLoggingEvent.prepareForDeferredProcessing();
        iLoggingEvent.getCallerData();
        Iterator<LocalLogProcessorPipeline> it = this.pipelines.iterator();
        while (it.hasNext()) {
            it.next().append(iLoggingEvent);
        }
    }
}
