package org.apache.kylin.engine.spark.utils;

import com.fasterxml.jackson.core.JsonProcessingException;
import java.util.ArrayList;
import java.util.List;
import org.apache.kylin.cluster.AvailableResource;
import org.apache.kylin.cluster.ResourceInfo;
import org.apache.kylin.cluster.YarnClusterManager;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
import org.apache.kylin.engine.spark.job.KylinBuildEnv;
import org.apache.kylin.guava30.shaded.common.collect.Lists;
import org.apache.spark.SparkConf;
import org.apache.spark.conf.rule.ExecutorInstancesRule;
import org.apache.spark.conf.rule.YarnConfRule;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/kylin/engine/spark/utils/SparkConfHelperTest.class */
public class SparkConfHelperTest extends NLocalFileMetadataTestCase {
    private final YarnClusterManager clusterManager = (YarnClusterManager) Mockito.mock(YarnClusterManager.class);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kylin/engine/spark/utils/SparkConfHelperTest$CompareTuple.class */
    public static class CompareTuple {
        String expect_Value;
        String key;

        CompareTuple(String str, String str2) {
            this.expect_Value = str;
            this.key = str2;
        }
    }

    @Before
    public void setup() throws Exception {
        createTestMetadata(new String[0]);
        KylinBuildEnv.getOrCreate(KylinConfig.getInstanceFromEnv());
        Mockito.when(this.clusterManager.fetchQueueAvailableResource("default")).thenReturn(new AvailableResource(new ResourceInfo(100, 100), new ResourceInfo(60480, 100)));
    }

    @After
    public void cleanUp() {
        cleanupTestMetadata();
    }

    @Test
    public void testOneGradeWhenLessTHan1GB() throws JsonProcessingException {
        KylinConfig.getInstanceFromEnv().setProperty("kylin.engine.spark.build-conf-extra-rules", "org.apache.spark.conf.rule.YarnConfRule");
        SparkConfHelper sparkConfHelper = new SparkConfHelper();
        sparkConfHelper.setClusterManager(this.clusterManager);
        sparkConfHelper.setOption("source_table_size", "1b");
        sparkConfHelper.setOption("layout_size", "10");
        sparkConfHelper.setOption("required_cores", "1");
        sparkConfHelper.setConf("spark.yarn.queue", "default");
        SparkConf sparkConf = new SparkConf();
        sparkConfHelper.generateSparkConf();
        sparkConfHelper.applySparkConf(sparkConf);
        ArrayList newArrayList = Lists.newArrayList(new CompareTuple[]{new CompareTuple("1GB", "spark.executor.memory"), new CompareTuple("1", "spark.executor.cores"), new CompareTuple("512MB", "spark.executor.memoryOverhead"), new CompareTuple("5", "spark.executor.instances"), new CompareTuple("2", "spark.sql.shuffle.partitions")});
        compareConf(newArrayList, sparkConf);
        cleanSparkConfHelper(sparkConfHelper);
        sparkConfHelper.setConf("count_distinct", "true");
        sparkConfHelper.generateSparkConf();
        sparkConfHelper.applySparkConf(sparkConf);
        newArrayList.set(0, new CompareTuple("1GB", "spark.executor.memory"));
        newArrayList.set(1, new CompareTuple("5", "spark.executor.cores"));
        newArrayList.set(2, new CompareTuple("1GB", "spark.executor.memoryOverhead"));
        compareConf(newArrayList, sparkConf);
    }

