package io.kyligence.kap.newten.clickhouse;

import com.google.common.collect.ImmutableList;
import java.math.BigDecimal;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import lombok.Generated;
import org.apache.kylin.engine.spark.NLocalWithSparkSessionTest;
import org.apache.kylin.engine.spark.utils.RichOption;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparderEnv;
import org.apache.spark.sql.catalyst.plans.logical.Filter;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.catalyst.plans.logical.Sort;
import org.apache.spark.sql.execution.datasources.jdbc.ClickHouseDialect$;
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions;
import org.apache.spark.sql.execution.datasources.jdbc.ShardOptions$;
import org.apache.spark.sql.execution.datasources.v2.PostV2ScanRelationPushDown$;
import org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$;
import org.apache.spark.sql.execution.datasources.v2.jdbc.ShardJDBCTableCatalog;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.jdbc.JdbcDialects;
import org.apache.spark.sql.jdbc.JdbcDialects$;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.types.MetadataBuilder;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.JdbcDatabaseContainer;
import scala.collection.JavaConverters;
import scala.runtime.AbstractFunction1;

/* loaded from: input_file:io/kyligence/kap/newten/clickhouse/ClickHouseV2QueryTest.class */
public class ClickHouseV2QueryTest extends NLocalWithSparkSessionTest {

    @Generated
    private static final Logger log;
    private static final String table = "default.shard_table";
    static final /* synthetic */ boolean $assertionsDisabled;

    @BeforeClass
    public static void beforeClass() {
        JdbcDialects$.MODULE$.registerDialect(ClickHouseDialect$.MODULE$);
        NLocalWithSparkSessionTest.ensureSparkConf();
        ClickHouseUtils.InjectNewPushDownRule(sparkConf);
        NLocalWithSparkSessionTest.beforeClass();
        Assert.assertTrue(SparderEnv.getSparkSession().sessionState().optimizer().preCBORules().contains(V2ScanRelationPushDown$.MODULE$));
        Assert.assertTrue(SparderEnv.getSparkSession().sessionState().optimizer().preCBORules().contains(PostV2ScanRelationPushDown$.MODULE$));
    }

    @AfterClass
    public static void afterClass() {
        NLocalWithSparkSessionTest.afterClass();
        JdbcDialects$.MODULE$.unregisterDialect(ClickHouseDialect$.MODULE$);
    }

    private static void setupCatalog(JdbcDatabaseContainer<?> jdbcDatabaseContainer, String str) {
        SQLConf conf = SparderEnv.getSparkSession().sessionState().conf();
        conf.setConfString(SQLConf.ANSI_ENABLED().key(), "true");
        conf.setConfString(str, ShardJDBCTableCatalog.class.getCanonicalName());
        conf.setConfString(str + ".url", jdbcDatabaseContainer.getJdbcUrl());
        conf.setConfString(str + ".driver", jdbcDatabaseContainer.getDriverClassName());
        conf.setConfString(str + ".pushDownLimit", "true");
        conf.setConfString(str + ".pushDownAggregate", "true");
        conf.setConfString(str + ".pushDownOffset", "true");
    }

    private void executeAndCheck(Dataset<Row> dataset, List<Row> list, int i) {
        Assert.assertEquals(list, dataset.collectAsList());
        Assert.assertEquals(i, ClickHouseUtils.findJDBCScan(dataset.queryExecution().optimizedPlan()).relation().parts().length);
    }

    private void checkFiltersRemoved(Dataset dataset) {
        Optional map = new RichOption(dataset.queryExecution().optimizedPlan().find(new AbstractFunction1<LogicalPlan, Object>() { // from class: io.kyligence.kap.newten.clickhouse.ClickHouseV2QueryTest.1
            public Object apply(LogicalPlan logicalPlan) {
                return Boolean.valueOf(logicalPlan instanceof Filter);
            }
        })).toOptional().map(logicalPlan -> {
            return (Filter) logicalPlan;
        });
        if (!$assertionsDisabled && !Optional.empty().equals(map)) {
            throw new AssertionError();
        }
    }

