package ai.databand.spark;

import ai.databand.DbndAppLog;
import ai.databand.DbndPropertyNames;
import ai.databand.DbndWrapper;
import ai.databand.parameters.DatasetOperationPreview;
import ai.databand.schema.DatasetOperationStatus;
import ai.databand.schema.DatasetOperationType;
import ai.databand.schema.LogDataset;
import ai.databand.schema.Pair;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.spark.sql.catalyst.plans.QueryPlan;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.execution.FileSourceScanExec;
import org.apache.spark.sql.execution.QueryExecution;
import org.apache.spark.sql.execution.SparkPlan;
import org.apache.spark.sql.execution.WholeStageCodegenExec;
import org.apache.spark.sql.execution.command.DataWritingCommandExec;
import org.apache.spark.sql.execution.datasources.HadoopFsRelation;
import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand;
import org.apache.spark.sql.execution.metric.SQLMetric;
import org.apache.spark.sql.hive.execution.HiveTableScanExec;
import org.apache.spark.sql.hive.execution.InsertIntoHiveTable;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.QueryExecutionListener;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConverters;
import scala.collection.Map;
import shadow.okhttp3.HttpUrl;

/* loaded from: input_file:ai/databand/spark/DbndSparkQueryExecutionListener.class */
public class DbndSparkQueryExecutionListener implements QueryExecutionListener {
    private static final DbndAppLog LOG = new DbndAppLog(LoggerFactory.getLogger(DbndSparkQueryExecutionListener.class));
    private final DbndWrapper dbnd;
    private final DatasetOperationPreview operationPreview;
    private final boolean isHiveEnabled;
    private final Pattern DATASET;

    public DbndSparkQueryExecutionListener(DbndWrapper dbndWrapper) {
        this.DATASET = Pattern.compile(".*\\[(.*)\\].*");
        this.dbnd = dbndWrapper;
        this.operationPreview = new DatasetOperationPreview();
        try {
            Class.forName("org.apache.spark.sql.hive.execution.HiveTableScanExec", false, getClass().getClassLoader());
            Class.forName("org.apache.spark.sql.hive.execution.InsertIntoHiveTable", false, getClass().getClassLoader());
            this.isHiveEnabled = true;
            LOG.jvmInfo("Succesfully constructed Databand QueryExecutionListener instance. Selected Spark events with dataset operations will be submitted to the Databand service.", new Object[0]);
        } catch (ClassNotFoundException e) {
            this.isHiveEnabled = false;
        }
    }

    public DbndSparkQueryExecutionListener() {
        this(DbndWrapper.instance());
    }

    public void onSuccess(String str, QueryExecution queryExecution, long j) {
        boolean z = false;
        SparkPlan executedPlan = queryExecution.executedPlan();
        LOG.verbose("[{}] Processing event from function \"{}()\" and execution plan class: {}. Executed root plan: {}", Integer.valueOf(executedPlan.hashCode()), str, executedPlan.getClass().getName(), executedPlan);
        if (isAdaptivePlan(executedPlan)) {
            executedPlan = extractFinalFromAdaptive(executedPlan).orElse(executedPlan);
            LOG.verbose("[{}] Extracted final root plan from adaptive plan. Final executed root plan: {}", Integer.valueOf(executedPlan.hashCode()), executedPlan);
        }
        if (executedPlan instanceof DataWritingCommandExec) {
            LOG.verbose("[{}] rootPlan is instanceof DataWritingCommandExec", Integer.valueOf(executedPlan.hashCode()));
            z = submitReadWriteOps(executedPlan);
        }
        if (executedPlan instanceof WholeStageCodegenExec) {
            LOG.verbose("[{}] rootPlan is instanceof WholeStageCodegenExec", Integer.valueOf(executedPlan.hashCode()));
            if (isDbndPlan(queryExecution)) {
                LOG.warn("[{}] Explicit Databand SDK DataFrame tracking will not be reported by JVM", Integer.valueOf(executedPlan.hashCode()));
                return;
            }
            z = submitReadWriteOps(executedPlan);
        }
        if (executedPlan.getClass().getName().equals("org.apache.spark.sql.execution.adaptive.ResultQueryStageExec")) {
            LOG.verbose("[{}] rootPlan is instanceof ResultQueryStageExec", Integer.valueOf(executedPlan.hashCode()));
            z = submitReadWriteOps(executedPlan);
        }
        if (z) {
            LOG.verbose("[{}] Spark event was processed succesfully, root execution plan class: {}", Integer.valueOf(executedPlan.hashCode()), executedPlan.getClass().getName());
        } else {
            LOG.verbose("[{}] Spark event was not processed because root execution plan class {} and all its child plans are not supported", Integer.valueOf(executedPlan.hashCode()), executedPlan.getClass().getName());
        }
    }