    @Test
    public void testTwoGradeWhenLessTHan10GB() throws JsonProcessingException {
        SparkConfHelper sparkConfHelper = new SparkConfHelper();
        sparkConfHelper.setClusterManager(this.clusterManager);
        sparkConfHelper.setOption("source_table_size", "8589934592b");
        sparkConfHelper.setOption("layout_size", "10");
        sparkConfHelper.setOption("required_cores", "1");
        sparkConfHelper.setConf("spark.yarn.queue", "default");
        SparkConf sparkConf = new SparkConf();
        sparkConfHelper.generateSparkConf();
        sparkConfHelper.applySparkConf(sparkConf);
        ArrayList newArrayList = Lists.newArrayList(new CompareTuple[]{new CompareTuple("4GB", "spark.executor.memory"), new CompareTuple("5", "spark.executor.cores"), new CompareTuple("1GB", "spark.executor.memoryOverhead"), new CompareTuple("5", "spark.executor.instances"), new CompareTuple("256", "spark.sql.shuffle.partitions")});
        compareConf(newArrayList, sparkConf);
        sparkConfHelper.setConf("count_distinct", "true");
        cleanSparkConfHelper(sparkConfHelper);
        sparkConfHelper.generateSparkConf();
        sparkConfHelper.applySparkConf(sparkConf);
        newArrayList.set(0, new CompareTuple("10GB", "spark.executor.memory"));
        newArrayList.set(2, new CompareTuple("2GB", "spark.executor.memoryOverhead"));
        compareConf(newArrayList, sparkConf);
    }

    @Test
    public void testThreeGradeWhenLessHan100GB() throws JsonProcessingException {
        SparkConfHelper sparkConfHelper = new SparkConfHelper();
        sparkConfHelper.setClusterManager(this.clusterManager);
        sparkConfHelper.setOption("source_table_size", "53687091200b");
        sparkConfHelper.setOption("layout_size", "10");
        sparkConfHelper.setOption("required_cores", "1");
        sparkConfHelper.setConf("spark.yarn.queue", "default");
        SparkConf sparkConf = new SparkConf();
        sparkConfHelper.generateSparkConf();
        sparkConfHelper.applySparkConf(sparkConf);
        ArrayList newArrayList = Lists.newArrayList(new CompareTuple[]{new CompareTuple("10GB", "spark.executor.memory"), new CompareTuple("5", "spark.executor.cores"), new CompareTuple("2GB", "spark.executor.memoryOverhead"), new CompareTuple("5", "spark.executor.instances"), new CompareTuple("1600", "spark.sql.shuffle.partitions")});
        compareConf(newArrayList, sparkConf);
        sparkConfHelper.setConf("count_distinct", "true");
        cleanSparkConfHelper(sparkConfHelper);
        sparkConfHelper.generateSparkConf();
        sparkConfHelper.applySparkConf(sparkConf);
        newArrayList.set(0, new CompareTuple("16GB", "spark.executor.memory"));
        newArrayList.set(2, new CompareTuple("4GB", "spark.executor.memoryOverhead"));
        compareConf(newArrayList, sparkConf);
    }

    @Test
    public void testFourGradeWhenGreaterThanOrEqual100GB() throws JsonProcessingException {
        SparkConfHelper sparkConfHelper = new SparkConfHelper();
        sparkConfHelper.setClusterManager(this.clusterManager);
        sparkConfHelper.setOption("source_table_size", "107374182400b");
        sparkConfHelper.setOption("layout_size", "10");
        sparkConfHelper.setOption("required_cores", "1");
        sparkConfHelper.setConf("spark.yarn.queue", "default");
        SparkConf sparkConf = new SparkConf();
        sparkConfHelper.generateSparkConf();
        sparkConfHelper.applySparkConf(sparkConf);
        ArrayList newArrayList = Lists.newArrayList(new CompareTuple[]{new CompareTuple("16GB", "spark.executor.memory"), new CompareTuple("5", "spark.executor.cores"), new CompareTuple("4GB", "spark.executor.memoryOverhead"), new CompareTuple("5", "spark.executor.instances"), new CompareTuple("3200", "spark.sql.shuffle.partitions")});
        compareConf(newArrayList, sparkConf);
        sparkConfHelper.setConf("count_distinct", "true");
        cleanSparkConfHelper(sparkConfHelper);
        sparkConfHelper.generateSparkConf();
        sparkConfHelper.applySparkConf(sparkConf);
        newArrayList.set(0, new CompareTuple("20GB", "spark.executor.memory"));
        newArrayList.set(2, new CompareTuple("6GB", "spark.executor.memoryOverhead"));
        compareConf(newArrayList, sparkConf);
    }

