package org.apache.samza.job.yarn;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.samza.SamzaException;
import org.apache.samza.clustermanager.ClusterResourceManager;
import org.apache.samza.clustermanager.SamzaApplicationState;
import org.apache.samza.clustermanager.SamzaResource;
import org.apache.samza.clustermanager.SamzaResourceRequest;
import org.apache.samza.clustermanager.SamzaResourceStatus;
import org.apache.samza.config.ClusterManagerConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.ShellCommandConfig;
import org.apache.samza.config.YarnConfig;
import org.apache.samza.coordinator.JobModelManager;
import org.apache.samza.job.CommandBuilder;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.util.Util;
import org.apache.samza.util.hadoop.HttpFileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/job/yarn/YarnClusterResourceManager.class */
public class YarnClusterResourceManager extends ClusterResourceManager implements AMRMClientAsync.CallbackHandler, NMClientAsync.CallbackHandler {
    private static final int PREFERRED_HOST_PRIORITY = 0;
    private static final int ANY_HOST_PRIORITY = 1;
    private final String INVALID_YARN_CONTAINER_ID = "-1";
    private final AMRMClientAsync<AMRMClient.ContainerRequest> amClient;
    private final YarnConfiguration yarnConfiguration;
    private final YarnAppState state;
    private final SamzaYarnAppMasterLifecycle lifecycle;
    private final SamzaYarnAppMasterService service;
    private final YarnConfig yarnConfig;
    private final ConcurrentHashMap<SamzaResource, Container> allocatedResources;
    private final ConcurrentHashMap<SamzaResourceRequest, AMRMClient.ContainerRequest> requestsMap;
    private final SamzaAppMasterMetrics metrics;
    private final AtomicBoolean started;
    private final Object lock;
    private final NMClientAsync nmClientAsync;
    private static final Logger log = LoggerFactory.getLogger(YarnClusterResourceManager.class);
    private final Config config;

    YarnClusterResourceManager(AMRMClientAsync aMRMClientAsync, NMClientAsync nMClientAsync, ClusterResourceManager.Callback callback, YarnAppState yarnAppState, SamzaYarnAppMasterLifecycle samzaYarnAppMasterLifecycle, SamzaYarnAppMasterService samzaYarnAppMasterService, SamzaAppMasterMetrics samzaAppMasterMetrics, YarnConfiguration yarnConfiguration, Config config) {
        super(callback);
        this.INVALID_YARN_CONTAINER_ID = "-1";
        this.allocatedResources = new ConcurrentHashMap<>();
        this.requestsMap = new ConcurrentHashMap<>();
        this.started = new AtomicBoolean(false);
        this.lock = new Object();
        this.yarnConfiguration = yarnConfiguration;
        this.metrics = samzaAppMasterMetrics;
        this.yarnConfig = new YarnConfig(config);
        this.config = config;
        this.amClient = aMRMClientAsync;
        this.state = yarnAppState;
        this.lifecycle = samzaYarnAppMasterLifecycle;
        this.service = samzaYarnAppMasterService;
        this.nmClientAsync = nMClientAsync;
    }

