package org.apache.kylin.query.pushdown;

import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.QueryContext;
import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
import org.apache.kylin.guava30.shaded.common.collect.Lists;
import org.apache.kylin.metadata.querymeta.SelectedColumnMeta;
import org.apache.kylin.query.pushdown.SparkSubmitter;
import org.apache.spark.sql.SparderEnv;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kylin/query/pushdown/PushDownRunnerSparkImplTest.class */
public class PushDownRunnerSparkImplTest extends NLocalFileMetadataTestCase {
    SparkSession ss;

    @Before
    public void setUp() throws Exception {
        createTestMetadata(new String[0]);
        this.ss = SparkSession.builder().appName("local").master("local[1]").getOrCreate();
        SparderEnv.setSparkSession(this.ss);
        this.ss.read().schema(new StructType().add("TRANS_ID", DataTypes.LongType, false).add("ORDER_ID", DataTypes.LongType, false).add("CAL_DT", DataTypes.DateType, false).add("LSTG_FORMAT_NAME", DataTypes.StringType, false).add("LEAF_CATEG_ID", DataTypes.LongType, false).add("LSTG_SITE_ID", DataTypes.IntegerType, false).add("SLR_SEGMENT_CD", DataTypes.FloatType, false).add("SELLER_ID", DataTypes.LongType, false).add("PRICE", DataTypes.createDecimalType(19, 4), false).add("ITEM_COUNT", DataTypes.DoubleType, false).add("TEST_COUNT_DISTINCT_BITMAP", DataTypes.StringType, false)).csv("../../examples/test_case_data/localmeta/data/DEFAULT.TEST_KYLIN_FACT.csv").createOrReplaceTempView("TEST_KYLIN_FACT");
        this.ss.read().schema(new StructType().add("col1", DataTypes.createDecimalType(12, 5), true).add("col2", DataTypes.createDecimalType(12, 6), true)).csv("../../examples/test_case_data/localmeta/data/DEFAULT.TEST_DECIMAL.csv").createOrReplaceTempView("TEST_DECIMAL");
    }

    @After
    public void after() throws Exception {
        this.ss.stop();
        cleanupTestMetadata();
    }

    @Test
    public void testDecimalCase() throws SQLException {
        PushDownRunnerSparkImpl pushDownRunnerSparkImpl = new PushDownRunnerSparkImpl();
        pushDownRunnerSparkImpl.init((KylinConfig) null, "tpch");
        ArrayList newArrayList = Lists.newArrayList();
        ArrayList newArrayList2 = Lists.newArrayList();
        pushDownRunnerSparkImpl.executeQuery("select sum(coalesce(col1 + col2 + 1.75, col1)) as test from TEST_DECIMAL", newArrayList, newArrayList2, "tpch");
        Assert.assertEquals(1L, newArrayList.size());
        Assert.assertEquals("549.986000", ((List) newArrayList.get(0)).get(0));
        Assert.assertEquals(1L, newArrayList2.size());
        Assert.assertEquals("test", ((SelectedColumnMeta) newArrayList2.get(0)).getName());
        Assert.assertEquals("HIVE", pushDownRunnerSparkImpl.getName());
    }

    @Test
    public void testCast() {
        PushDownRunnerSparkImpl pushDownRunnerSparkImpl = new PushDownRunnerSparkImpl();
        pushDownRunnerSparkImpl.init((KylinConfig) null, "tpch");
        ArrayList newArrayList = Lists.newArrayList();
        ArrayList newArrayList2 = Lists.newArrayList();
        ArrayList arrayList = new ArrayList();
        arrayList.add("SELECT cast(ORDER_ID as integer) FROM TEST_KYLIN_FACT limit 10");
        arrayList.add("SELECT cast(LSTG_SITE_ID as long) FROM TEST_KYLIN_FACT limit 10");
        arrayList.add("SELECT cast(LSTG_SITE_ID as short) FROM TEST_KYLIN_FACT limit 10");
        arrayList.add("SELECT CAST(ORDER_ID AS VARCHAR) FROM TEST_KYLIN_FACT limit 10");
        arrayList.add("SELECT CAST(ORDER_ID AS char) FROM TEST_KYLIN_FACT limit 10");
        arrayList.add("select SELLER_ID,ITEM_COUNT,sum(price)\nfrom (\nSELECT SELLER_ID, ITEM_COUNT,price\n\t, concat(concat(CAST(year(CAST(CAL_DT AS date)) AS varchar), '-'), CAST(month(CAST(CAL_DT AS date)) AS varchar)) AS prt_mth\nFROM TEST_KYLIN_FACT) \ngroup by SELLER_ID,ITEM_COUNT,price limit 10");
        arrayList.add("select SELLER_ID,ITEM_COUNT,sum(price)\nfrom (\nSELECT SELLER_ID, ITEM_COUNT,price\n\t, concat(concat(CAST(year(CAST(CAL_DT AS date)) AS char), '-'), CAST(month(CAST(CAL_DT AS date)) AS varchar)) AS prt_mth\nFROM TEST_KYLIN_FACT) \ngroup by SELLER_ID,ITEM_COUNT,price limit 10");
        arrayList.forEach(str -> {
            newArrayList.clear();
            try {
                pushDownRunnerSparkImpl.executeQuery(str, newArrayList, newArrayList2, "tpch");
            } catch (SQLException e) {
            }
            Assert.assertEquals(10L, newArrayList.size());
        });
    }

