/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.ql.exec.spark.session;

import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hive.common.ObjectPair;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClient;
import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClientFactory;
import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession;
import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.SparkWork;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.spark.SparkConf;
import org.apache.spark.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SparkSessionImpl
implements SparkSession {
    private static final Logger LOG = LoggerFactory.getLogger(SparkSession.class);
    private static final String SPARK_DIR = "_spark_session_dir";
    private HiveConf conf;
    private boolean isOpen;
    private final String sessionId;
    private HiveSparkClient hiveSparkClient;
    private Path scratchDir;
    private final Object dirLock = new Object();

    public SparkSessionImpl() {
        this.sessionId = SparkSessionImpl.makeSessionId();
    }

    @Override
    public void open(HiveConf conf) throws HiveException {
        this.conf = conf;
        this.isOpen = true;
        try {
            this.hiveSparkClient = HiveSparkClientFactory.createHiveSparkClient(conf);
        }
        catch (Throwable e) {
            throw new HiveException("Failed to create spark client.", e);
        }
    }

    @Override
    public SparkJobRef submit(DriverContext driverContext, SparkWork sparkWork) throws Exception {
        Preconditions.checkState(this.isOpen, "Session is not open. Can't submit jobs.");
        return this.hiveSparkClient.execute(driverContext, sparkWork);
    }

    @Override
    public ObjectPair<Long, Integer> getMemoryAndCores() throws Exception {
        int totalCores;
        SparkConf sparkConf = this.hiveSparkClient.getSparkConf();
        int numExecutors = this.hiveSparkClient.getExecutorCount();
        if (numExecutors <= 0) {
            return new ObjectPair<Long, Integer>(-1L, -1);
        }
        int executorMemoryInMB = Utils.memoryStringToMb((String)sparkConf.get("spark.executor.memory", "512m"));
        double memoryFraction = 1.0 - sparkConf.getDouble("spark.storage.memoryFraction", 0.6);
        long totalMemory = (long)((double)(numExecutors * executorMemoryInMB) * memoryFraction * 1024.0 * 1024.0);
        String masterURL = sparkConf.get("spark.master");
        if (masterURL.startsWith("spark")) {
            totalCores = sparkConf.contains("spark.default.parallelism") ? sparkConf.getInt("spark.default.parallelism", 1) : this.hiveSparkClient.getDefaultParallelism();
            totalCores = Math.max(totalCores, numExecutors);
        } else {
            int coresPerExecutor = sparkConf.getInt("spark.executor.cores", 1);
            totalCores = numExecutors * coresPerExecutor;
        }
        long memoryPerTaskInBytes = totalMemory / (long)(totalCores /= sparkConf.getInt("spark.task.cpus", 1));
        LOG.info("Spark cluster current has executors: " + numExecutors + ", total cores: " + totalCores + ", memory per executor: " + executorMemoryInMB + "M, memoryFraction: " + memoryFraction);
        return new ObjectPair<Long, Integer>(memoryPerTaskInBytes, totalCores);
    }

    @Override
    public boolean isOpen() {
        return this.isOpen;
    }

    @Override
    public HiveConf getConf() {
        return this.conf;
    }

    @Override
    public String getSessionId() {
        return this.sessionId;
    }

    @Override
    public void close() {
        this.isOpen = false;
        if (this.hiveSparkClient != null) {
            try {
                this.hiveSparkClient.close();
                this.cleanScratchDir();
            }
            catch (IOException e) {
                LOG.error("Failed to close spark session (" + this.sessionId + ").", (Throwable)e);
            }
        }
        this.hiveSparkClient = null;
    }

    private Path createScratchDir() throws IOException {
        Path parent = new Path(SessionState.get().getHdfsScratchDirURIString(), SPARK_DIR);
        Path sparkDir = new Path(parent, this.sessionId);
        FileSystem fs = sparkDir.getFileSystem((Configuration)this.conf);
        FsPermission fsPermission = new FsPermission(HiveConf.getVar(this.conf, HiveConf.ConfVars.SCRATCHDIRPERMISSION));
        fs.mkdirs(sparkDir, fsPermission);
        fs.deleteOnExit(sparkDir);
        return sparkDir;
    }

    private void cleanScratchDir() throws IOException {
        if (this.scratchDir != null) {
            FileSystem fs = this.scratchDir.getFileSystem((Configuration)this.conf);
            fs.delete(this.scratchDir, true);
            this.scratchDir = null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Path getHDFSSessionDir() throws IOException {
        if (this.scratchDir == null) {
            Object object = this.dirLock;
            synchronized (object) {
                if (this.scratchDir == null) {
                    this.scratchDir = this.createScratchDir();
                }
            }
        }
        return this.scratchDir;
    }

    public static String makeSessionId() {
        return UUID.randomUUID().toString();
    }
}

