package org.apache.samza.logging.log4j2;

import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.logging.log4j.core.Filter;
import org.apache.logging.log4j.core.Layout;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.appender.AbstractAppender;
import org.apache.logging.log4j.core.config.plugins.Plugin;
import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
import org.apache.logging.log4j.core.config.plugins.PluginElement;
import org.apache.logging.log4j.core.config.plugins.PluginFactory;
import org.apache.logging.log4j.core.impl.Log4jLogEvent;
import org.apache.logging.log4j.message.Message;
import org.apache.logging.log4j.message.SimpleMessage;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.Log4jSystemConfig;
import org.apache.samza.config.MetricsConfig;
import org.apache.samza.logging.LoggingContextHolder;
import org.apache.samza.logging.log4j2.serializers.LoggingEventJsonSerdeFactory;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.serializers.Serde;
import org.apache.samza.serializers.SerdeFactory;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemFactory;
import org.apache.samza.system.SystemProducer;
import org.apache.samza.system.SystemStream;
import org.apache.samza.util.MetricsReporterLoader;
import org.apache.samza.util.ReflectionUtil;

@Plugin(name = "Stream", category = "Core", elementType = "appender", printObject = true)
/* loaded from: input_file:org/apache/samza/logging/log4j2/StreamAppender.class */
public class StreamAppender extends AbstractAppender {
    private static final String JAVA_OPTS_CONTAINER_NAME = "samza.container.name";
    private static final String SOURCE = "log4j-log";
    private static final String CREATE_STREAM_ENABLED = "task.log4j.create.stream.enabled";
    private static final long DEFAULT_QUEUE_TIMEOUT_S = 2;
    private final BlockingQueue<EncodedLogEvent> logQueue;
    private SystemStream systemStream;
    private SystemProducer systemProducer;
    private String key;
    private byte[] keyBytes;
    private String containerName;
    private int partitionCount;
    private Serde<LogEvent> serde;
    private volatile Thread transferThread;
    private Config config;
    private String streamName;
    private final boolean usingAsyncLogger;
    private final LoggingContextHolder loggingContextHolder;
    private final AtomicBoolean recursiveCall;
    protected static final int DEFAULT_QUEUE_SIZE = 100;
    protected volatile boolean systemInitialized;
    protected StreamAppenderMetrics metrics;
    protected long queueTimeoutS;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/samza/logging/log4j2/StreamAppender$ByteArrayEncodedLogEvent.class */
    public class ByteArrayEncodedLogEvent implements EncodedLogEvent<byte[]> {
        final byte[] entryValue;

