package org.apache.kylin.query.runtime.plan;

import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.gluten.utils.QueryPlanSelector$;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.KapConfig;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.QueryContext;
import org.apache.kylin.common.exception.BigQueryException;
import org.apache.kylin.common.exception.NewQueryRefuseException;
import org.apache.kylin.common.exception.code.ErrorCodeServer;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.RandomUtil;
import org.apache.kylin.engine.spark.utils.LogEx;
import org.apache.kylin.metadata.project.NProjectManager;
import org.apache.kylin.metadata.query.BigQueryThresholdUpdater;
import org.apache.kylin.metadata.state.QueryShareStateManager;
import org.apache.kylin.query.engine.RelColumnMetaDataExtractor;
import org.apache.kylin.query.engine.exec.ExecuteResult;
import org.apache.kylin.query.pushdown.SparkSqlClient$;
import org.apache.kylin.query.relnode.ContextUtil;
import org.apache.kylin.query.util.AsyncQueryUtil;
import org.apache.kylin.query.util.QueryInterruptChecker;
import org.apache.kylin.query.util.SparkJobTrace;
import org.apache.kylin.query.util.SparkJobTrace$;
import org.apache.kylin.query.util.SparkQueryJobManager$;
import org.apache.poi.xssf.usermodel.XSSFRow;
import org.apache.poi.xssf.usermodel.XSSFSheet;
import org.apache.poi.xssf.usermodel.XSSFWorkbook;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.internal.Logging;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparderEnv$;
import org.apache.spark.sql.execution.CollectLimitExec;
import org.apache.spark.sql.execution.ColumnarToRowExec;
import org.apache.spark.sql.execution.InputAdapter;
import org.apache.spark.sql.execution.KylinFileSourceScanExec;
import org.apache.spark.sql.execution.LocalLimitExec;
import org.apache.spark.sql.execution.ProjectExec;
import org.apache.spark.sql.execution.QueryExecution;
import org.apache.spark.sql.execution.SparkPlan;
import org.apache.spark.sql.execution.WholeStageCodegenExec;
import org.apache.spark.sql.execution.gluten.KylinFileSourceScanExecTransformer;
import org.apache.spark.sql.hive.QueryMetricUtils$;
import org.apache.spark.sql.util.SparderTypeUtil$;
import org.slf4j.Logger;
import scala.Function0;
import scala.MatchError;
import scala.Predef$;
import scala.Tuple2;
import scala.Tuple3;
import scala.collection.IterableLike;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.convert.ImplicitConversions$;
import scala.collection.mutable.ArrayOps;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Buffer$;
import scala.collection.mutable.StringBuilder;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.LongRef;
import scala.runtime.ObjectRef;
import scala.runtime.RichInt$;

/* compiled from: ResultPlan.scala */
/* loaded from: input_file:org/apache/kylin/query/runtime/plan/ResultPlan$.class */
public final class ResultPlan$ implements LogEx {
    public static ResultPlan$ MODULE$;
    private final long PARTITION_SPLIT_BYTES;
    private final String SPARK_SCHEDULER_POOL;
    private final String QUOTE_CHAR;
    private final String END_OF_LINE_SYMBOLS;
    private final int CHECK_WRITE_SIZE;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    static {
        new ResultPlan$();
    }

    public <U> U logTime(String str, boolean z, Function0<U> function0) {
        return (U) LogEx.logTime$(this, str, z, function0);
    }

    public <U> boolean logTime$default$2() {
        return LogEx.logTime$default$2$(this);
    }

    public void logInfoIf(Function0<Object> function0, Function0<String> function02) {
        LogEx.logInfoIf$(this, function0, function02);
    }

    public void logWarningIf(Function0<Object> function0, Function0<String> function02) {
        LogEx.logWarningIf$(this, function0, function02);
    }

    public String logName() {
        return Logging.logName$(this);
    }

    public Logger log() {
        return Logging.log$(this);
    }

    public void logInfo(Function0<String> function0) {
        Logging.logInfo$(this, function0);
    }

    public void logDebug(Function0<String> function0) {
        Logging.logDebug$(this, function0);
    }

    public void logTrace(Function0<String> function0) {
        Logging.logTrace$(this, function0);
    }

    public void logWarning(Function0<String> function0) {
        Logging.logWarning$(this, function0);
    }

