package org.apache.kylin.query.pushdown;

import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.kylin.cache.kylin.KylinCacheFileSystem;
import org.apache.kylin.common.KapConfig;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.QueryContext;
import org.apache.kylin.common.util.DateFormat;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.fileseg.FileSegments;
import org.apache.kylin.guava30.shaded.common.collect.ImmutableList;
import org.apache.kylin.guava30.shaded.common.collect.Lists;
import org.apache.kylin.metadata.project.NProjectManager;
import org.apache.kylin.metadata.query.StructField;
import org.apache.kylin.query.mask.QueryResultMasks;
import org.apache.kylin.query.runtime.plan.QueryToExecutionIDCache$;
import org.apache.kylin.query.runtime.plan.ResultPlan$;
import org.apache.kylin.query.util.QueryInterruptChecker;
import org.apache.kylin.query.util.SparkJobTrace;
import org.apache.kylin.query.util.SparkJobTrace$;
import org.apache.kylin.softaffinity.SoftAffinityManager$;
import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparderEnv$;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.hive.QueryMetricUtils$;
import org.apache.spark.sql.hive.utils.ResourceDetectUtils$;
import org.apache.spark.sql.util.SparderTypeUtil$;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Array$;
import scala.MatchError;
import scala.Predef$;
import scala.Tuple2;
import scala.Tuple3;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Iterable$;
import scala.collection.immutable.Map;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.ArrayOps;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Buffer$;
import scala.collection.mutable.WrappedArray;
import scala.concurrent.duration.Duration$;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: SparkSqlClient.scala */
/* loaded from: input_file:org/apache/kylin/query/pushdown/SparkSqlClient$.class */
public final class SparkSqlClient$ {
    public static SparkSqlClient$ MODULE$;
    private final String DEFAULT_DB;
    private final String SHUFFLE_PARTITION;
    private final Logger logger;

    static {
        new SparkSqlClient$();
    }

    public String DEFAULT_DB() {
        return this.DEFAULT_DB;
    }

    public String SHUFFLE_PARTITION() {
        return this.SHUFFLE_PARTITION;
    }

    public Logger logger() {
        return this.logger;
    }

    public Pair<List<List<String>>, List<StructField>> executeSql(SparkSession sparkSession, String str, UUID uuid, String str2) {
        Tuple3<Iterable<List<String>>, Object, List<StructField>> executeSqlToIterable = executeSqlToIterable(sparkSession, str, uuid, str2);
        return Pair.newPair(ImmutableList.copyOf((Iterable) executeSqlToIterable._1()), executeSqlToIterable._3());
    }

    public Tuple3<Iterable<List<String>>, Object, List<StructField>> executeSqlToIterable(SparkSession sparkSession, String str, UUID uuid, String str2) {
        sparkSession.sparkContext().setLocalProperty("spark.scheduler.pool", "query_pushdown");
        HadoopUtil.setCurrentConfiguration(sparkSession.sparkContext().hadoopConfiguration());
        sparkSession.sparkContext().setLocalProperty(QueryToExecutionIDCache$.MODULE$.KYLIN_QUERY_ID_KEY(), QueryContext.current().getQueryId());
        logger().info("Start to run sql with SparkSQL...");
        try {
            KylinCacheFileSystem.processAcceptCacheTimeInHintStr(QueryContext.current().getFirstHintStr());
            sparkSession.sessionState().conf().setLocalProperty(DEFAULT_DB(), StringUtils.isNotBlank(str2) ? NProjectManager.getInstance(KylinConfig.getInstanceFromEnv()).getDefaultDatabase(str2) : null);
            Dataset sql = sparkSession.sql(str);
            if (NProjectManager.getProjectConfig(str2).isPrintQueryPlanEnabled()) {
                logger().info(sql.queryExecution().logical().toString());
            }
            Dataset<Row> maskResult = QueryResultMasks.maskResult(sql);
            logger().info("SparkSQL returned result DataFrame");
            QueryContext.current().record("to_spark_plan");
            autoSetShufflePartitions(maskResult);
            QueryContext.current().record("auto_set_parts");
            Tuple3<Iterable<List<String>>, Object, List<StructField>> tuple3 = FileSegments.isSyncFileSegSql(str) ? new Tuple3<>(ImmutableList.of(ImmutableList.of(Integer.toString(maskResult.rdd().getNumPartitions()))), BoxesRunTime.boxToInteger(1), ImmutableList.of(new StructField("CNT", -5, "BIGINT", 0, 0, false))) : dfToList(sparkSession, str, maskResult);
            QueryContext.current().record("collect_result");
            SoftAffinityManager$.MODULE$.logAuditAsks();
            return tuple3;
        } finally {
            sparkSession.sessionState().conf().setLocalProperty(DEFAULT_DB(), (String) null);
            KylinCacheFileSystem.clearAcceptCacheTimeLocally();
        }
    }

