package org.apache.hadoop.yarn.applications.distributedshell;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Vector;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.AMRMProtocol;
import org.apache.hadoop.yarn.api.ContainerManager;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;

/* JADX WARN: Classes with same name are omitted:
  input_file:classes/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.class
 */
@InterfaceAudience.Public
@InterfaceStability.Unstable
/* loaded from: input_file:hadoop-yarn-applications-distributedshell-0.23.6.jar:org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.class */
public class ApplicationMaster {
    private static final Log LOG;
    private AMRMProtocol resourceManager;
    private ApplicationAttemptId appAttemptID;
    private int requestPriority;
    static final /* synthetic */ boolean $assertionsDisabled;
    private String appMasterHostname = "";
    private int appMasterRpcPort = 0;
    private String appMasterTrackingUrl = "";
    private int numTotalContainers = 1;
    private int containerMemory = 10;
    private AtomicInteger rmRequestID = new AtomicInteger();
    private boolean appDone = false;
    private AtomicInteger numCompletedContainers = new AtomicInteger();
    private AtomicInteger numAllocatedContainers = new AtomicInteger();
    private AtomicInteger numFailedContainers = new AtomicInteger();
    private AtomicInteger numRequestedContainers = new AtomicInteger();
    private String shellCommand = "";
    private String shellArgs = "";
    private Map<String, String> shellEnv = new HashMap();
    private String shellScriptPath = "";
    private long shellScriptPathTimestamp = 0;
    private long shellScriptPathLen = 0;
    private final String ExecShellStringPath = "ExecShellScript.sh";
    private CopyOnWriteArrayList<ContainerId> releasedContainers = new CopyOnWriteArrayList<>();
    private List<Thread> launchThreads = new ArrayList();
    private Configuration conf = new Configuration();
    private YarnRPC rpc = YarnRPC.create(this.conf);

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Classes with same name are omitted:
      input_file:classes/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster$LaunchContainerRunnable.class
     */
    /* loaded from: input_file:hadoop-yarn-applications-distributedshell-0.23.6.jar:org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster$LaunchContainerRunnable.class */
    public class LaunchContainerRunnable implements Runnable {
        Container container;
        ContainerManager cm;

        public LaunchContainerRunnable(Container container) {
            this.container = container;
        }

        private void connectToCM() {
            ApplicationMaster.LOG.debug("Connecting to ContainerManager for containerid=" + this.container.getId());
            String str = this.container.getNodeId().getHost() + ":" + this.container.getNodeId().getPort();
            InetSocketAddress createSocketAddr = NetUtils.createSocketAddr(str);
            ApplicationMaster.LOG.info("Connecting to ContainerManager at " + str);
            this.cm = (ContainerManager) ApplicationMaster.this.rpc.getProxy(ContainerManager.class, createSocketAddr, ApplicationMaster.this.conf);
        }

