package datahub.spark;

import com.google.common.base.Splitter;
import datahub.shaded.org.slf4j.Logger;
import datahub.shaded.org.slf4j.LoggerFactory;
import datahub.shaded.software.amazon.awssdk.core.internal.useragent.UserAgentConstant;
import datahub.spark.consumer.impl.CoalesceJobsEmitter;
import datahub.spark.consumer.impl.McpEmitter;
import datahub.spark.model.AppEndEvent;
import datahub.spark.model.AppStartEvent;
import datahub.spark.model.DatasetLineage;
import datahub.spark.model.LineageConsumer;
import datahub.spark.model.LineageEvent;
import datahub.spark.model.LineageUtils;
import datahub.spark.model.SQLQueryExecEndEvent;
import datahub.spark.model.SQLQueryExecStartEvent;
import datahub.spark.model.dataset.SparkDataset;
import datahub.spark2.shaded.typesafe.config.Config;
import datahub.spark2.shaded.typesafe.config.ConfigFactory;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import lombok.Generated;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.SparkEnv;
import org.apache.spark.scheduler.SparkListener;
import org.apache.spark.scheduler.SparkListenerApplicationEnd;
import org.apache.spark.scheduler.SparkListenerApplicationStart;
import org.apache.spark.scheduler.SparkListenerEvent;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.execution.QueryExecution;
import org.apache.spark.sql.execution.SQLExecution;
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd;
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart;
import org.apache.spark.util.JsonProtocol;
import org.json4s.jackson.JsonMethods$;
import scala.collection.JavaConversions;
import scala.runtime.AbstractFunction1;
import scala.runtime.AbstractPartialFunction;

/* loaded from: input_file:datahub/spark/DatahubSparkListener.class */
public class DatahubSparkListener extends SparkListener {

    @Generated
    private static final Logger log = LoggerFactory.getLogger((Class<?>) DatahubSparkListener.class);
    public static final String CONSUMER_TYPE_KEY = "spark.datahub.lineage.consumerTypes";
    public static final String DATAHUB_EMITTER = "mcpEmitter";
    public static final String DATABRICKS_CLUSTER_KEY = "databricks.cluster";
    public static final String PIPELINE_KEY = "metadata.pipeline";
    public static final String PIPELINE_PLATFORM_INSTANCE_KEY = "metadata.pipeline.platformInstance";
    public static final String COALESCE_KEY = "coalesce_jobs";
    private final Map<String, AppStartEvent> appDetails = new ConcurrentHashMap();
    private final Map<String, Map<Long, SQLQueryExecStartEvent>> appSqlDetails = new ConcurrentHashMap();
    private final Map<String, McpEmitter> appEmitters = new ConcurrentHashMap();
    private final Map<String, Config> appConfig = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:datahub/spark/DatahubSparkListener$SqlStartTask.class */
    public class SqlStartTask {
        private final SparkListenerSQLExecutionStart sqlStart;
        private final SparkContext ctx;
        private final LogicalPlan plan;

        public SqlStartTask(SparkListenerSQLExecutionStart sparkListenerSQLExecutionStart, LogicalPlan logicalPlan, SparkContext sparkContext) {
            this.sqlStart = sparkListenerSQLExecutionStart;
            this.plan = logicalPlan;
            this.ctx = sparkContext;
            DatahubSparkListener.log.debug("SqlStartTask with parameters: sqlStart: {}, plan: {}, ctx: {}", sparkListenerSQLExecutionStart != null ? JsonMethods$.MODULE$.compact(JsonProtocol.sparkEventToJson(sparkListenerSQLExecutionStart)) : null, logicalPlan != null ? logicalPlan.toJSON() : null, sparkContext);
        }

