package org.apache.kylin.engine.spark.job.stage.build;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.KapConfig;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.engine.spark.IndexDataConstructor;
import org.apache.kylin.engine.spark.NLocalWithSparkSessionTest;
import org.apache.kylin.engine.spark.job.NSparkCubingJob;
import org.apache.kylin.engine.spark.job.NSparkCubingUtil;
import org.apache.kylin.guava30.shaded.common.collect.Sets;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.execution.NExecutableManager;
import org.apache.kylin.job.impl.threadpool.NDefaultScheduler;
import org.apache.kylin.metadata.cube.model.IndexPlan;
import org.apache.kylin.metadata.cube.model.LayoutEntity;
import org.apache.kylin.metadata.cube.model.NDataLayout;
import org.apache.kylin.metadata.cube.model.NDataSegment;
import org.apache.kylin.metadata.cube.model.NDataflow;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.cube.model.NIndexPlanManager;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.datasource.storage.StorageStore;
import org.apache.spark.sql.datasource.storage.StorageStoreFactory;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;

/* loaded from: input_file:org/apache/kylin/engine/spark/job/stage/build/GenerateFlatTableWithSparkSessionTest.class */
public class GenerateFlatTableWithSparkSessionTest extends NLocalWithSparkSessionTest {
    private static final LinkedHashMap<Long, ColumnStruct> DIM_SCHEMAS = new LinkedHashMap<>();
    private static final LinkedHashMap<Long, ColumnStruct> MEASURE_SCHEMAS = new LinkedHashMap<>();
    private static final String MODEL_ID = "3ec47efc-573a-9304-4405-8e05ae184322";
    private static final String SEGMENT_ID = "b2f206e1-7a15-c94a-20f5-f608d550ead6";
    private static final long LAYOUT_ID_TO_BUILD = 20001;
    private KylinConfig config;
    private NIndexPlanManager indexPlanManager;
    private NDataflowManager dataflowManager;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kylin/engine/spark/job/stage/build/GenerateFlatTableWithSparkSessionTest$ColumnStruct.class */
    public static class ColumnStruct {
        private Long rowId;
        private String rowName;
        private DataType type;

        @Generated
        public Long getRowId() {
            return this.rowId;
        }

        @Generated
        public String getRowName() {
            return this.rowName;
        }

        @Generated
        public DataType getType() {
            return this.type;
        }

        @Generated
        public ColumnStruct(Long l, String str, DataType dataType) {
            this.rowId = l;
            this.rowName = str;
            this.type = dataType;
        }
    }

    @Override // org.apache.kylin.engine.spark.NLocalWithSparkSessionTest
    public String getProject() {
        return "index_build_test";
    }

    @BeforeAll
    public static void beforeClass() {
        NLocalWithSparkSessionTest.beforeClass();
    }

    @AfterAll
    public static void afterClass() {
        NLocalWithSparkSessionTest.afterClass();
    }

    @Override // org.apache.kylin.engine.spark.NLocalWithSparkSessionTest
    @BeforeEach
    public void setUp() throws Exception {
        super.setUp();
        overwriteSystemProp("kylin.job.scheduler.poll-interval-second", "1");
        overwriteSystemProp("kylin.engine.persist-flattable-threshold", "0");
        overwriteSystemProp("kylin.engine.persist-flatview", "true");
        NDefaultScheduler.destroyInstance();
        NDefaultScheduler nDefaultScheduler = NDefaultScheduler.getInstance(getProject());
        nDefaultScheduler.init(new JobEngineConfig(getTestConfig()));
        if (!nDefaultScheduler.hasStarted()) {
            throw new RuntimeException("scheduler has not been started");
        }
        this.config = getTestConfig();
        this.indexPlanManager = NIndexPlanManager.getInstance(this.config, getProject());
        this.dataflowManager = NDataflowManager.getInstance(this.config, getProject());
    }

    @Override // org.apache.kylin.engine.spark.NLocalWithSparkSessionTest
    @AfterEach
    public void tearDown() throws Exception {
        super.tearDown();
        NDefaultScheduler.destroyInstance();
    }

