package com.datatorrent.stram;

import com.datatorrent.api.Context;
import com.datatorrent.api.StreamCodec;
import com.datatorrent.bufferserver.server.Server;
import com.datatorrent.common.util.AsyncFSStorageAgent;
import com.datatorrent.common.util.BasicContainerOptConfigurator;
import com.datatorrent.common.util.FSStorageAgent;
import com.datatorrent.netlet.EventLoop;
import com.datatorrent.netlet.util.Slice;
import com.datatorrent.stram.StreamingContainerManager;
import com.datatorrent.stram.client.StramClientUtils;
import com.datatorrent.stram.engine.StreamingContainer;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.minlog.Log;
import com.google.common.base.Objects;
import com.google.common.collect.Lists;
import com.ning.http.client.websocket.WebSocketUpgradeHandler;
import com.sun.jersey.api.client.ClientHandler;
import com.sun.jersey.client.apache4.ApacheHttpClient4Handler;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import javax.validation.ConstraintViolationException;
import net.engio.mbassy.bus.MBassador;
import org.apache.bval.BeanValidationContext;
import org.apache.bval.jsr303.ApacheValidationProvider;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.ClassUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.JarFinder;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.http.client.utils.URLEncodedUtils;
import org.apache.http.message.BasicHeaderValueParser;
import org.apache.log4j.DTLoggerFactory;
import org.apache.xbean.asm5.tree.ClassNode;
import org.codehaus.jackson.annotate.JsonUnwrapped;
import org.codehaus.jackson.map.ser.std.RawSerializer;
import org.mozilla.javascript.Scriptable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Public
@InterfaceStability.Unstable
/* loaded from: input_file:com/datatorrent/stram/StramClient.class */
public class StramClient {
    public static final String YARN_APPLICATION_TYPE = "DataTorrent";
    public static final String LIB_JARS_SEP = ",";
    private final Configuration conf;
    private ApplicationId appId;
    private final LogicalPlan dag;
    private String originalAppId;
    private String queueName;
    private String archives;
    private String files;
    private LinkedHashSet<String> resources;
    private static final Logger LOG = LoggerFactory.getLogger(StramClient.class);
    private static final Class<?>[] DATATORRENT_CLASSES = {Slice.class, EventLoop.class, Server.class, StreamingAppMaster.class, StreamCodec.class, FSStorageAgent.class, ConstraintViolationException.class, WebSocketUpgradeHandler.class, Kryo.class, ApacheValidationProvider.class, BeanValidationContext.class, ClassUtils.class, MBassador.class, JsonUnwrapped.class, RawSerializer.class, BeanUtils.class, URLEncodedUtils.class, BasicHeaderValueParser.class, Log.class, ClassNode.class, Scriptable.class, ClientHandler.class, ApacheHttpClient4Handler.class};
    private static final Class<?>[] DATATORRENT_SECURITY_SPECIFIC_CLASSES = new Class[0];
    private static final Class<?>[] DATATORRENT_SECURITY_CLASSES = (Class[]) ArrayUtils.addAll(DATATORRENT_CLASSES, DATATORRENT_SECURITY_SPECIFIC_CLASSES);
    private final YarnClient yarnClient = YarnClient.createYarnClient();
    private final int amPriority = 0;
    public String javaCmd = "${JAVA_HOME}/bin/java";
    private final String log4jPropFile = "";
    private long clientTimeout = StreamingContainerManager.LATENCY_WARNING_THRESHOLD_MILLIS;
    private String applicationType = YARN_APPLICATION_TYPE;

    public StramClient(Configuration configuration, LogicalPlan logicalPlan) throws Exception {
        this.conf = configuration;
        this.dag = logicalPlan;
        logicalPlan.validate();
        this.yarnClient.init(configuration);
    }

    public void start() {
        this.yarnClient.start();
    }

    public void stop() {
        this.yarnClient.stop();
    }