        public void run() {
            if (this.ctx == null) {
                DatahubSparkListener.log.error("Context is null skipping run");
                return;
            }
            if (this.ctx.conf() == null) {
                DatahubSparkListener.log.error("Context does not have config. Skipping run");
                return;
            }
            if (this.sqlStart == null) {
                DatahubSparkListener.log.error("sqlStart is null skipping run");
                return;
            }
            DatahubSparkListener.this.appSqlDetails.get(this.ctx.applicationId()).put(Long.valueOf(this.sqlStart.executionId()), new SQLQueryExecStartEvent(this.ctx.conf().get("spark.master"), DatahubSparkListener.this.getPipelineName(this.ctx), this.ctx.applicationId(), this.sqlStart.time(), this.sqlStart.executionId(), null));
            DatahubSparkListener.log.debug("PLAN for execution id: " + DatahubSparkListener.this.getPipelineName(this.ctx) + ":" + this.sqlStart.executionId() + "\n");
            DatahubSparkListener.log.debug(this.plan.toString());
            Optional<? extends Collection<SparkDataset>> asDataset = DatasetExtractor.asDataset(this.plan, this.ctx, true);
            if (!asDataset.isPresent() || asDataset.get().isEmpty()) {
                DatahubSparkListener.log.debug("Skipping execution as no output dataset present for execution id: " + this.ctx.applicationId() + ":" + this.sqlStart.executionId());
                return;
            }
            final DatasetLineage datasetLineage = new DatasetLineage(this.sqlStart.description(), this.plan.toString(), asDataset.get().iterator().next());
            final ArrayList<LogicalPlan> arrayList = new ArrayList();
            this.plan.collect(new AbstractPartialFunction<LogicalPlan, Void>() { // from class: datahub.spark.DatahubSparkListener.SqlStartTask.1
                public Void apply(LogicalPlan logicalPlan) {
                    DatahubSparkListener.log.debug("CHILD " + String.valueOf(logicalPlan.getClass()) + "\n" + String.valueOf(logicalPlan) + "\n-------------\n");
                    Optional<? extends Collection<SparkDataset>> asDataset2 = DatasetExtractor.asDataset(logicalPlan, SqlStartTask.this.ctx, false);
                    DatasetLineage datasetLineage2 = datasetLineage;
                    asDataset2.ifPresent(collection -> {
                        collection.forEach(sparkDataset -> {
                            datasetLineage2.addSource(sparkDataset);
                        });
                    });
                    arrayList.addAll(JavaConversions.asJavaCollection(logicalPlan.innerChildren()));
                    return null;
                }

                public boolean isDefinedAt(LogicalPlan logicalPlan) {
                    return true;
                }
            });
            for (LogicalPlan logicalPlan : arrayList) {
                if (logicalPlan instanceof LogicalPlan) {
                    logicalPlan.collect(new AbstractPartialFunction<LogicalPlan, Void>() { // from class: datahub.spark.DatahubSparkListener.SqlStartTask.2
                        public Void apply(LogicalPlan logicalPlan2) {
                            DatahubSparkListener.log.debug("INNER CHILD " + String.valueOf(logicalPlan2.getClass()) + "\n" + String.valueOf(logicalPlan2) + "\n-------------\n");
                            Optional<? extends Collection<SparkDataset>> asDataset2 = DatasetExtractor.asDataset(logicalPlan2, SqlStartTask.this.ctx, false);
                            asDataset2.ifPresent(collection -> {
                                Logger logger = DatahubSparkListener.log;
                                String appName = SqlStartTask.this.ctx.appName();
                                long executionId = SqlStartTask.this.sqlStart.executionId();
                                String.valueOf(collection);
                                logger.debug("source added for " + appName + "/" + executionId + ": " + logger);
                            });
                            DatasetLineage datasetLineage2 = datasetLineage;
                            asDataset2.ifPresent(collection2 -> {
                                collection2.forEach(sparkDataset -> {
                                    datasetLineage2.addSource(sparkDataset);
                                });
                            });
                            return null;
                        }

                        public boolean isDefinedAt(LogicalPlan logicalPlan2) {
                            return true;
                        }
                    });
                }
            }
            SQLQueryExecStartEvent sQLQueryExecStartEvent = new SQLQueryExecStartEvent(this.ctx.conf().get("spark.master"), DatahubSparkListener.this.getPipelineName(this.ctx), this.ctx.applicationId(), this.sqlStart.time(), this.sqlStart.executionId(), datasetLineage);
            DatahubSparkListener.this.appSqlDetails.get(this.ctx.applicationId()).put(Long.valueOf(this.sqlStart.executionId()), sQLQueryExecStartEvent);
            McpEmitter mcpEmitter = DatahubSparkListener.this.appEmitters.get(this.ctx.applicationId());
            if (mcpEmitter != null) {
                mcpEmitter.accept((LineageEvent) sQLQueryExecStartEvent);
            }
            DatahubSparkListener.this.consumers().forEach(lineageConsumer -> {
                lineageConsumer.accept(sQLQueryExecStartEvent);
            });
            DatahubSparkListener.log.debug("LINEAGE \n{}\n", datasetLineage);
            DatahubSparkListener.log.debug("Parsed execution id {}:{}", this.ctx.appName(), Long.valueOf(this.sqlStart.executionId()));
        }
    }

    public DatahubSparkListener() {
        log.info("DatahubSparkListener initialised.");
    }