    private void autoSetShufflePartitions(final Dataset<Row> dataset) {
        final KylinConfig instanceFromEnv = KylinConfig.getInstanceFromEnv();
        int i = new StringOps(Predef$.MODULE$.augmentString(dataset.sparkSession().sessionState().conf().getConfString(SHUFFLE_PARTITION()))).toInt();
        if ((instanceFromEnv.isAutoSetPushDownPartitions() && ResourceDetectUtils$.MODULE$.checkPartitionFilter(dataset.queryExecution().sparkPlan())) ? false : true) {
            logger().info(new StringBuilder(33).append("Skip auto set ").append(SHUFFLE_PARTITION()).append(", use origin value ").append(i).toString());
            return;
        }
        int autoSetPushDownPartitionsForced = instanceFromEnv.autoSetPushDownPartitionsForced();
        if (autoSetPushDownPartitionsForced > 0) {
            dataset.sparkSession().sessionState().conf().setLocalProperty(SHUFFLE_PARTITION(), Integer.toString(autoSetPushDownPartitionsForced));
            QueryContext.current().setShufflePartitions(autoSetPushDownPartitionsForced);
            logger().info(new StringBuilder(51).append("Auto force set forced spark.sql.shuffle.partitions ").append(autoSetPushDownPartitionsForced).toString());
            return;
        }
        final boolean isConcurrencyFetchDataSourceSize = instanceFromEnv.isConcurrencyFetchDataSourceSize();
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        final int autoShufflePartitionTimeOut = instanceFromEnv.getAutoShufflePartitionTimeOut();
        try {
            try {
                newSingleThreadExecutor.submit(new Callable<BoxedUnit>(instanceFromEnv, dataset, isConcurrencyFetchDataSourceSize, autoShufflePartitionTimeOut) { // from class: org.apache.kylin.query.pushdown.SparkSqlClient$$anon$1
                    private final KylinConfig config$1;
                    private final Dataset df$1;
                    private final boolean isConcurrency$1;
                    private final int timeOut$1;

                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.concurrent.Callable
                    public void call() {
                        int baseShufflePartitionSize = this.config$1.getBaseShufflePartitionSize();
                        Seq paths = ResourceDetectUtils$.MODULE$.getPaths(this.df$1.queryExecution().sparkPlan(), ResourceDetectUtils$.MODULE$.getPaths$default$2());
                        String sb = this.isConcurrency$1 ? new StringBuilder(1).append(ResourceDetectUtils$.MODULE$.getResourceSizeWithTimeoutByConcurrency(this.config$1, Duration$.MODULE$.apply(this.timeOut$1, TimeUnit.SECONDS), SparderEnv$.MODULE$.getHadoopConfiguration(), paths)).append("b").toString() : new StringBuilder(1).append(ResourceDetectUtils$.MODULE$.getResourceSizBySerial(this.config$1, SparderEnv$.MODULE$.getHadoopConfiguration(), paths)).append("b").toString();
                        String l = Long.toString(Math.max(1L, JavaUtils.byteStringAsMb(sb) / baseShufflePartitionSize));
                        this.df$1.sparkSession().sessionState().conf().setLocalProperty(SparkSqlClient$.MODULE$.SHUFFLE_PARTITION(), l);
                        QueryContext.current().setShufflePartitions(new StringOps(Predef$.MODULE$.augmentString(l)).toInt());
                        SparkSqlClient$.MODULE$.logger().info(new StringBuilder(48).append("Auto set ").append(SparkSqlClient$.MODULE$.SHUFFLE_PARTITION()).append(" ").append(l).append(", ").append("sourceTableSize ").append(sb).append(", basePartitionSize ").append(baseShufflePartitionSize).toString());
                    }

                    @Override // java.util.concurrent.Callable
                    public /* bridge */ /* synthetic */ BoxedUnit call() {
                        call();
                        return BoxedUnit.UNIT;
                    }

                    {
                        this.config$1 = instanceFromEnv;
                        this.df$1 = dataset;
                        this.isConcurrency$1 = isConcurrencyFetchDataSourceSize;
                        this.timeOut$1 = autoShufflePartitionTimeOut;
                    }
                }).get(autoShufflePartitionTimeOut, TimeUnit.SECONDS);
            } catch (TimeoutException e) {
                int i2 = new StringOps(Predef$.MODULE$.augmentString(dataset.sparkSession().sessionState().conf().getConfString(SHUFFLE_PARTITION()))).toInt() * instanceFromEnv.getAutoShufflePartitionMultiple();
                dataset.sparkSession().sessionState().conf().setLocalProperty(SHUFFLE_PARTITION(), Integer.toString(i2));
                QueryContext.current().setShufflePartitions(i2);
                logger().info(new StringBuilder(43).append("Auto set shuffle partitions timeout. set ").append(SHUFFLE_PARTITION()).append(" ").append(i2).append(".").toString());
            } catch (Exception e2) {
                logger().error(new StringBuilder(17).append("Auto set ").append(SHUFFLE_PARTITION()).append(" failed.").toString(), e2);
                throw e2;
            }
        } finally {
            newSingleThreadExecutor.shutdownNow();
        }
    }

