package org.apache.kyuubi.engine.spark;

import java.nio.file.Path;
import java.time.Instant;
import java.util.Locale;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kyuubi.KyuubiException;
import org.apache.kyuubi.KyuubiException$;
import org.apache.kyuubi.Logging;
import org.apache.kyuubi.Utils$;
import org.apache.kyuubi.config.ConfigEntry;
import org.apache.kyuubi.config.KyuubiConf;
import org.apache.kyuubi.config.KyuubiConf$;
import org.apache.kyuubi.engine.spark.events.EngineEvent;
import org.apache.kyuubi.engine.spark.events.EngineEvent$;
import org.apache.kyuubi.engine.spark.events.EngineEventsStore;
import org.apache.kyuubi.engine.spark.events.SparkEventHandlerRegister;
import org.apache.kyuubi.engine.spark.session.SparkSessionImpl;
import org.apache.kyuubi.events.EventBus$;
import org.apache.kyuubi.ha.HighAvailabilityConf$;
import org.apache.kyuubi.ha.client.RetryPolicies$;
import org.apache.kyuubi.session.SessionHandle$;
import org.apache.kyuubi.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.kyuubi.shade.io.vertx.core.cli.UsageMessageFormatter;
import org.apache.kyuubi.util.SignalRegister$;
import org.apache.spark.SparkConf;
import org.apache.spark.kyuubi.SparkContextHelper$;
import org.apache.spark.kyuubi.SparkUtilsHelper$;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SparkSession$;
import org.apache.spark.ui.EngineTab;
import org.slf4j.Logger;
import scala.Function0;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Serializable;
import scala.Some;
import scala.Tuple2;
import scala.collection.GenTraversableOnce;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableLike;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.ArrayOps;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.sys.package$;
import scala.util.control.NonFatal$;

/* compiled from: SparkSQLEngine.scala */
/* loaded from: input_file:org/apache/kyuubi/engine/spark/SparkSQLEngine$.class */
public final class SparkSQLEngine$ implements Logging, Serializable {
    public static SparkSQLEngine$ MODULE$;
    private String user;
    private SparkConf _sparkConf;
    private KyuubiConf _kyuubiConf;
    private Option<SparkSQLEngine> currentEngine;
    private final CountDownLatch org$apache$kyuubi$engine$spark$SparkSQLEngine$$countDownLatch;
    private final AtomicBoolean sparkSessionCreated;
    private final int EXECUTOR_POD_NAME_PREFIX_MAX_LENGTH;
    private transient Logger org$apache$kyuubi$Logging$$log_;
    private volatile boolean bitmap$0;

    static {
        new SparkSQLEngine$();
    }

    @Override // org.apache.kyuubi.Logging
    public String loggerName() {
        String loggerName;
        loggerName = loggerName();
        return loggerName;
    }

    @Override // org.apache.kyuubi.Logging
    public Logger logger() {
        Logger logger;
        logger = logger();
        return logger;
    }

    @Override // org.apache.kyuubi.Logging
    public void debug(Function0<Object> function0) {
        debug(function0);
    }

    @Override // org.apache.kyuubi.Logging
    public void debug(Function0<Object> function0, Throwable th) {
        debug(function0, th);
    }

    @Override // org.apache.kyuubi.Logging
    public void info(Function0<Object> function0) {
        info(function0);
    }

    @Override // org.apache.kyuubi.Logging
    public void info(Function0<Object> function0, Throwable th) {
        info(function0, th);
    }

    @Override // org.apache.kyuubi.Logging
    public void warn(Function0<Object> function0) {
        warn(function0);
    }

    @Override // org.apache.kyuubi.Logging
    public void warn(Function0<Object> function0, Throwable th) {
        warn(function0, th);
    }

    @Override // org.apache.kyuubi.Logging
    public void error(Function0<Object> function0, Throwable th) {
        error(function0, th);
    }

    @Override // org.apache.kyuubi.Logging
    public void error(Function0<Object> function0) {
        error(function0);
    }

    @Override // org.apache.kyuubi.Logging
    public void initializeLoggerIfNecessary(boolean z) {
        initializeLoggerIfNecessary(z);
    }

