package org.apache.kylin.query.monitor;

import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kylin.common.KylinConfig;
import org.apache.spark.api.java.JavaFutureAction;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparderContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kylin/query/monitor/SparderContextCanary.class */
public class SparderContextCanary {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) SparderContextCanary.class);
    private static volatile boolean isStarted = false;
    private static final int THRESHOLD_TO_RESTART_SPARK = KylinConfig.getInstanceFromEnv().getThresholdToRestartSparder();
    private static final int PERIOD_MINUTES = KylinConfig.getInstanceFromEnv().getSparderCanaryPeriodMinutes();
    private static volatile int errorAccumulated = 0;
    private static volatile long lastResponseTime = -1;
    private static volatile boolean sparderRestarting = false;

    private SparderContextCanary() {
    }

    public static int getErrorAccumulated() {
        return errorAccumulated;
    }

    public long getLastResponseTime() {
        return lastResponseTime;
    }

    public boolean isSparderRestarting() {
        return sparderRestarting;
    }

    public static void init() {
        if (isStarted) {
            return;
        }
        synchronized (SparderContextCanary.class) {
            if (!isStarted) {
                isStarted = true;
                logger.info("Start monitoring Sparder");
                Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(SparderContextCanary::monitor, PERIOD_MINUTES, PERIOD_MINUTES, TimeUnit.MINUTES);
            }
        }
    }

    public static boolean isError() {
        return errorAccumulated >= THRESHOLD_TO_RESTART_SPARK;
    }

    public static void monitor() {
        try {
            long currentTimeMillis = System.currentTimeMillis();
            if (SparderContext.isSparkAvailable()) {
                try {
                    try {
                        JavaSparkContext fromSparkContext = JavaSparkContext.fromSparkContext(SparderContext.getOriginalSparkSession().sparkContext());
                        fromSparkContext.setLocalProperty("spark.scheduler.pool", "vip_tasks");
                        logger.info("SparderContextCanary numberCount returned successfully with value {}, takes {} ms.", Long.valueOf(((Long) numberCount(fromSparkContext).get(KylinConfig.getInstanceFromEnv().getSparderCanaryErrorResponseMs(), TimeUnit.MILLISECONDS)).longValue()), Long.valueOf(System.currentTimeMillis() - System.currentTimeMillis()));
                        errorAccumulated = 0;
                    } catch (Exception e) {
                        errorAccumulated++;
                        logger.error("SparderContextCanary numberCount occurs exception.", (Throwable) e);
                    }
                } catch (ExecutionException e2) {
                    logger.error("SparderContextCanary numberCount occurs exception, need to restart immediately.", (Throwable) e2);
                    errorAccumulated = Math.max(errorAccumulated + 1, THRESHOLD_TO_RESTART_SPARK);
                } catch (TimeoutException e3) {
                    errorAccumulated++;
                    logger.error("SparderContextCanary numberCount timeout, didn't return in {} ms, error {} times.", Integer.valueOf(KylinConfig.getInstanceFromEnv().getSparderCanaryErrorResponseMs()), Integer.valueOf(errorAccumulated));
                }
            } else {
                logger.info("Sparder is unavailable, need to restart immediately.");
                errorAccumulated = Math.max(errorAccumulated + 1, THRESHOLD_TO_RESTART_SPARK);
            }
            lastResponseTime = System.currentTimeMillis() - currentTimeMillis;
            logger.debug("Sparder context errorAccumulated:{}", Integer.valueOf(errorAccumulated));
            if (isError()) {
                sparderRestarting = true;
                try {
                    logger.warn("Repairing sparder context");
                    SparderContext.restartSpark();
                } catch (Throwable th) {
                    logger.error("Restart sparder context failed.", th);
                }
                sparderRestarting = false;
            }
        } catch (Throwable th2) {
            logger.error("Error when monitoring Sparder.", th2);
        }
    }

    private static JavaFutureAction<Long> numberCount(JavaSparkContext javaSparkContext) {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 10; i++) {
            arrayList.add(Integer.valueOf(i));
        }
        return javaSparkContext.parallelize(arrayList, 1).countAsync();
    }
}