    @Test
    public void testUserDefinedSparkConf() throws JsonProcessingException {
        YarnClusterManager yarnClusterManager = (YarnClusterManager) Mockito.mock(YarnClusterManager.class);
        Mockito.when(yarnClusterManager.fetchQueueAvailableResource("default")).thenReturn(new AvailableResource(new ResourceInfo(10240, 100), new ResourceInfo(60480, 100)));
        overwriteSystemProp("kylin.engine.base-executor-instance", "1");
        SparkConfHelper sparkConfHelper = new SparkConfHelper();
        sparkConfHelper.setClusterManager(yarnClusterManager);
        sparkConfHelper.setOption("source_table_size", "1b");
        sparkConfHelper.setOption("layout_size", "500");
        sparkConfHelper.setOption("required_cores", "1");
        sparkConfHelper.setConf("spark.yarn.queue", "default");
        sparkConfHelper.setConf("spark.executor.memory", "4GB");
        sparkConfHelper.setConf("spark.executor.memoryOverhead", "512MB");
        sparkConfHelper.setConf("spark.executor.cores", "3");
        sparkConfHelper.setConf("spark.sql.shuffle.partitions", "10");
        SparkConf sparkConf = new SparkConf();
        sparkConfHelper.generateSparkConf();
        sparkConfHelper.applySparkConf(sparkConf);
        ArrayList newArrayList = Lists.newArrayList(new CompareTuple[]{new CompareTuple("4GB", "spark.executor.memory"), new CompareTuple("3", "spark.executor.cores"), new CompareTuple("512MB", "spark.executor.memoryOverhead"), new CompareTuple("2", "spark.executor.instances"), new CompareTuple("10", "spark.sql.shuffle.partitions")});
        compareConf(newArrayList, sparkConf);
        overwriteSystemProp("kylin.env.channel", "cloud");
        sparkConfHelper.generateSparkConf();
        sparkConfHelper.applySparkConf(sparkConf);
        newArrayList.add(new CompareTuple("6", "spark.cores.max"));
        compareConf(newArrayList, sparkConf);
    }

    @Test
    public void testUserDefinedSparkConf_Cloud() throws JsonProcessingException {
        overwriteSystemProp("kylin.env.channel", "cloud");
        SparkConfHelper sparkConfHelper = new SparkConfHelper();
        sparkConfHelper.setClusterManager(this.clusterManager);
        sparkConfHelper.setConf("spark.executor.memory", "4GB");
        sparkConfHelper.setConf("spark.executor.memoryOverhead", "512MB");
        sparkConfHelper.setConf("spark.executor.cores", "3");
        sparkConfHelper.setConf("spark.sql.shuffle.partitions", "10");
        sparkConfHelper.setConf("spark.executor.instances", "5");
        sparkConfHelper.setConf("spark.cores.max", "15");
        SparkConf sparkConf = new SparkConf();
        sparkConfHelper.generateSparkConf();
        sparkConfHelper.applySparkConf(sparkConf);
        compareConf(Lists.newArrayList(new CompareTuple[]{new CompareTuple("4GB", "spark.executor.memory"), new CompareTuple("3", "spark.executor.cores"), new CompareTuple("512MB", "spark.executor.memoryOverhead"), new CompareTuple("5", "spark.executor.instances"), new CompareTuple("10", "spark.sql.shuffle.partitions"), new CompareTuple("15", "spark.cores.max")}), sparkConf);
    }

    private void compareConf(List<CompareTuple> list, SparkConf sparkConf) {
        for (CompareTuple compareTuple : list) {
            Assert.assertEquals(compareTuple.expect_Value, sparkConf.get(compareTuple.key));
        }
    }