    private Object[] initDataCountCheckTest() {
        overwriteSystemProp("kylin.build.data-count-check-enabled", "true");
        IndexPlan indexPlan = this.indexPlanManager.getIndexPlan(MODEL_ID);
        NDataflow dataflow = this.dataflowManager.getDataflow(MODEL_ID);
        return new Object[]{indexPlan, dataflow, dataflow.getSegment(SEGMENT_ID), Collections.unmodifiableSet(NSparkCubingUtil.toLayouts(indexPlan, Sets.newHashSet(new Long[]{Long.valueOf(LAYOUT_ID_TO_BUILD)})))};
    }

    @Test
    void testCheckDataCount_Good() throws Exception {
        Object[] initDataCountCheckTest = initDataCountCheckTest();
        IndexPlan indexPlan = (IndexPlan) initDataCountCheckTest[0];
        NDataflow nDataflow = (NDataflow) initDataCountCheckTest[1];
        NDataSegment nDataSegment = (NDataSegment) initDataCountCheckTest[2];
        Set<LayoutEntity> set = (Set) initDataCountCheckTest[3];
        prepareFlatTableParquet(nDataflow, nDataSegment, false);
        prepareLayoutsParquet(indexPlan, nDataSegment, false, false, false);
        Assertions.assertEquals(ExecutableState.SUCCEED, executeJob(nDataSegment, set));
        NDataLayout layout = this.dataflowManager.getDataflow(MODEL_ID).getSegment(SEGMENT_ID).getLayout(LAYOUT_ID_TO_BUILD, true);
        Assertions.assertNotNull(layout);
        Assertions.assertNull(layout.getAbnormalType());
    }

    @ValueSource(strings = {"1,10001", "20000000001,20000010001", "1,10001,20000000001,20000010001"})
    @ParameterizedTest
    void testCheckDataCount_Good_WithNonExistingLayouts(String str) throws Exception {
        overwriteSystemProp("kylin.build.data-count-check-enabled", "true");
        Set<Long> set = (Set) Arrays.stream(str.split(",")).map(Long::parseLong).collect(Collectors.toSet());
        IndexPlan indexPlan = this.indexPlanManager.getIndexPlan(MODEL_ID);
        Set<LayoutEntity> unmodifiableSet = Collections.unmodifiableSet(NSparkCubingUtil.toLayouts(indexPlan, Sets.newHashSet(new Long[]{Long.valueOf(LAYOUT_ID_TO_BUILD)})));
        this.indexPlanManager.updateIndexPlan(MODEL_ID, indexPlan2 -> {
            indexPlan2.removeLayouts(set, true, true);
        });
        this.dataflowManager.reloadAll();
        NDataflow dataflow = this.dataflowManager.getDataflow(MODEL_ID);
        NDataSegment segment = dataflow.getSegment(SEGMENT_ID);
        prepareFlatTableParquet(dataflow, segment, false);
        prepareLayoutsParquet(indexPlan, segment, false, false, false, set);
        Assertions.assertEquals(ExecutableState.SUCCEED, executeJob(segment, unmodifiableSet));
        NDataLayout layout = this.dataflowManager.getDataflow(MODEL_ID).getSegment(SEGMENT_ID).getLayout(LAYOUT_ID_TO_BUILD, true);
        Assertions.assertNotNull(layout);
        Assertions.assertNull(layout.getAbnormalType());
    }