    @Override // org.apache.kyuubi.Logging
    public Logger org$apache$kyuubi$Logging$$log_() {
        return this.org$apache$kyuubi$Logging$$log_;
    }

    @Override // org.apache.kyuubi.Logging
    public void org$apache$kyuubi$Logging$$log__$eq(Logger logger) {
        this.org$apache$kyuubi$Logging$$log_ = logger;
    }

    private SparkConf _sparkConf() {
        return this._sparkConf;
    }

    private void _sparkConf_$eq(SparkConf sparkConf) {
        this._sparkConf = sparkConf;
    }

    private KyuubiConf _kyuubiConf() {
        return this._kyuubiConf;
    }

    private void _kyuubiConf_$eq(KyuubiConf kyuubiConf) {
        this._kyuubiConf = kyuubiConf;
    }

    public KyuubiConf kyuubiConf() {
        return _kyuubiConf();
    }

    public Option<SparkSQLEngine> currentEngine() {
        return this.currentEngine;
    }

    public void currentEngine_$eq(Option<SparkSQLEngine> option) {
        this.currentEngine = option;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v8, types: [org.apache.kyuubi.engine.spark.SparkSQLEngine$] */
    private String user$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (!this.bitmap$0) {
                this.user = Utils$.MODULE$.currentUser();
                r0 = this;
                r0.bitmap$0 = true;
            }
        }
        return this.user;
    }

    private String user() {
        return !this.bitmap$0 ? user$lzycompute() : this.user;
    }

    public CountDownLatch org$apache$kyuubi$engine$spark$SparkSQLEngine$$countDownLatch() {
        return this.org$apache$kyuubi$engine$spark$SparkSQLEngine$$countDownLatch;
    }

    private AtomicBoolean sparkSessionCreated() {
        return this.sparkSessionCreated;
    }

    public int EXECUTOR_POD_NAME_PREFIX_MAX_LENGTH() {
        return this.EXECUTOR_POD_NAME_PREFIX_MAX_LENGTH;
    }

    public SparkSession getSparkSession(String str) {
        Predef$.MODULE$.assert(currentEngine().isDefined());
        return ((SparkSessionImpl) ((SparkSQLEngine) currentEngine().get()).backendService().sessionManager().getSession(SessionHandle$.MODULE$.fromUUID(str))).spark();
    }

    public void setupConf() {
        _sparkConf_$eq(new SparkConf());
        _kyuubiConf_$eq(new KyuubiConf(KyuubiConf$.MODULE$.apply$default$1()));
        Path createTempDir = Utils$.MODULE$.createTempDir("repl", (String) _sparkConf().getOption("spark.repl.classdir").getOrElse(() -> {
            return SparkUtilsHelper$.MODULE$.getLocalDir(MODULE$._sparkConf());
        }));
        _sparkConf().setIfMissing("spark.sql.legacy.castComplexTypesToString.enabled", "true");
        _sparkConf().setIfMissing("spark.master", "local");
        _sparkConf().set("spark.redaction.regex", new StringBuilder(22).append(_sparkConf().get("spark.redaction.regex", "(?i)secret|password|token|access[.]key")).append("|zookeeper.auth.digest").toString());
        _sparkConf().set("spark.repl.class.outputDir", createTempDir.toFile().getAbsolutePath());
        _sparkConf().setIfMissing("spark.hadoop.mapreduce.input.fileinputformat.list-status.num-threads", "20");
        _sparkConf().setIfMissing("spark.app.name", new StringBuilder(14).append("kyuubi_").append(user()).append("_spark_").append(Instant.now()).toString());
        _sparkConf().setIfMissing("spark.sql.catalogImplementation", KyuubiSparkUtil$.MODULE$.hiveClassesArePresent() ? "hive" : "in-memory");
        kyuubiConf().setIfMissing((ConfigEntry<ConfigEntry<Object>>) KyuubiConf$.MODULE$.FRONTEND_THRIFT_BINARY_BIND_PORT(), (ConfigEntry<Object>) BoxesRunTime.boxToInteger(0));
        kyuubiConf().setIfMissing((ConfigEntry<ConfigEntry<String>>) HighAvailabilityConf$.MODULE$.HA_ZK_CONN_RETRY_POLICY(), (ConfigEntry<String>) RetryPolicies$.MODULE$.N_TIME().toString());
        if (Utils$.MODULE$.isOnK8s()) {
            kyuubiConf().setIfMissing((ConfigEntry<ConfigEntry<Object>>) KyuubiConf$.MODULE$.FRONTEND_CONNECTION_URL_USE_HOSTNAME(), (ConfigEntry<Object>) BoxesRunTime.boxToBoolean(false));
            _sparkConf().setIfMissing("spark.kubernetes.executor.podNamePrefix", generateExecutorPodNamePrefixForK8s(user()));
            if (isOnK8sClusterMode()) {
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            } else {
                _sparkConf().setIfMissing("spark.driver.host", Utils$.MODULE$.findLocalInetAddress().getHostAddress());
            }
        } else {
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
        if (isOnK8sClusterMode()) {
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
        } else {
            _sparkConf().setIfMissing("spark.ui.port", "0");
        }
        new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(_sparkConf().getAllWithPrefix("spark.kyuubi."))).foreach(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            String str = (String) tuple2._1();
            return MODULE$.kyuubiConf().set(new StringBuilder(7).append("kyuubi.").append(str).toString(), (String) tuple2._2());
        });
        if (logger().isDebugEnabled()) {
            kyuubiConf().getAll().foreach(tuple22 -> {
                $anonfun$setupConf$3(tuple22);
                return BoxedUnit.UNIT;
            });
        }
    }

    public SparkSession createSpark() {
        Option<String> option = kyuubiConf().getOption("kyuubi.engine.credentials");
        kyuubiConf().unset("kyuubi.engine.credentials");
        _sparkConf().set(new StringBuilder(31).append("spark.").append("kyuubi.engine.credentials").toString(), "");
        SparkSession orCreate = SparkSession$.MODULE$.builder().config(_sparkConf()).getOrCreate();
        option.filter(str -> {
            return BoxesRunTime.boxToBoolean($anonfun$createSpark$1(str));
        }).foreach(str2 -> {
            $anonfun$createSpark$2(orCreate, str2);
            return BoxedUnit.UNIT;
        });
        KyuubiSparkUtil$.MODULE$.initializeSparkSession(orCreate, (Seq) ((TraversableLike) kyuubiConf().get(KyuubiConf$.MODULE$.ENGINE_SPARK_INITIALIZE_SQL())).$plus$plus((GenTraversableOnce) kyuubiConf().get(KyuubiConf$.MODULE$.ENGINE_SESSION_SPARK_INITIALIZE_SQL()), Seq$.MODULE$.canBuildFrom()));
        orCreate.sparkContext().setLocalProperty("kyuubi.engine.url", KyuubiSparkUtil$.MODULE$.engineUrl());
        return orCreate;
    }

    public void startEngine(SparkSession sparkSession) {
        currentEngine_$eq(new Some(new SparkSQLEngine(sparkSession)));
        currentEngine().foreach(sparkSQLEngine -> {
            $anonfun$startEngine$1(sparkSession, sparkSQLEngine);
            return BoxedUnit.UNIT;
        });
    }

    public void main(String[] strArr) {
        long j;
        long currentTimeMillis = System.currentTimeMillis();
        Option<String> option = kyuubiConf().getOption("kyuubi.engine.submit.time");
        if (option instanceof Some) {
            j = new StringOps(Predef$.MODULE$.augmentString((String) ((Some) option).value())).toLong();
        } else {
            j = currentTimeMillis;
        }
        long j2 = j;
        long unboxToLong = BoxesRunTime.unboxToLong(kyuubiConf().get(KyuubiConf$.MODULE$.ENGINE_INIT_TIMEOUT()));
        long j3 = currentTimeMillis - j2;
        if (j3 > unboxToLong) {
            throw new KyuubiException(new StringBuilder(77).append("The total engine initialization time (").append(j3).append(" ms)").append(" exceeds ").append(KyuubiConf$.MODULE$.ENGINE_INIT_TIMEOUT().key()).append(" (").append(unboxToLong).append(" ms),").append(" and submitted at ").append(j2).append(".").toString(), KyuubiException$.MODULE$.$lessinit$greater$default$2());
        }
        SparkSession sparkSession = null;
        try {
            try {
                startInitTimeoutChecker(j2, unboxToLong);
                SparkSession createSpark = createSpark();
                sparkSessionCreated().set(true);
                try {
                    startEngine(createSpark);
                    org$apache$kyuubi$engine$spark$SparkSQLEngine$$countDownLatch().await();
                    if (createSpark != null) {
                        createSpark.stop();
                    }
                } catch (KyuubiException e) {
                    Option<SparkSQLEngine> currentEngine = currentEngine();
                    if (currentEngine instanceof Some) {
                        SparkSQLEngine sparkSQLEngine = (SparkSQLEngine) ((Some) currentEngine).value();
                        sparkSQLEngine.stop();
                        EngineEvent apply = EngineEvent$.MODULE$.apply(sparkSQLEngine);
                        EngineEvent copy = apply.copy(apply.copy$default$1(), apply.copy$default$2(), apply.copy$default$3(), apply.copy$default$4(), apply.copy$default$5(), apply.copy$default$6(), apply.copy$default$7(), apply.copy$default$8(), apply.copy$default$9(), apply.copy$default$10(), System.currentTimeMillis(), apply.copy$default$12(), e.getMessage(), apply.copy$default$14());
                        EventBus$.MODULE$.post(copy);
                        error(() -> {
                            return copy;
                        }, e);
                        BoxedUnit boxedUnit = BoxedUnit.UNIT;
                    } else {
                        error(() -> {
                            return "Current SparkSQLEngine is not created.";
                        });
                        BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                    }
                    throw e;
                }
            } catch (Throwable th) {
                if (th instanceof InterruptedException) {
                    Throwable th2 = (InterruptedException) th;
                    if (!sparkSessionCreated().get()) {
                        String sb = new StringBuilder(114).append("The Engine main thread was interrupted, possibly due to `createSpark` timeout.").append(" The `").append(KyuubiConf$.MODULE$.ENGINE_INIT_TIMEOUT().key()).append("` is (").append(unboxToLong).append(" ms) ").append(" and submitted at ").append(j2).append(".").toString();
                        error(() -> {
                            return sb;
                        }, th2);
                        throw new InterruptedException(sb);
                    }
                }
                if (th instanceof KyuubiException) {
                    throw ((KyuubiException) th);
                }
                if (th == null) {
                    throw th;
                }
                error(() -> {
                    return new StringBuilder(36).append("Failed to instantiate SparkSession: ").append(th.getMessage()).toString();
                }, th);
                throw th;
            }
        } catch (Throwable th3) {
            if (0 != 0) {
                sparkSession.stop();
            }
            throw th3;
        }
    }

    private void startInitTimeoutChecker(long j, long j2) {
        Thread currentThread = Thread.currentThread();
        Thread thread = new Thread(() -> {
            while (System.currentTimeMillis() - j < j2 && !MODULE$.sparkSessionCreated().get()) {
                Thread.sleep(500L);
            }
            if (MODULE$.sparkSessionCreated().get()) {
                return;
            }
            currentThread.interrupt();
        }, "CreateSparkTimeoutChecker");
        thread.setDaemon(true);
        thread.start();
    }

    private boolean isOnK8sClusterMode() {
        return Utils$.MODULE$.isOnK8s() && package$.MODULE$.env().contains("SPARK_APPLICATION_ID");
    }

    @VisibleForTesting
    public String generateExecutorPodNamePrefixForK8s(String str) {
        String sb = new StringBuilder(8).append("kyuubi-").append(str.trim().toLowerCase(Locale.ROOT).replaceAll("[^a-z0-9\\-]", UsageMessageFormatter.DEFAULT_OPT_PREFIX).replaceAll("-+", UsageMessageFormatter.DEFAULT_OPT_PREFIX).replaceAll("^-", "")).append(UsageMessageFormatter.DEFAULT_OPT_PREFIX).append(Instant.now().toEpochMilli()).toString();
        return sb.length() <= EXECUTOR_POD_NAME_PREFIX_MAX_LENGTH() ? sb : new StringBuilder(7).append("kyuubi-").append(UUID.randomUUID()).toString();
    }

    public SparkSQLEngine apply(SparkSession sparkSession) {
        return new SparkSQLEngine(sparkSession);
    }

    public Option<SparkSession> unapply(SparkSQLEngine sparkSQLEngine) {
        return sparkSQLEngine == null ? None$.MODULE$ : new Some(sparkSQLEngine.spark());
    }

    private Object readResolve() {
        return MODULE$;
    }

    public static final /* synthetic */ void $anonfun$setupConf$3(Tuple2 tuple2) {
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        String str = (String) tuple2._1();
        String str2 = (String) tuple2._2();
        MODULE$.debug(() -> {
            return new StringBuilder(15).append("KyuubiConf: ").append(str).append(" = ").append(str2).toString();
        });
        BoxedUnit boxedUnit = BoxedUnit.UNIT;
    }

    public static final /* synthetic */ boolean $anonfun$createSpark$1(String str) {
        return new StringOps(Predef$.MODULE$.augmentString(str)).nonEmpty();
    }

    public static final /* synthetic */ void $anonfun$createSpark$2(SparkSession sparkSession, String str) {
        SparkTBinaryFrontendService$.MODULE$.renewDelegationToken(sparkSession.sparkContext(), str);
    }

    public static final /* synthetic */ void $anonfun$startEngine$1(SparkSession sparkSession, SparkSQLEngine sparkSQLEngine) {
        try {
            initLoggerEventHandler$1(MODULE$.kyuubiConf(), sparkSession);
        } catch (Throwable th) {
            Option unapply = NonFatal$.MODULE$.unapply(th);
            if (unapply.isEmpty()) {
                throw th;
            }
            Throwable th2 = (Throwable) unapply.get();
            MODULE$.warn(() -> {
                return new StringBuilder(41).append("Failed to initialize LoggerEventHandler: ").append(th2.getMessage()).toString();
            }, th2);
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        }
        try {
            sparkSQLEngine.initialize(MODULE$.kyuubiConf());
            EventBus$.MODULE$.post(EngineEvent$.MODULE$.apply(sparkSQLEngine));
            try {
                sparkSQLEngine.start();
                new EngineTab(new Some(sparkSQLEngine), SparkContextHelper$.MODULE$.getSparkUI(sparkSession.sparkContext()), new EngineEventsStore(SparkContextHelper$.MODULE$.getKvStore(sparkSession.sparkContext())), MODULE$.kyuubiConf());
                EngineEvent apply = EngineEvent$.MODULE$.apply(sparkSQLEngine);
                MODULE$.info(() -> {
                    return apply;
                });
                EventBus$.MODULE$.post(apply);
                Utils$.MODULE$.addShutdownHook(() -> {
                    sparkSQLEngine.stop();
                }, Utils$.MODULE$.SPARK_CONTEXT_SHUTDOWN_PRIORITY() + 2);
            } catch (Throwable th3) {
                throw new KyuubiException(new StringBuilder(32).append("Failed to start SparkSQLEngine: ").append(th3.getMessage()).toString(), th3);
            }
        } catch (Throwable th4) {
            throw new KyuubiException(new StringBuilder(37).append("Failed to initialize SparkSQLEngine: ").append(th4.getMessage()).toString(), th4);
        }
    }

    private static final void initLoggerEventHandler$1(KyuubiConf kyuubiConf, SparkSession sparkSession) {
        new SparkEventHandlerRegister(sparkSession).registerEventLoggers(kyuubiConf);
    }

    private SparkSQLEngine$() {
        MODULE$ = this;
        Logging.$init$(this);
        this.currentEngine = None$.MODULE$;
        this.org$apache$kyuubi$engine$spark$SparkSQLEngine$$countDownLatch = new CountDownLatch(1);
        this.sparkSessionCreated = new AtomicBoolean(false);
        this.EXECUTOR_POD_NAME_PREFIX_MAX_LENGTH = 237;
        SignalRegister$.MODULE$.registerLogger(logger());
        setupConf();
    }
}
