package ai.databand.spark;

import ai.databand.DbndPropertyNames;
import ai.databand.DbndWrapper;
import ai.databand.parameters.DatasetOperationPreview;
import ai.databand.schema.DatasetOperationStatus;
import ai.databand.schema.DatasetOperationType;
import ai.databand.schema.Pair;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import org.apache.spark.sql.catalyst.plans.QueryPlan;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.execution.FileSourceScanExec;
import org.apache.spark.sql.execution.QueryExecution;
import org.apache.spark.sql.execution.SparkPlan;
import org.apache.spark.sql.execution.WholeStageCodegenExec;
import org.apache.spark.sql.execution.command.DataWritingCommandExec;
import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand;
import org.apache.spark.sql.execution.metric.SQLMetric;
import org.apache.spark.sql.hive.execution.HiveTableScanExec;
import org.apache.spark.sql.hive.execution.InsertIntoHiveTable;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.QueryExecutionListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConverters;
import shadow.com.fasterxml.jackson.annotation.JsonProperty;

/* loaded from: input_file:ai/databand/spark/DbndSparkQueryExecutionListener.class */
public class DbndSparkQueryExecutionListener implements QueryExecutionListener {
    private static final Logger LOG = LoggerFactory.getLogger(DbndSparkQueryExecutionListener.class);
    private final DbndWrapper dbnd;
    private final DatasetOperationPreview operationPreview;
    private final boolean isHiveEnabled;

    public DbndSparkQueryExecutionListener(DbndWrapper dbndWrapper) {
        this.dbnd = dbndWrapper;
        this.operationPreview = new DatasetOperationPreview();
        try {
            Class.forName("org.apache.spark.sql.hive.execution.HiveTableScanExec", false, getClass().getClassLoader());
            Class.forName("org.apache.spark.sql.hive.execution.InsertIntoHiveTable", false, getClass().getClassLoader());
            this.isHiveEnabled = true;
        } catch (ClassNotFoundException e) {
            this.isHiveEnabled = false;
        }
    }

    public DbndSparkQueryExecutionListener() {
        this(DbndWrapper.instance());
    }

    public void onSuccess(String str, QueryExecution queryExecution, long j) {
        SparkPlan executedPlan = queryExecution.executedPlan();
        if (isAdaptivePlan(executedPlan)) {
            executedPlan = extractFinalFromAdaptive(executedPlan).orElse(executedPlan);
        }
        if (executedPlan instanceof DataWritingCommandExec) {
            DataWritingCommandExec dataWritingCommandExec = (DataWritingCommandExec) executedPlan;
            if (dataWritingCommandExec.cmd() instanceof InsertIntoHadoopFsRelationCommand) {
                InsertIntoHadoopFsRelationCommand cmd = dataWritingCommandExec.cmd();
                log(exctractPath(cmd.outputPath().toString()), DatasetOperationType.WRITE, cmd.query().schema(), ((SQLMetric) cmd.metrics().get("numOutputRows").get()).value());
            }
            if (this.isHiveEnabled && (dataWritingCommandExec.cmd() instanceof InsertIntoHiveTable)) {
                try {
                    InsertIntoHiveTable cmd2 = dataWritingCommandExec.cmd();
                    log(cmd2.table().identifier().table(), DatasetOperationType.WRITE, cmd2.query().schema(), ((SQLMetric) cmd2.metrics().get("numOutputRows").get()).value());
                } catch (Exception e) {
                    LOG.error("Unable to extract dataset information from InsertIntoHiveTable", e);
                }
            }
        }
        if (executedPlan instanceof WholeStageCodegenExec) {
            if (isDbndPlan(queryExecution)) {
                LOG.warn("dbnd sdk Execution plan will not be reported");
                return;
            }
            Iterator<SparkPlan> it = getAllChildren(executedPlan).iterator();
            while (it.hasNext()) {
                FileSourceScanExec fileSourceScanExec = (SparkPlan) it.next();
                if (fileSourceScanExec instanceof FileSourceScanExec) {
                    FileSourceScanExec fileSourceScanExec2 = fileSourceScanExec;
                    log(exctractPath((String) fileSourceScanExec2.metadata().get("Location").get()), DatasetOperationType.READ, fileSourceScanExec2.schema(), ((SQLMetric) fileSourceScanExec2.metrics().get("numOutputRows").get()).value());
                }
                if (this.isHiveEnabled && (fileSourceScanExec instanceof HiveTableScanExec)) {
                    try {
                        HiveTableScanExec hiveTableScanExec = (HiveTableScanExec) fileSourceScanExec;
                        log(((URI) hiveTableScanExec.relation().tableMeta().storage().locationUri().get()).toString(), DatasetOperationType.READ, hiveTableScanExec.schema(), ((SQLMetric) hiveTableScanExec.metrics().get("numOutputRows").get()).value());
                    } catch (Exception e2) {
                        LOG.error("Unable to extract dataset information from HiveTableScanExec", e2);
                    }
                }
            }
        }
    }