    public void logError(Function0<String> function0) {
        Logging.logError$(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.logInfo$(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.logDebug$(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.logTrace$(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.logWarning$(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.logError$(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    public void initializeLogIfNecessary(boolean z) {
        Logging.initializeLogIfNecessary$(this, z);
    }

    public boolean initializeLogIfNecessary(boolean z, boolean z2) {
        return Logging.initializeLogIfNecessary$(this, z, z2);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$(this);
    }

    public void initializeForcefully(boolean z, boolean z2) {
        Logging.initializeForcefully$(this, z, z2);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger logger) {
        this.org$apache$spark$internal$Logging$$log_ = logger;
    }

    public long PARTITION_SPLIT_BYTES() {
        return this.PARTITION_SPLIT_BYTES;
    }

    public String SPARK_SCHEDULER_POOL() {
        return this.SPARK_SCHEDULER_POOL;
    }

    public String QUOTE_CHAR() {
        return this.QUOTE_CHAR;
    }

    public String END_OF_LINE_SYMBOLS() {
        return this.END_OF_LINE_SYMBOLS;
    }

    public int CHECK_WRITE_SIZE() {
        return this.CHECK_WRITE_SIZE;
    }

    private Tuple2<Iterable<List<String>>, Object> collectInternal(Dataset<Row> dataset, RelDataType relDataType) {
        return (Tuple2) logTime("collectInternal", true, () -> {
            String name = Thread.currentThread().getName();
            SparkContext sparkContext = SparderEnv$.MODULE$.getSparkSession().sparkContext();
            KapConfig instanceFromEnv = KapConfig.getInstanceFromEnv();
            int sparkSqlShufflePartitions = instanceFromEnv.getSparkSqlShufflePartitions() != -1 ? instanceFromEnv.getSparkSqlShufflePartitions() : (int) Math.min((QueryContext.current().getMetrics().getSourceScanBytes() / MODULE$.PARTITION_SPLIT_BYTES()) + 1, SparderEnv$.MODULE$.getTotalCore());
            QueryContext.current().setShufflePartitions(sparkSqlShufflePartitions);
            MODULE$.logInfo(() -> {
                return new StringBuilder(63).append("partitions num are: ").append(sparkSqlShufflePartitions).append(",").append(" total scan bytes are: ").append(QueryContext.current().getMetrics().getSourceScanBytes()).append(",").append(" total cores are: ").append(SparderEnv$.MODULE$.getTotalCore()).toString();
            });
            String queryId = QueryContext.current().getQueryId();
            sparkContext.setLocalProperty(QueryToExecutionIDCache$.MODULE$.KYLIN_QUERY_ID_KEY(), queryId);
            sparkContext.setJobGroup(name, QueryContext.current().getMetrics().getCorrectedSql(), true);
            try {
                try {
                    long autoBroadcastJoinThreshold = SparderEnv$.MODULE$.getSparkSession().sessionState().conf().autoBroadcastJoinThreshold();
                    SparkPlan executedPlan = dataset.queryExecution().executedPlan();
                    LongRef create = LongRef.create(QueryContext.current().getMetrics().getAccumSourceScanRows());
                    if (KapConfig.getInstanceFromEnv().isQueryLimitEnabled() && KapConfig.getInstanceFromEnv().isApplyLimitInfoToSourceScanRowsEnabled()) {
                        AtomicLong atomicLong = new AtomicLong(0L);
                        MODULE$.extractEachStageLimitRows(executedPlan, -1, atomicLong);
                        create.elem = atomicLong.get();
                        MODULE$.logDebug(() -> {
                            return new StringBuilder(47).append("Spark executed plan is \n ").append(executedPlan).append("; \n accumRowsCounter: ").append(atomicLong).toString();
                        });
                    }
                    MODULE$.logInfo(() -> {
                        return new StringBuilder(46).append("autoBroadcastJoinThreshold: [before:").append(autoBroadcastJoinThreshold).append(", ").append("after: ").append(SparderEnv$.MODULE$.getSparkSession().sessionState().conf().autoBroadcastJoinThreshold()).append("]").toString();
                    });
                    sparkContext.setLocalProperty("source_scan_rows", Long.toString(QueryContext.current().getMetrics().getSourceScanRows()));
                    MODULE$.logDebug(() -> {
                        return new StringBuilder(20).append("source_scan_rows is ").append(Long.toString(QueryContext.current().getMetrics().getSourceScanRows())).toString();
                    });
                    long bigQueryThreshold = BigQueryThresholdUpdater.getBigQueryThreshold();
                    String queryFairSchedulerPool = MODULE$.getQueryFairSchedulerPool(sparkContext.getConf(), QueryContext.current(), bigQueryThreshold, create.elem, sparkSqlShufflePartitions);
                    sparkContext.setLocalProperty(MODULE$.SPARK_SCHEDULER_POOL(), queryFairSchedulerPool);
                    MODULE$.logDebug(() -> {
                        return new StringBuilder(24).append("Total source scan rows: ").append(create.elem).toString();
                    });
                    MODULE$.ifRefuseQuery(create.elem, bigQueryThreshold, (List) JavaConverters$.MODULE$.seqAsJavaListConverter(new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(new Long[]{new Long(create.elem)})).toList()).asJava(), QueryContext.current().isIfBigQuery());
                    QueryContext.current().record("executed_plan");
                    QueryContext.currentTrace().endLastSpan();
                    SparkJobTrace sparkJobTrace = new SparkJobTrace(name, QueryContext.currentTrace(), QueryContext.current().getQueryId(), sparkContext, SparkJobTrace$.MODULE$.$lessinit$greater$default$5());
                    Tuple2 collectToIterator = NProjectManager.getProjectConfig(QueryContext.current().getProject()).isQueryUseIterableCollectApi() ? dataset.collectToIterator() : dataset.toIterator();
                    Iterator<Row> it = (Iterator) collectToIterator._1();
                    int _2$mcI$sp = collectToIterator._2$mcI$sp();
                    if (instanceFromEnv.isQuerySparkJobTraceEnabled()) {
                        sparkJobTrace.jobFinished();
                    }
                    QueryContext.current().record("collect_result");
                    Tuple2<List<Long>, List<Long>> collectScanMetrics = QueryMetricUtils$.MODULE$.collectScanMetrics(dataset.queryExecution().executedPlan());
                    if (collectScanMetrics == null) {
                        throw new MatchError(collectScanMetrics);
                    }
                    Tuple2 tuple2 = new Tuple2((List) collectScanMetrics._1(), (List) collectScanMetrics._2());
                    List list = (List) tuple2._1();
                    List list2 = (List) tuple2._2();
                    Tuple3<Long, Long, Long> collectTaskRelatedMetrics = QueryMetricUtils$.MODULE$.collectTaskRelatedMetrics(name, sparkContext);
                    if (collectTaskRelatedMetrics == null) {
                        throw new MatchError(collectTaskRelatedMetrics);
                    }
                    Tuple3 tuple3 = new Tuple3((Long) collectTaskRelatedMetrics._1(), (Long) collectTaskRelatedMetrics._2(), (Long) collectTaskRelatedMetrics._3());
                    Long l = (Long) tuple3._1();
                    Long l2 = (Long) tuple3._2();
                    Long l3 = (Long) tuple3._3();
                    QueryContext.current().getMetrics().setScanRows(list);
                    QueryContext.current().getMetrics().setScanBytes(list2);
                    QueryContext.current().getMetrics().setQueryJobCount(Predef$.MODULE$.Long2long(l));
                    QueryContext.current().getMetrics().setQueryStageCount(Predef$.MODULE$.Long2long(l2));
                    QueryContext.current().getMetrics().setQueryTaskCount(Predef$.MODULE$.Long2long(l3));
                    MODULE$.logInfo(() -> {
                        return new StringBuilder(144).append("Actual total scan count: ").append(list).append(", ").append("file scan row count: ").append(QueryContext.current().getMetrics().getAccumSourceScanRows()).append(", ").append("may apply limit row count: ").append(create.elem).append(", Big query threshold: ").append(bigQueryThreshold).append(", Allocate pool: ").append(queryFairSchedulerPool).append(", ").append("Is Vip: ").append(QueryContext.current().getQueryTagInfo().isHighPriorityQuery()).append(", ").append("Is TableIndex: ").append(QueryContext.current().getQueryTagInfo().isTableIndex()).toString();
                    });
                    return new Tuple2(MODULE$.readResultRow(it, (Buffer) JavaConverters$.MODULE$.asScalaBufferConverter(relDataType.getFieldList()).asScala()), BoxesRunTime.boxToInteger(_2$mcI$sp));
                } catch (Throwable th) {
                    if (th instanceof InterruptedException) {
                        Thread.currentThread().interrupt();
                        sparkContext.cancelJobGroup(name);
                        QueryInterruptChecker.checkThreadInterrupted("Interrupted at the stage of collecting result in ResultPlan.", "Current step: Collecting dataset for sparder.");
                    }
                    throw th;
                }
            } finally {
                QueryContext.current().setExecutionID(QueryToExecutionIDCache$.MODULE$.getQueryExecutionID(queryId));
            }
        });
    }

    private void ifRefuseQuery(long j, long j2, List<Long> list, boolean z) {
        if (!QueryShareStateManager.isShareStateSwitchEnabled() || j < j2 || (!SparkQueryJobManager$.MODULE$.isNewBigQueryRefuse() && !KapConfig.getInstanceFromEnv().isBigQueryLimitEnable())) {
            if (z) {
                QueryContext.current().getMetrics().setScanRows(list);
                QueryContext.current().setBigQuery(false);
                throw new BigQueryException("This query is non bigquery.");
            }
            return;
        }
        if (!z) {
            QueryContext.current().getQueryTagInfo().setRefused(true);
            throw new NewQueryRefuseException(new StringBuilder(125).append("Refuse new big query, sum of source_scan_rows is ").append(j).append(", refuse query threshold is ").append(j2).append(". Current step: Collecting dataset for sparder. ").toString());
        }
        QueryContext.current().setBigQuery(true);
        QueryContext.current().getMetrics().setScanRows(list);
        throw new BigQueryException("This query is bigquery.");
    }

    public Iterable<List<String>> readResultRow(Iterator<Row> it, Buffer<RelDataTypeField> buffer) {
        return () -> {
            return new Iterator<List<String>>(it, buffer) { // from class: org.apache.kylin.query.runtime.plan.ResultPlan$$anon$1
                private final Iterator resultRows$1;
                private final Buffer resultTypes$1;

                @Override // java.util.Iterator
                public void remove() {
                    super.remove();
                }

                @Override // java.util.Iterator
                public void forEachRemaining(Consumer<? super List<String>> consumer) {
                    super.forEachRemaining(consumer);
                }

                @Override // java.util.Iterator
                public boolean hasNext() {
                    return this.resultRows$1.hasNext();
                }

                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.Iterator
                public List<String> next() {
                    Row row = (Row) this.resultRows$1.next();
                    if (Thread.interrupted()) {
                        throw new InterruptedException();
                    }
                    return (List) JavaConverters$.MODULE$.seqAsJavaListConverter((Seq) ((TraversableLike) row.toSeq().zip(this.resultTypes$1, Seq$.MODULE$.canBuildFrom())).map(tuple2 -> {
                        if (tuple2 == null) {
                            throw new MatchError(tuple2);
                        }
                        return SparderTypeUtil$.MODULE$.convertToStringWithCalciteType(tuple2._1(), ((RelDataTypeField) tuple2._2()).getType(), SparderTypeUtil$.MODULE$.convertToStringWithCalciteType$default$3());
                    }, Seq$.MODULE$.canBuildFrom())).asJava();
                }

                {
                    this.resultRows$1 = it;
                    this.resultTypes$1 = buffer;
                }
            };
        };
    }

    private String getNormalizedExplain(Dataset<Row> dataset) {
        return dataset.queryExecution().executedPlan().toString().replaceAll("#\\d+", "#x");
    }

    public String getQueryFairSchedulerPool(SparkConf sparkConf, QueryContext queryContext, long j, long j2, int i) {
        String str = "heavy_tasks";
        if (queryContext.getQueryTagInfo().isHighPriorityQuery()) {
            str = "vip_tasks";
        } else if (queryContext.getQueryTagInfo().isTableIndex()) {
            str = "extreme_heavy_tasks";
        } else if (KapConfig.getInstanceFromEnv().isQueryLimitEnabled() && SparderEnv$.MODULE$.isSparkExecutorResourceLimited(sparkConf)) {
            if (j2 < j) {
                str = "lightweight_tasks";
            }
        } else if (i < SparderEnv$.MODULE$.getTotalCore()) {
            str = "lightweight_tasks";
        }
        return str;
    }

    public void extractEachStageLimitRows(SparkPlan sparkPlan, int i, AtomicLong atomicLong) {
        BoxedUnit boxedUnit;
        if (sparkPlan instanceof KylinFileSourceScanExec) {
            long sourceScanRows = ((KylinFileSourceScanExec) sparkPlan).getSourceScanRows();
            long min = i > 0 ? Math.min(i, sourceScanRows) : sourceScanRows;
            atomicLong.addAndGet(min);
            logDebug(() -> {
                return new StringBuilder(75).append("Apply limit to source scan, sourceScanRows: ").append(sourceScanRows).append(", ").append("stageLimit: ").append(i).append(", finalScanRows: ").append(min).toString();
            });
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            return;
        }
        if (sparkPlan instanceof KylinFileSourceScanExecTransformer) {
            long sourceScanRows2 = ((KylinFileSourceScanExecTransformer) sparkPlan).getSourceScanRows();
            long min2 = i > 0 ? Math.min(i, sourceScanRows2) : sourceScanRows2;
            atomicLong.addAndGet(min2);
            logDebug(() -> {
                return new StringBuilder(75).append("Apply limit to source scan, sourceScanRows: ").append(sourceScanRows2).append(", ").append("stageLimit: ").append(i).append(", finalScanRows: ").append(min2).toString();
            });
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
            return;
        }
        IntRef create = IntRef.create(i);
        if (sparkPlan instanceof LocalLimitExec) {
            create.elem = ((LocalLimitExec) sparkPlan).limit();
            BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
        } else if (sparkPlan instanceof CollectLimitExec) {
            create.elem = ((CollectLimitExec) sparkPlan).limit();
            BoxedUnit boxedUnit5 = BoxedUnit.UNIT;
        } else if ((sparkPlan instanceof ProjectExec) || (sparkPlan instanceof ColumnarToRowExec) || (sparkPlan instanceof InputAdapter) || (sparkPlan instanceof WholeStageCodegenExec)) {
            boxedUnit = BoxedUnit.UNIT;
        } else {
            create.elem = -1;
            boxedUnit = BoxedUnit.UNIT;
        }
        sparkPlan.children().foreach(sparkPlan2 -> {
            $anonfun$extractEachStageLimitRows$3(create, atomicLong, sparkPlan2);
            return BoxedUnit.UNIT;
        });
        BoxedUnit boxedUnit6 = BoxedUnit.UNIT;
    }

    public <U> U withScope(Dataset<Row> dataset, Function0<U> function0) {
        HadoopUtil.setCurrentConfiguration(dataset.sparkSession().sparkContext().hadoopConfiguration());
        try {
            return (U) function0.apply();
        } finally {
            dataset.sparkSession().sparkContext().setLocalProperty(SPARK_SCHEDULER_POOL(), (String) null);
            dataset.sparkSession().sessionState().conf().setLocalProperty("spark.sql.shuffle.partitions", (String) null);
            SparderEnv$.MODULE$.setDF(dataset);
            HadoopUtil.setCurrentConfiguration((Configuration) null);
        }
    }

    public ExecuteResult getResult(Dataset<Row> dataset, RelDataType relDataType) {
        return (ExecuteResult) withScope(dataset, () -> {
            if (!ContextUtil.getNativeRealizations().isEmpty() && !KylinConfig.getInstanceFromEnv().queryIndexUseGluten()) {
                dataset.sparkSession().sparkContext().setLocalProperty(QueryPlanSelector$.MODULE$.GLUTEN_ENABLE_FOR_THREAD_KEY(), "false");
            }
            QueryContext.QueryTagInfo queryTagInfo = QueryContext.current().getQueryTagInfo();
            if (queryTagInfo.isAsyncQuery()) {
                MODULE$.saveAsyncQueryResult(dataset, queryTagInfo.getFileFormat(), queryTagInfo.getFileEncode(), relDataType);
            }
            Tuple2<Iterable<List<String>>, Object> tuple2 = (!Predef$.MODULE$.Boolean2boolean(SparderEnv$.MODULE$.needCompute()) || QueryContext.current().getQueryTagInfo().isAsyncQuery()) ? new Tuple2<>(new LinkedList(), BoxesRunTime.boxToInteger(0)) : MODULE$.collectInternal(dataset, relDataType);
            return new ExecuteResult((Iterable) tuple2._1(), tuple2._2$mcI$sp());
        });
    }

    public ExecuteResult completeResultForMdx(Dataset<Row> dataset, RelDataType relDataType) {
        SparderEnv$.MODULE$.setDF(dataset.toDF((Seq) ((Buffer) JavaConverters$.MODULE$.asScalaBufferConverter(RelColumnMetaDataExtractor.getColumnMetadata(relDataType)).asScala()).map(structField -> {
            return structField.getName();
        }, Buffer$.MODULE$.canBuildFrom())));
        return new ExecuteResult(new LinkedList(), 0);
    }

    public Dataset<Row> wrapAlias(Dataset<Row> dataset, RelDataType relDataType) {
        Dataset<Row> df = dataset.toDF((Buffer) ((TraversableLike) JavaConverters$.MODULE$.asScalaBufferConverter(relDataType.getFieldList()).asScala()).map(relDataTypeField -> {
            return relDataTypeField.getName();
        }, Buffer$.MODULE$.canBuildFrom()));
        logInfo(() -> {
            return new StringBuilder(15).append("Wrap ALIAS ").append(dataset.schema().treeString()).append(" TO ").append(df.schema().treeString()).toString();
        });
        return df;
    }

    public void saveAsyncQueryResult(Dataset<Row> dataset, String str, String str2, RelDataType relDataType) {
        KapConfig instanceFromEnv = KapConfig.getInstanceFromEnv();
        SparderEnv$.MODULE$.setDF(dataset);
        String sb = new StringBuilder(1).append(KapConfig.getInstanceFromEnv().getAsyncResultBaseDir(QueryContext.current().getProject())).append("/").append(QueryContext.current().getQueryId()).toString();
        String randomUUIDStr = RandomUtil.randomUUIDStr();
        String name = Thread.currentThread().getName();
        SparkContext sparkContext = SparderEnv$.MODULE$.getSparkSession().sparkContext();
        sparkContext.setJobGroup(name, QueryContext.current().getMetrics().getCorrectedSql(), true);
        if (instanceFromEnv.isQueryLimitEnabled() && SparderEnv$.MODULE$.isSparkExecutorResourceLimited(sparkContext.getConf())) {
            sparkContext.setLocalProperty(SPARK_SCHEDULER_POOL(), "async_query_tasks");
        }
        dataset.sparkSession().sparkContext().setLocalProperty(QueryToExecutionIDCache$.MODULE$.KYLIN_QUERY_EXECUTION_ID(), randomUUIDStr);
        QueryContext.currentTrace().endLastSpan();
        SparkJobTrace sparkJobTrace = new SparkJobTrace(name, QueryContext.currentTrace(), QueryContext.current().getQueryId(), sparkContext, SparkJobTrace$.MODULE$.$lessinit$greater$default$5());
        String queryId = QueryContext.current().getQueryId();
        boolean isIncludeHeader = QueryContext.current().getQueryTagInfo().isIncludeHeader();
        if ("json".equals(str)) {
            String[] columns = dataset.columns();
            List columnNames = QueryContext.current().getColumnNames();
            ObjectRef create = ObjectRef.create(dataset);
            RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), columnNames.size()).foreach$mVc$sp(i -> {
                create.elem = ((Dataset) create.elem).withColumnRenamed(columns[i], (String) columnNames.get(i));
            });
            ((Dataset) create.elem).write().option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSZ").option("encoding", str2).option("charset", "utf-8").mode(SaveMode.Append).json(sb);
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else if ("parquet".equals(str)) {
            SQLContext sqlContext = SparderEnv$.MODULE$.getSparkSession().sqlContext();
            sqlContext.setConf("spark.sql.parquet.writeLegacyFormat", "true");
            if (relDataType != null) {
                normalizeSchema(wrapAlias(dataset, relDataType)).write().mode(SaveMode.Overwrite).option("encoding", str2).option("charset", "utf-8").parquet(sb);
            } else {
                normalizeSchema(dataset).write().mode(SaveMode.Overwrite).option("encoding", str2).option("charset", "utf-8").parquet(sb);
            }
            sqlContext.setConf("spark.sql.parquet.writeLegacyFormat", "false");
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        } else if ("csv".equals(str)) {
            processCsv(dataset, str, relDataType, sb, queryId, isIncludeHeader);
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
        } else if ("xlsx".equals(str)) {
            processXlsx(dataset, str, relDataType, sb, queryId, isIncludeHeader);
            BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
        } else {
            normalizeSchema(dataset).write().option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSZ").option("encoding", str2).option("charset", "utf-8").mode(SaveMode.Append).parquet(sb);
            BoxedUnit boxedUnit5 = BoxedUnit.UNIT;
        }
        AsyncQueryUtil.createSuccessFlagWithContent(QueryContext.current().getProject(), QueryContext.current().getQueryId(), QueryContext.current().isOutOfSegmentRange() ? new AsyncQueryUtil.SuccessFileContent(ErrorCodeServer.ASYNC_QUERY_OUT_OF_DATA_RANGE.getErrorCode().getCode()) : null);
        if (instanceFromEnv.isQuerySparkJobTraceEnabled()) {
            sparkJobTrace.jobFinished();
        }
        if (KylinConfig.getInstanceFromEnv().isUTEnv()) {
            return;
        }
        QueryExecution queryExecution = QueryToExecutionIDCache$.MODULE$.getQueryExecution(randomUUIDStr);
        Tuple2<List<Long>, List<Long>> collectScanMetrics = QueryMetricUtils$.MODULE$.collectScanMetrics(queryExecution.executedPlan());
        if (collectScanMetrics == null) {
            throw new MatchError(collectScanMetrics);
        }
        Tuple2 tuple2 = new Tuple2((List) collectScanMetrics._1(), (List) collectScanMetrics._2());
        List list = (List) tuple2._1();
        List list2 = (List) tuple2._2();
        Tuple3<Long, Long, Long> collectTaskRelatedMetrics = QueryMetricUtils$.MODULE$.collectTaskRelatedMetrics(name, sparkContext);
        if (collectTaskRelatedMetrics == null) {
            throw new MatchError(collectTaskRelatedMetrics);
        }
        Tuple3 tuple3 = new Tuple3((Long) collectTaskRelatedMetrics._1(), (Long) collectTaskRelatedMetrics._2(), (Long) collectTaskRelatedMetrics._3());
        Long l = (Long) tuple3._1();
        Long l2 = (Long) tuple3._2();
        Long l3 = (Long) tuple3._3();
        logInfo(() -> {
            return new StringBuilder(27).append("scanRows is ").append(list).append(", scanBytes is ").append(list2).toString();
        });
        QueryContext.current().getMetrics().setScanRows(list);
        QueryContext.current().getMetrics().setScanBytes(list2);
        QueryContext.current().getMetrics().setQueryJobCount(Predef$.MODULE$.Long2long(l));
        QueryContext.current().getMetrics().setQueryStageCount(Predef$.MODULE$.Long2long(l2));
        QueryContext.current().getMetrics().setQueryTaskCount(Predef$.MODULE$.Long2long(l3));
        setResultRowCount(queryExecution.executedPlan());
    }

    public void setResultRowCount(SparkPlan sparkPlan) {
        if (QueryContext.current().getMetrics().getResultRowCount() == 0) {
            QueryContext.current().getMetrics().setResultRowCount(BoxesRunTime.unboxToLong(sparkPlan.metrics().get("numOutputRows").map(sQLMetric -> {
                return BoxesRunTime.boxToLong(sQLMetric.value());
            }).getOrElse(() -> {
                return 0L;
            })));
        }
    }

    public void processCsv(Dataset<Row> dataset, String str, RelDataType relDataType, String str2, String str3, boolean z) {
        File createTmpFile = createTmpFile(str3, str);
        OutputStreamWriter outputStreamWriter = new OutputStreamWriter(new FileOutputStream(createTmpFile), StandardCharsets.UTF_8);
        if (z) {
            processCsvHeader(outputStreamWriter, relDataType);
        }
        Tuple2 iterator = dataset.toIterator();
        if (iterator == null) {
            throw new MatchError(iterator);
        }
        Tuple2 tuple2 = new Tuple2((Iterator) iterator._1(), BoxesRunTime.boxToInteger(iterator._2$mcI$sp()));
        Iterator<Row> it = (Iterator) tuple2._1();
        int _2$mcI$sp = tuple2._2$mcI$sp();
        asyncQueryIteratorWriteCsv(it, outputStreamWriter, relDataType);
        uploadAsyncQueryResult(createTmpFile, str2, str3, str);
        setResultRowCount(_2$mcI$sp);
    }

    public void processXlsx(Dataset<Row> dataset, String str, RelDataType relDataType, String str2, String str3, boolean z) {
        File createTmpFile = createTmpFile(str3, str);
        FileOutputStream fileOutputStream = new FileOutputStream(createTmpFile);
        XSSFWorkbook xSSFWorkbook = new XSSFWorkbook();
        XSSFSheet createSheet = xSSFWorkbook.createSheet("query_result");
        IntRef create = IntRef.create(0);
        if (z) {
            processXlsxHeader(createSheet, relDataType);
            create.elem++;
        }
        Tuple2 iterator = dataset.toIterator();
        if (iterator == null) {
            throw new MatchError(iterator);
        }
        Tuple2 tuple2 = new Tuple2((Iterator) iterator._1(), BoxesRunTime.boxToInteger(iterator._2$mcI$sp()));
        Iterator it = (Iterator) tuple2._1();
        int _2$mcI$sp = tuple2._2$mcI$sp();
        ImplicitConversions$.MODULE$.iterator$u0020asScala(it).foreach(row -> {
            $anonfun$processXlsx$1(createSheet, create, row);
            return BoxedUnit.UNIT;
        });
        xSSFWorkbook.write(fileOutputStream);
        uploadAsyncQueryResult(createTmpFile, str2, str3, str);
        setResultRowCount(_2$mcI$sp);
    }

    private void setResultRowCount(int i) {
        if (KylinConfig.getInstanceFromEnv().isUTEnv()) {
            return;
        }
        QueryContext.current().getMetrics().setResultRowCount(i);
    }

    public void processCsvHeader(OutputStreamWriter outputStreamWriter, RelDataType relDataType) {
        String separator = QueryContext.current().getQueryTagInfo().getSeparator();
        if (relDataType == null) {
            outputStreamWriter.write(new StringBuilder(0).append(((TraversableOnce) JavaConverters$.MODULE$.asScalaBufferConverter(QueryContext.current().getColumnNames()).asScala()).mkString(separator)).append(END_OF_LINE_SYMBOLS()).toString());
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            StringBuilder stringBuilder = new StringBuilder();
            ((IterableLike) ((TraversableLike) JavaConverters$.MODULE$.asScalaBufferConverter(relDataType.getFieldList()).asScala()).map(relDataTypeField -> {
                return relDataTypeField.getName();
            }, Buffer$.MODULE$.canBuildFrom())).foreach(str -> {
                return stringBuilder.append(new StringBuilder(0).append(separator).append(str).toString());
            });
            stringBuilder.deleteCharAt(0);
            outputStreamWriter.write(new StringBuilder(0).append(stringBuilder.toString()).append(END_OF_LINE_SYMBOLS()).toString());
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
        outputStreamWriter.flush();
    }

    public void processXlsxHeader(XSSFSheet xSSFSheet, RelDataType relDataType) {
        XSSFRow createRow = xSSFSheet.createRow(0);
        if (relDataType != null) {
            ((IterableLike) ((Buffer) ((TraversableLike) JavaConverters$.MODULE$.asScalaBufferConverter(relDataType.getFieldList()).asScala()).map(relDataTypeField -> {
                return relDataTypeField.getName();
            }, Buffer$.MODULE$.canBuildFrom())).zipWithIndex(Buffer$.MODULE$.canBuildFrom())).foreach(tuple2 -> {
                $anonfun$processXlsxHeader$3(createRow, tuple2);
                return BoxedUnit.UNIT;
            });
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            ((IterableLike) ((IterableLike) JavaConverters$.MODULE$.asScalaBufferConverter(QueryContext.current().getColumnNames()).asScala()).zipWithIndex(Buffer$.MODULE$.canBuildFrom())).foreach(tuple22 -> {
                $anonfun$processXlsxHeader$1(createRow, tuple22);
                return BoxedUnit.UNIT;
            });
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
    }

    public File createTmpFile(String str, String str2) {
        File file = new File(new StringBuilder(0).append(str).append(str2).toString());
        file.createNewFile();
        return file;
    }

    public void uploadAsyncQueryResult(File file, String str, String str2, String str3) {
        AsyncQueryUtil.getFileSystem().copyFromLocalFile(true, true, new Path(file.getAbsolutePath()), new Path(new StringBuilder(2).append(str).append("/").append(str2).append(".").append(str3).toString()));
        if (file.exists()) {
            file.delete();
        }
    }

    public void asyncQueryIteratorWriteCsv(Iterator<Row> it, OutputStreamWriter outputStreamWriter, RelDataType relDataType) {
        IntRef create = IntRef.create(0);
        String separator = QueryContext.current().getQueryTagInfo().getSeparator();
        (relDataType != null ? readResultRow(it, (Buffer) JavaConverters$.MODULE$.asScalaBufferConverter(relDataType.getFieldList()).asScala()) : SparkSqlClient$.MODULE$.readPushDownResultRow(it, false)).forEach(list -> {
            create.elem++;
            StringBuilder stringBuilder = new StringBuilder();
            RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), list.size()).foreach(obj -> {
                return $anonfun$asyncQueryIteratorWriteCsv$2(list, stringBuilder, separator, BoxesRunTime.unboxToInt(obj));
            });
            stringBuilder.append(MODULE$.END_OF_LINE_SYMBOLS());
            outputStreamWriter.write(stringBuilder.toString());
            if (create.elem % MODULE$.CHECK_WRITE_SIZE() == 0) {
                outputStreamWriter.flush();
            }
        });
        outputStreamWriter.flush();
    }

    public String encodeCell(String str, String str2) {
        String str3 = str;
        boolean z = str3.contains(str2) || str3.contains("\r") || str3.contains("\n");
        if (str3.contains(QUOTE_CHAR())) {
            z = true;
            str3 = str3.replace(QUOTE_CHAR(), new StringBuilder(0).append(QUOTE_CHAR()).append(QUOTE_CHAR()).toString());
        }
        return z ? new StringBuilder(0).append(QUOTE_CHAR()).append(str3).append(QUOTE_CHAR()).toString() : str3;
    }

    public Seq<String> normalize(Seq<String> seq) {
        return (Seq) seq.map(str -> {
            return str.replace(" ", "_").replace(",", "_").replace(";", "_").replace("{", "_").replace("}", "_").replace("(", "_").replace(")", "_").replace("\\n", "_").replace("\\t", "_").replace("=", "_");
        }, Seq$.MODULE$.canBuildFrom());
    }

    public Dataset<Row> normalizeSchema(Dataset<Row> dataset) {
        return dataset.toDF(normalize(Predef$.MODULE$.wrapRefArray(dataset.columns())));
    }

    public static final /* synthetic */ void $anonfun$extractEachStageLimitRows$3(IntRef intRef, AtomicLong atomicLong, SparkPlan sparkPlan) {
        MODULE$.extractEachStageLimitRows(sparkPlan, intRef.elem, atomicLong);
    }

    public static final /* synthetic */ void $anonfun$processXlsx$2(XSSFRow xSSFRow, Tuple2 tuple2) {
        xSSFRow.createCell(tuple2._2$mcI$sp()).setCellValue(tuple2._1().toString());
    }

    public static final /* synthetic */ void $anonfun$processXlsx$1(XSSFSheet xSSFSheet, IntRef intRef, Row row) {
        XSSFRow createRow = xSSFSheet.createRow(intRef.elem);
        ((IterableLike) row.toSeq().zipWithIndex(Seq$.MODULE$.canBuildFrom())).foreach(tuple2 -> {
            $anonfun$processXlsx$2(createRow, tuple2);
            return BoxedUnit.UNIT;
        });
        intRef.elem++;
    }

    public static final /* synthetic */ void $anonfun$processXlsxHeader$1(XSSFRow xSSFRow, Tuple2 tuple2) {
        xSSFRow.createCell(tuple2._2$mcI$sp()).setCellValue((String) tuple2._1());
    }

    public static final /* synthetic */ void $anonfun$processXlsxHeader$3(XSSFRow xSSFRow, Tuple2 tuple2) {
        xSSFRow.createCell(tuple2._2$mcI$sp()).setCellValue((String) tuple2._1());
    }

    public static final /* synthetic */ StringBuilder $anonfun$asyncQueryIteratorWriteCsv$2(List list, StringBuilder stringBuilder, String str, int i) {
        String str2 = list.get(i) == null ? "" : (String) list.get(i);
        if (i > 0) {
            stringBuilder.append(str);
        } else {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        }
        return stringBuilder.append(MODULE$.encodeCell(str2, str));
    }

    private ResultPlan$() {
        MODULE$ = this;
        Logging.$init$(this);
        LogEx.$init$(this);
        this.PARTITION_SPLIT_BYTES = KylinConfig.getInstanceFromEnv().getQueryPartitionSplitSizeMB() * 1024 * 1024;
        this.SPARK_SCHEDULER_POOL = "spark.scheduler.pool";
        this.QUOTE_CHAR = "\"";
        this.END_OF_LINE_SYMBOLS = "\n";
        this.CHECK_WRITE_SIZE = 1000;
    }
}