    private void checkSortRemoved(Dataset dataset, boolean z) {
        Optional map = new RichOption(dataset.queryExecution().optimizedPlan().find(new AbstractFunction1<LogicalPlan, Object>() { // from class: io.kyligence.kap.newten.clickhouse.ClickHouseV2QueryTest.2
            public Object apply(LogicalPlan logicalPlan) {
                return Boolean.valueOf(logicalPlan instanceof Sort);
            }
        })).toOptional().map(logicalPlan -> {
            return (Sort) logicalPlan;
        });
        if (z) {
            if (!$assertionsDisabled && map.isPresent()) {
                throw new AssertionError();
            }
        } else if (!$assertionsDisabled && !map.isPresent()) {
            throw new AssertionError();
        }
    }

    private void testFilter(String str, int i) {
        Dataset sql = ss.sql(String.format(Locale.ROOT, "select s2, i1, i2, n3 from %s.%s where i1 > 5 and n4 < 2 order by i1", str, table));
        checkFiltersRemoved(sql);
        checkSortRemoved(sql, false);
        ClickHouseUtils.checkPushedInfo(sql, "PushedFilters: [i1 IS NOT NULL, n4 IS NOT NULL, i1 > 5, n4 < 2], ");
        BigDecimal javaBigDecimal = Decimal.apply(new BigDecimal(-18.22d), 19, 4).toJavaBigDecimal();
        ImmutableList of = ImmutableList.of(RowFactory.create(new Object[]{"2", 6, 7L, javaBigDecimal}), RowFactory.create(new Object[]{"4", 7, 3L, javaBigDecimal}));
        executeAndCheck(sql, of, i);
        Dataset sql2 = ss.sql(String.format(Locale.ROOT, "select s2, i1, i2, n3 from %s.%s where i1 > 5 and cast(n3 as double) < 2 order by i1", str, table));
        checkFiltersRemoved(sql2);
        checkSortRemoved(sql2, false);
        ClickHouseUtils.checkPushedInfo(sql2, "PushedFilters: [i1 IS NOT NULL, n3 IS NOT NULL, i1 > 5, n3 < 2.0000], ");
        executeAndCheck(sql2, of, i);
        Dataset sql3 = ss.sql(String.format(Locale.ROOT, "select s2, i1, i2, n3 from %s.%s where i1 > 5 and cast(str_date4 as date) > date'2021-01-06' order by i1", str, table));
        checkFiltersRemoved(sql3);
        checkSortRemoved(sql3, false);
        ClickHouseUtils.checkPushedInfo(sql3, "PushedFilters: [i1 IS NOT NULL, str_date4 IS NOT NULL, i1 > 5, CAST(str_date4 AS date) > 18633], ");
        executeAndCheck(sql3, ImmutableList.of(RowFactory.create(new Object[]{"2", 6, 7L, javaBigDecimal})), i);
    }

    private void testLimit(String str, int i) {
        Dataset<Row> sql = ss.sql(String.format(Locale.ROOT, "select s2, i1, i2 from %s.%s where i1 > 6 limit 1", str, table));
        ClickHouseUtils.checkPushedInfo(sql, "PushedFilters: [i1 IS NOT NULL, i1 > 6], ", "PushedLimit: LIMIT 1, ");
        executeAndCheck(sql, ImmutableList.of(RowFactory.create(new Object[]{"4", 7, 3L})), i);
    }

    private void testOffset(String str, int i) {
        Dataset<Row> sql = ss.sql(String.format(Locale.ROOT, "select s2, i1, i2 from %s.%s where i1 > 6 offset 1", str, table));
        ClickHouseUtils.checkPushedInfo(sql, i == 1 ? new String[]{"PushedFilters: [i1 IS NOT NULL, i1 > 6],", "PushedOffset: OFFSET 1, "} : new String[]{"PushedFilters: [i1 IS NOT NULL, i1 > 6],"});
        executeAndCheck(sql, ImmutableList.of(), i);
        Dataset<Row> sql2 = ss.sql(String.format(Locale.ROOT, "select sum(i2) from %s.%s where i1 > 6 group by i1 limit 1 offset 1", str, table));
        ClickHouseUtils.checkPushedInfo(sql2, i == 1 ? new String[]{"PushedAggregates: [SUM(i2)], ", "PushedFilters: [i1 IS NOT NULL, i1 > 6], ", "PushedGroupByExpressions: [i1], ", "PushedLimit: LIMIT 2", "PushedOffset: OFFSET 1,"} : new String[]{"PushedAggregates: [SUM(i2)], ", "PushedFilters: [i1 IS NOT NULL, i1 > 6], ", "PushedGroupByExpressions: [i1], "});
        executeAndCheck(sql2, ImmutableList.of(), i);
    }