    public static LinkedHashSet<String> findJars(LogicalPlan logicalPlan, Class<?>[] clsArr) {
        ArrayList<Class> arrayList = new ArrayList();
        for (String str : logicalPlan.getClassNames()) {
            try {
                arrayList.add(Thread.currentThread().getContextClassLoader().loadClass(str));
            } catch (ClassNotFoundException e) {
                throw new IllegalArgumentException("Failed to load class " + str, e);
            }
        }
        Iterator it = Lists.newArrayList(arrayList).iterator();
        while (it.hasNext()) {
            Class cls = (Class) it.next();
            while (true) {
                Class cls2 = cls;
                if (cls2 != Object.class && cls2 != null) {
                    arrayList.add(cls2);
                    arrayList.addAll(Arrays.asList(cls2.getInterfaces()));
                    cls = cls2.getSuperclass();
                }
            }
        }
        arrayList.addAll(Arrays.asList(clsArr));
        if (logicalPlan.isDebug()) {
            LOG.debug("Deploy dependencies: {}", arrayList);
        }
        LinkedHashSet<String> linkedHashSet = new LinkedHashSet<>();
        HashMap hashMap = new HashMap();
        for (Class cls3 : arrayList) {
            if (cls3.getProtectionDomain().getCodeSource() != null) {
                String url = cls3.getProtectionDomain().getCodeSource().getLocation().toString();
                String str2 = (String) hashMap.get(url);
                if (str2 == null) {
                    str2 = JarFinder.getJar(cls3);
                    hashMap.put(url, str2);
                    LOG.debug("added sourceLocation {} as {}", url, str2);
                }
                if (str2 == null) {
                    throw new AssertionError("Cannot resolve jar file for " + cls3);
                }
                linkedHashSet.add(str2);
            }
        }
        String str3 = (String) logicalPlan.getValue(LogicalPlan.LIBRARY_JARS);
        if (!StringUtils.isEmpty(str3)) {
            linkedHashSet.addAll(Arrays.asList(StringUtils.splitByWholeSeparator(str3, ",")));
        }
        LOG.info("Local jar file dependencies: " + linkedHashSet);
        return linkedHashSet;
    }

    private String copyFromLocal(FileSystem fileSystem, Path path, String[] strArr) throws IOException {
        StringBuilder sb = new StringBuilder(strArr.length * (path.toString().length() + 16));
        for (String str : strArr) {
            Path path2 = new Path(str);
            Path path3 = new Path(path, path2.getName());
            try {
                URI uri = new URI(str);
                if (uri.getScheme() == null || uri.getScheme().startsWith("file")) {
                    LOG.info("Copy {} from local filesystem to {}", str, path3);
                    fileSystem.copyFromLocalFile(false, true, path2, path3);
                } else {
                    LOG.info("Copy {} from DFS to {}", str, path3);
                    FileUtil.copy(fileSystem, path2, fileSystem, path3, false, true, this.conf);
                }
                if (sb.length() > 0) {
                    sb.append(",");
                }
                sb.append(path3.toString());
            } catch (URISyntaxException e) {
                throw new IOException(e);
            }
        }
        return sb.toString();
    }

    public void copyInitialState(Path path) throws IOException {
        String assertAppPath = this.dag.assertAppPath();
        FSRecoveryHandler fSRecoveryHandler = new FSRecoveryHandler(path.toString(), this.conf);
        Object restore = fSRecoveryHandler.restore();
        if (restore == null) {
            throw new IllegalArgumentException("No previous application state found in " + path);
        }
        DataInputStream log = fSRecoveryHandler.getLog();
        ((StreamingContainerManager.CheckpointState) restore).setApplicationId(this.dag, this.conf);
        Path path2 = new Path(assertAppPath, LogicalPlan.SUBDIR_CHECKPOINTS);
        FileSystem newInstance = FileSystem.newInstance(path.toUri(), this.conf);
        newInstance.delete(path2, true);
        FSRecoveryHandler fSRecoveryHandler2 = new FSRecoveryHandler(assertAppPath, this.conf);
        fSRecoveryHandler2.save(restore);
        DataOutputStream rotateLog = fSRecoveryHandler2.rotateLog();
        IOUtils.copy(log, rotateLog);
        rotateLog.flush();
        rotateLog.close();
        log.close();
        for (FileStatus fileStatus : newInstance.listStatus(path)) {
            if (fileStatus.isDirectory()) {
                String replace = fileStatus.getPath().toString().replace(path.toString(), assertAppPath);
                if (newInstance.exists(new Path(replace))) {
                    LOG.debug("Ignoring {} as it already exists under {}", fileStatus.getPath(), replace);
                } else {
                    LOG.debug("Copying {} to {}", fileStatus.getPath(), replace);
                    FileUtil.copy(newInstance, fileStatus.getPath(), newInstance, new Path(replace), false, this.conf);
                }
            }
        }
    }

