package co.cask.cdap.runtime.spi.provisioner.emr;

import co.cask.cdap.runtime.spi.SparkCompat;
import co.cask.cdap.runtime.spi.provisioner.Cluster;
import co.cask.cdap.runtime.spi.provisioner.ClusterStatus;
import co.cask.cdap.runtime.spi.provisioner.Node;
import co.cask.cdap.runtime.spi.provisioner.PollingStrategies;
import co.cask.cdap.runtime.spi.provisioner.PollingStrategy;
import co.cask.cdap.runtime.spi.provisioner.ProgramRun;
import co.cask.cdap.runtime.spi.provisioner.Provisioner;
import co.cask.cdap.runtime.spi.provisioner.ProvisionerContext;
import co.cask.cdap.runtime.spi.provisioner.ProvisionerSpecification;
import co.cask.cdap.runtime.spi.ssh.SSHSession;
import com.amazonaws.services.elasticmapreduce.model.ClusterSummary;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import java.io.IOException;
import java.net.ConnectException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/runtime/spi/provisioner/emr/ElasticMapReduceProvisioner.class */
public class ElasticMapReduceProvisioner implements Provisioner {
    private static final Logger LOG = LoggerFactory.getLogger(ElasticMapReduceProvisioner.class);
    private static final ProvisionerSpecification SPEC = new ProvisionerSpecification("aws-emr", "Amazon EMR", "Amazon EMR provides a managed Hadoop framework that makes it easy, fast, and cost-effective to process vast amounts of data across dynamically scalable Amazon EC2 instances.", new HashMap());
    private static final String CLUSTER_PREFIX = "cdap-";

    public ProvisionerSpecification getSpec() {
        return SPEC;
    }

    public void validateProperties(Map<String, String> map) {
        EMRConf.fromProperties(map);
    }

