package org.apache.kylin.newten;

import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.UUID;
import org.apache.hadoop.util.Shell;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.engine.spark.NLocalWithSparkSessionTest;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.impl.threadpool.NDefaultScheduler;
import org.apache.kylin.metadata.project.NProjectManager;
import org.apache.kylin.util.ExecAndComp;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparderEnv;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.execution.SortExec;
import org.apache.spark.sql.execution.SparkPlan;
import org.apache.spark.sql.execution.exchange.Exchange;
import org.apache.spark.sql.execution.joins.SortMergeJoinExec;
import org.apache.spark.sql.internal.StaticSQLConf;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import scala.Option;
import scala.runtime.AbstractFunction1;

/* loaded from: input_file:org/apache/kylin/newten/NJoinOptTest.class */
public class NJoinOptTest extends NLocalWithSparkSessionTest {
    @BeforeClass
    public static void initSpark() {
        if (Shell.MAC) {
            overwriteSystemPropBeforeClass("org.xerial.snappy.lib.name", "libsnappyjava.jnilib");
        }
        if (ss != null && !ss.sparkContext().isStopped()) {
            ss.stop();
        }
        sparkConf = new SparkConf().setAppName(UUID.randomUUID().toString()).setMaster("local[4]");
        sparkConf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer");
        sparkConf.set(StaticSQLConf.CATALOG_IMPLEMENTATION().key(), "in-memory");
        sparkConf.set("spark.sql.shuffle.partitions", "1");
        sparkConf.set("spark.memory.fraction", "0.1");
        sparkConf.set("spark.shuffle.detectCorrupt", "false");
        sparkConf.set("spark.sql.crossJoin.enabled", "true");
        sparkConf.set("spark.sql.adaptive.enabled", "false");
        sparkConf.set("spark.sql.autoBroadcastJoinThreshold", "1");
        ss = SparkSession.builder().config(sparkConf).getOrCreate();
        SparderEnv.setSparkSession(ss);
    }

    @Before
    public void setup() throws Exception {
        overwriteSystemProp("kylin.job.scheduler.poll-interval-second", "1");
        createTestMetadata(new String[]{"src/test/resources/ut_meta/join_opt"});
        NDefaultScheduler nDefaultScheduler = NDefaultScheduler.getInstance(getProject());
        nDefaultScheduler.init(new JobEngineConfig(KylinConfig.getInstanceFromEnv()));
        if (!nDefaultScheduler.hasStarted()) {
            throw new RuntimeException("scheduler has not been started");
        }
    }

    @After
    public void after() throws Exception {
        NDefaultScheduler.destroyInstance();
        cleanupTestMetadata();
    }

    @Test
    @Ignore("KE-30387")
    public void testShardJoinInOneSeg() throws Exception {
        overwriteSystemProp("kylin.storage.columnar.shard-rowcount", "100");
        fullBuild("8c670664-8d05-466a-802f-83c023b56c77");
        populateSSWithCSVData(getTestConfig(), getProject(), SparderEnv.getSparkSession());
        ArrayList arrayList = new ArrayList();
        arrayList.add("select count(*) from TEST_KYLIN_FACT where SELLER_ID in (select SELLER_ID from TEST_KYLIN_FACT group by SELLER_ID)");
        arrayList.add("select count(*) from TEST_KYLIN_FACT where LSTG_FORMAT_NAME in (select LSTG_FORMAT_NAME from TEST_KYLIN_FACT group by LSTG_FORMAT_NAME)");
        arrayList.add("select count(*) from TEST_KYLIN_FACT t1 join (select TRANS_ID,LSTG_FORMAT_NAME from TEST_KYLIN_FACT group by TRANS_ID,LSTG_FORMAT_NAME) t2 on t1.TRANS_ID = t2.TRANS_ID and t1.LSTG_FORMAT_NAME = t2.LSTG_FORMAT_NAME");
        ExecAndComp.execAndCompareQueryList(arrayList, getProject(), ExecAndComp.CompareLevel.SAME, "default");
        basicScenario("select count(*) from TEST_KYLIN_FACT where SELLER_ID in (select SELLER_ID from TEST_KYLIN_FACT group by SELLER_ID)");
        testExchangePruningAfterAgg("select count(*) from TEST_KYLIN_FACT where LSTG_FORMAT_NAME in (select LSTG_FORMAT_NAME from TEST_KYLIN_FACT group by LSTG_FORMAT_NAME)");
        testMultiShards("select count(*) from TEST_KYLIN_FACT t1 join (select TRANS_ID,LSTG_FORMAT_NAME from TEST_KYLIN_FACT group by TRANS_ID,LSTG_FORMAT_NAME) t2 on t1.TRANS_ID = t2.TRANS_ID and t1.LSTG_FORMAT_NAME = t2.LSTG_FORMAT_NAME");
    }