    @Test
    void testCheckDataCount_Good_WithNonStrictCountCheckEnabled() throws Exception {
        Object[] initDataCountCheckTest = initDataCountCheckTest();
        IndexPlan indexPlan = (IndexPlan) initDataCountCheckTest[0];
        NDataflow nDataflow = (NDataflow) initDataCountCheckTest[1];
        NDataSegment nDataSegment = (NDataSegment) initDataCountCheckTest[2];
        Set<LayoutEntity> set = (Set) initDataCountCheckTest[3];
        overwriteSystemProp("kylin.build.allow-non-strict-count-check", "true");
        prepareFlatTableParquet(nDataflow, nDataSegment, false);
        prepareLayoutsParquet(indexPlan, nDataSegment, false, false, true);
        Assertions.assertEquals(ExecutableState.SUCCEED, executeJob(nDataSegment, set));
        NDataLayout layout = this.dataflowManager.getDataflow(MODEL_ID).getSegment(SEGMENT_ID).getLayout(LAYOUT_ID_TO_BUILD, true);
        Assertions.assertNotNull(layout);
        Assertions.assertNull(layout.getAbnormalType());
    }

    @ParameterizedTest
    @CsvSource({"true, false, false", "false, true, false", "false, false, true"})
    void testCheckDataCount_Failed(boolean z, boolean z2, boolean z3) throws Exception {
        Object[] initDataCountCheckTest = initDataCountCheckTest();
        IndexPlan indexPlan = (IndexPlan) initDataCountCheckTest[0];
        NDataflow nDataflow = (NDataflow) initDataCountCheckTest[1];
        NDataSegment nDataSegment = (NDataSegment) initDataCountCheckTest[2];
        Set<LayoutEntity> set = (Set) initDataCountCheckTest[3];
        prepareFlatTableParquet(nDataflow, nDataSegment, false);
        prepareLayoutsParquet(indexPlan, nDataSegment, z, z2, z3);
        Assertions.assertEquals(ExecutableState.SUCCEED, executeJob(nDataSegment, set));
        NDataLayout layout = this.dataflowManager.getDataflow(MODEL_ID).getSegment(SEGMENT_ID).getLayout(LAYOUT_ID_TO_BUILD, true);
        Assertions.assertNotNull(layout);
        Assertions.assertEquals(NDataLayout.AbnormalType.DATA_INCONSISTENT, layout.getAbnormalType());
    }

    private void prepareFlatTableParquet(NDataflow nDataflow, NDataSegment nDataSegment, boolean z) {
        ArrayList arrayList = new ArrayList();
        for (ColumnStruct columnStruct : DIM_SCHEMAS.values()) {
            arrayList.add(DataTypes.createStructField(String.valueOf(columnStruct.getRowId()), columnStruct.getType(), true));
        }
        StructType createStructType = DataTypes.createStructType(arrayList);
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(RowFactory.create(new Object[]{1L, 5L, 1L, "Customer#000000001", 1L}));
        arrayList2.add(RowFactory.create(new Object[]{2L, 15L, 1L, "Customer#000000001", 1L}));
        arrayList2.add(RowFactory.create(new Object[]{3L, 5L, 2L, "Customer#000000002", 2L}));
        arrayList2.add(RowFactory.create(new Object[]{4L, 15L, 3L, "Customer#000000003", 3L}));
        arrayList2.add(RowFactory.create(new Object[]{5L, 5L, 5L, "Customer#000000005", 5L}));
        if (!z) {
            arrayList2.add(RowFactory.create(new Object[]{6L, 15L, 5L, "Customer#000000005", 5L}));
        }
        Dataset createDataFrame = ss.createDataFrame(arrayList2, createStructType);
        ss.sessionState().conf().setLocalProperty("spark.sql.sources.repartitionWritingDataSource", "true");
        createDataFrame.write().mode(SaveMode.Overwrite).parquet(this.config.getFlatTableDir(getProject(), nDataflow.getId(), nDataSegment.getId()).toString());
    }

    private void prepareLayoutsParquet(IndexPlan indexPlan, NDataSegment nDataSegment, boolean z, boolean z2, boolean z3) {
        prepareLayoutsParquet(indexPlan, nDataSegment, z, z2, z3, null);
    }