    public Tuple3<Iterable<List<String>>, Object, List<StructField>> dfToList(SparkSession sparkSession, String str, Dataset<Row> dataset) {
        Tuple3<Iterable<List<String>>, Object, List<StructField>> tuple3;
        KapConfig instanceFromEnv = KapConfig.getInstanceFromEnv();
        String name = Thread.currentThread().getName();
        sparkSession.sparkContext().setJobGroup(name, new StringBuilder(11).append("Push down: ").append(str).toString(), true);
        try {
            try {
                QueryContext.QueryTagInfo queryTagInfo = QueryContext.current().getQueryTagInfo();
                if (queryTagInfo.isAsyncQuery()) {
                    List list = (List) JavaConverters$.MODULE$.seqAsJavaListConverter((Seq) dataset.schema().map(structField -> {
                        return SparderTypeUtil$.MODULE$.convertSparkFieldToJavaField(structField);
                    }, Seq$.MODULE$.canBuildFrom())).asJava();
                    QueryContext.current().setColumnNames((List) JavaConverters$.MODULE$.bufferAsJavaListConverter((Buffer) ((TraversableLike) JavaConverters$.MODULE$.asScalaBufferConverter(list).asScala()).map(structField2 -> {
                        return structField2.getName();
                    }, Buffer$.MODULE$.canBuildFrom())).asJava());
                    ResultPlan$.MODULE$.saveAsyncQueryResult(dataset, queryTagInfo.getFileFormat(), queryTagInfo.getFileEncode(), null);
                    tuple3 = new Tuple3<>(Lists.newArrayList(), BoxesRunTime.boxToInteger(0), list);
                } else {
                    if (!QueryContext.current().isExplainSql()) {
                        QueryContext.currentTrace().endLastSpan();
                        SparkJobTrace sparkJobTrace = new SparkJobTrace(name, QueryContext.currentTrace(), QueryContext.current().getQueryId(), SparderEnv$.MODULE$.getSparkSession().sparkContext(), SparkJobTrace$.MODULE$.$lessinit$greater$default$5());
                        NProjectManager.getProjectConfig(QueryContext.current().getProject()).isQueryUseIterableCollectApi();
                        Tuple2 collectToIterator = NProjectManager.getProjectConfig(QueryContext.current().getProject()).isQueryUseIterableCollectApi() ? dataset.collectToIterator() : dataset.toIterator();
                        Iterator<Row> it = (Iterator) collectToIterator._1();
                        int _2$mcI$sp = collectToIterator._2$mcI$sp();
                        if (instanceFromEnv.isQuerySparkJobTraceEnabled()) {
                            sparkJobTrace.jobFinished();
                        }
                        List list2 = (List) JavaConverters$.MODULE$.seqAsJavaListConverter((Seq) dataset.schema().map(structField3 -> {
                            return SparderTypeUtil$.MODULE$.convertSparkFieldToJavaField(structField3);
                        }, Seq$.MODULE$.canBuildFrom())).asJava();
                        Tuple2<List<Long>, List<Long>> collectScanMetrics = QueryMetricUtils$.MODULE$.collectScanMetrics(dataset.queryExecution().executedPlan());
                        if (collectScanMetrics == null) {
                            throw new MatchError(collectScanMetrics);
                        }
                        Tuple2 tuple2 = new Tuple2((List) collectScanMetrics._1(), (List) collectScanMetrics._2());
                        List list3 = (List) tuple2._1();
                        List list4 = (List) tuple2._2();
                        Tuple3<Long, Long, Long> collectTaskRelatedMetrics = QueryMetricUtils$.MODULE$.collectTaskRelatedMetrics(name, sparkSession.sparkContext());
                        if (collectTaskRelatedMetrics == null) {
                            throw new MatchError(collectTaskRelatedMetrics);
                        }
                        Tuple3 tuple32 = new Tuple3((Long) collectTaskRelatedMetrics._1(), (Long) collectTaskRelatedMetrics._2(), (Long) collectTaskRelatedMetrics._3());
                        Long l = (Long) tuple32._1();
                        Long l2 = (Long) tuple32._2();
                        Long l3 = (Long) tuple32._3();
                        QueryContext.current().getMetrics().setScanRows(list3);
                        QueryContext.current().getMetrics().setScanBytes(list4);
                        QueryContext.current().getMetrics().setQueryJobCount(Predef$.MODULE$.Long2long(l));
                        QueryContext.current().getMetrics().setQueryStageCount(Predef$.MODULE$.Long2long(l2));
                        QueryContext.current().getMetrics().setQueryTaskCount(Predef$.MODULE$.Long2long(l3));
                        return new Tuple3<>(readPushDownResultRow(it, true), BoxesRunTime.boxToInteger(_2$mcI$sp), list2);
                    }
                    List list5 = (List) JavaConverters$.MODULE$.seqAsJavaListConverter((Seq) dataset.schema().map(structField4 -> {
                        return SparderTypeUtil$.MODULE$.convertSparkFieldToJavaField(structField4);
                    }, Seq$.MODULE$.canBuildFrom())).asJava();
                    QueryContext.current().getQueryPlan().setSparkPlan(dataset.queryExecution().analyzed().toString());
                    tuple3 = new Tuple3<>(Lists.newArrayList(), BoxesRunTime.boxToInteger(0), list5);
                }
                QueryContext.current().setExecutionID(QueryToExecutionIDCache$.MODULE$.getQueryExecutionID(QueryContext.current().getQueryId()));
                dataset.sparkSession().sessionState().conf().setLocalProperty(SHUFFLE_PARTITION(), (String) null);
                HadoopUtil.setCurrentConfiguration((Configuration) null);
                return tuple3;
            } catch (Throwable th) {
                if (th instanceof InterruptedException) {
                    Thread.currentThread().interrupt();
                    sparkSession.sparkContext().cancelJobGroup(name);
                    QueryInterruptChecker.checkThreadInterrupted("Interrupted at the stage of collecting result in SparkSqlClient.", "Current step: Collecting dataset of push-down.");
                }
                throw th;
            }
        } finally {
            QueryContext.current().setExecutionID(QueryToExecutionIDCache$.MODULE$.getQueryExecutionID(QueryContext.current().getQueryId()));
            dataset.sparkSession().sessionState().conf().setLocalProperty(SHUFFLE_PARTITION(), (String) null);
            HadoopUtil.setCurrentConfiguration((Configuration) null);
        }
    }

