package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEventType;
import org.apache.hadoop.yarn.service.AbstractService;

/* JADX WARN: Classes with same name are omitted:
  input_file:classes/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.class
 */
/* loaded from: input_file:hadoop-yarn-server-nodemanager-2.0.6-alpha.jar:org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.class */
public class LogAggregationService extends AbstractService implements LogHandler {
    private static final Log LOG = LogFactory.getLog(LogAggregationService.class);
    private static final FsPermission TLDIR_PERMISSIONS = FsPermission.createImmutable(1023);
    private static final FsPermission APP_DIR_PERMISSIONS = FsPermission.createImmutable(504);
    private final Context context;
    private final DeletionService deletionService;
    private final Dispatcher dispatcher;
    private LocalDirsHandlerService dirsHandler;
    Path remoteRootLogDir;
    String remoteRootLogDirSuffix;
    private NodeId nodeId;
    private final ConcurrentMap<ApplicationId, AppLogAggregator> appLogAggregators;
    private final ExecutorService threadPool;

    public LogAggregationService(Dispatcher dispatcher, Context context, DeletionService deletionService, LocalDirsHandlerService localDirsHandlerService) {
        super(LogAggregationService.class.getName());
        this.dispatcher = dispatcher;
        this.context = context;
        this.deletionService = deletionService;
        this.dirsHandler = localDirsHandlerService;
        this.appLogAggregators = new ConcurrentHashMap();
        this.threadPool = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("LogAggregationService #%d").build());
    }

    public synchronized void init(Configuration configuration) {
        this.remoteRootLogDir = new Path(configuration.get("yarn.nodemanager.remote-app-log-dir", "/tmp/logs"));
        this.remoteRootLogDirSuffix = configuration.get("yarn.nodemanager.remote-app-log-dir-suffix", "logs");
        super.init(configuration);
    }

    public synchronized void start() {
        this.nodeId = this.context.getNodeId();
        verifyAndCreateRemoteLogDir(getConfig());
        super.start();
    }

    public synchronized void stop() {
        LOG.info(getName() + " waiting for pending aggregation during exit");
        stopAggregators();
        super.stop();
    }

    private void stopAggregators() {
        this.threadPool.shutdown();
        Iterator<AppLogAggregator> it = this.appLogAggregators.values().iterator();
        while (it.hasNext()) {
            it.next().finishLogAggregation();
        }
        while (!this.threadPool.isTerminated()) {
            Iterator<ApplicationId> it2 = this.appLogAggregators.keySet().iterator();
            while (it2.hasNext()) {
                LOG.info("Waiting for aggregation to complete for " + it2.next());
            }
            try {
                if (!this.threadPool.awaitTermination(30L, TimeUnit.SECONDS)) {
                    this.threadPool.shutdownNow();
                }
            } catch (InterruptedException e) {
                LOG.warn("Aggregation stop interrupted!");
            }
        }
        Iterator<ApplicationId> it3 = this.appLogAggregators.keySet().iterator();
        while (it3.hasNext()) {
            LOG.warn("Some logs may not have been aggregated for " + it3.next());
        }
    }

    private void verifyAndCreateRemoteLogDir(Configuration configuration) {
        try {
            FileSystem fileSystem = FileSystem.get(configuration);
            try {
                if (fileSystem.exists(this.remoteRootLogDir)) {
                    try {
                        FsPermission permission = fileSystem.getFileStatus(this.remoteRootLogDir).getPermission();
                        if (!permission.equals(TLDIR_PERMISSIONS)) {
                            LOG.warn("Remote Root Log Dir [" + this.remoteRootLogDir + "] already exist, but with incorrect permissions. Expected: [" + TLDIR_PERMISSIONS + "], Found: [" + permission + "]. The cluster may have problems with multiple users.");
                        }
                        return;
                    } catch (IOException e) {
                        throw new YarnException("Failed while attempting to check permissions for dir [" + this.remoteRootLogDir + "]");
                    }
                }
                LOG.warn("Remote Root Log Dir [" + this.remoteRootLogDir + "] does not exist. Attempting to create it.");
                try {
                    Path makeQualified = this.remoteRootLogDir.makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory());
                    fileSystem.mkdirs(makeQualified, new FsPermission(TLDIR_PERMISSIONS));
                    fileSystem.setPermission(makeQualified, new FsPermission(TLDIR_PERMISSIONS));
                } catch (IOException e2) {
                    throw new YarnException("Failed to create remoteLogDir [" + this.remoteRootLogDir + "]", e2);
                }
            } catch (IOException e3) {
                throw new YarnException("Failed to check for existence of remoteLogDir [" + this.remoteRootLogDir + "]");
            }
        } catch (IOException e4) {
            throw new YarnException("Unable to get Remote FileSystem instance", e4);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Path getRemoteNodeLogFileForApp(ApplicationId applicationId, String str) {
        return LogAggregationUtils.getRemoteNodeLogFileForApp(this.remoteRootLogDir, applicationId, str, this.nodeId, this.remoteRootLogDirSuffix);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void createDir(FileSystem fileSystem, Path path, FsPermission fsPermission) throws IOException {
        fileSystem.mkdirs(path, new FsPermission(fsPermission));
        fileSystem.setPermission(path, new FsPermission(fsPermission));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void createAppDir(final String str, final ApplicationId applicationId, UserGroupInformation userGroupInformation) {
        try {
            userGroupInformation.doAs(new PrivilegedExceptionAction<Object>() { // from class: org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService.1
                @Override // java.security.PrivilegedExceptionAction
                public Object run() throws Exception {
                    Path path = null;
                    Path path2 = null;
                    Path path3 = null;
                    try {
                        FileSystem fileSystem = FileSystem.get(LogAggregationService.this.getConfig());
                        try {
                            path = LogAggregationUtils.getRemoteLogUserDir(LogAggregationService.this.remoteRootLogDir, str).makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory());
                            LogAggregationService.this.createDir(fileSystem, path, LogAggregationService.APP_DIR_PERMISSIONS);
                            try {
                                path2 = LogAggregationUtils.getRemoteLogSuffixedDir(LogAggregationService.this.remoteRootLogDir, str, LogAggregationService.this.remoteRootLogDirSuffix).makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory());
                                LogAggregationService.this.createDir(fileSystem, path2, LogAggregationService.APP_DIR_PERMISSIONS);
                                try {
                                    path3 = LogAggregationUtils.getRemoteAppLogDir(LogAggregationService.this.remoteRootLogDir, applicationId, str, LogAggregationService.this.remoteRootLogDirSuffix).makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory());
                                    LogAggregationService.this.createDir(fileSystem, path3, LogAggregationService.APP_DIR_PERMISSIONS);
                                    return null;
                                } catch (IOException e) {
                                    LogAggregationService.LOG.error("Failed to  create application log dir [" + path3 + "] while processing app " + applicationId);
                                    throw e;
                                }
                            } catch (IOException e2) {
                                LogAggregationService.LOG.error("Failed to create suffixed user dir [" + path2 + "] while processing app " + applicationId);
                                throw e2;
                            }
                        } catch (IOException e3) {
                            LogAggregationService.LOG.error("Failed to create user dir [" + path + "] while processing app " + applicationId);
                            throw e3;
                        }
                    } catch (IOException e4) {
                        LogAggregationService.LOG.error("Failed to get remote FileSystem while processing app " + applicationId, e4);
                        throw e4;
                    }
                }
            });
        } catch (Exception e) {
            throw new YarnException(e);
        }
    }

    private void initApp(ApplicationId applicationId, String str, Credentials credentials, ContainerLogsRetentionPolicy containerLogsRetentionPolicy, Map<ApplicationAccessType, String> map) {
        ApplicationEvent applicationEvent;
        try {
            initAppAggregator(applicationId, str, credentials, containerLogsRetentionPolicy, map);
            applicationEvent = new ApplicationEvent(applicationId, ApplicationEventType.APPLICATION_LOG_HANDLING_INITED);
        } catch (YarnException e) {
            LOG.warn("Application failed to init aggregation: " + e.getMessage());
            applicationEvent = new ApplicationEvent(applicationId, ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED);
        }
        this.dispatcher.getEventHandler().handle(applicationEvent);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void initAppAggregator(final ApplicationId applicationId, String str, Credentials credentials, ContainerLogsRetentionPolicy containerLogsRetentionPolicy, Map<ApplicationAccessType, String> map) {
        final UserGroupInformation createRemoteUser = UserGroupInformation.createRemoteUser(str);
        if (credentials != null) {
            createRemoteUser.addCredentials(credentials);
        }
        final AppLogAggregatorImpl appLogAggregatorImpl = new AppLogAggregatorImpl(this.dispatcher, this.deletionService, getConfig(), applicationId, createRemoteUser, this.dirsHandler, getRemoteNodeLogFileForApp(applicationId, str), containerLogsRetentionPolicy, map);
        if (this.appLogAggregators.putIfAbsent(applicationId, appLogAggregatorImpl) != null) {
            throw new YarnException("Duplicate initApp for " + applicationId);
        }
        try {
            createAppDir(str, applicationId, createRemoteUser);
            this.threadPool.execute(new Runnable() { // from class: org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService.2
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        appLogAggregatorImpl.run();
                        LogAggregationService.this.appLogAggregators.remove(applicationId);
                        LogAggregationService.this.closeFileSystems(createRemoteUser);
                    } catch (Throwable th) {
                        LogAggregationService.this.appLogAggregators.remove(applicationId);
                        LogAggregationService.this.closeFileSystems(createRemoteUser);
                        throw th;
                    }
                }
            });
        } catch (Exception e) {
            e = e;
            this.appLogAggregators.remove(applicationId);
            closeFileSystems(createRemoteUser);
            if (!(e instanceof YarnException)) {
                e = new YarnException(e);
            }
            throw ((YarnException) e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void closeFileSystems(UserGroupInformation userGroupInformation) {
        try {
            FileSystem.closeAllForUGI(userGroupInformation);
        } catch (IOException e) {
            LOG.warn("Failed to close filesystems: ", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @InterfaceAudience.Private
    public int getNumAggregators() {
        return this.appLogAggregators.size();
    }

    private void stopContainer(ContainerId containerId, int i) {
        AppLogAggregator appLogAggregator = this.appLogAggregators.get(containerId.getApplicationAttemptId().getApplicationId());
        if (appLogAggregator == null) {
            LOG.warn("Log aggregation is not initialized for " + containerId + ", did it fail to start?");
        } else {
            appLogAggregator.startContainerLogAggregation(containerId, i == 0);
        }
    }

    private void stopApp(ApplicationId applicationId) {
        AppLogAggregator appLogAggregator = this.appLogAggregators.get(applicationId);
        if (appLogAggregator == null) {
            LOG.warn("Log aggregation is not initialized for " + applicationId + ", did it fail to start?");
        } else {
            appLogAggregator.finishLogAggregation();
        }
    }

    @Override // org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler
    public void handle(LogHandlerEvent logHandlerEvent) {
        switch ((LogHandlerEventType) logHandlerEvent.getType()) {
            case APPLICATION_STARTED:
                LogHandlerAppStartedEvent logHandlerAppStartedEvent = (LogHandlerAppStartedEvent) logHandlerEvent;
                initApp(logHandlerAppStartedEvent.getApplicationId(), logHandlerAppStartedEvent.getUser(), logHandlerAppStartedEvent.getCredentials(), logHandlerAppStartedEvent.getLogRetentionPolicy(), logHandlerAppStartedEvent.getApplicationAcls());
                return;
            case CONTAINER_FINISHED:
                LogHandlerContainerFinishedEvent logHandlerContainerFinishedEvent = (LogHandlerContainerFinishedEvent) logHandlerEvent;
                stopContainer(logHandlerContainerFinishedEvent.getContainerId(), logHandlerContainerFinishedEvent.getExitCode());
                return;
            case APPLICATION_FINISHED:
                stopApp(((LogHandlerAppFinishedEvent) logHandlerEvent).getApplicationId());
                return;
            default:
                return;
        }
    }
}