    private void prepareLayoutsParquet(IndexPlan indexPlan, NDataSegment nDataSegment, boolean z, boolean z2, boolean z3, Set<Long> set) {
        StorageStore create = StorageStoreFactory.create(1);
        ArrayList arrayList = new ArrayList();
        for (ColumnStruct columnStruct : toColumnStructs("0 13 16 24 22 100000")) {
            arrayList.add(DataTypes.createStructField(String.valueOf(columnStruct.getRowId()), columnStruct.getType(), true));
        }
        StructType createStructType = DataTypes.createStructType(arrayList);
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(RowFactory.create(new Object[]{1L, 5L, 1L, 1L, "Customer#000000001", 1L}));
        arrayList2.add(RowFactory.create(new Object[]{2L, 15L, 1L, 1L, "Customer#000000001", 1L}));
        arrayList2.add(RowFactory.create(new Object[]{3L, 5L, 2L, 2L, "Customer#000000002", 1L}));
        arrayList2.add(RowFactory.create(new Object[]{4L, 15L, 3L, 3L, "Customer#000000003", 1L}));
        arrayList2.add(RowFactory.create(new Object[]{5L, 5L, 5L, 5L, "Customer#000000005", 1L}));
        if (!z) {
            arrayList2.add(RowFactory.create(new Object[]{6L, 15L, 5L, 5L, "Customer#000000005", 1L}));
        }
        Dataset createDataFrame = ss.createDataFrame(arrayList2, createStructType);
        LayoutEntity layoutEntity = indexPlan.getLayoutEntity(1L);
        if (CollectionUtils.isEmpty(set) || !set.contains(1L)) {
            create.save(layoutEntity, new Path(NSparkCubingUtil.getStoragePath(nDataSegment, Long.valueOf(layoutEntity.getId()))), KapConfig.wrap(this.config), createDataFrame);
        }
        ArrayList arrayList3 = new ArrayList();
        for (ColumnStruct columnStruct2 : toColumnStructs("24 22 100000 100001")) {
            arrayList3.add(DataTypes.createStructField(String.valueOf(columnStruct2.getRowId()), columnStruct2.getType(), true));
        }
        StructType createStructType2 = DataTypes.createStructType(arrayList3);
        ArrayList arrayList4 = new ArrayList();
        arrayList4.add(RowFactory.create(new Object[]{1L, "Customer#000000001", 2L, 20L}));
        arrayList4.add(RowFactory.create(new Object[]{2L, "Customer#000000002", 1L, 5L}));
        arrayList4.add(RowFactory.create(new Object[]{3L, "Customer#000000003", 1L, 15L}));
        arrayList4.add(RowFactory.create(new Object[]{5L, "Customer#000000005", 2L, 20L}));
        Dataset createDataFrame2 = ss.createDataFrame(arrayList4, createStructType2);
        LayoutEntity layoutEntity2 = indexPlan.getLayoutEntity(10001L);
        if (CollectionUtils.isEmpty(set) || !set.contains(10001L)) {
            create.save(layoutEntity2, new Path(NSparkCubingUtil.getStoragePath(nDataSegment, Long.valueOf(layoutEntity2.getId()))), KapConfig.wrap(this.config), createDataFrame2);
        }
        ArrayList arrayList5 = new ArrayList();
        for (ColumnStruct columnStruct3 : toColumnStructs("0 13 16 24 22")) {
            arrayList5.add(DataTypes.createStructField(String.valueOf(columnStruct3.getRowId()), columnStruct3.getType(), true));
        }
        StructType createStructType3 = DataTypes.createStructType(arrayList5);
        ArrayList arrayList6 = new ArrayList();
        arrayList6.add(RowFactory.create(new Object[]{1L, 5L, 1L, 1L, "Customer#000000001"}));
        arrayList6.add(RowFactory.create(new Object[]{2L, 15L, 1L, 1L, "Customer#000000001"}));
        arrayList6.add(RowFactory.create(new Object[]{3L, 5L, 2L, 2L, "Customer#000000002"}));
        arrayList6.add(RowFactory.create(new Object[]{4L, 15L, 3L, 3L, "Customer#000000003"}));
        arrayList6.add(RowFactory.create(new Object[]{5L, 5L, 5L, 5L, "Customer#000000005"}));
        if (!z2 && !z3) {
            arrayList6.add(RowFactory.create(new Object[]{6L, 15L, 5L, 5L, "Customer#000000005"}));
        }
        Dataset createDataFrame3 = ss.createDataFrame(arrayList6, createStructType3);
        LayoutEntity layoutEntity3 = indexPlan.getLayoutEntity(20000000001L);
        if (CollectionUtils.isEmpty(set) || !set.contains(20000000001L)) {
            create.save(layoutEntity3, new Path(NSparkCubingUtil.getStoragePath(nDataSegment, Long.valueOf(layoutEntity3.getId()))), KapConfig.wrap(this.config), createDataFrame3);
        }
        ArrayList arrayList7 = new ArrayList();
        for (ColumnStruct columnStruct4 : toColumnStructs("0 13")) {
            arrayList7.add(DataTypes.createStructField(String.valueOf(columnStruct4.getRowId()), columnStruct4.getType(), true));
        }
        StructType createStructType4 = DataTypes.createStructType(arrayList7);
        ArrayList arrayList8 = new ArrayList();
        arrayList8.add(RowFactory.create(new Object[]{1L, 5L}));
        arrayList8.add(RowFactory.create(new Object[]{2L, 15L}));
        arrayList8.add(RowFactory.create(new Object[]{3L, 5L}));
        arrayList8.add(RowFactory.create(new Object[]{4L, 15L}));
        arrayList8.add(RowFactory.create(new Object[]{5L, 5L}));
        if (!z3) {
            arrayList8.add(RowFactory.create(new Object[]{6L, 15L}));
        }
        Dataset createDataFrame4 = ss.createDataFrame(arrayList8, createStructType4);
        LayoutEntity layoutEntity4 = indexPlan.getLayoutEntity(20000010001L);
        if (CollectionUtils.isEmpty(set) || !set.contains(20000010001L)) {
            create.save(layoutEntity4, new Path(NSparkCubingUtil.getStoragePath(nDataSegment, Long.valueOf(layoutEntity4.getId()))), KapConfig.wrap(this.config), createDataFrame4);
        }
    }

