package org.apache.kylin.engine.spark.utils;

import org.apache.spark.internal.Logging;
import org.apache.spark.scheduler.SparkListener;
import org.apache.spark.scheduler.SparkListenerEvent;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.trees.TreeNode;
import org.apache.spark.sql.execution.BinaryExecNode;
import org.apache.spark.sql.execution.LeafExecNode;
import org.apache.spark.sql.execution.QueryExecution;
import org.apache.spark.sql.execution.SparkPlan;
import org.apache.spark.sql.execution.UnaryExecNode;
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec;
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper;
import org.apache.spark.sql.execution.adaptive.QueryStageExec;
import org.apache.spark.sql.execution.aggregate.BaseAggregateExec;
import org.apache.spark.sql.execution.aggregate.HashAggregateExec;
import org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec;
import org.apache.spark.sql.execution.aggregate.SortAggregateExec;
import org.apache.spark.sql.execution.command.DataWritingCommandExec;
import org.apache.spark.sql.execution.command.DataWritingCommandExec$;
import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec;
import org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec;
import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec;
import org.apache.spark.sql.execution.joins.SortMergeJoinExec;
import org.apache.spark.sql.execution.metric.SQLMetric;
import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec;
import org.apache.spark.sql.execution.ui.PostQueryExecutionForKylin;
import org.slf4j.Logger;
import scala.Function0;
import scala.Function1;
import scala.Option;
import scala.PartialFunction;
import scala.Predef$;
import scala.collection.Seq;
import scala.collection.TraversableOnce;
import scala.collection.immutable.$colon;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.runtime.BooleanRef;
import scala.runtime.BoxedUnit;
import scala.runtime.ObjectRef;

/* compiled from: JobMetricsUtils.scala */
/* loaded from: input_file:org/apache/kylin/engine/spark/utils/JobMetricsUtils$.class */
public final class JobMetricsUtils$ implements Logging, AdaptiveSparkPlanHelper {
    public static JobMetricsUtils$ MODULE$;
    private final List<Class<? extends BaseAggregateExec>> aggs;
    private final List<Class<? extends BinaryExecNode>> joins;
    private SparkListener sparkListener;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    static {
        new JobMetricsUtils$();
    }

    public Option<SparkPlan> find(SparkPlan sparkPlan, Function1<SparkPlan, Object> function1) {
        return AdaptiveSparkPlanHelper.find$(this, sparkPlan, function1);
    }

    public void foreach(SparkPlan sparkPlan, Function1<SparkPlan, BoxedUnit> function1) {
        AdaptiveSparkPlanHelper.foreach$(this, sparkPlan, function1);
    }

    public void foreachUp(SparkPlan sparkPlan, Function1<SparkPlan, BoxedUnit> function1) {
        AdaptiveSparkPlanHelper.foreachUp$(this, sparkPlan, function1);
    }

    public <A> Seq<A> mapPlans(SparkPlan sparkPlan, Function1<SparkPlan, A> function1) {
        return AdaptiveSparkPlanHelper.mapPlans$(this, sparkPlan, function1);
    }

    public <A> Seq<A> flatMap(SparkPlan sparkPlan, Function1<SparkPlan, TraversableOnce<A>> function1) {
        return AdaptiveSparkPlanHelper.flatMap$(this, sparkPlan, function1);
    }

    public <B> Seq<B> collect(SparkPlan sparkPlan, PartialFunction<SparkPlan, B> partialFunction) {
        return AdaptiveSparkPlanHelper.collect$(this, sparkPlan, partialFunction);
    }

    public Seq<SparkPlan> collectLeaves(SparkPlan sparkPlan) {
        return AdaptiveSparkPlanHelper.collectLeaves$(this, sparkPlan);
    }

    public <B> Option<B> collectFirst(SparkPlan sparkPlan, PartialFunction<SparkPlan, B> partialFunction) {
        return AdaptiveSparkPlanHelper.collectFirst$(this, sparkPlan, partialFunction);
    }

    public <B> Seq<B> collectWithSubqueries(SparkPlan sparkPlan, PartialFunction<SparkPlan, B> partialFunction) {
        return AdaptiveSparkPlanHelper.collectWithSubqueries$(this, sparkPlan, partialFunction);
    }

    public Seq<SparkPlan> subqueriesAll(SparkPlan sparkPlan) {
        return AdaptiveSparkPlanHelper.subqueriesAll$(this, sparkPlan);
    }

    public SparkPlan stripAQEPlan(SparkPlan sparkPlan) {
        return AdaptiveSparkPlanHelper.stripAQEPlan$(this, sparkPlan);
    }

    public String logName() {
        return Logging.logName$(this);
    }

    public Logger log() {
        return Logging.log$(this);
    }