    protected boolean submitReadWriteOps(SparkPlan sparkPlan) {
        HadoopFsRelation hadoopFsRelation;
        boolean z = false;
        List<SparkPlan> allChildren = getAllChildren(sparkPlan);
        LOG.verbose("[{}] {} children plans detected.", Integer.valueOf(sparkPlan.hashCode()), Integer.valueOf(allChildren.size()));
        Iterator<SparkPlan> it = allChildren.iterator();
        while (it.hasNext()) {
            HiveTableScanExec hiveTableScanExec = (SparkPlan) it.next();
            if (hiveTableScanExec instanceof FileSourceScanExec) {
                LOG.verbose("[{}][{}] Supported FileSourceScanExec child plan type detected", Integer.valueOf(sparkPlan.hashCode()), Integer.valueOf(hiveTableScanExec.hashCode()));
                FileSourceScanExec fileSourceScanExec = (FileSourceScanExec) hiveTableScanExec;
                StructType schema = fileSourceScanExec.schema();
                if (schema.isEmpty() && fileSourceScanExec.relation() != null) {
                    schema = fileSourceScanExec.relation().schema();
                }
                log(extractPath((String) fileSourceScanExec.metadata().get("Location").get()), DatasetOperationType.READ, schema, ((SQLMetric) fileSourceScanExec.metrics().get("numOutputRows").get()).value(), false);
                z = true;
            } else if (hiveTableScanExec.getClass().getName().equals("com.databricks.photon.PhotonJsonFileScanExec")) {
                LOG.verbose("[{}][{}] Supported PhotonJsonFileScanExec child plan type detected", Integer.valueOf(sparkPlan.hashCode()), Integer.valueOf(hiveTableScanExec.hashCode()));
                try {
                    Class<?>[] clsArr = new Class[0];
                    StructType structType = (StructType) hiveTableScanExec.getClass().getMethod("schema", clsArr).invoke(hiveTableScanExec, new Object[0]);
                    if (structType.isEmpty() && (hadoopFsRelation = (HadoopFsRelation) hiveTableScanExec.getClass().getMethod("relation", clsArr).invoke(hiveTableScanExec, new Object[0])) != null) {
                        structType = hadoopFsRelation.schema();
                    }
                    log(extractPath((String) ((Map) hiveTableScanExec.getClass().getMethod("metadata", clsArr).invoke(hiveTableScanExec, new Object[0])).get("Location").get()), DatasetOperationType.READ, structType, ((SQLMetric) ((Map) hiveTableScanExec.getClass().getMethod("metrics", clsArr).invoke(hiveTableScanExec, new Object[0])).get("numOutputRows").get()).value(), false);
                    z = true;
                } catch (Exception e) {
                    LOG.error("[{}][{}] Unable to extract dataset information from PhotonJsonFileScanExec - {}", Integer.valueOf(sparkPlan.hashCode()), Integer.valueOf(hiveTableScanExec.hashCode()), e);
                }
            } else if (this.isHiveEnabled && (hiveTableScanExec instanceof HiveTableScanExec)) {
                LOG.verbose("[{}][{}] Supported HiveTableScanExec child plan type detected", Integer.valueOf(sparkPlan.hashCode()), Integer.valueOf(hiveTableScanExec.hashCode()));
                try {
                    HiveTableScanExec hiveTableScanExec2 = hiveTableScanExec;
                    log(((URI) hiveTableScanExec2.relation().tableMeta().storage().locationUri().get()).toString(), DatasetOperationType.READ, hiveTableScanExec2.relation().schema(), ((SQLMetric) hiveTableScanExec2.metrics().get("numOutputRows").get()).value(), true);
                    z = true;
                } catch (Exception e2) {
                    LOG.error("[{}][{}] Unable to extract dataset information from HiveTableScanExec - {}", Integer.valueOf(sparkPlan.hashCode()), Integer.valueOf(hiveTableScanExec.hashCode()), e2);
                }
            } else if (hiveTableScanExec instanceof DataWritingCommandExec) {
                LOG.verbose("[{}][{}] Supported DataWritingCommandExec child plan type detected", Integer.valueOf(sparkPlan.hashCode()), Integer.valueOf(hiveTableScanExec.hashCode()));
                DataWritingCommandExec dataWritingCommandExec = (DataWritingCommandExec) hiveTableScanExec;
                if (dataWritingCommandExec.cmd() instanceof InsertIntoHadoopFsRelationCommand) {
                    LOG.verbose("[{}][{}] writeplan is instanceof InsertIntoHadoopFsRelationCommand", Integer.valueOf(sparkPlan.hashCode()), Integer.valueOf(dataWritingCommandExec.hashCode()));
                    InsertIntoHadoopFsRelationCommand cmd = dataWritingCommandExec.cmd();
                    log(extractPath(cmd.outputPath().toString()), DatasetOperationType.WRITE, cmd.query().schema(), ((SQLMetric) cmd.metrics().get("numOutputRows").get()).value(), false);
                    z = true;
                }
                if (this.isHiveEnabled && (dataWritingCommandExec.cmd() instanceof InsertIntoHiveTable)) {
                    LOG.verbose("[{}][{}] writeplan is instanceof InsertIntoHiveTable", Integer.valueOf(sparkPlan.hashCode()), Integer.valueOf(dataWritingCommandExec.hashCode()));
                    try {
                        InsertIntoHiveTable cmd2 = dataWritingCommandExec.cmd();
                        log(cmd2.table().identifier().table(), DatasetOperationType.WRITE, cmd2.query().schema(), ((SQLMetric) cmd2.metrics().get("numOutputRows").get()).value(), true);
                        z = true;
                    } catch (Exception e3) {
                        LOG.error("[{}][{}] Unable to extract dataset information from InsertIntoHiveTable - {}", Integer.valueOf(sparkPlan.hashCode()), Integer.valueOf(dataWritingCommandExec.hashCode()), e3);
                    }
                }
            } else {
                LOG.verbose("[{}][{}] Unsupported children plan: {}", Integer.valueOf(sparkPlan.hashCode()), Integer.valueOf(hiveTableScanExec.hashCode()), hiveTableScanExec.getClass().getName());
            }
        }
        return z;
    }

