package org.apache.kylin.newten;

import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.Shell;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.common.util.RandomUtil;
import org.apache.kylin.common.util.TempMetadataBuilder;
import org.apache.kylin.engine.spark.NLocalWithSparkSessionTest;
import org.apache.kylin.engine.spark.filter.BloomFilterSkipCollector;
import org.apache.kylin.engine.spark.filter.ParquetBloomFilter;
import org.apache.kylin.engine.spark.filter.QueryFiltersCollector;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.impl.threadpool.NDefaultScheduler;
import org.apache.kylin.junit.TimeZoneTestRunner;
import org.apache.kylin.metadata.cube.model.LayoutEntity;
import org.apache.kylin.metadata.cube.model.NDataflow;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.model.SegmentRange;
import org.apache.kylin.util.ExecAndComp;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparderEnv;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper;
import org.apache.spark.sql.internal.StaticSQLConf;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.sparkproject.guava.collect.Sets;

@RunWith(TimeZoneTestRunner.class)
/* loaded from: input_file:org/apache/kylin/newten/BloomFilterTest.class */
public class BloomFilterTest extends NLocalWithSparkSessionTest implements AdaptiveSparkPlanHelper {
    private NDataflowManager dfMgr = null;

    @BeforeClass
    public static void initSpark() {
        if (Shell.MAC) {
            overwriteSystemPropBeforeClass("org.xerial.snappy.lib.name", "libsnappyjava.jnilib");
        }
        if (ss != null && !ss.sparkContext().isStopped()) {
            ss.stop();
        }
        sparkConf = new SparkConf().setAppName(RandomUtil.randomUUIDStr()).setMaster("local[2]");
        sparkConf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer");
        sparkConf.set(StaticSQLConf.CATALOG_IMPLEMENTATION().key(), "in-memory");
        sparkConf.set("spark.sql.shuffle.partitions", "1");
        sparkConf.set("spark.memory.fraction", "0.1");
        sparkConf.set(StaticSQLConf.WAREHOUSE_PATH().key(), TempMetadataBuilder.TEMP_TEST_METADATA + "/spark-warehouse");
        ss = SparkSession.builder().config(sparkConf).getOrCreate();
        SparderEnv.setSparkSession(ss);
    }

    @Before
    public void setup() throws Exception {
        overwriteSystemProp("kylin.job.scheduler.poll-interval-second", "1");
        overwriteSystemProp("kylin.bloom.collect-filter.enabled", "true");
        overwriteSystemProp("kylin.bloom.build.enabled", "true");
        overwriteSystemProp("kylin.query.filter.collect-interval", "10");
        createTestMetadata(new String[]{"src/test/resources/ut_meta/bloomfilter"});
        NDefaultScheduler nDefaultScheduler = NDefaultScheduler.getInstance(getProject());
        nDefaultScheduler.init(new JobEngineConfig(KylinConfig.getInstanceFromEnv()));
        if (!nDefaultScheduler.hasStarted()) {
            throw new RuntimeException("scheduler has not been started");
        }
        QueryFiltersCollector.initScheduler();
        this.dfMgr = NDataflowManager.getInstance(getTestConfig(), getProject());
    }

    @After
    public void after() throws Exception {
        NDefaultScheduler.destroyInstance();
        QueryFiltersCollector.destoryScheduler();
        cleanupTestMetadata();
    }

    public String getProject() {
        return "bloomfilter";
    }