    @Test
    public void testPushDownRunnerSpark() throws SQLException {
        PushDownRunnerSparkImpl pushDownRunnerSparkImpl = new PushDownRunnerSparkImpl();
        pushDownRunnerSparkImpl.init((KylinConfig) null, "tpch");
        pushDownRunnerSparkImpl.executeQuery("select * from TEST_KYLIN_FACT", Lists.newArrayList(), Lists.newArrayList(), "tpch");
        Assert.assertEquals(10000L, r0.size());
        Assert.assertEquals(11L, r0.size());
        Assert.assertEquals("HIVE", pushDownRunnerSparkImpl.getName());
    }

    @Test
    public void testPushDownRunnerSparkWithDotColumn() throws SQLException {
        PushDownRunnerSparkImpl pushDownRunnerSparkImpl = new PushDownRunnerSparkImpl();
        pushDownRunnerSparkImpl.init((KylinConfig) null, "tpch");
        pushDownRunnerSparkImpl.executeQuery("select TEST_KYLIN_FACT.price as `TEST_KYLIN_FACT.price` from TEST_KYLIN_FACT", Lists.newArrayList(), Lists.newArrayList(), "tpch");
        Assert.assertEquals(10000L, r0.size());
        Assert.assertEquals(1L, r0.size());
        Assert.assertEquals("HIVE", pushDownRunnerSparkImpl.getName());
    }

    @Test
    public void testSelectTwoSameExpr() throws SQLException {
        PushDownRunnerSparkImpl pushDownRunnerSparkImpl = new PushDownRunnerSparkImpl();
        pushDownRunnerSparkImpl.init((KylinConfig) null, "tpch");
        pushDownRunnerSparkImpl.executeQuery("select sum(price), sum(price) from TEST_KYLIN_FACT", Lists.newArrayList(), Lists.newArrayList(), "tpch");
        Assert.assertEquals(1L, r0.size());
        Assert.assertEquals(2L, r0.size());
        Assert.assertEquals("HIVE", pushDownRunnerSparkImpl.getName());
    }

    @Test
    public void testCaseSensitiveOnAlias() throws SQLException {
        PushDownRunnerSparkImpl pushDownRunnerSparkImpl = new PushDownRunnerSparkImpl();
        pushDownRunnerSparkImpl.init((KylinConfig) null, "tpch");
        ArrayList newArrayList = Lists.newArrayList();
        ArrayList newArrayList2 = Lists.newArrayList();
        pushDownRunnerSparkImpl.executeQuery("SELECT cast(ORDER_ID as integer) as OrderId FROM TEST_KYLIN_FACT limit 10", newArrayList, newArrayList2, "tpch");
        Assert.assertEquals(((SelectedColumnMeta) newArrayList2.get(0)).getName(), "OrderId");
    }

