package org.apache.kylin.engine.spark.job;

import java.lang.invoke.SerializedLambda;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.spark.LocalWithSparkSessionTest;
import org.apache.kylin.engine.spark.NSparkCubingEngine;
import org.apache.kylin.engine.spark.metadata.FunctionDesc;
import org.apache.kylin.engine.spark.metadata.MetadataConverter;
import org.apache.kylin.engine.spark.metadata.cube.PathManager;
import org.apache.kylin.engine.spark.metadata.cube.model.LayoutEntity;
import org.apache.kylin.engine.spark.metadata.cube.model.SpanningTree;
import org.apache.kylin.job.exception.SchedulerException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.metadata.model.IStorageAware;
import org.apache.kylin.metadata.model.SegmentRange;
import org.apache.kylin.shaded.com.google.common.collect.Sets;
import org.apache.kylin.storage.StorageFactory;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.common.SparkQueryTest;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.udaf.PreciseCountDistinct;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kylin/engine/spark/job/SparkCubingJobTest.class */
public class SparkCubingJobTest extends LocalWithSparkSessionTest {
    private static final Logger logger = LoggerFactory.getLogger(SparkCubingJobTest.class);
    private static StructType OUT_SCHEMA = null;
    private KylinConfig kylinConfig;
    protected CubeManager cubeMgr;
    protected ExecutableManager execMgr;

    @Override // org.apache.kylin.engine.spark.LocalWithSparkSessionTest
    public void setup() throws SchedulerException {
        super.setup();
        ss.sparkContext().setLogLevel("WARN");
        this.kylinConfig = KylinConfig.getInstanceFromEnv();
        this.kylinConfig.setProperty("kylin.source.provider.0", "org.apache.kylin.engine.spark.source.HiveSource");
        this.cubeMgr = CubeManager.getInstance(this.kylinConfig);
        this.execMgr = ExecutableManager.getInstance(this.kylinConfig);
    }

    @Override // org.apache.kylin.engine.spark.LocalWithSparkSessionTest
    public void after() {
        super.after();
    }

    @Test
    public void testBuildJob() throws Exception {
        cleanupSegments("ci_inner_join_cube");
        CubeInstance cube = this.cubeMgr.getCube("ci_inner_join_cube");
        ExecutableManager executableManager = ExecutableManager.getInstance(this.kylinConfig);
        long dateToLong = dateToLong("2012-06-01");
        long dateToLong2 = dateToLong("2013-07-01");
        CubeSegment appendSegment = this.cubeMgr.appendSegment(cube, new SegmentRange.TSRange(0L, Long.valueOf(dateToLong)));
        NSparkCubingJob create = NSparkCubingJob.create(Sets.newHashSet(new CubeSegment[]{appendSegment}), "ADMIN");
        executableManager.addJob(create);
        Assert.assertEquals(ExecutableState.SUCCEED, wait((AbstractExecutable) create));
        queryTest(appendSegment);
        snapshotTest(appendSegment);
        NSparkCubingJob create2 = NSparkCubingJob.create(Sets.newHashSet(new CubeSegment[]{this.cubeMgr.appendSegment(cube, new SegmentRange.TSRange(Long.valueOf(dateToLong), Long.valueOf(dateToLong2)))}), "ADMIN");
        executableManager.addJob(create2);
        Assert.assertEquals(ExecutableState.SUCCEED, wait((AbstractExecutable) create2));
        Assert.assertEquals(2L, this.cubeMgr.reloadCube("ci_inner_join_cube").getSegments().size());
    }

    @Test
    public void testBuildTwoSegmentsAndMerge() throws Exception {
        cleanupSegments("ci_inner_join_cube");
        CubeInstance cube = this.cubeMgr.getCube("ci_inner_join_cube");
        ExecutableManager executableManager = ExecutableManager.getInstance(this.kylinConfig);
        long dateToLong = dateToLong("2010-01-01");
        long dateToLong2 = dateToLong("2012-01-01");
        long dateToLong3 = dateToLong("2014-01-01");
        NSparkCubingJob create = NSparkCubingJob.create(Sets.newHashSet(new CubeSegment[]{this.cubeMgr.appendSegment(cube, new SegmentRange.TSRange(Long.valueOf(dateToLong), Long.valueOf(dateToLong2)))}), "ADMIN");
        executableManager.addJob(create);
        Assert.assertEquals(ExecutableState.SUCCEED, wait((AbstractExecutable) create));
        NSparkCubingJob create2 = NSparkCubingJob.create(Sets.newHashSet(new CubeSegment[]{this.cubeMgr.appendSegment(cube, new SegmentRange.TSRange(Long.valueOf(dateToLong2), Long.valueOf(dateToLong3)))}), "ADMIN");
        executableManager.addJob(create2);
        Assert.assertEquals(ExecutableState.SUCCEED, wait((AbstractExecutable) create2));
        NSparkMergingJob merge = NSparkMergingJob.merge(this.cubeMgr.mergeSegments(this.cubeMgr.reloadCube("ci_inner_join_cube"), new SegmentRange.TSRange(Long.valueOf(dateToLong), Long.valueOf(dateToLong3)), (SegmentRange) null, true), "ADMIN");
        executableManager.addJob(merge);
        Assert.assertEquals(ExecutableState.SUCCEED, wait((AbstractExecutable) merge));
        Iterator it = this.cubeMgr.reloadCube("ci_inner_join_cube").getSegments().iterator();
        while (it.hasNext()) {
            queryTest((CubeSegment) it.next());
        }
    }