    private List<ColumnStruct> toColumnStructs(String str) {
        return (List) Arrays.stream(str.split(" ")).map(str2 -> {
            ColumnStruct columnStruct = DIM_SCHEMAS.get(Long.valueOf(str2));
            if (columnStruct == null) {
                columnStruct = MEASURE_SCHEMAS.get(Long.valueOf(str2));
            }
            return columnStruct;
        }).collect(Collectors.toList());
    }

    private ExecutableState executeJob(NDataSegment nDataSegment, Set<LayoutEntity> set) throws InterruptedException {
        NExecutableManager nExecutableManager = NExecutableManager.getInstance(this.config, getProject());
        NSparkCubingJob create = NSparkCubingJob.create(Sets.newHashSet(new NDataSegment[]{nDataSegment}), set, "ADMIN", (Set) null);
        nExecutableManager.addJob(create);
        return IndexDataConstructor.wait((AbstractExecutable) create);
    }

    static {
        DIM_SCHEMAS.put(0L, new ColumnStruct(0L, "LO_ORDERKEY", DataTypes.LongType));
        DIM_SCHEMAS.put(13L, new ColumnStruct(13L, "LO_QUANTITY", DataTypes.LongType));
        DIM_SCHEMAS.put(16L, new ColumnStruct(16L, "LO_CUSTKEY", DataTypes.LongType));
        DIM_SCHEMAS.put(22L, new ColumnStruct(22L, "C_NAME", DataTypes.StringType));
        DIM_SCHEMAS.put(24L, new ColumnStruct(24L, "C_CUSTKEY", DataTypes.LongType));
        MEASURE_SCHEMAS.put(100000L, new ColumnStruct(100000L, "COUNT_ALL", DataTypes.LongType));
        MEASURE_SCHEMAS.put(100001L, new ColumnStruct(100001L, "SUM_LINEORDER_LO_QUANTITY", DataTypes.LongType));
    }
}