    @Test
    public void testSparkSubmitter() throws Exception {
        SparkSubmitter.OverriddenSparkSession overrideSparkSession = SparkSubmitter.getInstance().overrideSparkSession(this.ss);
        Throwable th = null;
        try {
            PushdownResponse submitPushDownTask = SparkSubmitter.getInstance().submitPushDownTask("select order_id from test_kylin_fact limit 1", "tpch");
            Assert.assertEquals(1L, submitPushDownTask.getColumns().size());
            Assert.assertEquals(1L, submitPushDownTask.getSize());
            if (overrideSparkSession != null) {
                if (0 == 0) {
                    overrideSparkSession.close();
                    return;
                }
                try {
                    overrideSparkSession.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (overrideSparkSession != null) {
                if (0 != 0) {
                    try {
                        overrideSparkSession.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    overrideSparkSession.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testAsyncQueryWriteParquet() {
        QueryContext current = QueryContext.current();
        current.getQueryTagInfo().setAsyncQuery(true);
        current.getQueryTagInfo().setFileFormat("parquet");
        current.getQueryTagInfo().setFileEncode("utf-8");
        SparkSqlClient.executeSql(this.ss, "select * from TEST_KYLIN_FACT", UUID.randomUUID(), "tpch");
    }

    @Test
    public void testAutoSetShufflePartitionConcurrency() throws Exception {
        SparkSubmitter.OverriddenSparkSession overrideSparkSession = SparkSubmitter.getInstance().overrideSparkSession(this.ss);
        Throwable th = null;
        try {
            this.ss.sql("DROP TABLE if exists mypartitionedtable");
            this.ss.sql("CREATE TABLE mypartitionedtable (col1 INT, col2 INT) USING PARQUET PARTITIONED BY (col3 STRING)");
            this.ss.sql("INSERT INTO mypartitionedtable PARTITION (col3='partitionvalue') SELECT TRANS_ID, ORDER_ID FROM test_kylin_fact");
            this.ss.sql("REFRESH TABLE mypartitionedtable");
            KylinConfig instanceFromEnv = KylinConfig.getInstanceFromEnv();
            instanceFromEnv.setProperty("kylin.query.pushdown.auto-set-shuffle-partitions-timeout", "0");
            PushdownResponse submitPushDownTask = SparkSubmitter.getInstance().submitPushDownTask("select col1 from mypartitionedtable where col3='partitionvalue' limit 1", "tpch");
            Assert.assertEquals(1L, submitPushDownTask.getColumns().size());
            Assert.assertEquals(1L, submitPushDownTask.getSize());
            instanceFromEnv.setProperty("kylin.query.pushdown.auto-set-shuffle-partitions-timeout", "30");
            PushdownResponse submitPushDownTask2 = SparkSubmitter.getInstance().submitPushDownTask("select col1 from mypartitionedtable where col3='partitionvalue' limit 1", "tpch");
            Assert.assertEquals(1L, submitPushDownTask2.getColumns().size());
            Assert.assertEquals(1L, submitPushDownTask2.getSize());
            this.ss.sql("DROP TABLE mypartitionedtable");
            if (overrideSparkSession != null) {
                if (0 == 0) {
                    overrideSparkSession.close();
                    return;
                }
                try {
                    overrideSparkSession.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (overrideSparkSession != null) {
                if (0 != 0) {
                    try {
                        overrideSparkSession.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    overrideSparkSession.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testAutoSetShufflePartition() throws Exception {
        SparkSubmitter.OverriddenSparkSession overrideSparkSession = SparkSubmitter.getInstance().overrideSparkSession(this.ss);
        Throwable th = null;
        try {
            this.ss.sql("DROP TABLE if exists mypartitionedtable");
            this.ss.sql("CREATE TABLE mypartitionedtable (col1 INT, col2 INT) USING PARQUET PARTITIONED BY (col3 STRING)");
            this.ss.sql("INSERT INTO mypartitionedtable PARTITION (col3='partitionvalue') SELECT TRANS_ID, ORDER_ID FROM test_kylin_fact");
            this.ss.sql("REFRESH TABLE mypartitionedtable");
            KylinConfig instanceFromEnv = KylinConfig.getInstanceFromEnv();
            instanceFromEnv.setProperty("kylin.job.concurrency-fetch-datasource-size-enabled", "false");
            PushdownResponse submitPushDownTask = SparkSubmitter.getInstance().submitPushDownTask("select col1 from mypartitionedtable where col3='partitionvalue' limit 1", "tpch");
            Assert.assertEquals(1L, submitPushDownTask.getColumns().size());
            Assert.assertEquals(1L, submitPushDownTask.getSize());
            instanceFromEnv.setProperty("kylin.job.concurrency-fetch-datasource-size-enabled", "true");
            PushdownResponse submitPushDownTask2 = SparkSubmitter.getInstance().submitPushDownTask("select col1 from mypartitionedtable where col3='partitionvalue' limit 1", "tpch");
            Assert.assertEquals(1L, submitPushDownTask2.getColumns().size());
            Assert.assertEquals(1L, submitPushDownTask2.getSize());
            this.ss.sql("DROP TABLE mypartitionedtable");
            if (overrideSparkSession != null) {
                if (0 == 0) {
                    overrideSparkSession.close();
                    return;
                }
                try {
                    overrideSparkSession.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (overrideSparkSession != null) {
                if (0 != 0) {
                    try {
                        overrideSparkSession.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    overrideSparkSession.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testAutoSetShufflePartitionOff() throws Exception {
        SparkSubmitter.OverriddenSparkSession overrideSparkSession = SparkSubmitter.getInstance().overrideSparkSession(this.ss);
        Throwable th = null;
        try {
            KylinConfig instanceFromEnv = KylinConfig.getInstanceFromEnv();
            instanceFromEnv.setProperty("kylin.query.pushdown.auto-set-shuffle-partitions-enabled", "false");
            PushdownResponse submitPushDownTask = SparkSubmitter.getInstance().submitPushDownTask("select count(order_id) from test_kylin_fact limit 1", "tpch");
            Assert.assertEquals(1L, submitPushDownTask.getColumns().size());
            Assert.assertEquals(1L, submitPushDownTask.getSize());
            instanceFromEnv.setProperty("kylin.query.pushdown.auto-set-shuffle-partitions-enabled", "true");
            if (overrideSparkSession != null) {
                if (0 == 0) {
                    overrideSparkSession.close();
                    return;
                }
                try {
                    overrideSparkSession.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (overrideSparkSession != null) {
                if (0 != 0) {
                    try {
                        overrideSparkSession.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    overrideSparkSession.close();
                }
            }
            throw th3;
        }
    }
}
