package org.apache.samza.logging.log4j;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URL;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.Logger;
import org.apache.log4j.spi.LoggingEvent;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.Log4jSystemConfig;
import org.apache.samza.config.SerializerConfig;
import org.apache.samza.config.ShellCommandConfig;
import org.apache.samza.coordinator.JobModelManager;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.logging.log4j.serializers.LoggingEventJsonSerdeFactory;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.serializers.Serde;
import org.apache.samza.serializers.SerdeFactory;
import org.apache.samza.serializers.model.SamzaObjectMapper;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemFactory;
import org.apache.samza.system.SystemProducer;
import org.apache.samza.system.SystemStream;
import org.apache.samza.util.Util;

/* loaded from: input_file:org/apache/samza/logging/log4j/StreamAppender.class */
public class StreamAppender extends AppenderSkeleton {
    private static final String JAVA_OPTS_CONTAINER_NAME = "samza.container.name";
    private static final String JOB_COORDINATOR_TAG = "samza-job-coordinator";
    private static final String SOURCE = "log4j-log";
    protected static volatile boolean systemInitialized = false;
    private Config config = null;
    private SystemStream systemStream = null;
    private SystemProducer systemProducer = null;
    private String key = null;
    private String streamName = null;
    private boolean isApplicationMaster = false;
    private Serde<LoggingEvent> serde = null;
    private Logger log = Logger.getLogger(StreamAppender.class);
    private final AtomicBoolean recursiveCall = new AtomicBoolean(false);

    public String getStreamName() {
        return this.streamName;
    }

    public void setStreamName(String str) {
        this.streamName = str;
    }

    public void activateOptions() {
        String property = System.getProperty(JAVA_OPTS_CONTAINER_NAME);
        if (property == null) {
            throw new SamzaException("Got null container name from system property: samza.container.name. This is used as the key for the log appender, so can't proceed.");
        }
        this.isApplicationMaster = property.contains(JOB_COORDINATOR_TAG);
        this.key = property;
        if (this.isApplicationMaster) {
            systemInitialized = false;
        } else {
            setupSystem();
            systemInitialized = true;
        }
    }

    public void append(LoggingEvent loggingEvent) {
        try {
            if (this.recursiveCall.get()) {
                return;
            }
            try {
                this.recursiveCall.set(true);
                if (systemInitialized) {
                    this.systemProducer.send(SOURCE, new OutgoingMessageEnvelope(this.systemStream, this.key.getBytes("UTF-8"), this.serde.toBytes(subLog(loggingEvent))));
                } else if (JobModelManager.currentJobModelManager() != null) {
                    setupSystem();
                    systemInitialized = true;
                } else {
                    this.log.trace("Waiting for the JobCoordinator to be instantiated...");
                }
            } catch (UnsupportedEncodingException e) {
                throw new SamzaException("can not send the log messages", e);
            }
        } finally {
            this.recursiveCall.set(false);
        }
    }

    private String subAppend(LoggingEvent loggingEvent) {
        return this.layout == null ? loggingEvent.getRenderedMessage() : this.layout.format(loggingEvent).trim();
    }

    private LoggingEvent subLog(LoggingEvent loggingEvent) {
        return new LoggingEvent(loggingEvent.getFQNOfLoggerClass(), loggingEvent.getLogger(), loggingEvent.getTimeStamp(), loggingEvent.getLevel(), subAppend(loggingEvent), loggingEvent.getThreadName(), loggingEvent.getThrowableInformation(), loggingEvent.getNDC(), loggingEvent.getLocationInformation(), loggingEvent.getProperties());
    }

    public void close() {
        this.log.info("Shutting down the StreamAppender...");
        if (this.closed) {
            return;
        }
        this.closed = true;
        flushSystemProducer();
        if (this.systemProducer != null) {
            this.systemProducer.stop();
        }
    }

    public boolean requiresLayout() {
        return false;
    }

    public void flushSystemProducer() {
        if (this.systemProducer != null) {
            this.systemProducer.flush(SOURCE);
        }
    }

    protected Config getConfig() {
        Config config;
        try {
            if (this.isApplicationMaster) {
                config = JobModelManager.currentJobModelManager().jobModel().getConfig();
            } else {
                config = ((JobModel) SamzaObjectMapper.getObjectMapper().readValue(Util.read(new URL(System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL())), 30000), JobModel.class)).getConfig();
            }
            return config;
        } catch (IOException e) {
            throw new SamzaException("can not read the config", e);
        }
    }

    protected void setupSystem() {
        this.config = getConfig();
        Log4jSystemConfig log4jSystemConfig = new Log4jSystemConfig(this.config);
        if (this.streamName == null) {
            this.streamName = getStreamName(log4jSystemConfig.getJobName(), log4jSystemConfig.getJobId());
        }
        String systemName = log4jSystemConfig.getSystemName();
        String systemFactory = log4jSystemConfig.getSystemFactory(systemName);
        if (systemFactory == null) {
            throw new SamzaException("Could not figure out \"" + systemName + "\" system factory for log4j StreamAppender to use");
        }
        SystemFactory systemFactory2 = (SystemFactory) Util.getObj(systemFactory);
        setSerde(log4jSystemConfig, systemName, this.streamName);
        this.systemProducer = systemFactory2.getProducer(systemName, this.config, new MetricsRegistryMap());
        this.systemStream = new SystemStream(systemName, this.streamName);
        this.systemProducer.register(SOURCE);
        this.systemProducer.start();
        this.log.info("log4j-log has been registered in " + systemName + ". So all the logs will be sent to " + this.streamName + " in " + systemName + ". Logs are partitioned by " + this.key);
    }

    protected static String getStreamName(String str, String str2) {
        if (str == null) {
            throw new SamzaException("job name is null. Please specify job.name");
        }
        if (str2 == null) {
            str2 = "1";
        }
        return ("__samza_" + str + "_" + str2 + "_logs").replace("-", "_");
    }

    private void setSerde(Log4jSystemConfig log4jSystemConfig, String str, String str2) {
        String canonicalName = LoggingEventJsonSerdeFactory.class.getCanonicalName();
        String streamSerdeName = log4jSystemConfig.getStreamSerdeName(str, str2);
        if (streamSerdeName != null) {
            canonicalName = log4jSystemConfig.getSerdeClass(streamSerdeName);
        }
        if (canonicalName == null) {
            throw new SamzaException("Can not find serializers class for key '" + streamSerdeName + "'. Please specify " + String.format(SerializerConfig.SERDE(), streamSerdeName) + " property");
        }
        this.serde = ((SerdeFactory) Util.getObj(canonicalName)).getSerde(str, this.config);
    }

    public Serde<LoggingEvent> getSerde() {
        return this.serde;
    }
}