    private void testTop(String str, int i) {
        Dataset sql = ss.sql(String.format(Locale.ROOT, "select s2, i1, i2 from %s.%s where i1 > 1 order by i1 limit 1", str, table));
        checkSortRemoved(sql, i == 1);
        ClickHouseUtils.checkPushedInfo(sql, "PushedFilters: [i1 IS NOT NULL, i1 > 1], ", "PushedTopN: ORDER BY [i1 ASC NULLS FIRST] LIMIT 1, ");
        ImmutableList of = ImmutableList.of(RowFactory.create(new Object[]{"3", 2, 3L}));
        executeAndCheck(sql, of, i);
        Dataset sql2 = ss.sql(String.format(Locale.ROOT, "select s2, i1 as my_i1, i2 from %s.%s where i1 > 1 order by my_i1 limit 1", str, table));
        checkSortRemoved(sql2, i == 1);
        ClickHouseUtils.checkPushedInfo(sql2, "PushedFilters: [i1 IS NOT NULL, i1 > 1], ", "PushedTopN: ORDER BY [i1 ASC NULLS FIRST] LIMIT 1, ");
        executeAndCheck(sql2, of, i);
        Dataset sql3 = ss.sql(String.format(Locale.ROOT, "select sum(i2) from %s.%s where i1 > 1 group by i1 order by i1 limit 1", str, table));
        checkSortRemoved(sql3, i == 1);
        ClickHouseUtils.checkPushedInfo(sql3, i == 1 ? new String[]{"PushedAggregates: [SUM(i2)],", "PushedFilters: [i1 IS NOT NULL, i1 > 1],", "PushedGroupByExpressions: [i1],", "PushedTopN: ORDER BY [i1 ASC NULLS FIRST] LIMIT 1"} : new String[]{"PushedAggregates: [SUM(i2)],", "PushedFilters: [i1 IS NOT NULL, i1 > 1],", "PushedGroupByExpressions: [i1],"});
        executeAndCheck(sql3, ImmutableList.of(RowFactory.create(new Object[]{3})), i);
    }

    private void testPaging(String str, int i) {
        ClickHouseUtils.checkPushedInfo(ss.sql(String.format(Locale.ROOT, "select s2, i1, i2 from %s.%s where i1 > 1 limit 1 offset 1", str, table)), "PushedFilters: [i1 IS NOT NULL, i1 > 1], ", "PushedLimit: LIMIT 2, ");
        Dataset sql = ss.sql(String.format(Locale.ROOT, "select s2, i1, i2 from %s.%s where i1 > 1 order by i1 limit 1 offset 1", str, table));
        checkSortRemoved(sql, i == 1);
        ClickHouseUtils.checkPushedInfo(sql, "PushedFilters: [i1 IS NOT NULL, i1 > 1], ", "PushedTopN: ORDER BY [i1 ASC NULLS FIRST] LIMIT 2, ");
        ImmutableList of = ImmutableList.of(RowFactory.create(new Object[]{"3", 3, 4L}));
        executeAndCheck(sql, of, i);
        Dataset sql2 = ss.sql(String.format(Locale.ROOT, "select s2, i1 as my_i1, i2 from %s.%s where i1 > 1 order by my_i1 limit 1 offset 1", str, table));
        checkSortRemoved(sql2, i == 1);
        ClickHouseUtils.checkPushedInfo(sql2, "PushedFilters: [i1 IS NOT NULL, i1 > 1], ", "PushedTopN: ORDER BY [i1 ASC NULLS FIRST] LIMIT 2, ");
        executeAndCheck(sql2, of, i);
        Dataset sql3 = ss.sql(String.format(Locale.ROOT, "select s2, i1, i2 from %s.%s order by i1 limit 2 offset 1", str, table));
        checkSortRemoved(sql3, i == 1);
        ClickHouseUtils.checkPushedInfo(sql3, "PushedFilters: [], ", "PushedTopN: ORDER BY [i1 ASC NULLS FIRST] LIMIT 3, ");
        ImmutableList of2 = ImmutableList.of(RowFactory.create(new Object[]{"3", 2, 3L}), RowFactory.create(new Object[]{"3", 3, 4L}));
        executeAndCheck(sql3, of2, i);
        Dataset sql4 = ss.sql(String.format(Locale.ROOT, "select s2, i1 as my_i1, i2 from %s.%s order by my_i1 limit 2 offset 1", str, table));
        checkSortRemoved(sql4, i == 1);
        ClickHouseUtils.checkPushedInfo(sql4, "PushedFilters: [], ", "PushedTopN: ORDER BY [i1 ASC NULLS FIRST] LIMIT 3, ");
        executeAndCheck(sql4, of2, i);
    }