    public void snapshotTest(CubeSegment cubeSegment) {
        Assert.assertEquals(5L, this.cubeMgr.reloadCube(cubeSegment.getCubeInstance().getName()).getSegmentById(cubeSegment.getUuid()).getSnapshots().size());
    }

    private void queryTest(CubeSegment cubeSegment) {
        for (LayoutEntity layoutEntity : MetadataConverter.extractEntityList2JavaList(cubeSegment.getCubeInstance())) {
            Dataset from = ((NSparkCubingEngine.NSparkCubingStorage) StorageFactory.createEngineAdapter(new IStorageAware() { // from class: org.apache.kylin.engine.spark.job.SparkCubingJobTest.1
                public int getStorageType() {
                    return 4;
                }
            }, NSparkCubingEngine.NSparkCubingStorage.class)).getFrom(PathManager.getParquetStoragePath(cubeSegment.getConfig(), cubeSegment.getCubeInstance().getName(), cubeSegment.getName(), cubeSegment.getStorageLocationIdentifier(), String.valueOf(layoutEntity.getId())), ss);
            HashSet hashSet = new HashSet();
            Set keySet = layoutEntity.getOrderedDimensions().keySet();
            for (Map.Entry entry : layoutEntity.getOrderedMeasures().entrySet()) {
                String dataType = ((FunctionDesc) entry.getValue()).returnType().dataType();
                if (!dataType.equals("hllc") && !dataType.equals("topn") && !dataType.equals("percentile")) {
                    hashSet.add(entry.getKey());
                }
            }
            Dataset<Row> sort = from.select(NSparkCubingUtil.getColumns(keySet, hashSet)).sort(NSparkCubingUtil.getColumns(keySet));
            System.out.println("Query cuboid ------------ " + layoutEntity.getId());
            Dataset<Row> dsConvertToOriginal = dsConvertToOriginal(sort, layoutEntity);
            dsConvertToOriginal.show(10);
            Dataset<Row> initFlatTable = initFlatTable(cubeSegment);
            if (!layoutEntity.isTableIndex()) {
                initFlatTable = CuboidAggregator.agg(ss, initFlatTable, layoutEntity.getOrderedDimensions().keySet(), layoutEntity.getOrderedMeasures(), (SpanningTree) null, true);
            }
            Dataset sort2 = initFlatTable.select(NSparkCubingUtil.getColumns(keySet, hashSet)).sort(NSparkCubingUtil.getColumns(keySet));
            System.out.println("Spark sql ------------ ");
            sort2.show(10);
            Assert.assertEquals(dsConvertToOriginal.count(), sort2.count());
            Assert.assertNull(SparkQueryTest.checkAnswer(dsConvertToOriginal, sort2, false));
        }
    }

    private Dataset<Row> dsConvertToOriginal(Dataset<Row> dataset, LayoutEntity layoutEntity) {
        for (Map.Entry entry : layoutEntity.getOrderedMeasures().entrySet()) {
            FunctionDesc functionDesc = (FunctionDesc) entry.getValue();
            if (functionDesc != null) {
                String[] columns = dataset.columns();
                if ("bitmap".equals(functionDesc.returnType().dataType())) {
                    int intValue = convertOutSchema(dataset, ((Integer) entry.getKey()).toString(), DataTypes.LongType).intValue();
                    PreciseCountDistinct preciseCountDistinct = new PreciseCountDistinct((Expression) null);
                    dataset = dataset.map(row -> {
                        Object[] objArr = new Object[row.size()];
                        for (int i = 0; i < columns.length; i++) {
                            if (i == intValue) {
                                objArr[i] = Long.valueOf(preciseCountDistinct.deserialize((byte[]) row.get(i)).getLongCardinality());
                            } else {
                                objArr[i] = row.get(i);
                            }
                        }
                        return RowFactory.create(objArr);
                    }, RowEncoder.apply(OUT_SCHEMA));
                }
            }
        }
        return dataset;
    }

    private Integer convertOutSchema(Dataset<Row> dataset, String str, DataType dataType) {
        StructField[] fields = dataset.schema().fields();
        String[] columns = dataset.columns();
        int i = 0;
        StructField[] structFieldArr = new StructField[fields.length];
        for (int i2 = 0; i2 < fields.length; i2++) {
            if (columns[i2].equalsIgnoreCase(str)) {
                i = i2;
                StructField structField = fields[i2];
                structFieldArr[i2] = new StructField(structField.name(), dataType, false, structField.metadata());
            } else {
                structFieldArr[i2] = fields[i2];
            }
        }
        OUT_SCHEMA = new StructType(structFieldArr);
        return Integer.valueOf(i);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -630455859:
                if (implMethodName.equals("lambda$dsConvertToOriginal$6e688ba5$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/kylin/engine/spark/job/SparkCubingJobTest") && serializedLambda.getImplMethodSignature().equals("([Ljava/lang/String;ILorg/apache/spark/sql/udaf/PreciseCountDistinct;Lorg/apache/spark/sql/Row;)Lorg/apache/spark/sql/Row;")) {
                    String[] strArr = (String[]) serializedLambda.getCapturedArg(0);
                    int intValue = ((Integer) serializedLambda.getCapturedArg(1)).intValue();
                    PreciseCountDistinct preciseCountDistinct = (PreciseCountDistinct) serializedLambda.getCapturedArg(2);
                    return row -> {
                        Object[] objArr = new Object[row.size()];
                        for (int i = 0; i < strArr.length; i++) {
                            if (i == intValue) {
                                objArr[i] = Long.valueOf(preciseCountDistinct.deserialize((byte[]) row.get(i)).getLongCardinality());
                            } else {
                                objArr[i] = row.get(i);
                            }
                        }
                        return RowFactory.create(objArr);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