    public void onApplicationStart(SparkListenerApplicationStart sparkListenerApplicationStart) {
        try {
            log.info("Application started: " + String.valueOf(sparkListenerApplicationStart));
            LineageUtils.findSparkCtx().foreach(new AbstractFunction1<SparkContext, Void>() { // from class: datahub.spark.DatahubSparkListener.1
                public Void apply(SparkContext sparkContext) {
                    DatahubSparkListener.this.checkOrCreateApplicationSetup(sparkContext);
                    return null;
                }
            });
            super.onApplicationStart(sparkListenerApplicationStart);
        } catch (Exception e) {
            StringWriter stringWriter = new StringWriter();
            PrintWriter printWriter = new PrintWriter(stringWriter);
            e.printStackTrace(printWriter);
            log.error(stringWriter.toString());
            printWriter.close();
        }
    }

    public void onApplicationEnd(final SparkListenerApplicationEnd sparkListenerApplicationEnd) {
        try {
            LineageUtils.findSparkCtx().foreach(new AbstractFunction1<SparkContext, Void>() { // from class: datahub.spark.DatahubSparkListener.2
                public Void apply(SparkContext sparkContext) {
                    DatahubSparkListener.log.info("Application ended : {} {}", sparkContext.appName(), sparkContext.applicationId());
                    AppStartEvent remove = DatahubSparkListener.this.appDetails.remove(sparkContext.applicationId());
                    DatahubSparkListener.this.appSqlDetails.remove(sparkContext.applicationId());
                    if (remove == null) {
                        DatahubSparkListener.log.error("Application end event received, but start event missing for appId " + sparkContext.applicationId());
                        return null;
                    }
                    AppEndEvent appEndEvent = new AppEndEvent(LineageUtils.getMaster(sparkContext), DatahubSparkListener.this.getPipelineName(sparkContext), sparkContext.applicationId(), sparkListenerApplicationEnd.time(), remove);
                    McpEmitter mcpEmitter = DatahubSparkListener.this.appEmitters.get(sparkContext.applicationId());
                    if (mcpEmitter != null) {
                        mcpEmitter.accept((LineageEvent) appEndEvent);
                        try {
                            mcpEmitter.close();
                            DatahubSparkListener.this.appEmitters.remove(sparkContext.applicationId());
                        } catch (Exception e) {
                            DatahubSparkListener.log.warn("Failed to close underlying emitter due to {}", e.getMessage());
                        }
                    }
                    DatahubSparkListener.this.consumers().forEach(lineageConsumer -> {
                        lineageConsumer.accept(appEndEvent);
                        try {
                            lineageConsumer.close();
                        } catch (IOException e2) {
                            DatahubSparkListener.log.warn("Failed to close lineage consumer", (Throwable) e2);
                        }
                    });
                    return null;
                }
            });
            super.onApplicationEnd(sparkListenerApplicationEnd);
        } catch (Exception e) {
            StringWriter stringWriter = new StringWriter();
            PrintWriter printWriter = new PrintWriter(stringWriter);
            e.printStackTrace(printWriter);
            log.error(stringWriter.toString());
            printWriter.close();
        }
    }

    public void onOtherEvent(SparkListenerEvent sparkListenerEvent) {
        try {
            if (sparkListenerEvent instanceof SparkListenerSQLExecutionStart) {
                SparkListenerSQLExecutionStart sparkListenerSQLExecutionStart = (SparkListenerSQLExecutionStart) sparkListenerEvent;
                log.debug("SQL Exec start event with id " + sparkListenerSQLExecutionStart.executionId());
                processExecution(sparkListenerSQLExecutionStart);
            } else if (sparkListenerEvent instanceof SparkListenerSQLExecutionEnd) {
                SparkListenerSQLExecutionEnd sparkListenerSQLExecutionEnd = (SparkListenerSQLExecutionEnd) sparkListenerEvent;
                log.debug("SQL Exec end event with id " + sparkListenerSQLExecutionEnd.executionId());
                processExecutionEnd(sparkListenerSQLExecutionEnd);
            }
        } catch (Exception e) {
            StringWriter stringWriter = new StringWriter();
            PrintWriter printWriter = new PrintWriter(stringWriter);
            e.printStackTrace(printWriter);
            log.error(stringWriter.toString());
            printWriter.close();
        }
    }