    private void testAggregate(String str, int i) {
        Dataset sql = ss.sql(String.format(Locale.ROOT, "select s2, sum(i1), sum(i2), count(i1), count(*) from %s.%s group by s2 order by s2", str, table));
        checkSortRemoved(sql, false);
        ClickHouseUtils.checkAggregateRemoved(sql, i == 1);
        ClickHouseUtils.checkPushedInfo(sql, "PushedAggregates: [SUM(i1), SUM(i2), COUNT(i1), COUNT(*)], ", "PushedFilters: [], ", "PushedGroupByExpressions: [s2], ");
        executeAndCheck(sql, ImmutableList.of(RowFactory.create(new Object[]{"2", 12, 15L, 3L, 3L}), RowFactory.create(new Object[]{"3", 9, 12L, 3L, 3L}), RowFactory.create(new Object[]{"4", 7, 3L, 1L, 1L})), i);
        Dataset sql2 = ss.sql("select s2, COUNT(CASE WHEN i1 > 1 AND i1 < 3 THEN i1 ELSE 0 END), COUNT(CASE WHEN i1 >= 1 OR i1 <= 3 THEN i1 ELSE 0 END), MAX(CASE WHEN NOT(i1 > 1) AND NOT(i1 < 3) THEN i1 ELSE 0 END), MAX(CASE WHEN NOT(i1 != 0) OR NOT(i1 < 1) THEN i1 ELSE 0 END), MIN(CASE WHEN NOT(i1 > 1 OR i1 IS NULL) THEN i1 ELSE 0 END), SUM(CASE WHEN NOT(i1 > 1 AND i1 IS NOT NULL) THEN i1 ELSE 0 END), SUM(CASE WHEN i1 > 2 THEN 2 WHEN i1 > 1 THEN 1 END), AVG(CASE WHEN NOT(i1 > 1 OR i1 IS NOT NULL) THEN i1 ELSE 0 END)" + String.format(Locale.ROOT, " from %s.%s group by s2 order by s2", str, table));
        checkSortRemoved(sql2, false);
        ClickHouseUtils.checkAggregateRemoved(sql2, i == 1);
        ClickHouseUtils.checkPushedInfo(sql2, "PushedAggregates: [COUNT(CASE WHEN (i1 > 1) AND (i1 < 3) THEN i1 ELSE 0 END), COUNT(CASE WHEN (i1 >= 1) OR (i1 <= 3..., ", "PushedFilters: [], ", "PushedGroupByExpressions: [s2], ");
        executeAndCheck(sql2, ImmutableList.of(RowFactory.create(new Object[]{"2", 3, 3, 0, 6, 0, 1, 4, 0}), RowFactory.create(new Object[]{"3", 3, 3, 0, 4, 0, 0, 5, 0}), RowFactory.create(new Object[]{"4", 1, 1, 0, 7, 0, 0, 2, 0})), i);
        Dataset sql3 = ss.sql(String.format(Locale.ROOT, "select s2, sum(distinct i1), count(distinct i1), avg(distinct i1) from %s.%s group by s2 order by s2", str, table));
        checkSortRemoved(sql3, false);
        ClickHouseUtils.checkAggregateRemoved(sql3, i == 1);
        ClickHouseUtils.checkPushedInfo(sql3, i == 1 ? new String[]{"PushedAggregates: [SUM(DISTINCT i1), COUNT(DISTINCT i1), AVG(DISTINCT i1)], ", "PushedFilters: [], ", "PushedGroupByExpressions: [s2], "} : new String[]{"PushedFilters: [], "});
        executeAndCheck(sql3, ImmutableList.of(RowFactory.create(new Object[]{"2", 12, 3, Double.valueOf(4.0d)}), RowFactory.create(new Object[]{"3", 9, 3, Double.valueOf(3.0d)}), RowFactory.create(new Object[]{"4", 7, 1, Double.valueOf(7.0d)})), i);
    }