    @Test
    public void testBuildBloomFilter() throws Exception {
        Path projectFiltersFile = QueryFiltersCollector.getProjectFiltersFile(QueryFiltersCollector.SERVER_HOST, getProject());
        FileSystem fileSystem = HadoopUtil.getFileSystem(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory());
        if (fileSystem.exists(projectFiltersFile)) {
            fileSystem.delete(projectFiltersFile, true);
        }
        String str = "c41390c5-b93d-4db3-b167-029874b85a2c";
        NDataflow dataflow = this.dfMgr.getDataflow("c41390c5-b93d-4db3-b167-029874b85a2c");
        Assert.assertNotNull(dataflow.getIndexPlan().getLayoutEntity(20000000001L));
        populateSSWithCSVData(getTestConfig(), getProject(), SparderEnv.getSparkSession());
        overwriteSystemProp("kylin.bloom.build.column-ids", "0#1");
        this.indexDataConstructor.buildIndex("c41390c5-b93d-4db3-b167-029874b85a2c", SegmentRange.TimePartitionedSegmentRange.createInfinite(), Sets.newHashSet(new LayoutEntity[]{dataflow.getIndexPlan().getLayoutEntity(20000000001L)}), true);
        if (ParquetBloomFilter.isLoaded()) {
            Assert.assertTrue(ParquetBloomFilter.getBuildBloomColumns().contains("0"));
            Assert.assertTrue(ParquetBloomFilter.getBuildBloomColumns().contains("1"));
        }
        ArrayList arrayList = new ArrayList();
        arrayList.add(Pair.newPair("bloomfilter", "select * from SSB.P_LINEORDER where LO_CUSTKEY in (13,8) and LO_SHIPPRIOTITY = 0 "));
        ExecAndComp.execAndCompare(arrayList, getProject(), ExecAndComp.CompareLevel.NONE, "inner");
        Awaitility.await().atMost(120L, TimeUnit.SECONDS).until(() -> {
            try {
                return fileSystem.exists(projectFiltersFile);
            } catch (Exception e) {
                return false;
            }
        });
        Map map = (Map) JsonUtil.readValue(HadoopUtil.readStringFromHdfs(fileSystem, projectFiltersFile), Map.class);
        Assert.assertTrue(((Map) map.get("c41390c5-b93d-4db3-b167-029874b85a2c")).keySet().contains("8"));
        Assert.assertTrue(((Map) map.get("c41390c5-b93d-4db3-b167-029874b85a2c")).keySet().contains("9"));
        Integer num = (Integer) ((Map) map.get("c41390c5-b93d-4db3-b167-029874b85a2c")).get("8");
        Assert.assertTrue(fileSystem.exists(projectFiltersFile));
        arrayList.add(Pair.newPair("bloomfilter", "select * from SSB.P_LINEORDER where LO_CUSTKEY in (13,8)"));
        ExecAndComp.execAndCompare(arrayList, getProject(), ExecAndComp.CompareLevel.NONE, "inner");
        Awaitility.await().atMost(120L, TimeUnit.SECONDS).until(() -> {
            try {
                return ((Integer) ((Map) ((Map) JsonUtil.readValue(HadoopUtil.readStringFromHdfs(fileSystem, projectFiltersFile), Map.class)).get(str)).get("8")).intValue() > num.intValue();
            } catch (Exception e) {
                return false;
            }
        });
        overwriteSystemProp("kylin.bloom.build.column-ids", "");
        overwriteSystemProp("kylin.bloom.build.column.max-size", "1");
        ParquetBloomFilter.resetParquetBloomFilter();
        this.indexDataConstructor.buildIndex("c41390c5-b93d-4db3-b167-029874b85a2c", SegmentRange.TimePartitionedSegmentRange.createInfinite(), Sets.newHashSet(new LayoutEntity[]{dataflow.getIndexPlan().getLayoutEntity(20000000001L)}), true);
        if (ParquetBloomFilter.isLoaded()) {
            Assert.assertTrue(ParquetBloomFilter.getBuildBloomColumns().contains("8"));
            Assert.assertFalse(ParquetBloomFilter.getBuildBloomColumns().contains("9"));
        }
        ExecAndComp.execAndCompare(arrayList, getProject(), ExecAndComp.CompareLevel.SAME, "inner");
        testBloomFilterSkipCollector();
    }

    private void testBloomFilterSkipCollector() {
        BloomFilterSkipCollector.addQueryMetrics("query-id1", 3L, 2L, 20L, 100L, 1L);
        BloomFilterSkipCollector.addQueryMetrics("query-id1", 1L, 1L, 10L, 100L, 1L);
        Assert.assertEquals(4L, ((AtomicLong) BloomFilterSkipCollector.queryTotalBloomBlocks.getIfPresent("query-id1")).get());
        Assert.assertEquals(3L, ((AtomicLong) BloomFilterSkipCollector.querySkipBloomBlocks.getIfPresent("query-id1")).get());
        Assert.assertEquals(30L, ((AtomicLong) BloomFilterSkipCollector.querySkipBloomRows.getIfPresent("query-id1")).get());
        Assert.assertEquals(200L, ((AtomicLong) BloomFilterSkipCollector.queryFooterReadTime.getIfPresent("query-id1")).get());
        Assert.assertEquals(2L, ((AtomicLong) BloomFilterSkipCollector.queryFooterReadNumber.getIfPresent("query-id1")).get());
        BloomFilterSkipCollector.logAndCleanStatus("query-id1");
        BloomFilterSkipCollector.logAndCleanStatus("query-id2");
        Assert.assertNull(BloomFilterSkipCollector.queryTotalBloomBlocks.getIfPresent("query-id1"));
        Assert.assertNull(BloomFilterSkipCollector.querySkipBloomBlocks.getIfPresent("query-id1"));
        Assert.assertNull(BloomFilterSkipCollector.querySkipBloomRows.getIfPresent("query-id1"));
        Assert.assertNull(BloomFilterSkipCollector.queryFooterReadTime.getIfPresent("query-id1"));
        Assert.assertNull(BloomFilterSkipCollector.queryFooterReadNumber.getIfPresent("query-id1"));
        for (int i = 0; i < 200; i++) {
            BloomFilterSkipCollector.logAndCleanStatus("query-id2");
        }
        Assert.assertTrue(BloomFilterSkipCollector.logCounter.get() <= 100);
    }
}
