package org.apache.reef.runtime.standalone.driver;

import com.jcraft.jsch.ChannelExec;
import com.jcraft.jsch.JSch;
import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.Session;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.inject.Inject;
import org.apache.reef.driver.evaluator.EvaluatorProcess;
import org.apache.reef.runtime.common.driver.api.ResourceLaunchEvent;
import org.apache.reef.runtime.common.driver.api.ResourceReleaseEvent;
import org.apache.reef.runtime.common.driver.api.ResourceRequestEvent;
import org.apache.reef.runtime.common.driver.evaluator.pojos.State;
import org.apache.reef.runtime.common.driver.resourcemanager.ResourceEventImpl;
import org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEventImpl;
import org.apache.reef.runtime.common.files.FileResource;
import org.apache.reef.runtime.common.files.REEFFileNames;
import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
import org.apache.reef.runtime.common.utils.RemoteManager;
import org.apache.reef.runtime.local.process.ReefRunnableProcessObserver;
import org.apache.reef.runtime.standalone.client.parameters.NodeFolder;
import org.apache.reef.runtime.standalone.client.parameters.NodeInfoSet;
import org.apache.reef.runtime.standalone.client.parameters.RootFolder;
import org.apache.reef.runtime.standalone.client.parameters.SshPortNum;
import org.apache.reef.runtime.yarn.driver.REEFEventHandlers;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.tang.exceptions.BindException;
import org.apache.reef.tang.formats.ConfigurationSerializer;
import org.apache.reef.util.CollectionUtils;
import org.apache.reef.util.Optional;

/* loaded from: input_file:org/apache/reef/runtime/standalone/driver/RemoteNodeManager.class */
public final class RemoteNodeManager {
    private static final Logger LOG = Logger.getLogger(RemoteNodeManager.class.getName());
    private final ThreadGroup containerThreads = new ThreadGroup("SshContainerManagerThreadGroup");
    private final Map<String, SshProcessContainer> containers = new HashMap();
    private final ConfigurationSerializer configurationSerializer;
    private final REEFFileNames fileNames;
    private final double jvmHeapFactor;
    private final REEFEventHandlers reefEventHandlers;
    private final String errorHandlerRID;
    private final Set<String> nodeInfoSet;
    private Iterator<String> nodeSetIterator;
    private final ReefRunnableProcessObserver processObserver;
    private final String rootFolder;
    private final String nodeFolder;
    private final int sshPortNum;

    @Inject
    RemoteNodeManager(ConfigurationSerializer configurationSerializer, REEFFileNames rEEFFileNames, RemoteManager remoteManager, REEFEventHandlers rEEFEventHandlers, ReefRunnableProcessObserver reefRunnableProcessObserver, @Parameter(JVMHeapSlack.class) double d, @Parameter(NodeInfoSet.class) Set<String> set, @Parameter(RootFolder.class) String str, @Parameter(NodeFolder.class) String str2, @Parameter(SshPortNum.class) int i) {
        this.configurationSerializer = configurationSerializer;
        this.fileNames = rEEFFileNames;
        this.processObserver = reefRunnableProcessObserver;
        this.errorHandlerRID = remoteManager.getMyIdentifier();
        this.reefEventHandlers = rEEFEventHandlers;
        this.jvmHeapFactor = 1.0d - d;
        this.nodeInfoSet = set;
        this.rootFolder = str;
        this.nodeFolder = str2;
        this.sshPortNum = i;
        this.nodeSetIterator = this.nodeInfoSet.iterator();
        LOG.log(Level.FINEST, "Initialized RemoteNodeManager.");
    }