        @Override // java.lang.Runnable
        public void run() {
            connectToCM();
            ApplicationMaster.LOG.info("Setting up container launch container for containerid=" + this.container.getId());
            ContainerLaunchContext containerLaunchContext = (ContainerLaunchContext) Records.newRecord(ContainerLaunchContext.class);
            containerLaunchContext.setContainerId(this.container.getId());
            containerLaunchContext.setResource(this.container.getResource());
            try {
                containerLaunchContext.setUser(UserGroupInformation.getCurrentUser().getShortUserName());
            } catch (IOException e) {
                ApplicationMaster.LOG.info("Getting current user info failed when trying to launch the container" + e.getMessage());
            }
            containerLaunchContext.setEnvironment(ApplicationMaster.this.shellEnv);
            HashMap hashMap = new HashMap();
            if (!ApplicationMaster.this.shellScriptPath.isEmpty()) {
                LocalResource localResource = (LocalResource) Records.newRecord(LocalResource.class);
                localResource.setType(LocalResourceType.FILE);
                localResource.setVisibility(LocalResourceVisibility.APPLICATION);
                try {
                    localResource.setResource(ConverterUtils.getYarnUrlFromURI(new URI(ApplicationMaster.this.shellScriptPath)));
                    localResource.setTimestamp(ApplicationMaster.this.shellScriptPathTimestamp);
                    localResource.setSize(ApplicationMaster.this.shellScriptPathLen);
                    hashMap.put("ExecShellScript.sh", localResource);
                } catch (URISyntaxException e2) {
                    ApplicationMaster.LOG.error("Error when trying to use shell script path specified in env, path=" + ApplicationMaster.this.shellScriptPath);
                    e2.printStackTrace();
                    ApplicationMaster.this.numCompletedContainers.incrementAndGet();
                    ApplicationMaster.this.numFailedContainers.incrementAndGet();
                    return;
                }
            }
            containerLaunchContext.setLocalResources(hashMap);
            Vector vector = new Vector(5);
            vector.add(ApplicationMaster.this.shellCommand);
            if (!ApplicationMaster.this.shellScriptPath.isEmpty()) {
                vector.add("ExecShellScript.sh");
            }
            vector.add(ApplicationMaster.this.shellArgs);
            vector.add("1><LOG_DIR>/stdout");
            vector.add("2><LOG_DIR>/stderr");
            StringBuilder sb = new StringBuilder();
            Iterator it = vector.iterator();
            while (it.hasNext()) {
                sb.append((CharSequence) it.next()).append(" ");
            }
            ArrayList arrayList = new ArrayList();
            arrayList.add(sb.toString());
            containerLaunchContext.setCommands(arrayList);
            StartContainerRequest startContainerRequest = (StartContainerRequest) Records.newRecord(StartContainerRequest.class);
            startContainerRequest.setContainerLaunchContext(containerLaunchContext);
            try {
                this.cm.startContainer(startContainerRequest);
            } catch (YarnRemoteException e3) {
                ApplicationMaster.LOG.info("Start container failed for :, containerId=" + this.container.getId());
                e3.printStackTrace();
            }
        }
    }

    public static void main(String[] strArr) {
        boolean z = false;
        try {
            ApplicationMaster applicationMaster = new ApplicationMaster();
            LOG.info("Initializing ApplicationMaster");
            if (!applicationMaster.init(strArr)) {
                System.exit(0);
            }
            z = applicationMaster.run();
        } catch (Throwable th) {
            LOG.fatal("Error running ApplicationMaster", th);
            System.exit(1);
        }
        if (z) {
            LOG.info("Application Master completed successfully. exiting");
            System.exit(0);
        } else {
            LOG.info("Application Master failed. exiting");
            System.exit(2);
        }
    }

