package org.apache.linkis.orchestrator.computation.monitor;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.linkis.common.ServiceInstance;
import org.apache.linkis.common.conf.TimeType;
import org.apache.linkis.common.utils.Logging;
import org.apache.linkis.common.utils.Utils$;
import org.apache.linkis.governance.common.conf.GovernanceCommonConf$;
import org.apache.linkis.governance.common.entity.NodeExistStatus;
import org.apache.linkis.governance.common.protocol.engineconn.RequestEngineStatusBatch;
import org.apache.linkis.governance.common.protocol.engineconn.ResponseEngineStatusBatch;
import org.apache.linkis.governance.common.utils.GovernanceConstant$;
import org.apache.linkis.orchestrator.computation.conf.ComputationOrchestratorConf$;
import org.apache.linkis.orchestrator.computation.execute.CodeExecTaskExecutor;
import org.apache.linkis.orchestrator.computation.execute.EngineConnTaskInfo;
import org.apache.linkis.orchestrator.listener.task.EngineQuitedUnexpectedlyEvent;
import org.apache.linkis.orchestrator.plans.physical.ExecTask;
import org.apache.linkis.rpc.Sender$;
import org.apache.linkis.server.BDPJettyServerHelper$;
import org.apache.linkis.server.package$;
import org.slf4j.Logger;
import scala.Function0;
import scala.MatchError;
import scala.Option;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.IterableLike;
import scala.collection.JavaConverters$;
import scala.collection.TraversableOnce;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.ArrayBuffer$;
import scala.collection.mutable.HashMap;
import scala.collection.mutable.ResizableArray;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: EngineConnMonitor.scala */
/* loaded from: input_file:org/apache/linkis/orchestrator/computation/monitor/EngineConnMonitor$.class */
public final class EngineConnMonitor$ implements Logging {
    public static EngineConnMonitor$ MODULE$;
    private final long org$apache$linkis$orchestrator$computation$monitor$EngineConnMonitor$$ENGINECONN_LASTUPDATE_TIMEOUT;
    private Logger logger;
    private volatile boolean bitmap$0;

    static {
        new EngineConnMonitor$();
    }

    public void trace(Function0<String> function0) {
        Logging.trace$(this, function0);
    }

    public void debug(Function0<String> function0) {
        Logging.debug$(this, function0);
    }

    public void info(Function0<String> function0) {
        Logging.info$(this, function0);
    }

    public void info(Function0<String> function0, Throwable th) {
        Logging.info$(this, function0, th);
    }

    public void warn(Function0<String> function0) {
        Logging.warn$(this, function0);
    }

    public void warn(Function0<String> function0, Throwable th) {
        Logging.warn$(this, function0, th);
    }

    public void error(Function0<String> function0, Throwable th) {
        Logging.error$(this, function0, th);
    }