    @Test
    public void testComputeExecutorInstanceSize() {
        ExecutorInstancesRule executorInstancesRule = new ExecutorInstancesRule();
        Assert.assertEquals(5L, executorInstancesRule.calculateExecutorInstanceSizeByLayoutSize(10));
        Assert.assertEquals(10L, executorInstancesRule.calculateExecutorInstanceSizeByLayoutSize(100));
        Assert.assertEquals(10L, executorInstancesRule.calculateExecutorInstanceSizeByLayoutSize(300));
        Assert.assertEquals(15L, executorInstancesRule.calculateExecutorInstanceSizeByLayoutSize(600));
        Assert.assertEquals(20L, executorInstancesRule.calculateExecutorInstanceSizeByLayoutSize(1000));
        Assert.assertEquals(20L, executorInstancesRule.calculateExecutorInstanceSizeByLayoutSize(1200));
    }

    @Test
    public void testExecutorInstancesRule() {
        ExecutorInstancesRule executorInstancesRule = new ExecutorInstancesRule();
        SparkConfHelper sparkConfHelper = new SparkConfHelper();
        sparkConfHelper.setClusterManager(this.clusterManager);
        resetSparkConfHelper(sparkConfHelper);
        Mockito.when(this.clusterManager.fetchQueueAvailableResource("default")).thenReturn(new AvailableResource(new ResourceInfo(100, 100), new ResourceInfo(60480, 100)));
        executorInstancesRule.apply(sparkConfHelper);
        Assert.assertEquals("5", sparkConfHelper.getConf("spark.executor.instances"));
        resetSparkConfHelper(sparkConfHelper);
        Mockito.when(this.clusterManager.fetchQueueAvailableResource("default")).thenReturn(new AvailableResource(new ResourceInfo(20480, 100), new ResourceInfo(60480, 100)));
        executorInstancesRule.apply(sparkConfHelper);
        Assert.assertEquals("13", sparkConfHelper.getConf("spark.executor.instances"));
        resetSparkConfHelper(sparkConfHelper);
        sparkConfHelper.setOption("layout_size", "200");
        sparkConfHelper.setOption("required_cores", "9");
        executorInstancesRule.apply(sparkConfHelper);
        Assert.assertEquals("10", sparkConfHelper.getConf("spark.executor.instances"));
        resetSparkConfHelper(sparkConfHelper);
        sparkConfHelper.setOption("layout_size", "200");
        executorInstancesRule.apply(sparkConfHelper);
        Assert.assertEquals("13", sparkConfHelper.getConf("spark.executor.instances"));
        resetSparkConfHelper(sparkConfHelper);
        overwriteSystemProp("kylin.engine.base-executor-instance", "30");
        executorInstancesRule.apply(sparkConfHelper);
        Assert.assertEquals("30", sparkConfHelper.getConf("spark.executor.instances"));
        restoreSystemProp("kylin.engine.base-executor-instance");
        resetSparkConfHelper(sparkConfHelper);
        Mockito.when(this.clusterManager.fetchQueueAvailableResource("default")).thenReturn(new AvailableResource(new ResourceInfo(60480, 100), new ResourceInfo(60480, 100)));
        sparkConfHelper.setOption("layout_size", "1200");
        executorInstancesRule.apply(sparkConfHelper);
        Assert.assertEquals("20", sparkConfHelper.getConf("spark.executor.instances"));
        resetSparkConfHelper(sparkConfHelper);
        sparkConfHelper.setOption("layout_size", "-1");
        sparkConfHelper.setOption("required_cores", "4");
        executorInstancesRule.apply(sparkConfHelper);
        Assert.assertEquals("5", sparkConfHelper.getConf("spark.executor.instances"));
        resetSparkConfHelper(sparkConfHelper);
        sparkConfHelper.setConf("spark.executor.instances", "10");
        executorInstancesRule.apply(sparkConfHelper);
        Assert.assertEquals("10", sparkConfHelper.getConf("spark.executor.instances"));
    }