    private void release(String str) {
        synchronized (this.containers) {
            SshProcessContainer sshProcessContainer = this.containers.get(str);
            if (null != sshProcessContainer) {
                LOG.log(Level.INFO, "Releasing Container with containerId [{0}]", sshProcessContainer);
                if (sshProcessContainer.isRunning()) {
                    sshProcessContainer.close();
                }
                this.containers.remove(str);
            } else {
                LOG.log(Level.INFO, "Ignoring release request for unknown containerID [{0}]", str);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onResourceLaunchRequest(ResourceLaunchEvent resourceLaunchEvent) {
        String node;
        String substring;
        String substring2;
        Session session;
        SshProcessContainer withRemoteConnection;
        LOG.log(Level.INFO, "RemoteNodeManager:onResourceLaunchRequest");
        try {
            synchronized (this.nodeSetIterator) {
                node = getNode();
            }
            if (node.indexOf(64) < 0) {
                substring = System.getProperty("user.name");
                substring2 = node;
            } else {
                substring = node.substring(0, node.indexOf(64));
                substring2 = node.substring(node.indexOf(64) + 1, node.length());
            }
            String str = System.getProperty("user.home") + "/.ssh/id_dsa";
            synchronized (this.containers) {
                try {
                    JSch jSch = new JSch();
                    jSch.addIdentity(str);
                    session = jSch.getSession(substring, substring2, this.sshPortNum);
                    Properties properties = new Properties();
                    properties.put("StrictHostKeyChecking", "no");
                    session.setConfig(properties);
                    try {
                        session.connect();
                        LOG.log(Level.FINEST, "Established connection with {0}", substring2);
                        withRemoteConnection = this.containers.get(resourceLaunchEvent.getIdentifier()).withRemoteConnection(session, node);
                        withRemoteConnection.addGlobalFiles(this.fileNames.getGlobalFolder());
                        withRemoteConnection.addLocalFiles(getLocalFiles(resourceLaunchEvent));
                    } catch (JSchException e) {
                        throw new RuntimeException("Unable to connect to " + node + ". Check your authorized_keys settings. It should contain the public key of " + str, e);
                    }
                } catch (JSchException | IOException e2) {
                    LOG.log(Level.WARNING, "Failed to establish connection with {0}@{1}:\n Exception:{2}", new Object[]{substring, substring2, e2});
                }
                try {
                    this.configurationSerializer.toFile(resourceLaunchEvent.getEvaluatorConf(), new File(withRemoteConnection.getFolder(), this.fileNames.getEvaluatorConfigurationPath()));
                    ChannelExec openChannel = session.openChannel("exec");
                    openChannel.setCommand("mkdir " + this.nodeFolder);
                    openChannel.connect();
                    ArrayList arrayList = new ArrayList(Arrays.asList("scp", "-r", withRemoteConnection.getFolder().toString(), node + ":~/" + this.nodeFolder + "/" + withRemoteConnection.getContainerID()));
                    LOG.log(Level.INFO, "Copying files: {0}", arrayList);
                    try {
                        new ProcessBuilder(arrayList).start().waitFor();
                        List<String> launchCommand = getLaunchCommand(resourceLaunchEvent, withRemoteConnection.getMemory());
                        LOG.log(Level.FINEST, "Launching container: {0}", withRemoteConnection);
                        withRemoteConnection.run(launchCommand);
                    } catch (InterruptedException e3) {
                        throw new RuntimeException("Copying Interrupted: ", e3);
                    }
                } catch (IOException | BindException e4) {
                    throw new RuntimeException("Unable to write configuration.", e4);
                }
            }
        } catch (Exception e5) {
            throw new RuntimeException("Unable to get remote node", e5);
        }
    }

    private String getNode() {
        if (!this.nodeSetIterator.hasNext()) {
            this.nodeSetIterator = this.nodeInfoSet.iterator();
        }
        return this.nodeSetIterator.next().trim();
    }

    private static List<File> getLocalFiles(ResourceLaunchEvent resourceLaunchEvent) {
        ArrayList arrayList = new ArrayList();
        Iterator it = resourceLaunchEvent.getFileSet().iterator();
        while (it.hasNext()) {
            arrayList.add(new File(((FileResource) it.next()).getPath()).getAbsoluteFile());
        }
        return arrayList;
    }

    private List<String> getLaunchCommand(ResourceLaunchEvent resourceLaunchEvent, int i) {
        EvaluatorProcess configurationFileName = resourceLaunchEvent.getProcess().setConfigurationFileName(this.fileNames.getEvaluatorConfigurationPath());
        return configurationFileName.isOptionSet() ? configurationFileName.getCommandLine() : configurationFileName.setMemory((int) (this.jvmHeapFactor * i)).getCommandLine();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onResourceRequest(ResourceRequestEvent resourceRequestEvent) {
        Optional<String> selectNode = selectNode(resourceRequestEvent);
        String str = selectNode.isPresent() ? (String) selectNode.get() : getNode() + ":" + String.valueOf(this.sshPortNum);
        String str2 = str + "-" + String.valueOf(System.currentTimeMillis());
        this.containers.put(str2, new SshProcessContainer(this.errorHandlerRID, str, str2, new File(this.rootFolder, str2), ((Integer) resourceRequestEvent.getMemorySize().get()).intValue(), ((Integer) resourceRequestEvent.getVirtualCores().get()).intValue(), null, this.fileNames, this.nodeFolder, this.processObserver, this.containerThreads));
        this.reefEventHandlers.onResourceAllocation(ResourceEventImpl.newAllocationBuilder().setIdentifier(str2).setNodeId(str).setResourceMemory(((Integer) resourceRequestEvent.getMemorySize().get()).intValue()).setVirtualCores(((Integer) resourceRequestEvent.getVirtualCores().get()).intValue()).setRuntimeName("STANDALONE").build());
        updateRuntimeStatus();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onResourceReleaseRequest(ResourceReleaseEvent resourceReleaseEvent) {
        synchronized (this.containers) {
            LOG.log(Level.FINEST, "Release container: {0}", resourceReleaseEvent.getIdentifier());
            release(resourceReleaseEvent.getIdentifier());
        }
    }

    public synchronized void close() {
        synchronized (this.containers) {
            if (this.containers.isEmpty()) {
                LOG.log(Level.FINEST, "Clean shutdown with no outstanding containers.");
            } else {
                LOG.log(Level.WARNING, "Dirty shutdown with outstanding containers.");
                for (SshProcessContainer sshProcessContainer : this.containers.values()) {
                    LOG.log(Level.WARNING, "Force shutdown of: {0}", sshProcessContainer);
                    sshProcessContainer.close();
                }
            }
        }
    }

    private Optional<String> selectNode(ResourceRequestEvent resourceRequestEvent) {
        if (CollectionUtils.isNotEmpty(resourceRequestEvent.getNodeNameList())) {
            Iterator it = resourceRequestEvent.getNodeNameList().iterator();
            if (it.hasNext()) {
                return Optional.of((String) it.next());
            }
        }
        if (CollectionUtils.isNotEmpty(resourceRequestEvent.getRackNameList())) {
            Iterator it2 = resourceRequestEvent.getRackNameList().iterator();
            if (it2.hasNext()) {
                return Optional.of((String) it2.next());
            }
        }
        return Optional.empty();
    }

    private synchronized void updateRuntimeStatus() {
        this.reefEventHandlers.onRuntimeStatus(RuntimeStatusEventImpl.newBuilder().setName("STANDALONE").setState(State.RUNNING).build());
    }
}
