package org.apache.samza.logging.log4j;

import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.Logger;
import org.apache.log4j.spi.LoggingEvent;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.Log4jSystemConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.SerializerConfig;
import org.apache.samza.config.ShellCommandConfig;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.coordinator.JobModelManager;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.logging.log4j.serializers.LoggingEventJsonSerdeFactory;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.serializers.Serde;
import org.apache.samza.serializers.SerdeFactory;
import org.apache.samza.serializers.model.SamzaObjectMapper;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemFactory;
import org.apache.samza.system.SystemProducer;
import org.apache.samza.system.SystemStream;
import org.apache.samza.util.ExponentialSleepStrategy;
import org.apache.samza.util.HttpUtil;
import org.apache.samza.util.Util;

/* loaded from: input_file:org/apache/samza/logging/log4j/StreamAppender.class */
public class StreamAppender extends AppenderSkeleton {
    private static final String JAVA_OPTS_CONTAINER_NAME = "samza.container.name";
    private static final String JOB_COORDINATOR_TAG = "samza-job-coordinator";
    private static final String SOURCE = "log4j-log";
    private static final String CREATE_STREAM_ENABLED = "task.log4j.create.stream.enabled";
    protected static final int DEFAULT_QUEUE_SIZE = 100;
    private static final long DEFAULT_QUEUE_TIMEOUT_S = 2;
    protected static volatile boolean systemInitialized = false;
    protected StreamAppenderMetrics metrics;
    private Thread transferThread;
    private Config config = null;
    private SystemStream systemStream = null;
    private SystemProducer systemProducer = null;
    private String key = null;
    private String streamName = null;
    private int partitionCount = 0;
    private boolean isApplicationMaster = false;
    private Serde<LoggingEvent> serde = null;
    private Logger log = Logger.getLogger(StreamAppender.class);
    private final BlockingQueue<byte[]> logQueue = new LinkedBlockingQueue(DEFAULT_QUEUE_SIZE);
    protected long queueTimeoutS = DEFAULT_QUEUE_TIMEOUT_S;
    private final AtomicBoolean recursiveCall = new AtomicBoolean(false);

    public String getStreamName() {
        return this.streamName;
    }

    public void setStreamName(String str) {
        this.streamName = str;
    }

    public int getPartitionCount() {
        return this.partitionCount > 0 ? this.partitionCount : new JobConfig(getConfig()).getContainerCount();
    }

    public void setPartitionCount(int i) {
        this.partitionCount = i;
    }

    public void activateOptions() {
        String property = System.getProperty(JAVA_OPTS_CONTAINER_NAME);
        if (property == null) {
            throw new SamzaException("Got null container name from system property: samza.container.name. This is used as the key for the log appender, so can't proceed.");
        }
        this.isApplicationMaster = property.contains(JOB_COORDINATOR_TAG);
        this.key = property;
        if (this.isApplicationMaster) {
            systemInitialized = false;
        } else {
            setupSystem();
            systemInitialized = true;
        }
    }

    public void append(LoggingEvent loggingEvent) {
        try {
            if (this.recursiveCall.get()) {
                if (this.metrics != null) {
                    this.metrics.recursiveCalls.inc();
                    return;
                }
                return;
            }
            this.recursiveCall.set(true);
            if (systemInitialized) {
                if (!this.logQueue.offer(this.serde.toBytes(subLog(loggingEvent)), this.queueTimeoutS, TimeUnit.SECONDS)) {
                    int drainTo = this.logQueue.drainTo(new ArrayList()) + 1;
                    this.log.warn(String.format("Exceeded timeout %ss while trying to log to %s. Dropping %d log messages.", Long.valueOf(this.queueTimeoutS), this.systemStream.toString(), Integer.valueOf(drainTo)));
                    this.metrics.logMessagesDropped.inc(drainTo);
                }
                this.metrics.bufferFillPct.set(Integer.valueOf(Math.round((100.0f * this.logQueue.size()) / 100.0f)));
            } else if (JobModelManager.currentJobModelManager() != null) {
                setupSystem();
                systemInitialized = true;
            } else {
                this.log.trace("Waiting for the JobCoordinator to be instantiated...");
            }
        } catch (Exception e) {
            System.err.println("[StreamAppender] Error sending log message:");
            e.printStackTrace();
        } finally {
            this.recursiveCall.set(false);
        }
    }

    private String subAppend(LoggingEvent loggingEvent) {
        return this.layout == null ? loggingEvent.getRenderedMessage() : this.layout.format(loggingEvent).trim();
    }

    private LoggingEvent subLog(LoggingEvent loggingEvent) {
        return new LoggingEvent(loggingEvent.getFQNOfLoggerClass(), loggingEvent.getLogger(), loggingEvent.getTimeStamp(), loggingEvent.getLevel(), subAppend(loggingEvent), loggingEvent.getThreadName(), loggingEvent.getThrowableInformation(), loggingEvent.getNDC(), loggingEvent.getLocationInformation(), loggingEvent.getProperties());
    }