    public Iterable<List<String>> readPushDownResultRow(Iterator<Row> it, boolean z) {
        return () -> {
            return new Iterator<List<String>>(it, z) { // from class: org.apache.kylin.query.pushdown.SparkSqlClient$$anon$2
                private final int checkInterruptSize = 1000;
                private int readRowSize = 0;
                private final Iterator resultRows$1;
                private final boolean checkInterrupt$1;

                @Override // java.util.Iterator
                public void remove() {
                    super.remove();
                }

                @Override // java.util.Iterator
                public void forEachRemaining(Consumer<? super List<String>> consumer) {
                    super.forEachRemaining(consumer);
                }

                private int checkInterruptSize() {
                    return this.checkInterruptSize;
                }

                private int readRowSize() {
                    return this.readRowSize;
                }

                private void readRowSize_$eq(int i) {
                    this.readRowSize = i;
                }

                @Override // java.util.Iterator
                public boolean hasNext() {
                    return this.resultRows$1.hasNext();
                }

                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.Iterator
                public List<String> next() {
                    Row row = (Row) this.resultRows$1.next();
                    readRowSize_$eq(readRowSize() + 1);
                    if (this.checkInterrupt$1 && readRowSize() % checkInterruptSize() == 0) {
                        QueryInterruptChecker.checkThreadInterrupted("Interrupted at the stage of collecting result in SparkSqlClient.", "Current step: Collecting dataset of push-down.");
                    }
                    return (List) JavaConverters$.MODULE$.seqAsJavaListConverter((Seq) row.toSeq().map(obj -> {
                        return SparkSqlClient$.MODULE$.org$apache$kylin$query$pushdown$SparkSqlClient$$rawValueToString(obj, SparkSqlClient$.MODULE$.org$apache$kylin$query$pushdown$SparkSqlClient$$rawValueToString$default$2());
                    }, Seq$.MODULE$.canBuildFrom())).asJava();
                }

                {
                    this.resultRows$1 = it;
                    this.checkInterrupt$1 = z;
                }
            };
        };
    }