        public ByteArrayEncodedLogEvent(byte[] bArr) {
            this.entryValue = bArr;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.samza.logging.log4j2.StreamAppender.EncodedLogEvent
        public byte[] getValue() {
            return this.entryValue;
        }

        @Override // org.apache.samza.logging.log4j2.StreamAppender.EncodedLogEvent
        public long getEntryValueSize() {
            return this.entryValue.length;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/samza/logging/log4j2/StreamAppender$EncodedLogEvent.class */
    public interface EncodedLogEvent<T> {
        long getEntryValueSize();

        T getValue();
    }

    protected StreamAppender(String str, Filter filter, Layout<? extends Serializable> layout, boolean z, boolean z2, String str2) {
        this(str, filter, layout, z, z2, str2, LoggingContextHolder.INSTANCE);
    }

    protected StreamAppender(String str, Filter filter, Layout<? extends Serializable> layout, boolean z, boolean z2, String str2, LoggingContextHolder loggingContextHolder) {
        super(str, filter, layout, z);
        this.logQueue = new LinkedBlockingQueue(DEFAULT_QUEUE_SIZE);
        this.systemStream = null;
        this.systemProducer = null;
        this.key = null;
        this.containerName = null;
        this.partitionCount = 0;
        this.serde = null;
        this.config = null;
        this.streamName = null;
        this.recursiveCall = new AtomicBoolean(false);
        this.systemInitialized = false;
        this.queueTimeoutS = DEFAULT_QUEUE_TIMEOUT_S;
        this.streamName = str2;
        this.usingAsyncLogger = z2;
        this.loggingContextHolder = loggingContextHolder;
    }

    public void start() {
        super.start();
        this.containerName = System.getProperty(JAVA_OPTS_CONTAINER_NAME);
        if (this.containerName == null) {
            throw new SamzaException("Got null container name from system property: samza.container.name. This is used as the key for the log appender, so can't proceed.");
        }
        this.key = this.containerName;
        try {
            this.keyBytes = this.key.getBytes("UTF-8");
        } catch (UnsupportedEncodingException e) {
            throw new SamzaException(String.format("Container name: %s could not be encoded to bytes. %s cannot proceed.", this.key, getName()), e);
        }
    }

    public String getStreamName() {
        return this.streamName;
    }

    protected Config getConfig() {
        if (this.config == null) {
            this.config = this.loggingContextHolder.getConfig();
        }
        return this.config;
    }

    public int getPartitionCount() {
        return this.partitionCount > 0 ? this.partitionCount : new JobConfig(getConfig()).getContainerCount();
    }

    public void setPartitionCount(int i) {
        this.partitionCount = i;
    }

    @PluginFactory
    public static StreamAppender createAppender(@PluginAttribute("name") String str, @PluginElement("Filter") Filter filter, @PluginElement("Layout") Layout layout, @PluginAttribute(value = "ignoreExceptions", defaultBoolean = true) boolean z, @PluginAttribute(value = "usingAsyncLogger", defaultBoolean = false) boolean z2, @PluginAttribute("streamName") String str2) {
        return new StreamAppender(str, filter, layout, z, z2, str2);
    }

    public void append(LogEvent logEvent) {
        try {
            if (this.recursiveCall.get()) {
                if (this.metrics != null) {
                    this.metrics.recursiveCalls.inc();
                    return;
                }
                return;
            }
            try {
                this.recursiveCall.set(true);
                if (this.systemInitialized) {
                    handleEvent(logEvent);
                } else if (this.loggingContextHolder.getConfig() != null) {
                    synchronized (this) {
                        if (!this.systemInitialized) {
                            setupSystem();
                            this.systemInitialized = true;
                        }
                    }
                    handleEvent(logEvent);
                } else {
                    System.out.println("Waiting for config to become available before log can be handled");
                }
                this.recursiveCall.set(false);
            } catch (Exception e) {
                if (this.metrics != null) {
                    this.metrics.logMessagesErrors.inc();
                }
                System.err.println(String.format("[%s] Error sending log message:", getName()));
                e.printStackTrace();
                this.recursiveCall.set(false);
            }
        } catch (Throwable th) {
            this.recursiveCall.set(false);
            throw th;
        }
    }

    private void handleEvent(LogEvent logEvent) throws InterruptedException {
        if (this.usingAsyncLogger) {
            sendEventToSystemProducer(encodeLogEvent(logEvent));
            return;
        }
        if (!this.logQueue.offer(encodeLogEvent(logEvent), this.queueTimeoutS, TimeUnit.SECONDS)) {
            int drainTo = this.logQueue.drainTo(new ArrayList()) + 1;
            System.err.println(String.format("Exceeded timeout %ss while trying to log to %s. Dropping %d log messages.", Long.valueOf(this.queueTimeoutS), this.systemStream.toString(), Integer.valueOf(drainTo)));
            this.metrics.logMessagesDropped.inc(drainTo);
        }
        this.metrics.bufferFillPct.set(Integer.valueOf(Math.round((100.0f * this.logQueue.size()) / 100.0f)));
    }

    protected EncodedLogEvent encodeLogEvent(LogEvent logEvent) {
        return new ByteArrayEncodedLogEvent(this.serde.toBytes(subLog(logEvent)));
    }

    private Message subAppend(LogEvent logEvent) {
        if (getLayout() == null) {
            return new SimpleMessage(logEvent.getMessage().getFormattedMessage());
        }
        Message serializable = getLayout().toSerializable(logEvent);
        return serializable instanceof Message ? new SimpleMessage(serializable.getFormattedMessage()) : serializable instanceof LogEvent ? new SimpleMessage(((LogEvent) serializable).getMessage().getFormattedMessage()) : new SimpleMessage(serializable.toString());
    }

    protected LogEvent subLog(LogEvent logEvent) {
        return Log4jLogEvent.newBuilder().setLevel(logEvent.getLevel()).setLoggerName(logEvent.getLoggerName()).setLoggerFqcn(logEvent.getLoggerFqcn()).setMessage(subAppend(logEvent)).setThrown(logEvent.getThrown()).setContextData(logEvent.getContextData()).setContextStack(logEvent.getContextStack()).setThreadName(logEvent.getThreadName()).setSource(logEvent.getSource()).setTimeMillis(logEvent.getTimeMillis()).build();
    }

    public void stop() {
        System.out.println(String.format("Shutting down the %s...", getName()));
        if (this.transferThread != null) {
            this.transferThread.interrupt();
            try {
                this.transferThread.join();
            } catch (InterruptedException e) {
                System.err.println("Interrupted while waiting for transfer thread to finish." + e);
                Thread.currentThread().interrupt();
            }
        }
        flushSystemProducer();
        if (this.systemProducer != null) {
            this.systemProducer.stop();
        }
    }

    private void flushSystemProducer() {
        if (this.systemProducer != null) {
            this.systemProducer.flush(SOURCE);
        }
    }

    protected Log4jSystemConfig getLog4jSystemConfig(Config config) {
        return new Log4jSystemConfig(config);
    }

    protected StreamAppenderMetrics getMetrics(MetricsRegistryMap metricsRegistryMap) {
        return new StreamAppenderMetrics(getName(), metricsRegistryMap);
    }

    protected void setupStream(SystemFactory systemFactory, String str) {
        if (this.config.getBoolean(CREATE_STREAM_ENABLED, false)) {
            int partitionCount = getPartitionCount();
            System.out.println(String.format("[%s] creating stream ", getName()) + this.streamName + " with partition count " + partitionCount);
            StreamSpec createStreamAppenderStreamSpec = StreamSpec.createStreamAppenderStreamSpec(this.streamName, str, partitionCount);
            SystemAdmin admin = systemFactory.getAdmin(str, this.config);
            admin.start();
            admin.createStream(createStreamAppenderStreamSpec);
            admin.stop();
        }
    }

    protected void setupSystem() {
        this.config = getConfig();
        Log4jSystemConfig log4jSystemConfig = getLog4jSystemConfig(this.config);
        if (this.streamName == null) {
            this.streamName = getStreamName(log4jSystemConfig.getJobName(), log4jSystemConfig.getJobId());
        }
        MetricsRegistryMap metricsRegistryMap = new MetricsRegistryMap();
        this.metrics = getMetrics(metricsRegistryMap);
        MetricsReporterLoader.getMetricsReporters(new MetricsConfig(this.config), this.containerName).values().forEach(metricsReporter -> {
            metricsReporter.register(this.containerName, metricsRegistryMap);
            metricsReporter.start();
        });
        String systemName = log4jSystemConfig.getSystemName();
        SystemFactory systemFactory = (SystemFactory) ReflectionUtil.getObj((String) log4jSystemConfig.getSystemFactory(systemName).orElseThrow(() -> {
            return new SamzaException("Could not figure out \"" + systemName + "\" system factory for log4j " + getName() + " to use");
        }), SystemFactory.class);
        setSerde(log4jSystemConfig, systemName);
        setupStream(systemFactory, systemName);
        this.systemProducer = systemFactory.getProducer(systemName, this.config, metricsRegistryMap, getClass().getSimpleName());
        this.systemStream = new SystemStream(systemName, this.streamName);
        this.systemProducer.register(SOURCE);
        this.systemProducer.start();
        System.out.println("log4j-log has been registered in " + systemName + ". So all the logs will be sent to " + this.streamName + " in " + systemName + ". Logs are partitioned by " + this.key);
        startTransferThread();
    }

    private void startTransferThread() {
        this.transferThread = new Thread(() -> {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    sendEventToSystemProducer(this.logQueue.take());
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } catch (Throwable th) {
                    this.metrics.logMessagesErrors.inc();
                    System.err.println("Error sending " + getName() + " event to SystemProducer " + th);
                }
            }
        });
        this.transferThread.setDaemon(true);
        this.transferThread.setName("Samza " + getName() + " Producer " + this.transferThread.getName());
        this.transferThread.start();
    }