    private boolean isDbndPlan(QueryExecution queryExecution) {
        String planVerboseString;
        return (queryExecution.analyzed() == null || queryExecution.analyzed().children().isEmpty() || (planVerboseString = getPlanVerboseString((LogicalPlan) queryExecution.analyzed().children().apply(0))) == null || !planVerboseString.contains(DbndPropertyNames.DBND_INTERNAL_ALIAS)) ? false : true;
    }

    protected boolean isAdaptivePlan(Object obj) {
        return obj.getClass().getName().contains("AdaptiveSparkPlanExec");
    }

    protected String getPlanVerboseString(LogicalPlan logicalPlan) {
        try {
            return logicalPlan.verboseString();
        } catch (NoSuchMethodError e) {
            try {
                return QueryPlan.class.getDeclaredMethod("verboseString", Integer.TYPE).invoke(logicalPlan, 1).toString();
            } catch (IllegalAccessException | NoSuchMethodException | InvocationTargetException e2) {
                LOG.error("Unable to identify whenever spark query was triggered by log_dataset_op or not", e2);
                return null;
            }
        }
    }

    protected Optional<SparkPlan> extractFinalFromAdaptive(SparkPlan sparkPlan) {
        try {
            Field declaredField = Class.forName("org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec").getDeclaredField("currentPhysicalPlan");
            declaredField.setAccessible(true);
            return Optional.of((SparkPlan) declaredField.get(sparkPlan));
        } catch (ClassNotFoundException | IllegalAccessException | NoSuchFieldException e) {
            LOG.error("[{}] Unable to extract final plan from the adaptive one using reflection. Dataset operation won't be logged - {}.", Integer.valueOf(sparkPlan.hashCode()), e);
            return Optional.empty();
        }
    }