    public void close() {
        this.log.info("Shutting down the StreamAppender...");
        if (this.closed) {
            return;
        }
        this.closed = true;
        this.transferThread.interrupt();
        try {
            this.transferThread.join();
        } catch (InterruptedException e) {
            this.log.error("Interrupted while waiting for transfer thread to finish.", e);
            Thread.currentThread().interrupt();
        }
        flushSystemProducer();
        if (this.systemProducer != null) {
            this.systemProducer.stop();
        }
    }

    public boolean requiresLayout() {
        return false;
    }

    public void flushSystemProducer() {
        if (this.systemProducer != null) {
            this.systemProducer.flush(SOURCE);
        }
    }

    protected Config getConfig() {
        try {
            return new MapConfig(new Map[]{this.isApplicationMaster ? JobModelManager.currentJobModelManager().jobModel().getConfig() : ((JobModel) SamzaObjectMapper.getObjectMapper().readValue(HttpUtil.read(new URL(System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL())), 30000, new ExponentialSleepStrategy()), JobModel.class)).getConfig(), ImmutableMap.of(TaskConfig.DROP_PRODUCER_ERROR(), "true")});
        } catch (IOException e) {
            throw new SamzaException("can not read the config", e);
        }
    }

    protected void setupSystem() {
        this.config = getConfig();
        Log4jSystemConfig log4jSystemConfig = new Log4jSystemConfig(this.config);
        if (this.streamName == null) {
            this.streamName = getStreamName(log4jSystemConfig.getJobName(), log4jSystemConfig.getJobId());
        }
        MetricsRegistryMap metricsRegistryMap = new MetricsRegistryMap();
        this.metrics = new StreamAppenderMetrics("stream-appender", metricsRegistryMap);
        String systemName = log4jSystemConfig.getSystemName();
        String systemFactory = log4jSystemConfig.getSystemFactory(systemName);
        if (systemFactory == null) {
            throw new SamzaException("Could not figure out \"" + systemName + "\" system factory for log4j StreamAppender to use");
        }
        SystemFactory systemFactory2 = (SystemFactory) Util.getObj(systemFactory, SystemFactory.class);
        setSerde(log4jSystemConfig, systemName, this.streamName);
        if (this.config.getBoolean(CREATE_STREAM_ENABLED, false)) {
            System.out.println("[StreamAppender] creating stream " + this.streamName + " with partition count " + getPartitionCount());
            StreamSpec createStreamAppenderStreamSpec = StreamSpec.createStreamAppenderStreamSpec(this.streamName, systemName, getPartitionCount());
            SystemAdmin admin = systemFactory2.getAdmin(systemName, this.config);
            admin.start();
            admin.createStream(createStreamAppenderStreamSpec);
            admin.stop();
        }
        this.systemProducer = systemFactory2.getProducer(systemName, this.config, metricsRegistryMap);
        this.systemStream = new SystemStream(systemName, this.streamName);
        this.systemProducer.register(SOURCE);
        this.systemProducer.start();
        this.log.info("log4j-log has been registered in " + systemName + ". So all the logs will be sent to " + this.streamName + " in " + systemName + ". Logs are partitioned by " + this.key);
        startTransferThread();
    }

    private void startTransferThread() {
        try {
            byte[] bytes = this.key.getBytes("UTF-8");
            this.transferThread = new Thread(() -> {
                while (!Thread.currentThread().isInterrupted()) {
                    try {
                        this.systemProducer.send(SOURCE, new OutgoingMessageEnvelope(this.systemStream, bytes, this.logQueue.take()));
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    } catch (Throwable th) {
                        this.log.error("Error sending StreamAppender event to SystemProducer", th);
                    }
                }
            });
            this.transferThread.setDaemon(true);
            this.transferThread.setName("Samza StreamAppender Producer " + this.transferThread.getName());
            this.transferThread.start();
        } catch (UnsupportedEncodingException e) {
            throw new SamzaException(String.format("Container name: %s could not be encoded to bytes. StreamAppender cannot proceed.", this.key), e);
        }
    }

    protected static String getStreamName(String str, String str2) {
        if (str == null) {
            throw new SamzaException("job name is null. Please specify job.name");
        }
        if (str2 == null) {
            str2 = "1";
        }
        return ("__samza_" + str + "_" + str2 + "_logs").replace("-", "_");
    }

    private void setSerde(Log4jSystemConfig log4jSystemConfig, String str, String str2) {
        String canonicalName = LoggingEventJsonSerdeFactory.class.getCanonicalName();
        String streamSerdeName = log4jSystemConfig.getStreamSerdeName(str, str2);
        if (streamSerdeName != null) {
            canonicalName = log4jSystemConfig.getSerdeClass(streamSerdeName);
        }
        if (canonicalName == null) {
            throw new SamzaException("Can not find serializers class for key '" + streamSerdeName + "'. Please specify " + String.format(SerializerConfig.SERDE_FACTORY_CLASS(), streamSerdeName) + " property");
        }
        this.serde = ((SerdeFactory) Util.getObj(canonicalName, SerdeFactory.class)).getSerde(str, this.config);
    }

    public Serde<LoggingEvent> getSerde() {
        return this.serde;
    }
}