    private void sendEventToSystemProducer(EncodedLogEvent encodedLogEvent) {
        this.metrics.logMessagesBytesSent.inc(encodedLogEvent.getEntryValueSize());
        this.metrics.logMessagesCountSent.inc();
        this.systemProducer.send(SOURCE, decorateLogEvent(encodedLogEvent));
    }

    protected OutgoingMessageEnvelope decorateLogEvent(EncodedLogEvent encodedLogEvent) {
        return new OutgoingMessageEnvelope(this.systemStream, this.keyBytes, encodedLogEvent.getValue());
    }

    protected String getStreamName(String str, String str2) {
        if (str == null) {
            throw new SamzaException("job name is null. Please specify job.name");
        }
        if (str2 == null) {
            str2 = "1";
        }
        return ("__samza_" + str + "_" + str2 + "_logs").replace("-", "_");
    }

    protected void setSerde(Log4jSystemConfig log4jSystemConfig, String str) {
        String canonicalName = LoggingEventJsonSerdeFactory.class.getCanonicalName();
        String streamSerdeName = log4jSystemConfig.getStreamSerdeName(str, this.streamName);
        if (streamSerdeName != null) {
            canonicalName = log4jSystemConfig.getSerdeClass(streamSerdeName);
        }
        if (canonicalName == null) {
            throw new SamzaException("Can not find serializers class for key '" + streamSerdeName + "'. Please specify " + String.format("serializers.registry.%s.class", streamSerdeName) + " property");
        }
        this.serde = ((SerdeFactory) ReflectionUtil.getObj(canonicalName, SerdeFactory.class)).getSerde(str, this.config);
    }

    public Serde<LogEvent> getSerde() {
        return this.serde;
    }
}