    public void processExecutionEnd(final SparkListenerSQLExecutionEnd sparkListenerSQLExecutionEnd) {
        LineageUtils.findSparkCtx().foreach(new AbstractFunction1<SparkContext, Void>() { // from class: datahub.spark.DatahubSparkListener.3
            public Void apply(SparkContext sparkContext) {
                SQLQueryExecStartEvent remove = DatahubSparkListener.this.appSqlDetails.get(sparkContext.applicationId()).remove(Long.valueOf(sparkListenerSQLExecutionEnd.executionId()));
                if (remove == null) {
                    DatahubSparkListener.log.error("Execution end event received, but start event missing for appId/sql exec Id " + sparkContext.applicationId() + ":" + sparkListenerSQLExecutionEnd.executionId());
                    return null;
                }
                if (remove.getDatasetLineage() == null) {
                    return null;
                }
                SQLQueryExecEndEvent sQLQueryExecEndEvent = new SQLQueryExecEndEvent(LineageUtils.getMaster(sparkContext), sparkContext.appName(), sparkContext.applicationId(), sparkListenerSQLExecutionEnd.time(), sparkListenerSQLExecutionEnd.executionId(), remove);
                McpEmitter mcpEmitter = DatahubSparkListener.this.appEmitters.get(sparkContext.applicationId());
                if (mcpEmitter == null) {
                    return null;
                }
                mcpEmitter.accept((LineageEvent) sQLQueryExecEndEvent);
                return null;
            }
        });
    }

    private synchronized void checkOrCreateApplicationSetup(SparkContext sparkContext) {
        String applicationId = sparkContext.applicationId();
        if (this.appConfig.get(applicationId) == null) {
            Config parseSparkConfig = LineageUtils.parseSparkConfig();
            this.appConfig.put(applicationId, parseSparkConfig);
            AppStartEvent appStartEvent = new AppStartEvent(LineageUtils.getMaster(sparkContext), getPipelineName(sparkContext), applicationId, sparkContext.startTime(), sparkContext.sparkUser(), parseSparkConfig.hasPath(PIPELINE_KEY) ? parseSparkConfig.getConfig(PIPELINE_KEY) : ConfigFactory.empty());
            this.appEmitters.computeIfAbsent(applicationId, str -> {
                return (parseSparkConfig.hasPath(COALESCE_KEY) && parseSparkConfig.getBoolean(COALESCE_KEY)) ? new CoalesceJobsEmitter(parseSparkConfig) : new McpEmitter(parseSparkConfig);
            }).accept((LineageEvent) appStartEvent);
            consumers().forEach(lineageConsumer -> {
                lineageConsumer.accept(appStartEvent);
            });
            this.appDetails.put(applicationId, appStartEvent);
            this.appSqlDetails.put(applicationId, new ConcurrentHashMap());
        }
    }

    private String getPipelineName(SparkContext sparkContext) {
        Config computeIfAbsent = this.appConfig.computeIfAbsent(sparkContext.applicationId(), str -> {
            return LineageUtils.parseSparkConfig();
        });
        if (computeIfAbsent.hasPath(DATABRICKS_CLUSTER_KEY)) {
            String str2 = computeIfAbsent.getString(DATABRICKS_CLUSTER_KEY) + "_" + sparkContext.applicationId();
        }
        String appName = sparkContext.appName();
        if (computeIfAbsent.hasPath(PIPELINE_PLATFORM_INSTANCE_KEY)) {
            appName = computeIfAbsent.getString(PIPELINE_PLATFORM_INSTANCE_KEY) + "." + appName;
        }
        return appName;
    }

    private void processExecution(SparkListenerSQLExecutionStart sparkListenerSQLExecutionStart) {
        QueryExecution queryExecution = SQLExecution.getQueryExecution(sparkListenerSQLExecutionStart.executionId());
        if (queryExecution == null) {
            log.error("Skipping processing for sql exec Id" + sparkListenerSQLExecutionStart.executionId() + " as Query execution context could not be read from current spark state");
            return;
        }
        LogicalPlan optimizedPlan = queryExecution.optimizedPlan();
        SparkContext sparkContext = queryExecution.sparkSession().sparkContext();
        checkOrCreateApplicationSetup(sparkContext);
        new SqlStartTask(sparkListenerSQLExecutionStart, optimizedPlan, sparkContext).run();
    }

    private List<LineageConsumer> consumers() {
        SparkConf conf = SparkEnv.get().conf();
        if (!conf.contains(CONSUMER_TYPE_KEY)) {
            return Collections.emptyList();
        }
        return (List) StreamSupport.stream(Splitter.on(UserAgentConstant.COMMA).trimResults().split(conf.get(CONSUMER_TYPE_KEY)).spliterator(), false).map(str -> {
            return LineageUtils.getConsumer(str);
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).collect(Collectors.toList());
    }
}
