/*
 * Decompiled with CFR 0.152.
 */
package org.apache.linkis.orchestrator.computation.monitor;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.linkis.common.ServiceInstance;
import org.apache.linkis.common.conf.TimeType;
import org.apache.linkis.common.utils.Logging;
import org.apache.linkis.common.utils.Utils$;
import org.apache.linkis.governance.common.conf.GovernanceCommonConf$;
import org.apache.linkis.governance.common.entity.NodeExistStatus;
import org.apache.linkis.governance.common.protocol.engineconn.RequestEngineStatusBatch;
import org.apache.linkis.governance.common.protocol.engineconn.ResponseEngineStatusBatch;
import org.apache.linkis.governance.common.utils.GovernanceConstant$;
import org.apache.linkis.manager.common.entity.enumeration.NodeStatus;
import org.apache.linkis.manager.common.protocol.node.RequestNodeStatus;
import org.apache.linkis.manager.common.protocol.node.ResponseNodeStatus;
import org.apache.linkis.orchestrator.computation.conf.ComputationOrchestratorConf$;
import org.apache.linkis.orchestrator.computation.execute.CodeExecTaskExecutor;
import org.apache.linkis.orchestrator.computation.monitor.EngineConnMonitor$;
import org.apache.linkis.orchestrator.computation.monitor.EngineConnMonitor$$anon$1$;
import org.apache.linkis.orchestrator.computation.monitor.EngineConnMonitor$$anon$1$$anonfun$run$1$;
import org.apache.linkis.orchestrator.computation.monitor.EngineConnMonitor$$anonfun$org$apache$linkis$orchestrator$computation$monitor$EngineConnMonitor$;
import org.apache.linkis.orchestrator.ecm.service.EngineConnExecutor;
import org.apache.linkis.rpc.Sender$;
import org.apache.linkis.server.BDPJettyServerHelper$;
import org.apache.linkis.server.package$;
import org.slf4j.Logger;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.Predef$;
import scala.Serializable;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.IterableLike;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.TraversableOnce;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.ArrayBuffer$;
import scala.collection.mutable.HashMap;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