    public String org$apache$kylin$query$pushdown$SparkSqlClient$$rawValueToString(Object obj, boolean z) {
        String obj2;
        if (obj == null) {
            obj2 = null;
        } else if (obj instanceof Timestamp) {
            obj2 = DateFormat.castTimestampToString(((Timestamp) obj).getTime());
        } else if (obj instanceof String) {
            String str = (String) obj;
            obj2 = z ? new StringBuilder(2).append("\"").append(str).append("\"").toString() : str;
        } else if (obj instanceof WrappedArray) {
            obj2 = new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[]) Predef$.MODULE$.genericArrayOps(((WrappedArray) obj).array()).map(obj3 -> {
                return MODULE$.org$apache$kylin$query$pushdown$SparkSqlClient$$rawValueToString(obj3, true);
            }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(String.class))))).mkString("[", ",", "]");
        } else if (obj instanceof WrappedArray.ofRef) {
            obj2 = new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(((WrappedArray.ofRef) obj).array())).map(obj4 -> {
                return MODULE$.org$apache$kylin$query$pushdown$SparkSqlClient$$rawValueToString(obj4, true);
            }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(String.class))))).mkString("[", ",", "]");
        } else if (obj instanceof Map) {
            obj2 = ((TraversableOnce) ((Map) obj).map(tuple2 -> {
                return new StringBuilder(1).append(MODULE$.org$apache$kylin$query$pushdown$SparkSqlClient$$rawValueToString(tuple2._1(), true)).append(":").append(MODULE$.org$apache$kylin$query$pushdown$SparkSqlClient$$rawValueToString(tuple2._2(), true)).toString();
            }, Iterable$.MODULE$.canBuildFrom())).mkString("{", ",", "}");
        } else if (obj instanceof byte[]) {
            obj2 = new String((byte[]) obj, StandardCharsets.UTF_8);
        } else if (obj instanceof BigDecimal) {
            obj2 = SparderTypeUtil$.MODULE$.adjustDecimal((BigDecimal) obj);
        } else {
            if (!(obj instanceof Object)) {
                throw new MatchError(obj);
            }
            obj2 = obj.toString();
        }
        return obj2;
    }

    public boolean org$apache$kylin$query$pushdown$SparkSqlClient$$rawValueToString$default$2() {
        return false;
    }

    private SparkSqlClient$() {
        MODULE$ = this;
        this.DEFAULT_DB = "spark.sql.default.database";
        this.SHUFFLE_PARTITION = "spark.sql.shuffle.partitions";
        this.logger = LoggerFactory.getLogger(SparkSqlClient.class);
    }
}