    private void testMultiShards(String str) throws SQLException {
        assertPlan(str, false, false);
    }

    private void testExchangePruningAfterAgg(String str) throws SQLException {
        assertPlan(str, false, true);
    }

    private void basicScenario(String str) throws SQLException {
        assertPlan(str, false, false);
    }

    @Test
    public void testShardJoinInMultiSeg() throws Exception {
        overwriteSystemProp("kylin.storage.columnar.shard-rowcount", "100");
        buildMultiSegs("8c670664-8d05-466a-802f-83c023b56c77", new long[0]);
        populateSSWithCSVData(getTestConfig(), getProject(), SparderEnv.getSparkSession());
        ArrayList arrayList = new ArrayList();
        arrayList.add("select count(*) from TEST_KYLIN_FACT where SELLER_ID in (select SELLER_ID from TEST_KYLIN_FACT group by SELLER_ID)");
        ExecAndComp.execAndCompareQueryList(arrayList, getProject(), ExecAndComp.CompareLevel.SAME, "default");
        assertPlan("select count(*) from TEST_KYLIN_FACT where SELLER_ID in (select SELLER_ID from TEST_KYLIN_FACT group by SELLER_ID)", true, true);
    }

    @Test
    @Ignore("KE-30387")
    public void testShardJoinInMultiSegWithFixedShardNum() throws Exception {
        NProjectManager nProjectManager = NProjectManager.getInstance(KylinConfig.getInstanceFromEnv());
        HashMap hashMap = new HashMap();
        hashMap.put("kylin.engine.shard-num-json", "{\"DEFAULT.TEST_KYLIN_FACT.SELLER_ID\":\"10\",\"DEFAULT.TEST_KYLIN_FACT.LSTG_FORMAT_NAME,DEFAULT.TEST_KYLIN_FACT.TRANS_ID\":\"15\",\"e\":\"300\"}");
        nProjectManager.updateProject(getProject(), projectInstance -> {
            projectInstance.getOverrideKylinProps().putAll(hashMap);
        });
        buildMultiSegs("8c670664-8d05-466a-802f-83c023b56c77", new long[0]);
        populateSSWithCSVData(getTestConfig(), getProject(), SparderEnv.getSparkSession());
        ArrayList arrayList = new ArrayList();
        arrayList.add("select count(*) from TEST_KYLIN_FACT where SELLER_ID in (select SELLER_ID from TEST_KYLIN_FACT group by SELLER_ID)");
        arrayList.add("select count(*) from TEST_KYLIN_FACT t1 join (select TRANS_ID,LSTG_FORMAT_NAME from TEST_KYLIN_FACT group by TRANS_ID,LSTG_FORMAT_NAME) t2 on t1.TRANS_ID = t2.TRANS_ID and t1.LSTG_FORMAT_NAME = t2.LSTG_FORMAT_NAME");
        ExecAndComp.execAndCompareQueryList(arrayList, getProject(), ExecAndComp.CompareLevel.SAME, "default");
        assertPlan("select count(*) from TEST_KYLIN_FACT where SELLER_ID in (select SELLER_ID from TEST_KYLIN_FACT group by SELLER_ID)", false, true);
        assertPlan("select count(*) from TEST_KYLIN_FACT t1 join (select TRANS_ID,LSTG_FORMAT_NAME from TEST_KYLIN_FACT group by TRANS_ID,LSTG_FORMAT_NAME) t2 on t1.TRANS_ID = t2.TRANS_ID and t1.LSTG_FORMAT_NAME = t2.LSTG_FORMAT_NAME", false, true);
    }

    private void assertPlan(String str, boolean z, boolean z2) throws SQLException {
        SortMergeJoinExec sortMergeJoinExec = getSortMergeJoinExec(str);
        Assert.assertEquals(Boolean.valueOf(z), Boolean.valueOf(findSpecPlan(sortMergeJoinExec, Exchange.class).isDefined()));
        Assert.assertEquals(Boolean.valueOf(z2), Boolean.valueOf(findSpecPlan(sortMergeJoinExec, SortExec.class).isDefined()));
    }

    private SortMergeJoinExec getSortMergeJoinExec(String str) throws SQLException {
        return (SortMergeJoinExec) findSpecPlan(ExecAndComp.queryModel(getProject(), str).queryExecution().executedPlan(), SortMergeJoinExec.class).get();
    }

    private Option<SparkPlan> findSpecPlan(SparkPlan sparkPlan, final Class<?> cls) {
        return sparkPlan.find(new AbstractFunction1<SparkPlan, Object>() { // from class: org.apache.kylin.newten.NJoinOptTest.1
            public Object apply(SparkPlan sparkPlan2) {
                return Boolean.valueOf(cls.isInstance(sparkPlan2));
            }
        });
    }

    public String getProject() {
        return "join_opt";
    }
}