    protected void log(String str, DatasetOperationType datasetOperationType, StructType structType, long j, boolean z) {
        Pair<String, List<Long>> extractSchema = this.operationPreview.extractSchema(structType, j);
        this.dbnd.logDatasetOperation(str, datasetOperationType, DatasetOperationStatus.OK, HttpUrl.FRAGMENT_ENCODE_SET, extractSchema.right(), extractSchema.left(), Boolean.valueOf(z), this.operationPreview.extractColumnStats(structType, j), LogDataset.OP_SOURCE_SPARK_QUERY_LISTENER);
    }

    protected String extractPath(String str) {
        Matcher matcher = this.DATASET.matcher(str);
        return matcher.matches() ? matcher.group(1) : str;
    }

    protected List<SparkPlan> getAllChildren(SparkPlan sparkPlan) {
        ArrayList arrayList = new ArrayList();
        LinkedList linkedList = new LinkedList();
        linkedList.add(sparkPlan);
        List asList = Arrays.asList("org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec", "org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec", "org.apache.spark.sql.execution.adaptive.ResultQueryStageExec");
        while (!linkedList.isEmpty()) {
            SparkPlan sparkPlan2 = (SparkPlan) linkedList.pop();
            arrayList.add(sparkPlan2);
            if (asList.contains(sparkPlan2.getClass().getName())) {
                Optional<SparkPlan> extractChildFromSpark3 = extractChildFromSpark3(sparkPlan2);
                linkedList.getClass();
                extractChildFromSpark3.ifPresent((v1) -> {
                    r1.add(v1);
                });
                if (!extractChildFromSpark3.isPresent()) {
                    LOG.verbose("[{}][{}] No child node for Spark3 node class '{}'", Integer.valueOf(sparkPlan.hashCode()), Integer.valueOf(sparkPlan2.hashCode()), sparkPlan2.getClass().getName());
                }
            } else if (sparkPlan2.getClass().getName().contains("AdaptiveSparkPlanExec")) {
                Optional<SparkPlan> extractFinalFromAdaptive = extractFinalFromAdaptive(sparkPlan2);
                linkedList.getClass();
                extractFinalFromAdaptive.ifPresent((v1) -> {
                    r1.add(v1);
                });
            } else {
                List list = (List) JavaConverters.seqAsJavaListConverter(sparkPlan2.children()).asJava();
                linkedList.addAll(list);
                if (list.size() == 0) {
                    LOG.verbose("[{}][{}] No children nodes for node '{}'", Integer.valueOf(sparkPlan.hashCode()), Integer.valueOf(sparkPlan2.hashCode()), sparkPlan2.getClass().getName());
                }
            }
        }
        return arrayList;
    }

    protected Optional<SparkPlan> extractChildFromSpark3(SparkPlan sparkPlan) {
        try {
            return Optional.of((SparkPlan) Class.forName(sparkPlan.getClass().getName()).getDeclaredMethod("plan", new Class[0]).invoke(sparkPlan, new Object[0]));
        } catch (ClassNotFoundException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
            LOG.error("[{}] Unable to extract child plan from the spark3 query '{}' using reflection. Dataset operation won't be logged - {}.", Integer.valueOf(sparkPlan.hashCode()), sparkPlan.getClass().getName(), e);
            return Optional.empty();
        }
    }

    public void onFailure(String str, QueryExecution queryExecution, Exception exc) {
    }
}