    private void resetSparkConfHelper(SparkConfHelper sparkConfHelper) {
        sparkConfHelper.setOption("source_table_size", "1b");
        sparkConfHelper.setOption("layout_size", "10");
        sparkConfHelper.setOption("required_cores", "1");
        sparkConfHelper.setConf("spark.yarn.queue", "default");
        sparkConfHelper.setConf("spark.executor.memory", "1GB");
        sparkConfHelper.setConf("spark.executor.instances", (String) null);
        sparkConfHelper.setConf("spark.executor.memoryOverhead", "512MB");
        sparkConfHelper.setOption("required_cores", "14");
        sparkConfHelper.setOption("spark.executor.instances", (String) null);
    }

    private void cleanSparkConfHelper(SparkConfHelper sparkConfHelper) {
        sparkConfHelper.setConf("spark.executor.memory", (String) null);
        sparkConfHelper.setConf("spark.executor.memoryOverhead", (String) null);
        sparkConfHelper.setConf("spark.executor.cores", (String) null);
    }

    @Test
    public void testExecutorInstancesRuleWhenError() {
        SparkConfHelper sparkConfHelper = new SparkConfHelper();
        Mockito.when(this.clusterManager.fetchQueueAvailableResource("default")).thenReturn(new AvailableResource(new ResourceInfo(-60480, -100), new ResourceInfo(-60480, -100)));
        sparkConfHelper.generateSparkConf();
        Assert.assertEquals("5", sparkConfHelper.getConf("spark.executor.instances"));
    }

    @Test
    public void testYarnConfRule() throws JsonProcessingException {
        SparkConfHelper sparkConfHelper = new SparkConfHelper();
        sparkConfHelper.setClusterManager(this.clusterManager);
        YarnConfRule yarnConfRule = new YarnConfRule();
        yarnConfRule.apply(sparkConfHelper);
        Assert.assertEquals("default", sparkConfHelper.getConf("spark.yarn.queue"));
        SparkConf sparkConf = new SparkConf();
        sparkConfHelper.applySparkConf(sparkConf);
        Assert.assertEquals("default", sparkConf.get("spark.yarn.queue"));
        Mockito.when(this.clusterManager.listQueueNames()).thenReturn(Lists.newArrayList(new String[]{"default", "dw"}));
        sparkConfHelper.setConf("spark.yarn.queue", "dw");
        yarnConfRule.apply(sparkConfHelper);
        Assert.assertEquals("dw", sparkConfHelper.getConf("spark.yarn.queue"));
        sparkConfHelper.applySparkConf(sparkConf);
        Assert.assertEquals("dw", sparkConf.get("spark.yarn.queue"));
        sparkConfHelper.setConf("spark.yarn.queue", "test");
        yarnConfRule.apply(sparkConfHelper);
        Assert.assertEquals("default", sparkConfHelper.getConf("spark.yarn.queue"));
        sparkConfHelper.applySparkConf(sparkConf);
        Assert.assertEquals("default", sparkConf.get("spark.yarn.queue"));
    }

    @Test
    public void testAutoSetSparkExecutorMemory() throws JsonProcessingException {
        SparkConfHelper sparkConfHelper = new SparkConfHelper();
        Mockito.when(this.clusterManager.fetchMaximumResourceAllocation()).thenReturn(new ResourceInfo(1024, 100));
        sparkConfHelper.setClusterManager(this.clusterManager);
        sparkConfHelper.setOption("source_table_size", "1073741824000b");
        sparkConfHelper.setConf("spark.yarn.queue", "default");
        SparkConf sparkConf = new SparkConf();
        sparkConfHelper.generateSparkConf();
        sparkConfHelper.applySparkConf(sparkConf);
        Assert.assertEquals("920MB", sparkConf.get("spark.executor.memory"));
    }
}