    public void logInfo(Function0<String> function0) {
        Logging.logInfo$(this, function0);
    }

    public void logDebug(Function0<String> function0) {
        Logging.logDebug$(this, function0);
    }

    public void logTrace(Function0<String> function0) {
        Logging.logTrace$(this, function0);
    }

    public void logWarning(Function0<String> function0) {
        Logging.logWarning$(this, function0);
    }

    public void logError(Function0<String> function0) {
        Logging.logError$(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.logInfo$(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.logDebug$(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.logTrace$(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.logWarning$(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.logError$(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    public void initializeLogIfNecessary(boolean z) {
        Logging.initializeLogIfNecessary$(this, z);
    }

    public boolean initializeLogIfNecessary(boolean z, boolean z2) {
        return Logging.initializeLogIfNecessary$(this, z, z2);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$(this);
    }

    public void initializeForcefully(boolean z, boolean z2) {
        Logging.initializeForcefully$(this, z, z2);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger logger) {
        this.org$apache$spark$internal$Logging$$log_ = logger;
    }

    private List<Class<? extends BaseAggregateExec>> aggs() {
        return this.aggs;
    }

    private List<Class<? extends BinaryExecNode>> joins() {
        return this.joins;
    }

    public SparkListener sparkListener() {
        return this.sparkListener;
    }

    public void sparkListener_$eq(SparkListener sparkListener) {
        this.sparkListener = sparkListener;
    }

    public JobMetrics collectMetrics(String str) {
        ObjectRef create = ObjectRef.create(new JobMetrics());
        QueryExecution queryExecution = QueryExecutionCache$.MODULE$.getQueryExecution(str);
        if (queryExecution != null) {
            create.elem = collectOutputRows(queryExecution.executedPlan());
            logInfo(() -> {
                return new StringBuilder(34).append("Collect output rows successfully. ").append((JobMetrics) create.elem).toString();
            });
        } else {
            logError(() -> {
                return "Collect output rows failed.";
            });
        }
        return (JobMetrics) create.elem;
    }

    public JobMetrics collectOutputRows(SparkPlan sparkPlan) {
        JobMetrics jobMetrics = new JobMetrics();
        BooleanRef create = BooleanRef.create(false);
        BooleanRef create2 = BooleanRef.create(false);
        BooleanRef create3 = BooleanRef.create(false);
        foreach(stripAQEPlan(sparkPlan), sparkPlan2 -> {
            $anonfun$collectOutputRows$1(create3, jobMetrics, create, create2, sparkPlan2);
            return BoxedUnit.UNIT;
        });
        if (!jobMetrics.isDefinedAt(Metrics$.MODULE$.CUBOID_ROWS_CNT())) {
            Predef$.MODULE$.require(!create3.elem);
            Predef$.MODULE$.require(!create.elem);
            jobMetrics.setMetrics(Metrics$.MODULE$.CUBOID_ROWS_CNT(), jobMetrics.getMetrics(Metrics$.MODULE$.SOURCE_ROWS_CNT()));
            logInfo(() -> {
                return new StringBuilder(10).append("Set ").append(Metrics$.MODULE$.CUBOID_ROWS_CNT()).append(" to ").append(Metrics$.MODULE$.SOURCE_ROWS_CNT()).append(", ").append(jobMetrics.getMetrics(Metrics$.MODULE$.SOURCE_ROWS_CNT())).toString();
            });
        }
        return jobMetrics;
    }

    public void registerListener(SparkSession sparkSession) {
        sparkListener_$eq(new SparkListener() { // from class: org.apache.kylin.engine.spark.utils.JobMetricsUtils$$anon$1
            public void onOtherEvent(SparkListenerEvent sparkListenerEvent) {
                BoxedUnit boxedUnit;
                if (!(sparkListenerEvent instanceof PostQueryExecutionForKylin)) {
                    BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                    return;
                }
                PostQueryExecutionForKylin postQueryExecutionForKylin = (PostQueryExecutionForKylin) sparkListenerEvent;
                String property = postQueryExecutionForKylin.localProperties().getProperty(QueryExecutionCache$.MODULE$.N_EXECUTION_ID_KEY(), "");
                if (property != null ? !property.equals("") : "" != 0) {
                    if (postQueryExecutionForKylin.queryExecution() != null) {
                        QueryExecutionCache$.MODULE$.setQueryExecution(property, postQueryExecutionForKylin.queryExecution());
                        boxedUnit = BoxedUnit.UNIT;
                    }
                }
                boxedUnit = BoxedUnit.UNIT;
            }
        });
        sparkSession.sparkContext().addSparkListener(sparkListener());
    }

    public void unRegisterListener(SparkSession sparkSession) {
        if (sparkListener() != null) {
            sparkSession.sparkContext().removeSparkListener(sparkListener());
        }
    }

    public static final /* synthetic */ void $anonfun$collectOutputRows$1(BooleanRef booleanRef, JobMetrics jobMetrics, BooleanRef booleanRef2, BooleanRef booleanRef3, SparkPlan sparkPlan) {
        BoxedUnit boxedUnit;
        BoxedUnit boxedUnit2;
        BoxedUnit boxedUnit3;
        BoxedUnit boxedUnit4;
        if (sparkPlan instanceof DataWritingCommandExec) {
            DataWritingCommandExec dataWritingCommandExec = (DataWritingCommandExec) sparkPlan;
            if (booleanRef.elem) {
                boxedUnit4 = BoxedUnit.UNIT;
            } else {
                jobMetrics.setMetrics(Metrics$.MODULE$.CUBOID_ROWS_CNT(), ((SQLMetric) dataWritingCommandExec.metrics().apply("numOutputRows")).value());
                booleanRef.elem = true;
                MODULE$.logInfo(() -> {
                    return new StringBuilder(31).append("Set ").append(Metrics$.MODULE$.CUBOID_ROWS_CNT()).append(" to ").append(DataWritingCommandExec$.MODULE$.getClass().getCanonicalName()).append(".metrics.numOutputRows,").append(((SQLMetric) dataWritingCommandExec.metrics().apply("numOutputRows")).value()).toString();
                });
                boxedUnit4 = BoxedUnit.UNIT;
            }
            return;
        }
        if (sparkPlan instanceof UnaryExecNode) {
            SparkPlan sparkPlan2 = (UnaryExecNode) sparkPlan;
            if (!MODULE$.aggs().contains(sparkPlan2.getClass()) || booleanRef2.elem || booleanRef.elem) {
                boxedUnit3 = BoxedUnit.UNIT;
            } else {
                booleanRef2.elem = true;
                jobMetrics.setMetrics(Metrics$.MODULE$.CUBOID_ROWS_CNT(), ((SQLMetric) sparkPlan2.metrics().apply("numOutputRows")).value());
                boxedUnit3 = BoxedUnit.UNIT;
            }
            return;
        }
        if (sparkPlan instanceof BinaryExecNode) {
            SparkPlan sparkPlan3 = (BinaryExecNode) sparkPlan;
            if (!MODULE$.joins().contains(sparkPlan3.getClass()) || booleanRef3.elem) {
                boxedUnit2 = BoxedUnit.UNIT;
            } else {
                jobMetrics.setMetrics(Metrics$.MODULE$.SOURCE_ROWS_CNT(), ((SQLMetric) sparkPlan3.metrics().apply("numOutputRows")).value());
                booleanRef3.elem = true;
                boxedUnit2 = BoxedUnit.UNIT;
            }
            return;
        }
        if (!(sparkPlan instanceof LeafExecNode)) {
            BoxedUnit boxedUnit5 = BoxedUnit.UNIT;
            return;
        }
        SparkPlan sparkPlan4 = (LeafExecNode) sparkPlan;
        if (booleanRef3.elem || (sparkPlan4 instanceof QueryStageExec) || (sparkPlan4 instanceof AdaptiveSparkPlanExec)) {
            boxedUnit = BoxedUnit.UNIT;
        } else {
            long metrics = jobMetrics.getMetrics(Metrics$.MODULE$.SOURCE_ROWS_CNT()) == -1 ? 0L : jobMetrics.getMetrics(Metrics$.MODULE$.SOURCE_ROWS_CNT());
            MODULE$.logInfo(() -> {
                return new StringBuilder(15).append("plan name : ").append(sparkPlan4.getClass().getName()).append(" , ").append(((TreeNode) sparkPlan4).nodeName()).toString();
            });
            jobMetrics.setMetrics(Metrics$.MODULE$.SOURCE_ROWS_CNT(), metrics + ((SQLMetric) sparkPlan4.metrics().apply("numOutputRows")).value());
            boxedUnit = BoxedUnit.UNIT;
        }
    }

    private JobMetricsUtils$() {
        MODULE$ = this;
        Logging.$init$(this);
        AdaptiveSparkPlanHelper.$init$(this);
        this.aggs = new $colon.colon(HashAggregateExec.class, new $colon.colon(SortAggregateExec.class, new $colon.colon(ObjectHashAggregateExec.class, Nil$.MODULE$)));
        this.joins = new $colon.colon(BroadcastHashJoinExec.class, new $colon.colon(ShuffledHashJoinExec.class, new $colon.colon(SortMergeJoinExec.class, new $colon.colon(BroadcastNestedLoopJoinExec.class, new $colon.colon(StreamingSymmetricHashJoinExec.class, Nil$.MODULE$)))));
    }
}