    public void error(Function0<String> function0) {
        Logging.error$(this, function0);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v8, types: [org.apache.linkis.orchestrator.computation.monitor.EngineConnMonitor$] */
    private Logger logger$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (!this.bitmap$0) {
                this.logger = Logging.logger$(this);
                r0 = this;
                r0.bitmap$0 = true;
            }
        }
        return this.logger;
    }

    public Logger logger() {
        return !this.bitmap$0 ? logger$lzycompute() : this.logger;
    }

    public long org$apache$linkis$orchestrator$computation$monitor$EngineConnMonitor$$ENGINECONN_LASTUPDATE_TIMEOUT() {
        return this.org$apache$linkis$orchestrator$computation$monitor$EngineConnMonitor$$ENGINECONN_LASTUPDATE_TIMEOUT;
    }

    public void addEngineExecutorStatusMonitor(final Map<EngineConnTaskInfo, CodeExecTaskExecutor> map) {
        Utils$.MODULE$.defaultScheduler().scheduleWithFixedDelay(new Runnable(map) { // from class: org.apache.linkis.orchestrator.computation.monitor.EngineConnMonitor$$anon$1
            private final Map engineConnExecutorCache$1;

            @Override // java.lang.Runnable
            public void run() {
                Utils$.MODULE$.tryAndWarn(() -> {
                    long currentTimeMillis = System.currentTimeMillis();
                    HashMap<ServiceInstance, ArrayBuffer<CodeExecTaskExecutor>> hashMap = new HashMap<>();
                    ArrayBuffer arrayBuffer = new ArrayBuffer();
                    arrayBuffer.appendAll((TraversableOnce) JavaConverters$.MODULE$.collectionAsScalaIterableConverter(this.engineConnExecutorCache$1.values()).asScala());
                    ((ResizableArray) arrayBuffer.filter(codeExecTaskExecutor -> {
                        return BoxesRunTime.boxToBoolean($anonfun$run$2(currentTimeMillis, codeExecTaskExecutor));
                    })).foreach(codeExecTaskExecutor2 -> {
                        $anonfun$run$3(hashMap, codeExecTaskExecutor2);
                        return BoxedUnit.UNIT;
                    });
                    if (hashMap.nonEmpty()) {
                        EngineConnMonitor$.MODULE$.logger().info("There are {} unActivity engineConn.", BoxesRunTime.boxToInteger(hashMap.size()));
                        if (hashMap.size() > GovernanceConstant$.MODULE$.REQUEST_ENGINE_STATUS_BATCH_LIMIT()) {
                            EngineConnMonitor$.MODULE$.org$apache$linkis$orchestrator$computation$monitor$EngineConnMonitor$$queryEngineStatusAndHandle(hashMap, (List) JavaConverters$.MODULE$.seqAsJavaListConverter(hashMap.keys().toList()).asJava());
                        } else {
                            ArrayList arrayList = new ArrayList();
                            hashMap.keys().foreach(serviceInstance -> {
                                $anonfun$run$5(arrayList, hashMap, serviceInstance);
                                return BoxedUnit.UNIT;
                            });
                            if (!arrayList.isEmpty()) {
                                EngineConnMonitor$.MODULE$.org$apache$linkis$orchestrator$computation$monitor$EngineConnMonitor$$queryEngineStatusAndHandle(hashMap, arrayList);
                                arrayList.clear();
                            }
                        }
                    }
                    if (System.currentTimeMillis() - currentTimeMillis >= ((TimeType) ComputationOrchestratorConf$.MODULE$.ENGINECONN_ACTIVITY_MONITOR_INTERVAL().getValue()).toLong()) {
                        EngineConnMonitor$.MODULE$.logger().warn("Query engines status costs longer time than query task interval, you should increase interval.");
                    }
                }, EngineConnMonitor$.MODULE$.logger());
            }

            public static final /* synthetic */ boolean $anonfun$run$2(long j, CodeExecTaskExecutor codeExecTaskExecutor) {
                return j - codeExecTaskExecutor.getEngineConnExecutor().getLastUpdateTime() > EngineConnMonitor$.MODULE$.org$apache$linkis$orchestrator$computation$monitor$EngineConnMonitor$$ENGINECONN_LASTUPDATE_TIMEOUT();
            }

            public static final /* synthetic */ void $anonfun$run$3(HashMap hashMap, CodeExecTaskExecutor codeExecTaskExecutor) {
                ((ArrayBuffer) hashMap.getOrElseUpdate(codeExecTaskExecutor.getEngineConnExecutor().getServiceInstance(), () -> {
                    return new ArrayBuffer();
                })).append(Predef$.MODULE$.wrapRefArray(new CodeExecTaskExecutor[]{codeExecTaskExecutor}));
            }

            public static final /* synthetic */ void $anonfun$run$5(ArrayList arrayList, HashMap hashMap, ServiceInstance serviceInstance) {
                arrayList.add(serviceInstance);
                if (arrayList.size() >= GovernanceConstant$.MODULE$.REQUEST_ENGINE_STATUS_BATCH_LIMIT()) {
                    EngineConnMonitor$.MODULE$.org$apache$linkis$orchestrator$computation$monitor$EngineConnMonitor$$queryEngineStatusAndHandle(hashMap, arrayList);
                    arrayList.clear();
                }
            }

            {
                this.engineConnExecutorCache$1 = map;
            }
        }, 10000L, ((TimeType) ComputationOrchestratorConf$.MODULE$.ENGINECONN_ACTIVITY_MONITOR_INTERVAL().getValue()).toLong(), TimeUnit.MILLISECONDS);
        logger().info("Entrance engineStatusMonitor inited.");
    }

    public void org$apache$linkis$orchestrator$computation$monitor$EngineConnMonitor$$queryEngineStatusAndHandle(HashMap<ServiceInstance, ArrayBuffer<CodeExecTaskExecutor>> hashMap, List<ServiceInstance> list) {
        RequestEngineStatusBatch requestEngineStatusBatch = new RequestEngineStatusBatch(list);
        Utils$.MODULE$.tryAndError(() -> {
            Object ask = Sender$.MODULE$.getSender((String) GovernanceCommonConf$.MODULE$.MANAGER_SERVICE_NAME().getValue()).ask(requestEngineStatusBatch);
            if (!(ask instanceof ResponseEngineStatusBatch)) {
                MODULE$.logger().warn("Invalid response. request : {}", new Object[]{BDPJettyServerHelper$.MODULE$.gson().toJson(requestEngineStatusBatch)});
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
                return;
            }
            ResponseEngineStatusBatch responseEngineStatusBatch = (ResponseEngineStatusBatch) ask;
            if (responseEngineStatusBatch.msg() != null) {
                MODULE$.logger().info("ResponseEngineStatusBatch msg: {}", new Object[]{responseEngineStatusBatch.msg()});
            }
            if (responseEngineStatusBatch.engineStatus().size() != requestEngineStatusBatch.engineList().size()) {
                MODULE$.logger().warn("ResponseEngineStatusBatch engines size: {} is not equal request: {}.", BoxesRunTime.boxToInteger(responseEngineStatusBatch.engineStatus().size()), BoxesRunTime.boxToInteger(requestEngineStatusBatch.engineList().size()));
                ArrayBuffer arrayBuffer = new ArrayBuffer();
                ((IterableLike) JavaConverters$.MODULE$.asScalaBufferConverter(requestEngineStatusBatch.engineList()).asScala()).foreach(serviceInstance -> {
                    if (responseEngineStatusBatch.engineStatus().containsKey(serviceInstance)) {
                        return BoxedUnit.UNIT;
                    }
                    responseEngineStatusBatch.engineStatus().put(serviceInstance, NodeExistStatus.Unknown);
                    return arrayBuffer.$plus$eq(serviceInstance);
                });
                MODULE$.logger().warn("These engine instances cannot be found in manager : {}", new Object[]{((TraversableOnce) arrayBuffer.map(serviceInstance2 -> {
                    return serviceInstance2.getInstance();
                }, ArrayBuffer$.MODULE$.canBuildFrom())).mkString(",")});
            }
            ((IterableLike) JavaConverters$.MODULE$.mapAsScalaMapConverter(responseEngineStatusBatch.engineStatus()).asScala()).foreach(tuple2 -> {
                $anonfun$queryEngineStatusAndHandle$4(hashMap, tuple2);
                return BoxedUnit.UNIT;
            });
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }, logger());
    }

    private void dealWithEngineStatus(Tuple2<ServiceInstance, NodeExistStatus> tuple2, HashMap<ServiceInstance, ArrayBuffer<CodeExecTaskExecutor>> hashMap) {
        NodeExistStatus nodeExistStatus = (NodeExistStatus) tuple2._2();
        if (NodeExistStatus.UnExist.equals(nodeExistStatus)) {
            logger().warn("Engine {} is Failed, now go to clear its task.", new Object[]{tuple2._1()});
            killTask(hashMap.get(tuple2._1()));
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            if (!(NodeExistStatus.Exist.equals(nodeExistStatus) ? true : NodeExistStatus.Unknown.equals(nodeExistStatus))) {
                throw new MatchError(nodeExistStatus);
            }
            if (((ArrayBuffer) package$.MODULE$.toJavaMap(hashMap).getOrDefault(tuple2._1(), null)) != null) {
            } else {
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            }
        }
    }

    private void killTask(Option<ArrayBuffer<CodeExecTaskExecutor>> option) {
        if (option.isEmpty()) {
            logger().error("executor is not Defined");
        } else {
            ((ArrayBuffer) option.get()).foreach(codeExecTaskExecutor -> {
                $anonfun$killTask$1(codeExecTaskExecutor);
                return BoxedUnit.UNIT;
            });
        }
    }

    private void updateExecutorActivityTime(ServiceInstance serviceInstance, HashMap<ServiceInstance, ArrayBuffer<CodeExecTaskExecutor>> hashMap) {
        if (serviceInstance != null) {
            ArrayBuffer arrayBuffer = (ArrayBuffer) package$.MODULE$.toJavaMap(hashMap).getOrDefault(serviceInstance, null);
            if (arrayBuffer != null) {
                arrayBuffer.foreach(codeExecTaskExecutor -> {
                    $anonfun$updateExecutorActivityTime$1(serviceInstance, codeExecTaskExecutor);
                    return BoxedUnit.UNIT;
                });
            } else {
                logger().warn("EngineConnExecutor {} cannot be found in engineConnExecutorCache", new Object[]{serviceInstance.toString()});
            }
        }
    }

    public static final /* synthetic */ void $anonfun$queryEngineStatusAndHandle$4(HashMap hashMap, Tuple2 tuple2) {
        MODULE$.dealWithEngineStatus(tuple2, hashMap);
    }

    public static final /* synthetic */ void $anonfun$dealWithEngineStatus$2(Tuple2 tuple2, HashMap hashMap, Throwable th) {
        MODULE$.logger().error(new StringBuilder(56).append("Failed to get status of engineConn : ").append(tuple2._1()).append(", now end the job. ").toString(), th);
        MODULE$.killTask(hashMap.get(tuple2._1()));
    }

    public static final /* synthetic */ void $anonfun$killTask$1(CodeExecTaskExecutor codeExecTaskExecutor) {
        ExecTask execTask = codeExecTaskExecutor.getExecTask();
        Utils$.MODULE$.tryAndError(() -> {
            MODULE$.logger().warn(new StringBuilder(56).append("Will kill task ").append(execTask.getIDInfo()).append(" because the engine ").append(codeExecTaskExecutor.getEngineConnExecutor().getServiceInstance().toString()).append(" quited unexpectedly.").toString());
            execTask.getPhysicalContext().broadcastSyncEvent(new EngineQuitedUnexpectedlyEvent(execTask, codeExecTaskExecutor.getEngineConnExecutor().getServiceInstance().toString()));
        }, MODULE$.logger());
    }

    public static final /* synthetic */ void $anonfun$updateExecutorActivityTime$1(ServiceInstance serviceInstance, CodeExecTaskExecutor codeExecTaskExecutor) {
        if (codeExecTaskExecutor.getEngineConnExecutor().getServiceInstance().equals(serviceInstance)) {
            codeExecTaskExecutor.getEngineConnExecutor().updateLastUpdateTime();
        }
    }

    private EngineConnMonitor$() {
        MODULE$ = this;
        Logging.$init$(this);
        this.org$apache$linkis$orchestrator$computation$monitor$EngineConnMonitor$$ENGINECONN_LASTUPDATE_TIMEOUT = ((TimeType) ComputationOrchestratorConf$.MODULE$.ENGINECONN_LASTUPDATE_TIMEOUT().getValue()).toLong();
    }
}