    public void startApplication() throws YarnException, IOException {
        FileSystem newFileSystemInstance;
        if (!this.applicationType.equals(YARN_APPLICATION_TYPE)) {
            throw new IllegalStateException(this.applicationType + " is not a valid application type.");
        }
        LinkedHashSet<String> findJars = findJars(this.dag, UserGroupInformation.isSecurityEnabled() ? DATATORRENT_SECURITY_CLASSES : DATATORRENT_CLASSES);
        if (this.resources != null) {
            findJars.addAll(this.resources);
        }
        LOG.info("Got Cluster metric info from ASM, numNodeManagers=" + this.yarnClient.getYarnClusterMetrics().getNumNodeManagers());
        for (QueueUserACLInfo queueUserACLInfo : this.yarnClient.getQueueAclsInfo()) {
            Iterator it = queueUserACLInfo.getUserAcls().iterator();
            while (it.hasNext()) {
                LOG.info("User ACL Info for Queue, queueName=" + queueUserACLInfo.getQueueName() + ", userAcl=" + ((QueueACL) it.next()).name());
            }
        }
        YarnClientApplication createApplication = this.yarnClient.createApplication();
        this.appId = createApplication.getNewApplicationResponse().getApplicationId();
        int memory = createApplication.getNewApplicationResponse().getMaximumResourceCapability().getMemory();
        LOG.info("Max mem capability of resources in this cluster " + memory);
        int masterMemoryMB = this.dag.getMasterMemoryMB();
        if (masterMemoryMB > memory) {
            LOG.info("AM memory specified above max threshold of cluster. Using max value., specified=" + masterMemoryMB + ", max=" + memory);
            masterMemoryMB = memory;
        }
        if (this.dag.getAttributes().get(LogicalPlan.APPLICATION_ID) == null) {
            this.dag.setAttribute(LogicalPlan.APPLICATION_ID, this.appId.toString());
        }
        LOG.info("Setting up application submission context for ASM");
        ApplicationSubmissionContext applicationSubmissionContext = (ApplicationSubmissionContext) Records.newRecord(ApplicationSubmissionContext.class);
        applicationSubmissionContext.setApplicationId(this.appId);
        applicationSubmissionContext.setApplicationName((String) this.dag.getValue(LogicalPlan.APPLICATION_NAME));
        applicationSubmissionContext.setApplicationType(this.applicationType);
        if (YARN_APPLICATION_TYPE.equals(this.applicationType)) {
        }
        ContainerLaunchContext containerLaunchContext = (ContainerLaunchContext) Records.newRecord(ContainerLaunchContext.class);
        if (UserGroupInformation.isSecurityEnabled()) {
            Credentials credentials = new Credentials();
            String str = this.conf.get("yarn.resourcemanager.principal");
            if (str == null || str.length() == 0) {
                throw new IOException("Can't get Master Kerberos principal for the RM to use as renewer");
            }
            newFileSystemInstance = StramClientUtils.newFileSystemInstance(this.conf);
            try {
                Token[] addDelegationTokens = newFileSystemInstance.addDelegationTokens(str, credentials);
                if (addDelegationTokens != null) {
                    for (Token token : addDelegationTokens) {
                        LOG.info("Got dt for " + newFileSystemInstance.getUri() + "; " + token);
                    }
                }
                newFileSystemInstance.close();
                new StramClientUtils.ClientRMHelper(this.yarnClient, this.conf).addRMDelegationToken(str, credentials);
                DataOutputBuffer dataOutputBuffer = new DataOutputBuffer();
                credentials.writeTokenStorageToStream(dataOutputBuffer);
                containerLaunchContext.setTokens(ByteBuffer.wrap(dataOutputBuffer.getData(), 0, dataOutputBuffer.getLength()));
            } finally {
            }
        }
        HashMap hashMap = new HashMap();
        newFileSystemInstance = StramClientUtils.newFileSystemInstance(this.conf);
        try {
            Path path = new Path(StramClientUtils.getDTDFSRootDir(newFileSystemInstance, this.conf), StramClientUtils.SUBDIR_APPS);
            String str2 = (String) this.dag.getValue(LogicalPlan.APPLICATION_PATH);
            Path path2 = str2 == null ? new Path(path, this.appId.toString()) : new Path(str2);
            String copyFromLocal = copyFromLocal(newFileSystemInstance, path2, (String[]) findJars.toArray(new String[0]));
            LOG.info("libjars: {}", copyFromLocal);
            this.dag.getAttributes().put(LogicalPlan.LIBRARY_JARS, copyFromLocal);
            LaunchContainerRunnable.addFilesToLocalResources(LocalResourceType.FILE, copyFromLocal, hashMap, newFileSystemInstance);
            if (this.archives != null) {
                String copyFromLocal2 = copyFromLocal(newFileSystemInstance, path2, this.archives.split(","));
                LOG.info("archives: {}", copyFromLocal2);
                this.dag.getAttributes().put(LogicalPlan.ARCHIVES, copyFromLocal2);
                LaunchContainerRunnable.addFilesToLocalResources(LocalResourceType.ARCHIVE, copyFromLocal2, hashMap, newFileSystemInstance);
            }
            if (this.files != null) {
                String copyFromLocal3 = copyFromLocal(newFileSystemInstance, path2, this.files.split(","));
                LOG.info("files: {}", copyFromLocal3);
                this.dag.getAttributes().put(LogicalPlan.FILES, copyFromLocal3);
                LaunchContainerRunnable.addFilesToLocalResources(LocalResourceType.FILE, copyFromLocal3, hashMap, newFileSystemInstance);
            }
            this.dag.getAttributes().put(LogicalPlan.APPLICATION_PATH, path2.toString());
            if (this.dag.getAttributes().get(Context.OperatorContext.STORAGE_AGENT) == null) {
                this.dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(new Path(path2, LogicalPlan.SUBDIR_CHECKPOINTS).toString(), this.conf));
            }
            if (this.dag.getAttributes().get(LogicalPlan.CONTAINER_OPTS_CONFIGURATOR) == null) {
                this.dag.setAttribute(LogicalPlan.CONTAINER_OPTS_CONFIGURATOR, new BasicContainerOptConfigurator());
            }
            if (!"".isEmpty()) {
                Path path3 = new Path("");
                Path path4 = new Path(path2, "log4j.props");
                newFileSystemInstance.copyFromLocalFile(false, true, path3, path4);
                FileStatus fileStatus = newFileSystemInstance.getFileStatus(path4);
                LocalResource localResource = (LocalResource) Records.newRecord(LocalResource.class);
                localResource.setType(LocalResourceType.FILE);
                localResource.setVisibility(LocalResourceVisibility.APPLICATION);
                localResource.setResource(ConverterUtils.getYarnUrlFromURI(path4.toUri()));
                localResource.setTimestamp(fileStatus.getModificationTime());
                localResource.setSize(fileStatus.getLen());
                hashMap.put("log4j.properties", localResource);
            }
            if (this.originalAppId != null) {
                Path path5 = new Path(path, this.originalAppId);
                LOG.info("Restart from {}", path5);
                copyInitialState(path5);
            }
            Path path6 = new Path(path2, LogicalPlan.SER_FILE_NAME);
            FSDataOutputStream create = newFileSystemInstance.create(path6, true);
            LogicalPlan.write(this.dag, create);
            create.close();
            FSDataOutputStream create2 = newFileSystemInstance.create(new Path(path2, LogicalPlan.LAUNCH_CONFIG_FILE_NAME), true);
            this.conf.writeXml(create2);
            create2.close();
            LaunchContainerRunnable.addFileToLocalResources(LogicalPlan.SER_FILE_NAME, newFileSystemInstance.getFileStatus(path6), LocalResourceType.FILE, hashMap);
            containerLaunchContext.setLocalResources(hashMap);
            LOG.info("Set the environment for the application master");
            HashMap hashMap2 = new HashMap();
            StringBuilder sb = new StringBuilder("./*");
            String str3 = this.conf.get("yarn.application.classpath");
            for (String str4 : StringUtils.isBlank(str3) ? YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH : str3.split(",")) {
                if (!str4.equals("$HADOOP_CLIENT_CONF_DIR")) {
                    sb.append(':');
                    sb.append(str4.trim());
                }
            }
            hashMap2.put("CLASSPATH", sb.toString());
            hashMap2.put("HADOOP_USER_NAME", UserGroupInformation.getLoginUser().getUserName());
            containerLaunchContext.setEnvironment(hashMap2);
            ArrayList arrayList = new ArrayList(30);
            LOG.info("Setting up app master command");
            arrayList.add(this.javaCmd);
            if (this.dag.isDebug()) {
                arrayList.add("-agentlib:jdwp=transport=dt_socket,server=y,suspend=n");
            }
            if (this.dag.getMasterJVMOptions() != null) {
                arrayList.add(this.dag.getMasterJVMOptions());
            }
            arrayList.add("-Djava.io.tmpdir=" + new Path(ApplicationConstants.Environment.PWD.$(), "./tmp"));
            arrayList.add("-Xmx" + ((masterMemoryMB * 3) / 4) + "m");
            arrayList.add("-XX:+HeapDumpOnOutOfMemoryError");
            arrayList.add("-XX:HeapDumpPath=" + System.getProperty("java.io.tmpdir") + "/dt-heap-" + this.appId.getId() + ".bin");
            arrayList.add("-Dhadoop.root.logger=" + (this.dag.isDebug() ? "DEBUG" : "INFO") + ",RFA");
            arrayList.add("-Dhadoop.log.dir=<LOG_DIR>");
            arrayList.add(String.format("-D%s=%s", StreamingContainer.PROP_APP_PATH, this.dag.assertAppPath()));
            if (this.dag.isDebug()) {
                arrayList.add("-Dlog4j.debug=true");
            }
            String str5 = this.conf.get(DTLoggerFactory.DT_LOGGERS_LEVEL);
            if (str5 != null) {
                arrayList.add(String.format("-D%s=%s", DTLoggerFactory.DT_LOGGERS_LEVEL, str5));
            }
            arrayList.add(StreamingAppMaster.class.getName());
            arrayList.add("1><LOG_DIR>/AppMaster.stdout");
            arrayList.add("2><LOG_DIR>/AppMaster.stderr");
            StringBuilder sb2 = new StringBuilder(9 * arrayList.size());
            Iterator it2 = arrayList.iterator();
            while (it2.hasNext()) {
                sb2.append((CharSequence) it2.next()).append(" ");
            }
            LOG.info("Completed setting up app master command " + sb2.toString());
            ArrayList arrayList2 = new ArrayList();
            arrayList2.add(sb2.toString());
            containerLaunchContext.setCommands(arrayList2);
            Resource resource = (Resource) Records.newRecord(Resource.class);
            resource.setMemory(masterMemoryMB);
            applicationSubmissionContext.setResource(resource);
            applicationSubmissionContext.setAMContainerSpec(containerLaunchContext);
            Priority priority = (Priority) Records.newRecord(Priority.class);
            priority.setPriority(0);
            applicationSubmissionContext.setPriority(priority);
            applicationSubmissionContext.setQueue(this.queueName);
            LOG.info(Objects.toStringHelper("Submitting application: ").add("name", applicationSubmissionContext.getApplicationName()).add("queue", applicationSubmissionContext.getQueue()).add("user", UserGroupInformation.getLoginUser()).add("resource", applicationSubmissionContext.getResource()).toString());
            if (this.dag.isDebug()) {
            }
            this.yarnClient.submitApplication(applicationSubmissionContext);
            newFileSystemInstance.close();
        } finally {
        }
    }

    public ApplicationReport getApplicationReport() throws YarnException, IOException {
        return this.yarnClient.getApplicationReport(this.appId);
    }

    public void killApplication() throws YarnException, IOException {
        this.yarnClient.killApplication(this.appId);
    }

    public void setClientTimeout(long j) {
        this.clientTimeout = j;
    }

    public boolean monitorApplication() throws YarnException, IOException {
        return new StramClientUtils.ClientRMHelper(this.yarnClient, this.conf).waitForCompletion(this.appId, new StramClientUtils.ClientRMHelper.AppStatusCallback() { // from class: com.datatorrent.stram.StramClient.1
            @Override // com.datatorrent.stram.client.StramClientUtils.ClientRMHelper.AppStatusCallback
            public boolean exitLoop(ApplicationReport applicationReport) {
                StramClient.LOG.info("Got application report from ASM for, appId=" + StramClient.this.appId.getId() + ", clientToken=" + applicationReport.getClientToAMToken() + ", appDiagnostics=" + applicationReport.getDiagnostics() + ", appMasterHost=" + applicationReport.getHost() + ", appQueue=" + applicationReport.getQueue() + ", appMasterRpcPort=" + applicationReport.getRpcPort() + ", appStartTime=" + applicationReport.getStartTime() + ", yarnAppState=" + applicationReport.getYarnApplicationState().toString() + ", distributedFinalState=" + applicationReport.getFinalApplicationStatus().toString() + ", appTrackingUrl=" + applicationReport.getTrackingUrl() + ", appUser=" + applicationReport.getUser());
                return false;
            }
        }, this.clientTimeout);
    }

    public void setApplicationType(String str) {
        this.applicationType = str;
    }

    public void setOriginalAppId(String str) {
        this.originalAppId = str;
    }

    public String getQueueName() {
        return this.queueName;
    }

    public void setQueueName(String str) {
        this.queueName = str;
    }

    public void setResources(LinkedHashSet<String> linkedHashSet) {
        this.resources = linkedHashSet;
    }

    public void setArchives(String str) {
        this.archives = str;
    }

    public void setFiles(String str) {
        this.files = str;
    }
}