public final class EngineConnMonitor$
implements Logging {
    public static final EngineConnMonitor$ MODULE$;
    private final Logger logger;
    private volatile boolean bitmap$0;

    static {
        new EngineConnMonitor$();
    }

    private Logger logger$lzycompute() {
        EngineConnMonitor$ engineConnMonitor$ = this;
        synchronized (engineConnMonitor$) {
            if (!this.bitmap$0) {
                this.logger = Logging.class.logger((Logging)this);
                this.bitmap$0 = true;
            }
            return this.logger;
        }
    }

    public Logger logger() {
        return this.bitmap$0 ? this.logger : this.logger$lzycompute();
    }

    public void trace(Function0<String> message) {
        Logging.class.trace((Logging)this, message);
    }

    public void debug(Function0<String> message) {
        Logging.class.debug((Logging)this, message);
    }

    public void info(Function0<String> message) {
        Logging.class.info((Logging)this, message);
    }

    public void info(Function0<String> message, Throwable t) {
        Logging.class.info((Logging)this, message, (Throwable)t);
    }

    public void warn(Function0<String> message) {
        Logging.class.warn((Logging)this, message);
    }

    public void warn(Function0<String> message, Throwable t) {
        Logging.class.warn((Logging)this, message, (Throwable)t);
    }

    public void error(Function0<String> message, Throwable t) {
        Logging.class.error((Logging)this, message, (Throwable)t);
    }

    public void error(Function0<String> message) {
        Logging.class.error((Logging)this, message);
    }

    public void addEngineExecutorStatusMonitor(HashMap<ServiceInstance, CodeExecTaskExecutor[]> engineConnExecutorCache, Function1<ServiceInstance, BoxedUnit> endJobByEngineInstance) {
        Runnable task = new Runnable(engineConnExecutorCache, endJobByEngineInstance){
            public final HashMap engineConnExecutorCache$1;
            public final Function1 endJobByEngineInstance$1;

            public void run() {
                Utils$.MODULE$.tryAndWarn((Function0)new Serializable(this){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ anon.1 $outer;

                    public final void apply() {
                        this.apply$mcV$sp();
                    }

                    public void apply$mcV$sp() {
                        long endTime;
                        long startTime = System.currentTimeMillis();
                        HashMap engineExecutorCache = this.$outer.engineConnExecutorCache$1;
                        Set<E> unActivityEngines = Collections.synchronizedSet(new HashSet<E>());
                        engineExecutorCache.values().foreach((Function1)new Serializable(this, startTime, unActivityEngines){
                            public static final long serialVersionUID = 0L;
                            public final long startTime$1;
                            public final Set unActivityEngines$1;

                            public final void apply(CodeExecTaskExecutor[] taskExecutors) {
                                Predef$.MODULE$.refArrayOps((Object[])taskExecutors).foreach((Function1)new Serializable(this){
                                    public static final long serialVersionUID = 0L;
                                    private final /* synthetic */ anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$2 $outer;

                                    public final Object apply(CodeExecTaskExecutor executor) {
                                        return this.$outer.startTime$1 - executor.getEngineConnExecutor().getLastUpdateTime() > ((TimeType)ComputationOrchestratorConf$.MODULE$.ENGINECONN_LASTUPDATE_TIMEOUT().getValue()).toLong() ? BoxesRunTime.boxToBoolean((boolean)this.$outer.unActivityEngines$1.add(executor.getEngineConnExecutor())) : BoxedUnit.UNIT;
                                    }
                                    {
                                        if ($outer == null) {
                                            throw null;
                                        }
                                        this.$outer = $outer;
                                    }
                                });
                            }
                            {
                                this.startTime$1 = startTime$1;
                                this.unActivityEngines$1 = unActivityEngines$1;
                            }
                        });
                        if (unActivityEngines != null && !unActivityEngines.isEmpty()) {
                            EngineConnMonitor$.MODULE$.logger().info(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"There are ", " unActivity engines."})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)unActivityEngines.size())})));
                            ArrayList<ServiceInstance> engineList = new ArrayList<ServiceInstance>();
                            ((IterableLike)JavaConverters$.MODULE$.asScalaSetConverter(unActivityEngines).asScala()).foreach((Function1)new Serializable(this, engineList){
                                public static final long serialVersionUID = 0L;
                                private final /* synthetic */ anon$1$$anonfun$run$1 $outer;
                                private final ArrayList engineList$1;

                                public final void apply(EngineConnExecutor engine) {
                                    this.engineList$1.add(engine.getServiceInstance());
                                    if (this.engineList$1.size() >= GovernanceConstant$.MODULE$.REQUEST_ENGINE_STATUS_BATCH_LIMIT()) {
                                        EngineConnMonitor$.MODULE$.org$apache$linkis$orchestrator$computation$monitor$EngineConnMonitor$$queryEngineStatusAndHandle((HashMap<ServiceInstance, CodeExecTaskExecutor[]>)this.$outer.org$apache$linkis$orchestrator$computation$monitor$EngineConnMonitor$$anon$$anonfun$$$outer().engineConnExecutorCache$1, this.engineList$1, (Function1<ServiceInstance, BoxedUnit>)this.$outer.org$apache$linkis$orchestrator$computation$monitor$EngineConnMonitor$$anon$$anonfun$$$outer().endJobByEngineInstance$1);
                                        this.engineList$1.clear();
                                    }
                                }
                                {
                                    if ($outer == null) {
                                        throw null;
                                    }
                                    this.$outer = $outer;
                                    this.engineList$1 = engineList$1;
                                }
                            });
                            if (!engineList.isEmpty()) {
                                EngineConnMonitor$.MODULE$.org$apache$linkis$orchestrator$computation$monitor$EngineConnMonitor$$queryEngineStatusAndHandle((HashMap<ServiceInstance, CodeExecTaskExecutor[]>)this.$outer.engineConnExecutorCache$1, engineList, (Function1<ServiceInstance, BoxedUnit>)this.$outer.endJobByEngineInstance$1);
                                engineList.clear();
                            }
                        }
                        if ((endTime = System.currentTimeMillis()) - startTime >= ((TimeType)ComputationOrchestratorConf$.MODULE$.ENGINECONN_ACTIVITY_MONITOR_INTERVAL().getValue()).toLong()) {
                            EngineConnMonitor$.MODULE$.logger().warn("Query engines status costs longer time than query task interval, you should increase interval.");
                        }
                    }

                    public /* synthetic */ anon.1 org$apache$linkis$orchestrator$computation$monitor$EngineConnMonitor$$anon$$anonfun$$$outer() {
                        return this.$outer;
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                    }
                }, EngineConnMonitor$.MODULE$.logger());
            }
            {
                this.engineConnExecutorCache$1 = engineConnExecutorCache$1;
                this.endJobByEngineInstance$1 = endJobByEngineInstance$1;
            }
        };
        Utils$.MODULE$.defaultScheduler().scheduleWithFixedDelay(task, 10000L, ((TimeType)ComputationOrchestratorConf$.MODULE$.ENGINECONN_ACTIVITY_MONITOR_INTERVAL().getValue()).toLong(), TimeUnit.MILLISECONDS);
        this.logger().info("Entrance engineStatusMonitor inited.");
    }

    public void org$apache$linkis$orchestrator$computation$monitor$EngineConnMonitor$$queryEngineStatusAndHandle(HashMap<ServiceInstance, CodeExecTaskExecutor[]> engineConnExecutorCache, List<ServiceInstance> engineList, Function1<ServiceInstance, BoxedUnit> endJobByEngineInstance) {
        RequestEngineStatusBatch requestEngineStatus = new RequestEngineStatusBatch(engineList);
        Utils$.MODULE$.tryAndError((Function0)new Serializable(engineConnExecutorCache, endJobByEngineInstance, requestEngineStatus){
            public static final long serialVersionUID = 0L;
            public final HashMap engineConnExecutorCache$2;
            public final Function1 endJobByEngineInstance$2;
            private final RequestEngineStatusBatch requestEngineStatus$1;

            public final void apply() {
                this.apply$mcV$sp();
            }

            public void apply$mcV$sp() {
                Object object = Sender$.MODULE$.getSender((String)GovernanceCommonConf$.MODULE$.MANAGER_SPRING_NAME().getValue()).ask((Object)this.requestEngineStatus$1);
                if (object instanceof ResponseEngineStatusBatch) {
                    ResponseEngineStatusBatch responseEngineStatusBatch = (ResponseEngineStatusBatch)object;
                    if (responseEngineStatusBatch.msg() != null) {
                        EngineConnMonitor$.MODULE$.logger().info(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"ResponseEngineStatusBatch msg : ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{responseEngineStatusBatch.msg()})));
                    }
                    if (responseEngineStatusBatch.engineStatus().size() != this.requestEngineStatus$1.engineList().size()) {
                        EngineConnMonitor$.MODULE$.logger().error(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"ResponseEngineStatusBatch engines size : ", " is not euqal requet : ", "."})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)responseEngineStatusBatch.engineStatus().size()), BoxesRunTime.boxToInteger((int)this.requestEngineStatus$1.engineList().size())})));
                        ArrayBuffer unKnownEngines = new ArrayBuffer();
                        ((IterableLike)JavaConverters$.MODULE$.asScalaBufferConverter(this.requestEngineStatus$1.engineList()).asScala()).foreach((Function1)new Serializable(this, unKnownEngines, responseEngineStatusBatch){
                            public static final long serialVersionUID = 0L;
                            private final ArrayBuffer unKnownEngines$1;
                            private final ResponseEngineStatusBatch x2$1;

                            public final Object apply(ServiceInstance instance) {
                                BoxedUnit boxedUnit;
                                if (this.x2$1.engineStatus().containsKey(instance)) {
                                    boxedUnit = BoxedUnit.UNIT;
                                } else {
                                    this.x2$1.engineStatus().put(instance, NodeExistStatus.Unknown);
                                    boxedUnit = this.unKnownEngines$1.$plus$eq((Object)instance);
                                }
                                return boxedUnit;
                            }
                            {
                                this.unKnownEngines$1 = unKnownEngines$1;
                                this.x2$1 = x2$1;
                            }
                        });
                        String instances = ((TraversableOnce)unKnownEngines.map((Function1)new Serializable(this){
                            public static final long serialVersionUID = 0L;

                            public final String apply(ServiceInstance x$1) {
                                return x$1.getInstance();
                            }
                        }, ArrayBuffer$.MODULE$.canBuildFrom())).mkString(",");
                        EngineConnMonitor$.MODULE$.logger().warn(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"These engine instances cannot be found in manager : ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{instances})));
                    }
                    ((IterableLike)JavaConverters$.MODULE$.mapAsScalaMapConverter(responseEngineStatusBatch.engineStatus()).asScala()).foreach((Function1)new Serializable(this){
                        public static final long serialVersionUID = 0L;
                        private final /* synthetic */ anonfun$org$apache$linkis$orchestrator$computation$monitor$EngineConnMonitor$$queryEngineStatusAndHandle$1 $outer;

                        public final void apply(Tuple2<ServiceInstance, NodeExistStatus> status) {
                            EngineConnMonitor$.MODULE$.org$apache$linkis$orchestrator$computation$monitor$EngineConnMonitor$$dealWithEngineStatus(status, (HashMap<ServiceInstance, CodeExecTaskExecutor[]>)this.$outer.engineConnExecutorCache$2, (Function1<ServiceInstance, BoxedUnit>)this.$outer.endJobByEngineInstance$2);
                        }
                        {
                            if ($outer == null) {
                                throw null;
                            }
                            this.$outer = $outer;
                        }
                    });
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                } else {
                    EngineConnMonitor$.MODULE$.logger().error(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Invalid response. request : ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BDPJettyServerHelper$.MODULE$.gson().toJson((Object)this.requestEngineStatus$1)})));
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                }
            }
            {
                this.engineConnExecutorCache$2 = engineConnExecutorCache$2;
                this.endJobByEngineInstance$2 = endJobByEngineInstance$2;
                this.requestEngineStatus$1 = requestEngineStatus$1;
            }
        }, this.logger());
    }

    public void org$apache$linkis$orchestrator$computation$monitor$EngineConnMonitor$$dealWithEngineStatus(Tuple2<ServiceInstance, NodeExistStatus> status, HashMap<ServiceInstance, CodeExecTaskExecutor[]> engineConnExecutorCache, Function1<ServiceInstance, BoxedUnit> endJobByEngineInstance) {
        NodeExistStatus nodeExistStatus;
        block4: {
            block3: {
                block2: {
                    nodeExistStatus = (NodeExistStatus)status._2();
                    if (!NodeExistStatus.UnExist.equals(nodeExistStatus)) break block2;
                    this.logger().warn(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Engine ", " is Failed, now go to clear its task."})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{status._1()})));
                    BoxedUnit boxedUnit = (BoxedUnit)endJobByEngineInstance.apply(status._1());
                    break block3;
                }
                boolean bl = NodeExistStatus.Exist.equals(nodeExistStatus) ? true : NodeExistStatus.Unknown.equals(nodeExistStatus);
                if (!bl) break block4;
                CodeExecTaskExecutor[] engineConnExecutor = package$.MODULE$.toJavaMap(engineConnExecutorCache).getOrDefault(status._1(), null);
                BoxedUnit boxedUnit = engineConnExecutor == null ? BoxedUnit.UNIT : (BoxedUnit)Utils$.MODULE$.tryCatch((Function0)new Serializable(status, engineConnExecutorCache, endJobByEngineInstance){
                    public static final long serialVersionUID = 0L;
                    private final Tuple2 status$1;
                    private final HashMap engineConnExecutorCache$3;
                    private final Function1 endJobByEngineInstance$3;

                    public final void apply() {
                        this.apply$mcV$sp();
                    }

                    public void apply$mcV$sp() {
                        Object object;
                        block8: {
                            block7: {
                                RequestNodeStatus requestNodeStatus;
                                block6: {
                                    BoxedUnit boxedUnit;
                                    requestNodeStatus = new RequestNodeStatus();
                                    object = Sender$.MODULE$.getSender((ServiceInstance)this.status$1._1()).ask((Object)requestNodeStatus);
                                    if (!(object instanceof ResponseNodeStatus)) break block6;
                                    ResponseNodeStatus responseNodeStatus = (ResponseNodeStatus)object;
                                    if (Predef$.MODULE$.Boolean2boolean(NodeStatus.isCompleted((NodeStatus)responseNodeStatus.getNodeStatus()))) {
                                        boxedUnit = (BoxedUnit)this.endJobByEngineInstance$3.apply(this.status$1._1());
                                    } else {
                                        if (EngineConnMonitor$.MODULE$.logger().isDebugEnabled()) {
                                            EngineConnMonitor$.MODULE$.logger().debug(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Will update engineConnExecutor(", ") lastupdated time"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.status$1._1()})));
                                        }
                                        EngineConnMonitor$.MODULE$.org$apache$linkis$orchestrator$computation$monitor$EngineConnMonitor$$updateExecutorActivityTime((ServiceInstance)this.status$1._1(), (HashMap<ServiceInstance, CodeExecTaskExecutor[]>)this.engineConnExecutorCache$3);
                                        boxedUnit = BoxedUnit.UNIT;
                                    }
                                    BoxedUnit boxedUnit2 = boxedUnit;
                                    break block7;
                                }
                                if (!(object instanceof Object)) break block8;
                                Object object2 = object;
                                BoxedUnit boxedUnit = (BoxedUnit)Utils$.MODULE$.tryAndWarn((Function0)new Serializable(this, requestNodeStatus, object2){
                                    public static final long serialVersionUID = 0L;
                                    private final RequestNodeStatus requestNodeStatus$1;
                                    private final Object x3$1;

                                    public final void apply() {
                                        this.apply$mcV$sp();
                                    }

                                    public void apply$mcV$sp() {
                                        EngineConnMonitor$.MODULE$.logger().warn(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Unknown response : ", " for request : ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BDPJettyServerHelper$.MODULE$.gson().toJson(this.x3$1), BDPJettyServerHelper$.MODULE$.gson().toJson((Object)this.requestNodeStatus$1)})));
                                    }
                                    {
                                        this.requestNodeStatus$1 = requestNodeStatus$1;
                                        this.x3$1 = x3$1;
                                    }
                                }, EngineConnMonitor$.MODULE$.logger());
                            }
                            return;
                        }
                        throw new MatchError(object);
                    }
                    {
                        this.status$1 = status$1;
                        this.engineConnExecutorCache$3 = engineConnExecutorCache$3;
                        this.endJobByEngineInstance$3 = endJobByEngineInstance$3;
                    }
                }, (Function1)new Serializable(status, endJobByEngineInstance){
                    public static final long serialVersionUID = 0L;
                    private final Tuple2 status$1;
                    private final Function1 endJobByEngineInstance$3;

                    public final void apply(Throwable x0$1) {
                        Throwable throwable = x0$1;
                        if (throwable != null) {
                            Throwable throwable2 = throwable;
                            EngineConnMonitor$.MODULE$.logger().error(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Failed to get status of engineConn : ", ", now end the job. "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.status$1._1()})), throwable2);
                            BoxedUnit boxedUnit = (BoxedUnit)this.endJobByEngineInstance$3.apply(this.status$1._1());
                            return;
                        }
                        throw new MatchError((Object)throwable);
                    }
                    {
                        this.status$1 = status$1;
                        this.endJobByEngineInstance$3 = endJobByEngineInstance$3;
                    }
                });
            }
            return;
        }
        throw new MatchError((Object)nodeExistStatus);
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public void org$apache$linkis$orchestrator$computation$monitor$EngineConnMonitor$$updateExecutorActivityTime(ServiceInstance serviceInstance, HashMap<ServiceInstance, CodeExecTaskExecutor[]> engineConnExecutorCache) {
        if (serviceInstance == null) return;
        CodeExecTaskExecutor[] taskExecutorList = package$.MODULE$.toJavaMap(engineConnExecutorCache).getOrDefault(serviceInstance, null);
        if (taskExecutorList == null) {
            this.logger().warn(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"EngineConnExecutor ", " cannot be found in engineConnExecutorCache"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{serviceInstance.toString()})));
            return;
        }
        CodeExecTaskExecutor[] executors = (CodeExecTaskExecutor[])Predef$.MODULE$.refArrayOps((Object[])taskExecutorList).filter((Function1)new Serializable(serviceInstance){
            public static final long serialVersionUID = 0L;
            private final ServiceInstance serviceInstance$1;

            public final boolean apply(CodeExecTaskExecutor x$2) {
                return x$2.getEngineConnExecutor().getServiceInstance().equals((Object)this.serviceInstance$1);
            }
            {
                this.serviceInstance$1 = serviceInstance$1;
            }
        });
        if (executors != null && Predef$.MODULE$.refArrayOps((Object[])executors).size() >= 1) {
            EngineConnMonitor$ engineConnMonitor$ = this;
            synchronized (engineConnMonitor$) {
                ((CodeExecTaskExecutor)Predef$.MODULE$.refArrayOps((Object[])executors).head()).getEngineConnExecutor().updateLastUpdateTime();
                return;
            }
        }
        this.logger().warn(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"EngineConnExecutor ", " cannot be found in engineConnExecutorCache"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{serviceInstance.toString()})));
    }

    private EngineConnMonitor$() {
        MODULE$ = this;
        Logging.class.$init$((Logging)this);
    }
}

