package org.apache.kylin.newten;

import java.sql.SQLException;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.util.Shell;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.engine.spark.NLocalWithSparkSessionTest;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.impl.threadpool.NDefaultScheduler;
import org.apache.kylin.query.runtime.plan.ResultPlan;
import org.apache.kylin.util.ExecAndComp;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparderEnv;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.execution.SparkPlan;
import org.apache.spark.sql.internal.StaticSQLConf;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/kylin/newten/ExtractLimitInfoTest.class */
public class ExtractLimitInfoTest extends NLocalWithSparkSessionTest {
    @BeforeClass
    public static void initSpark() {
        if (Shell.MAC) {
            overwriteSystemPropBeforeClass("org.xerial.snappy.lib.name", "libsnappyjava.jnilib");
        }
        if (ss != null && !ss.sparkContext().isStopped()) {
            ss.stop();
        }
        sparkConf = new SparkConf().setAppName(UUID.randomUUID().toString()).setMaster("local[4]");
        sparkConf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer");
        sparkConf.set(StaticSQLConf.CATALOG_IMPLEMENTATION().key(), "in-memory");
        sparkConf.set("spark.sql.shuffle.partitions", "1");
        sparkConf.set("spark.memory.fraction", "0.1");
        sparkConf.set("spark.shuffle.detectCorrupt", "false");
        sparkConf.set("spark.sql.crossJoin.enabled", "true");
        sparkConf.set("spark.sql.adaptive.enabled", "false");
        sparkConf.set("spark.sql.autoBroadcastJoinThreshold", "1");
        ss = SparkSession.builder().config(sparkConf).getOrCreate();
        SparderEnv.setSparkSession(ss);
    }

    @Before
    public void setup() throws Exception {
        overwriteSystemProp("kylin.job.scheduler.poll-interval-second", "1");
        createTestMetadata(new String[]{"src/test/resources/ut_meta/join_opt"});
        NDefaultScheduler nDefaultScheduler = NDefaultScheduler.getInstance(getProject());
        nDefaultScheduler.init(new JobEngineConfig(KylinConfig.getInstanceFromEnv()));
        if (!nDefaultScheduler.hasStarted()) {
            throw new RuntimeException("scheduler has not been started");
        }
    }

    @After
    public void after() throws Exception {
        NDefaultScheduler.destroyInstance();
        cleanupTestMetadata();
    }

    @Test
    public void testExtractLimitInfo() throws Exception {
        overwriteSystemProp("kylin.storage.columnar.shard-rowcount", "100");
        fullBuild("8c670664-8d05-466a-802f-83c023b56c77");
        populateSSWithCSVData(getTestConfig(), getProject(), SparderEnv.getSparkSession());
        SparkPlan sparkExecutedPlan = getSparkExecutedPlan("select * from (select TRANS_ID,LSTG_FORMAT_NAME from TEST_KYLIN_FACT group by TRANS_ID,LSTG_FORMAT_NAME) lside left join (select TRANS_ID,LSTG_FORMAT_NAME from TEST_KYLIN_FACT group by TRANS_ID,LSTG_FORMAT_NAME) rside on lside.TRANS_ID = rside.TRANS_ID limit 10");
        AtomicLong atomicLong = new AtomicLong();
        ResultPlan.extractEachStageLimitRows(sparkExecutedPlan, -1, atomicLong);
        Assert.assertEquals(10010L, atomicLong.get());
        SparkPlan sparkExecutedPlan2 = getSparkExecutedPlan("select TRANS_ID,LSTG_FORMAT_NAME from TEST_KYLIN_FACT group by TRANS_ID,LSTG_FORMAT_NAME limit 10");
        AtomicLong atomicLong2 = new AtomicLong();
        ResultPlan.extractEachStageLimitRows(sparkExecutedPlan2, -1, atomicLong2);
        Assert.assertEquals(10L, atomicLong2.get());
        SparkPlan sparkExecutedPlan3 = getSparkExecutedPlan("select count(*) from (select TRANS_ID,LSTG_FORMAT_NAME from TEST_KYLIN_FACT group by TRANS_ID,LSTG_FORMAT_NAME limit 10)");
        AtomicLong atomicLong3 = new AtomicLong();
        ResultPlan.extractEachStageLimitRows(sparkExecutedPlan3, -1, atomicLong3);
        Assert.assertEquals(10L, atomicLong3.get());
    }

    private SparkPlan getSparkExecutedPlan(String str) throws SQLException {
        return ExecAndComp.queryModel(getProject(), str).queryExecution().executedPlan();
    }

    public String getProject() {
        return "join_opt";
    }
}