    @Test
    public void testOnSingleShard() throws Exception {
        Assert.assertTrue(((Boolean) ClickHouseUtils.prepare1Instance(true, (jdbcDatabaseContainer, connection) -> {
            setupCatalog(jdbcDatabaseContainer, "spark.sql.catalog.testOnSingleShard");
            testFilter("testOnSingleShard", 1);
            testLimit("testOnSingleShard", 1);
            testTop("testOnSingleShard", 1);
            testPaging("testOnSingleShard", 1);
            testAggregate("testOnSingleShard", 1);
            testOffset("testOnSingleShard", 1);
            return true;
        })).booleanValue());
    }

    @Test
    public void testOnMultipleShard() throws Exception {
        Assert.assertTrue(((Boolean) ClickHouseUtils.prepare2Instances(true, (jdbcDatabaseContainer, connection, jdbcDatabaseContainer2, connection2) -> {
            ImmutableList of = ImmutableList.of(jdbcDatabaseContainer.getJdbcUrl(), jdbcDatabaseContainer2.getJdbcUrl());
            String buildSharding = ShardOptions$.MODULE$.buildSharding(JavaConverters.asScalaBuffer(of));
            setupCatalog(jdbcDatabaseContainer, "spark.sql.catalog.testOnMultipleShard");
            SparderEnv.getSparkSession().sessionState().conf().setConfString("spark.sql.catalog.testOnMultipleShard." + ShardOptions$.MODULE$.SHARD_URLS(), buildSharding);
            SparderEnv.getSparkSession().sessionState().conf().setConfString("spark.sql.catalog.testOnMultipleShard." + JDBCOptions.JDBC_NUM_PARTITIONS(), String.valueOf(of.size()));
            testFilter("testOnMultipleShard", of.size());
            testLimit("testOnMultipleShard", of.size());
            testTop("testOnMultipleShard", of.size());
            testPaging("testOnMultipleShard", of.size());
            testAggregate("testOnMultipleShard", of.size());
            testOffset("testOnMultipleShard", of.size());
            return true;
        })).booleanValue());
    }

    @Test
    public void testMultipleShardWithDataFrame() throws Exception {
        ClickHouseUtils.prepare2Instances(true, (jdbcDatabaseContainer, connection, jdbcDatabaseContainer2, connection2) -> {
            String buildSharding = ShardOptions$.MODULE$.buildSharding(JavaConverters.asScalaBuffer(ImmutableList.of(jdbcDatabaseContainer.getJdbcUrl(), jdbcDatabaseContainer2.getJdbcUrl())));
            setupCatalog(jdbcDatabaseContainer, "spark.sql.catalog.testMultipleShardWithDataFrame");
            DataFrameReader dataFrameReader = new DataFrameReader(SparderEnv.getSparkSession());
            dataFrameReader.option(ShardOptions$.MODULE$.SHARD_URLS(), buildSharding);
            dataFrameReader.option(JDBCOptions.JDBC_NUM_PARTITIONS(), r0.size());
            Assert.assertEquals(7L, dataFrameReader.table("testMultipleShardWithDataFrame.default.shard_table").count());
            return true;
        });
    }

    @Test
    public void testClickhouseType() {
        Assert.assertNotNull(JdbcDialects.get("jdbc:clickhouse").getCatalystType(2003, "Array(_numeric)", 0, new MetadataBuilder().putString("name", "test_column").putLong("scale", -127L)));
    }

    static {
        $assertionsDisabled = !ClickHouseV2QueryTest.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(ClickHouseV2QueryTest.class);
    }
}