    private boolean isDbndPlan(QueryExecution queryExecution) {
        String planVerboseString;
        return (queryExecution.analyzed() == null || queryExecution.analyzed().children().isEmpty() || (planVerboseString = getPlanVerboseString((LogicalPlan) queryExecution.analyzed().children().apply(0))) == null || !planVerboseString.contains(DbndPropertyNames.DBND_INTERNAL_ALIAS)) ? false : true;
    }

    protected boolean isAdaptivePlan(Object obj) {
        return obj.getClass().getName().contains("AdaptiveSparkPlanExec");
    }

    protected String getPlanVerboseString(LogicalPlan logicalPlan) {
        try {
            return logicalPlan.verboseString();
        } catch (NoSuchMethodError e) {
            try {
                return QueryPlan.class.getDeclaredMethod("verboseString", Integer.TYPE).invoke(logicalPlan, 1).toString();
            } catch (IllegalAccessException | NoSuchMethodException | InvocationTargetException e2) {
                LOG.error("Unable to identify whenever spark query was triggered by log_dataset_op");
                return null;
            }
        }
    }

    protected Optional<SparkPlan> extractFinalFromAdaptive(Object obj) {
        try {
            Field declaredField = Class.forName("org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec").getDeclaredField("currentPhysicalPlan");
            declaredField.setAccessible(true);
            return Optional.of((SparkPlan) declaredField.get(obj));
        } catch (ClassNotFoundException | IllegalAccessException | NoSuchFieldException e) {
            LOG.error("Unable to extract final plan from the adaptive one using reflection. Dataset operation won't be logged.");
            return Optional.empty();
        }
    }

    protected void log(String str, DatasetOperationType datasetOperationType, StructType structType, long j) {
        Pair<String, List<Long>> extractSchema = this.operationPreview.extractSchema(structType, j);
        this.dbnd.logDatasetOperation(str, datasetOperationType, DatasetOperationStatus.OK, JsonProperty.USE_DEFAULT_NAME, extractSchema.right(), extractSchema.left(), true, this.operationPreview.extractColumnStats(structType, j));
    }

    protected String exctractPath(String str) {
        if (str.contains("InMemoryFileIndex")) {
            String replace = str.replace("InMemoryFileIndex[", JsonProperty.USE_DEFAULT_NAME);
            str = replace.substring(0, replace.length() - 1);
        }
        return str;
    }

    protected List<SparkPlan> getAllChildren(SparkPlan sparkPlan) {
        ArrayList arrayList = new ArrayList();
        LinkedList linkedList = new LinkedList();
        linkedList.add(sparkPlan);
        while (!linkedList.isEmpty()) {
            SparkPlan sparkPlan2 = (SparkPlan) linkedList.pop();
            arrayList.add(sparkPlan2);
            linkedList.addAll((List) JavaConverters.seqAsJavaListConverter(sparkPlan2.children()).asJava());
        }
        return arrayList;
    }

    public void onFailure(String str, QueryExecution queryExecution, Exception exc) {
    }
}