    private void dumpOutDebugInfo() {
        LOG.info("Dump debug output");
        for (Map.Entry<String, String> entry : System.getenv().entrySet()) {
            LOG.info("System env: key=" + entry.getKey() + ", val=" + entry.getValue());
            System.out.println("System env: key=" + entry.getKey() + ", val=" + entry.getValue());
        }
        try {
            Process exec = Runtime.getRuntime().exec("ls -al");
            exec.waitFor();
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(exec.getInputStream()));
            while (true) {
                String readLine = bufferedReader.readLine();
                if (readLine == null) {
                    bufferedReader.close();
                    return;
                } else {
                    LOG.info("System CWD content: " + readLine);
                    System.out.println("System CWD content: " + readLine);
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e2) {
            e2.printStackTrace();
        }
    }

    public boolean init(String[] strArr) throws ParseException, IOException {
        Options options = new Options();
        options.addOption("app_attempt_id", true, "App Attempt ID. Not to be used unless for testing purposes");
        options.addOption("shell_command", true, "Shell command to be executed by the Application Master");
        options.addOption("shell_script", true, "Location of the shell script to be executed");
        options.addOption("shell_args", true, "Command line args for the shell script");
        options.addOption("shell_env", true, "Environment for shell script. Specified as env_key=env_val pairs");
        options.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command");
        options.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed");
        options.addOption("priority", true, "Application Priority. Default 0");
        options.addOption("debug", false, "Dump out debug information");
        options.addOption("help", false, "Print usage");
        CommandLine parse = new GnuParser().parse(options, strArr);
        if (strArr.length == 0) {
            printUsage(options);
            throw new IllegalArgumentException("No args specified for application master to initialize");
        }
        if (parse.hasOption("help")) {
            printUsage(options);
            return false;
        }
        if (parse.hasOption("debug")) {
            dumpOutDebugInfo();
        }
        Map<String, String> map = System.getenv();
        this.appAttemptID = (ApplicationAttemptId) Records.newRecord(ApplicationAttemptId.class);
        if (map.containsKey("AM_CONTAINER_ID")) {
            this.appAttemptID = ConverterUtils.toContainerId(map.get("AM_CONTAINER_ID")).getApplicationAttemptId();
        } else {
            if (!parse.hasOption("app_attempt_id")) {
                throw new IllegalArgumentException("Application Attempt Id not set in the environment");
            }
            this.appAttemptID = ConverterUtils.toApplicationAttemptId(parse.getOptionValue("app_attempt_id", ""));
        }
        LOG.info("Application master for app, appId=" + this.appAttemptID.getApplicationId().getId() + ", clustertimestamp=" + this.appAttemptID.getApplicationId().getClusterTimestamp() + ", attemptId=" + this.appAttemptID.getAttemptId());
        if (!parse.hasOption("shell_command")) {
            throw new IllegalArgumentException("No shell command specified to be executed by application master");
        }
        this.shellCommand = parse.getOptionValue("shell_command");
        if (parse.hasOption("shell_args")) {
            this.shellArgs = parse.getOptionValue("shell_args");
        }
        if (parse.hasOption("shell_env")) {
            for (String str : parse.getOptionValues("shell_env")) {
                String trim = str.trim();
                int indexOf = trim.indexOf(61);
                if (indexOf == -1) {
                    this.shellEnv.put(trim, "");
                } else {
                    this.shellEnv.put(trim.substring(0, indexOf), indexOf < trim.length() - 1 ? trim.substring(indexOf + 1) : "");
                }
            }
        }
        if (map.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION)) {
            this.shellScriptPath = map.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION);
            if (map.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)) {
                this.shellScriptPathTimestamp = Long.valueOf(map.get(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)).longValue();
            }
            if (map.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)) {
                this.shellScriptPathLen = Long.valueOf(map.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)).longValue();
            }
            if (!this.shellScriptPath.isEmpty() && (this.shellScriptPathTimestamp <= 0 || this.shellScriptPathLen <= 0)) {
                LOG.error("Illegal values in env for shell script path, path=" + this.shellScriptPath + ", len=" + this.shellScriptPathLen + ", timestamp=" + this.shellScriptPathTimestamp);
                throw new IllegalArgumentException("Illegal values in env for shell script path");
            }
        }
        this.containerMemory = Integer.parseInt(parse.getOptionValue("container_memory", "10"));
        this.numTotalContainers = Integer.parseInt(parse.getOptionValue("num_containers", "1"));
        this.requestPriority = Integer.parseInt(parse.getOptionValue("priority", "0"));
        return true;
    }

    private void printUsage(Options options) {
        new HelpFormatter().printHelp("ApplicationMaster", options);
    }

    public boolean run() throws YarnRemoteException {
        LOG.info("Starting ApplicationMaster");
        this.resourceManager = connectToRM();
        RegisterApplicationMasterResponse registerToRM = registerToRM();
        int memory = registerToRM.getMinimumResourceCapability().getMemory();
        int memory2 = registerToRM.getMaximumResourceCapability().getMemory();
        LOG.info("Min mem capabililty of resources in this cluster " + memory);
        LOG.info("Max mem capabililty of resources in this cluster " + memory2);
        if (this.containerMemory < memory) {
            LOG.info("Container memory specified below min threshold of cluster. Using min value., specified=" + this.containerMemory + ", min=" + memory);
            this.containerMemory = memory;
        } else if (this.containerMemory > memory2) {
            LOG.info("Container memory specified above max threshold of cluster. Using max value., specified=" + this.containerMemory + ", max=" + memory2);
            this.containerMemory = memory2;
        }
        int i = -1;
        while (this.numCompletedContainers.get() < this.numTotalContainers && !this.appDone) {
            i++;
            LOG.info("Current application state: loop=" + i + ", appDone=" + this.appDone + ", total=" + this.numTotalContainers + ", requested=" + this.numRequestedContainers + ", completed=" + this.numCompletedContainers + ", failed=" + this.numFailedContainers + ", currentAllocated=" + this.numAllocatedContainers);
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                LOG.info("Sleep interrupted " + e.getMessage());
            }
            int i2 = this.numTotalContainers - this.numRequestedContainers.get();
            this.numRequestedContainers.addAndGet(i2);
            ArrayList arrayList = new ArrayList();
            if (i2 > 0) {
                arrayList.add(setupContainerAskForRM(i2));
            }
            LOG.info("Asking RM for containers, askCount=" + i2);
            AMResponse sendContainerAskToRM = sendContainerAskToRM(arrayList);
            List<Container> allocatedContainers = sendContainerAskToRM.getAllocatedContainers();
            LOG.info("Got response from RM for container ask, allocatedCnt=" + allocatedContainers.size());
            this.numAllocatedContainers.addAndGet(allocatedContainers.size());
            for (Container container : allocatedContainers) {
                LOG.info("Launching shell command on a new container., containerId=" + container.getId() + ", containerNode=" + container.getNodeId().getHost() + ":" + container.getNodeId().getPort() + ", containerNodeURI=" + container.getNodeHttpAddress() + ", containerState" + container.getState() + ", containerResourceMemory" + container.getResource().getMemory());
                Thread thread = new Thread(new LaunchContainerRunnable(container));
                this.launchThreads.add(thread);
                thread.start();
            }
            LOG.info("Current available resources in the cluster " + sendContainerAskToRM.getAvailableResources());
            List<ContainerStatus> completedContainersStatuses = sendContainerAskToRM.getCompletedContainersStatuses();
            LOG.info("Got response from RM for container ask, completedCnt=" + completedContainersStatuses.size());
            for (ContainerStatus containerStatus : completedContainersStatuses) {
                LOG.info("Got container status for containerID= " + containerStatus.getContainerId() + ", state=" + containerStatus.getState() + ", exitStatus=" + containerStatus.getExitStatus() + ", diagnostics=" + containerStatus.getDiagnostics());
                if (!$assertionsDisabled && containerStatus.getState() != ContainerState.COMPLETE) {
                    throw new AssertionError();
                }
                int exitStatus = containerStatus.getExitStatus();
                if (0 == exitStatus) {
                    this.numCompletedContainers.incrementAndGet();
                    LOG.info("Container completed successfully., containerId=" + containerStatus.getContainerId());
                } else if (-100 != exitStatus) {
                    this.numCompletedContainers.incrementAndGet();
                    this.numFailedContainers.incrementAndGet();
                } else {
                    this.numAllocatedContainers.decrementAndGet();
                    this.numRequestedContainers.decrementAndGet();
                }
            }
            if (this.numCompletedContainers.get() == this.numTotalContainers) {
                this.appDone = true;
            }
            LOG.info("Current application state: loop=" + i + ", appDone=" + this.appDone + ", total=" + this.numTotalContainers + ", requested=" + this.numRequestedContainers + ", completed=" + this.numCompletedContainers + ", failed=" + this.numFailedContainers + ", currentAllocated=" + this.numAllocatedContainers);
        }
        Iterator<Thread> it = this.launchThreads.iterator();
        while (it.hasNext()) {
            try {
                it.next().join(10000L);
            } catch (InterruptedException e2) {
                LOG.info("Exception thrown in thread join: " + e2.getMessage());
                e2.printStackTrace();
            }
        }
        LOG.info("Application completed. Signalling finish to RM");
        FinishApplicationMasterRequest finishApplicationMasterRequest = (FinishApplicationMasterRequest) Records.newRecord(FinishApplicationMasterRequest.class);
        finishApplicationMasterRequest.setAppAttemptId(this.appAttemptID);
        boolean z = true;
        if (this.numFailedContainers.get() == 0) {
            finishApplicationMasterRequest.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED);
        } else {
            finishApplicationMasterRequest.setFinishApplicationStatus(FinalApplicationStatus.FAILED);
            finishApplicationMasterRequest.setDiagnostics("Diagnostics., total=" + this.numTotalContainers + ", completed=" + this.numCompletedContainers.get() + ", allocated=" + this.numAllocatedContainers.get() + ", failed=" + this.numFailedContainers.get());
            z = false;
        }
        this.resourceManager.finishApplicationMaster(finishApplicationMasterRequest);
        return z;
    }

    private AMRMProtocol connectToRM() {
        InetSocketAddress socketAddr = new YarnConfiguration(this.conf).getSocketAddr("yarn.resourcemanager.scheduler.address", "0.0.0.0:8030", 8030);
        LOG.info("Connecting to ResourceManager at " + socketAddr);
        return (AMRMProtocol) this.rpc.getProxy(AMRMProtocol.class, socketAddr, this.conf);
    }

    private RegisterApplicationMasterResponse registerToRM() throws YarnRemoteException {
        RegisterApplicationMasterRequest registerApplicationMasterRequest = (RegisterApplicationMasterRequest) Records.newRecord(RegisterApplicationMasterRequest.class);
        registerApplicationMasterRequest.setApplicationAttemptId(this.appAttemptID);
        registerApplicationMasterRequest.setHost(this.appMasterHostname);
        registerApplicationMasterRequest.setRpcPort(this.appMasterRpcPort);
        registerApplicationMasterRequest.setTrackingUrl(this.appMasterTrackingUrl);
        return this.resourceManager.registerApplicationMaster(registerApplicationMasterRequest);
    }

    private ResourceRequest setupContainerAskForRM(int i) {
        ResourceRequest resourceRequest = (ResourceRequest) Records.newRecord(ResourceRequest.class);
        resourceRequest.setHostName("*");
        resourceRequest.setNumContainers(i);
        Priority priority = (Priority) Records.newRecord(Priority.class);
        priority.setPriority(this.requestPriority);
        resourceRequest.setPriority(priority);
        Resource resource = (Resource) Records.newRecord(Resource.class);
        resource.setMemory(this.containerMemory);
        resourceRequest.setCapability(resource);
        return resourceRequest;
    }

    private AMResponse sendContainerAskToRM(List<ResourceRequest> list) throws YarnRemoteException {
        AllocateRequest allocateRequest = (AllocateRequest) Records.newRecord(AllocateRequest.class);
        allocateRequest.setResponseId(this.rmRequestID.incrementAndGet());
        allocateRequest.setApplicationAttemptId(this.appAttemptID);
        allocateRequest.addAllAsks(list);
        allocateRequest.addAllReleases(this.releasedContainers);
        allocateRequest.setProgress(this.numCompletedContainers.get() / this.numTotalContainers);
        LOG.info("Sending request to RM for containers, requestedSet=" + list.size() + ", releasedSet=" + this.releasedContainers.size() + ", progress=" + allocateRequest.getProgress());
        Iterator<ResourceRequest> it = list.iterator();
        while (it.hasNext()) {
            LOG.info("Requested container ask: " + it.next().toString());
        }
        Iterator<ContainerId> it2 = this.releasedContainers.iterator();
        while (it2.hasNext()) {
            LOG.info("Released container, id=" + it2.next().getId());
        }
        return this.resourceManager.allocate(allocateRequest).getAMResponse();
    }

    static {
        $assertionsDisabled = !ApplicationMaster.class.desiredAssertionStatus();
        LOG = LogFactory.getLog(ApplicationMaster.class);
    }
}