    public Cluster createCluster(ProvisionerContext provisionerContext) throws Exception {
        if (!SparkCompat.SPARK2_2_11.equals(provisionerContext.getSparkCompat())) {
            throw new UnsupportedOperationException("EMR currently only supports " + SparkCompat.SPARK2_2_11);
        }
        provisionerContext.getSSHContext().setSSHKeyPair(provisionerContext.getSSHContext().generate("ec2-user"));
        EMRConf fromProvisionerContext = EMRConf.fromProvisionerContext(provisionerContext);
        String clusterName = getClusterName(provisionerContext.getProgramRun());
        EMRClient fromConf = EMRClient.fromConf(fromProvisionerContext);
        Throwable th = null;
        try {
            Optional<ClusterSummary> unterminatedClusterByName = fromConf.getUnterminatedClusterByName(clusterName);
            if (unterminatedClusterByName.isPresent()) {
                Cluster cluster = fromConf.getCluster(unterminatedClusterByName.get().getId()).get();
                if (fromConf != null) {
                    if (0 != 0) {
                        try {
                            fromConf.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        fromConf.close();
                    }
                }
                return cluster;
            }
            Cluster cluster2 = new Cluster(fromConf.createCluster(clusterName), ClusterStatus.CREATING, Collections.emptyList(), Collections.emptyMap());
            if (fromConf != null) {
                if (0 != 0) {
                    try {
                        fromConf.close();
                    } catch (Throwable th3) {
                        th.addSuppressed(th3);
                    }
                } else {
                    fromConf.close();
                }
            }
            return cluster2;
        } catch (Throwable th4) {
            if (fromConf != null) {
                if (0 != 0) {
                    try {
                        fromConf.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    fromConf.close();
                }
            }
            throw th4;
        }
    }

    public ClusterStatus getClusterStatus(ProvisionerContext provisionerContext, Cluster cluster) throws Exception {
        EMRClient fromConf = EMRClient.fromConf(EMRConf.fromProvisionerContext(provisionerContext));
        Throwable th = null;
        try {
            try {
                ClusterStatus clusterStatus = fromConf.getClusterStatus(cluster.getName());
                if (fromConf != null) {
                    if (0 != 0) {
                        try {
                            fromConf.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        fromConf.close();
                    }
                }
                return clusterStatus;
            } finally {
            }
        } catch (Throwable th3) {
            if (fromConf != null) {
                if (th != null) {
                    try {
                        fromConf.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    fromConf.close();
                }
            }
            throw th3;
        }
    }

    public Cluster getClusterDetail(ProvisionerContext provisionerContext, Cluster cluster) throws Exception {
        EMRClient fromConf = EMRClient.fromConf(EMRConf.fromProvisionerContext(provisionerContext));
        Throwable th = null;
        try {
            try {
                Cluster orElseGet = fromConf.getCluster(cluster.getName()).orElseGet(() -> {
                    return new Cluster(cluster, ClusterStatus.NOT_EXISTS);
                });
                if (fromConf != null) {
                    if (0 != 0) {
                        try {
                            fromConf.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        fromConf.close();
                    }
                }
                return orElseGet;
            } finally {
            }
        } catch (Throwable th3) {
            if (fromConf != null) {
                if (th != null) {
                    try {
                        fromConf.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    fromConf.close();
                }
            }
            throw th3;
        }
    }

    public void initializeCluster(ProvisionerContext provisionerContext, Cluster cluster) throws Exception {
        SSHSession createSSHSession = createSSHSession(provisionerContext, getMasterExternalIp(cluster));
        Throwable th = null;
        try {
            try {
                LOG.debug("Starting zookeeper server.");
                LOG.debug("Zookeeper server started: {}", createSSHSession.executeAndWait(new String[]{"sudo zookeeper-server start"}));
                if (createSSHSession != null) {
                    if (0 == 0) {
                        createSSHSession.close();
                        return;
                    }
                    try {
                        createSSHSession.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createSSHSession != null) {
                if (th != null) {
                    try {
                        createSSHSession.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createSSHSession.close();
                }
            }
            throw th4;
        }
    }

    private SSHSession createSSHSession(ProvisionerContext provisionerContext, String str) throws IOException {
        try {
            return provisionerContext.getSSHContext().createSSHSession(str);
        } catch (IOException e) {
            if (Throwables.getRootCause(e) instanceof ConnectException) {
                throw new IOException(String.format("Failed to connect to host %s. Ensure that the the provisioner property for \"Additional Master Security Group\" has been configured with an EC2 Security Group that has inbound rules allowing ssh and https (ports 22 and 443).", str), e);
            }
            throw e;
        }
    }

    public void deleteCluster(ProvisionerContext provisionerContext, Cluster cluster) throws Exception {
        EMRClient fromConf = EMRClient.fromConf(EMRConf.fromProvisionerContext(provisionerContext));
        Throwable th = null;
        try {
            try {
                fromConf.deleteCluster(cluster.getName());
                if (fromConf != null) {
                    if (0 == 0) {
                        fromConf.close();
                        return;
                    }
                    try {
                        fromConf.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (fromConf != null) {
                if (th != null) {
                    try {
                        fromConf.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    fromConf.close();
                }
            }
            throw th4;
        }
    }

    public PollingStrategy getPollingStrategy(ProvisionerContext provisionerContext, Cluster cluster) {
        return PollingStrategies.fixedInterval(20L, TimeUnit.SECONDS);
    }

    private String getMasterExternalIp(Cluster cluster) {
        Node node = (Node) cluster.getNodes().stream().filter(node2 -> {
            return "master".equals(node2.getProperties().get("type"));
        }).findFirst().orElseThrow(() -> {
            return new IllegalArgumentException("Cluster has no node of master type: " + cluster);
        });
        String str = (String) node.getProperties().get("ip.external");
        if (str == null) {
            throw new IllegalArgumentException(String.format("External IP is not defined for node '%s' in cluster %s", node.getId(), cluster));
        }
        return str;
    }

    @VisibleForTesting
    static String getClusterName(ProgramRun programRun) {
        String lowerCase = programRun.getApplication().replaceAll("[^A-Za-z0-9\\-]", "").toLowerCase();
        int length = ((51 - CLUSTER_PREFIX.length()) - 1) - programRun.getRun().length();
        if (lowerCase.length() > length) {
            lowerCase = lowerCase.substring(0, length);
        }
        return CLUSTER_PREFIX + lowerCase + "-" + programRun.getRun();
    }
}