    public YarnClusterResourceManager(Config config, JobModelManager jobModelManager, ClusterResourceManager.Callback callback, SamzaApplicationState samzaApplicationState) {
        super(callback);
        this.INVALID_YARN_CONTAINER_ID = "-1";
        this.allocatedResources = new ConcurrentHashMap<>();
        this.requestsMap = new ConcurrentHashMap<>();
        this.started = new AtomicBoolean(false);
        this.lock = new Object();
        this.yarnConfiguration = new YarnConfiguration();
        this.yarnConfiguration.set("fs.http.impl", HttpFileSystem.class.getName());
        FileSystemImplConfig fileSystemImplConfig = new FileSystemImplConfig(config);
        fileSystemImplConfig.getSchemes().forEach(str -> {
            fileSystemImplConfig.getSchemeConfig(str).forEach((str, str2) -> {
                this.yarnConfiguration.set(str, str2);
            });
        });
        MetricsRegistryMap metricsRegistryMap = new MetricsRegistryMap();
        this.metrics = new SamzaAppMasterMetrics(config, samzaApplicationState, metricsRegistryMap);
        String str2 = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.toString());
        ContainerId containerId = ConverterUtils.toContainerId(str2);
        String str3 = System.getenv(ApplicationConstants.Environment.NM_HOST.toString());
        String str4 = System.getenv(ApplicationConstants.Environment.NM_PORT.toString());
        String str5 = System.getenv(ApplicationConstants.Environment.NM_HTTP_PORT.toString());
        int parseInt = Integer.parseInt(str4);
        int parseInt2 = Integer.parseInt(str5);
        YarnConfig yarnConfig = new YarnConfig(config);
        this.yarnConfig = yarnConfig;
        this.config = config;
        this.amClient = AMRMClientAsync.createAMRMClientAsync(yarnConfig.getAMPollIntervalMs(), this);
        this.state = new YarnAppState(-1, containerId, str3, parseInt, parseInt2);
        log.info("Initialized YarnAppState: {}", this.state.toString());
        this.service = new SamzaYarnAppMasterService(config, samzaApplicationState, this.state, metricsRegistryMap, this.yarnConfiguration);
        log.info("ContainerID str {}, Nodehost  {} , Nodeport  {} , NodeHttpport {}", new Object[]{str2, str3, Integer.valueOf(parseInt), Integer.valueOf(parseInt2)});
        ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config);
        this.lifecycle = new SamzaYarnAppMasterLifecycle(clusterManagerConfig.getContainerMemoryMb(), clusterManagerConfig.getNumCores(), samzaApplicationState, this.state, this.amClient);
        this.nmClientAsync = NMClientAsync.createNMClientAsync(this);
    }

    public void start() {
        if (!this.started.compareAndSet(false, true)) {
            log.info("Attempting to start an already started ContainerManager");
            return;
        }
        this.metrics.start();
        this.service.onInit();
        log.info("Starting YarnContainerManager.");
        this.amClient.init(this.yarnConfiguration);
        this.amClient.start();
        this.nmClientAsync.init(this.yarnConfiguration);
        this.nmClientAsync.start();
        this.lifecycle.onInit();
        if (this.lifecycle.shouldShutdown()) {
            this.clusterManagerCallback.onError(new SamzaException("Invalid resource request."));
        }
        log.info("Finished starting YarnContainerManager");
    }

    public void requestResources(SamzaResourceRequest samzaResourceRequest) {
        AMRMClient.ContainerRequest containerRequest;
        log.info("Requesting resources on  " + samzaResourceRequest.getPreferredHost() + " for container " + samzaResourceRequest.getContainerID());
        int memoryMB = samzaResourceRequest.getMemoryMB();
        int numCores = samzaResourceRequest.getNumCores();
        String containerLabel = this.yarnConfig.getContainerLabel();
        String preferredHost = samzaResourceRequest.getPreferredHost();
        Resource newInstance = Resource.newInstance(memoryMB, numCores);
        if (preferredHost.equals("ANY_HOST")) {
            log.info("Making a request for ANY_HOST ");
            containerRequest = new AMRMClient.ContainerRequest(newInstance, (String[]) null, (String[]) null, Priority.newInstance(ANY_HOST_PRIORITY), true, containerLabel);
        } else {
            log.info("Making a preferred host request on " + preferredHost);
            containerRequest = new AMRMClient.ContainerRequest(newInstance, new String[]{preferredHost}, (String[]) null, Priority.newInstance(PREFERRED_HOST_PRIORITY), false, containerLabel);
        }
        synchronized (this.lock) {
            this.requestsMap.put(samzaResourceRequest, containerRequest);
            this.amClient.addContainerRequest(containerRequest);
        }
    }

    public void releaseResources(SamzaResource samzaResource) {
        log.info("Release resource invoked {} ", samzaResource);
        synchronized (this.lock) {
            Container container = this.allocatedResources.get(samzaResource);
            if (container == null) {
                log.info("Resource {} already released. ", samzaResource);
            } else {
                this.amClient.releaseAssignedContainer(container.getId());
                this.allocatedResources.remove(samzaResource);
            }
        }
    }

    public void launchStreamProcessor(SamzaResource samzaResource, CommandBuilder commandBuilder) {
        Container container;
        String str = (String) commandBuilder.buildEnvironment().get(ShellCommandConfig.ENV_CONTAINER_ID());
        log.info("Received launch request for {} on hostname {}", str, samzaResource.getHost());
        synchronized (this.lock) {
            try {
                container = this.allocatedResources.get(samzaResource);
            } catch (Throwable th) {
                log.error("Error in launching stream processor:", th);
                this.clusterManagerCallback.onStreamProcessorLaunchFailure(samzaResource, th);
            }
            if (container == null) {
                log.info("Resource {} already released. ", samzaResource);
            } else {
                runContainer(str, container, commandBuilder);
            }
        }
    }

    private String getIDForContainer(String str) {
        for (Map.Entry<String, YarnContainer> entry : this.state.runningYarnContainers.entrySet()) {
            String key = entry.getKey();
            if (entry.getValue().id().toString().equals(str)) {
                return key;
            }
        }
        return "-1";
    }

    public void cancelResourceRequest(SamzaResourceRequest samzaResourceRequest) {
        log.info("Cancelling request {} ", samzaResourceRequest);
        synchronized (this.lock) {
            AMRMClient.ContainerRequest containerRequest = this.requestsMap.get(samzaResourceRequest);
            if (containerRequest == null) {
                log.info("Cancellation of {} already done. ", samzaResourceRequest);
            } else {
                this.requestsMap.remove(samzaResourceRequest);
                this.amClient.removeContainerRequest(containerRequest);
            }
        }
    }

    public void stop(SamzaApplicationState.SamzaAppStatus samzaAppStatus) {
        log.info("Stopping AM client ");
        this.lifecycle.onShutdown(samzaAppStatus);
        this.amClient.stop();
        log.info("Stopping the AM service ");
        this.nmClientAsync.stop();
        log.info("Stopping the NM service ");
        this.service.onShutdown();
        this.metrics.stop();
        if (samzaAppStatus != SamzaApplicationState.SamzaAppStatus.UNDEFINED) {
            cleanupStagingDir();
        }
    }

    private void cleanupStagingDir() {
        String yarnJobStagingDirectory = this.yarnConfig.getYarnJobStagingDirectory();
        if (yarnJobStagingDirectory != null) {
            JobContext jobContext = new JobContext();
            jobContext.setAppStagingDir(new Path(yarnJobStagingDirectory));
            try {
                FileSystem fileSystem = FileSystem.get(this.yarnConfiguration);
                if (fileSystem != null) {
                    YarnJobUtil.cleanupStagingDir(jobContext, fileSystem);
                }
            } catch (IOException e) {
                log.error("Unable to clean up file system: {}", e);
            }
        }
    }

    public void onContainersCompleted(List<ContainerStatus> list) {
        ArrayList arrayList = new ArrayList();
        for (ContainerStatus containerStatus : list) {
            log.info("Container completed from RM " + containerStatus);
            arrayList.add(new SamzaResourceStatus(containerStatus.getContainerId().toString(), containerStatus.getDiagnostics(), containerStatus.getExitStatus()));
            String iDForContainer = getIDForContainer(containerStatus.getContainerId().toString());
            log.info("Completed container had ID: {}", iDForContainer);
            if (!iDForContainer.equals("-1") && this.state.runningYarnContainers.containsKey(iDForContainer)) {
                log.info("Removing container ID {} from completed containers", iDForContainer);
                this.state.runningYarnContainers.remove(iDForContainer);
                if (containerStatus.getExitStatus() != 0) {
                    this.state.failedContainersStatus.put(containerStatus.getContainerId().toString(), containerStatus);
                }
            }
        }
        this.clusterManagerCallback.onResourcesCompleted(arrayList);
    }

    public void onContainersAllocated(List<Container> list) {
        ArrayList arrayList = new ArrayList();
        for (Container container : list) {
            log.info("Container allocated from RM on " + container.getNodeId().getHost());
            String containerId = container.getId().toString();
            SamzaResource samzaResource = new SamzaResource(container.getResource().getVirtualCores(), container.getResource().getMemory(), container.getNodeId().getHost(), containerId);
            this.allocatedResources.put(samzaResource, container);
            arrayList.add(samzaResource);
        }
        this.clusterManagerCallback.onResourcesAvailable(arrayList);
    }

    public void onShutdownRequest() {
    }

    public void onNodesUpdated(List<NodeReport> list) {
    }

    public float getProgress() {
        return 0.0f;
    }

    public void onError(Throwable th) {
        log.error("Exception in the Yarn callback {}", th);
        this.clusterManagerCallback.onError(th);
    }

    public void onContainerStarted(ContainerId containerId, Map<String, ByteBuffer> map) {
        log.info("Received a containerStart notification from the NodeManager for container: {} ", containerId);
        String pendingSamzaContainerId = getPendingSamzaContainerId(containerId);
        if (pendingSamzaContainerId == null) {
            log.info("Got an invalid notification from YARN for container: {}", containerId);
            return;
        }
        YarnContainer remove = this.state.pendingYarnContainers.remove(pendingSamzaContainerId);
        log.info("Samza containerId:{} has started", pendingSamzaContainerId);
        this.state.runningYarnContainers.put(pendingSamzaContainerId, remove);
        this.clusterManagerCallback.onStreamProcessorLaunchSuccess(new SamzaResource(remove.resource().getVirtualCores(), remove.resource().getMemory(), remove.nodeId().getHost(), containerId.toString()));
    }

    public void onContainerStatusReceived(ContainerId containerId, ContainerStatus containerStatus) {
        log.info("Got a status from the NodeManager. Container: {} Status: {}", containerId, containerStatus.getState());
    }

    public void onContainerStopped(ContainerId containerId) {
        log.info("Got a notification from the NodeManager for a stopped container. ContainerId: {}", containerId);
    }

    public void onStartContainerError(ContainerId containerId, Throwable th) {
        log.error(String.format("Yarn Container: %s could not start.", containerId), th);
        String pendingSamzaContainerId = getPendingSamzaContainerId(containerId);
        if (pendingSamzaContainerId == null) {
            log.info("Got an invalid notification for container: {}", containerId);
            return;
        }
        YarnContainer remove = this.state.pendingYarnContainers.remove(pendingSamzaContainerId);
        log.info("Failed Yarn Container: {} had Samza ContainerId: {} ", containerId, pendingSamzaContainerId);
        SamzaResource samzaResource = new SamzaResource(remove.resource().getVirtualCores(), remove.resource().getMemory(), remove.nodeId().getHost(), containerId.toString());
        log.info("Invoking failure callback for container: {}", containerId);
        this.clusterManagerCallback.onStreamProcessorLaunchFailure(samzaResource, new org.apache.samza.clustermanager.SamzaContainerLaunchException(th));
    }

    public void onGetContainerStatusError(ContainerId containerId, Throwable th) {
        log.info("Got an error on getContainerStatus from the NodeManager. ContainerId: {}. Error: {}", containerId, th);
    }

    public void onStopContainerError(ContainerId containerId, Throwable th) {
        log.info("Got an error when stopping container from the NodeManager. ContainerId: {}. Error: {}", containerId, th);
    }

    public void runContainer(String str, Container container, CommandBuilder commandBuilder) throws IOException {
        String converterUtils = ConverterUtils.toString(container.getId());
        log.info("Got available container ID ({}) for container: {}", str, container);
        String str2 = "";
        String str3 = "./__package/";
        String fwkPath = JobConfig.getFwkPath(this.config);
        if (fwkPath != null && !fwkPath.isEmpty()) {
            str3 = fwkPath;
            str2 = "export JOB_LIB_DIR=./__package/lib";
        }
        log.info("In runContainer in util: fwkPath= " + fwkPath + ";cmdPath=" + str3 + ";jobLib=" + str2);
        commandBuilder.setCommandPath(str3);
        String buildCommand = commandBuilder.buildCommand();
        log.info("Container ID {} using command {}", str, buildCommand);
        Map<String, String> escapedEnvironmentVariablesMap = getEscapedEnvironmentVariablesMap(commandBuilder);
        escapedEnvironmentVariablesMap.put(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID(), Util.envVarEscape(container.getId().toString()));
        printContainerEnvironmentVariables(str, escapedEnvironmentVariablesMap);
        log.info("Samza FWK path: " + buildCommand + "; env=" + escapedEnvironmentVariablesMap);
        Path path = new Path(this.yarnConfig.getPackagePath());
        log.info("Starting container ID {} using package path {}", str, path);
        this.state.pendingYarnContainers.put(str, new YarnContainer(container));
        startContainer(path, container, escapedEnvironmentVariablesMap, getFormattedCommand("<LOG_DIR>", str2, buildCommand, "stdout", "stderr"));
        log.info("Claimed container ID {} for container {} on node {} (http://{}/node/containerlogs/{}).", new Object[]{str, converterUtils, container.getNodeId().getHost(), container.getNodeHttpAddress(), converterUtils});
        log.info("Started container ID {}", str);
    }

    private void startContainer(Path path, Container container, Map<String, String> map, final String str) throws IOException {
        log.info("Starting container {} {} {} {}", new Object[]{path, container, map, str});
        LocalResource localResource = (LocalResource) Records.newRecord(LocalResource.class);
        URL yarnUrlFromPath = ConverterUtils.getYarnUrlFromPath(path);
        FileStatus fileStatus = path.getFileSystem(this.yarnConfiguration).getFileStatus(path);
        localResource.setResource(yarnUrlFromPath);
        log.info("Set package resource in YarnContainerRunner for {}", yarnUrlFromPath);
        localResource.setSize(fileStatus.getLen());
        localResource.setTimestamp(fileStatus.getModificationTime());
        localResource.setType(LocalResourceType.ARCHIVE);
        localResource.setVisibility(LocalResourceVisibility.APPLICATION);
        Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
        DataOutputBuffer dataOutputBuffer = new DataOutputBuffer();
        credentials.writeTokenStorageToStream(dataOutputBuffer);
        Iterator it = credentials.getAllTokens().iterator();
        while (it.hasNext()) {
            TokenIdentifier decodeIdentifier = ((Token) it.next()).decodeIdentifier();
            if (decodeIdentifier != null && decodeIdentifier.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
                it.remove();
            }
        }
        ByteBuffer wrap = ByteBuffer.wrap(dataOutputBuffer.getData(), PREFERRED_HOST_PRIORITY, dataOutputBuffer.getLength());
        HashMap hashMap = new HashMap();
        hashMap.put("__package", localResource);
        hashMap.putAll(new LocalizerResourceMapper(new LocalizerResourceConfig(this.config), this.yarnConfiguration).getResourceMap());
        ContainerLaunchContext containerLaunchContext = (ContainerLaunchContext) Records.newRecord(ContainerLaunchContext.class);
        containerLaunchContext.setEnvironment(map);
        containerLaunchContext.setTokens(wrap.duplicate());
        containerLaunchContext.setCommands(new ArrayList<String>() { // from class: org.apache.samza.job.yarn.YarnClusterResourceManager.1
            {
                add(str);
            }
        });
        containerLaunchContext.setLocalResources(hashMap);
        if (UserGroupInformation.isSecurityEnabled()) {
            Map<ApplicationAccessType, String> yarnApplicationAcls = this.yarnConfig.getYarnApplicationAcls();
            if (!yarnApplicationAcls.isEmpty()) {
                containerLaunchContext.setApplicationACLs(yarnApplicationAcls);
            }
        }
        log.debug("Setting localResourceMap to {}", hashMap);
        log.debug("Setting context to {}", containerLaunchContext);
        ((StartContainerRequest) Records.newRecord(StartContainerRequest.class)).setContainerLaunchContext(containerLaunchContext);
        log.info("Making an async start request for container {}", container);
        this.nmClientAsync.startContainerAsync(container, containerLaunchContext);
    }

    private void printContainerEnvironmentVariables(String str, Map<String, String> map) {
        StringBuilder sb = new StringBuilder();
        for (Map.Entry<String, String> entry : map.entrySet()) {
            sb.append(String.format("\n%s=%s", entry.getKey(), entry.getValue()));
        }
        log.info("Container ID {} using environment variables: {}", str, sb.toString());
    }

    private Map<String, String> getEscapedEnvironmentVariablesMap(CommandBuilder commandBuilder) {
        HashMap hashMap = new HashMap();
        for (Map.Entry entry : commandBuilder.buildEnvironment().entrySet()) {
            hashMap.put(entry.getKey(), Util.envVarEscape((String) entry.getValue()));
        }
        return hashMap;
    }

    private String getFormattedCommand(String str, String str2, String str3, String str4, String str5) {
        if (!str2.isEmpty()) {
            str2 = "&& " + str2;
        }
        return String.format("export SAMZA_LOG_DIR=%s %s && ln -sfn %s logs && exec %s 1>logs/%s 2>logs/%s", str, str2, str, str3, str4, str5);
    }

    private String getPendingSamzaContainerId(ContainerId containerId) {
        for (String str : this.state.pendingYarnContainers.keySet()) {
            YarnContainer yarnContainer = this.state.pendingYarnContainers.get(str);
            if (yarnContainer != null && yarnContainer.id().equals(containerId)) {
                return str;
            }
        }
        return null;
    }
}
