package org.apache.kylin.engine.spark;

import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.Iterator;
import java.util.Locale;
import java.util.Map;
import java.util.UUID;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.Shell;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.StorageURL;
import org.apache.kylin.common.util.DateFormat;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.LocalFileMetadataTestCase;
import org.apache.kylin.common.util.TempMetadataBuilder;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.CubingJob;
import org.apache.kylin.engine.spark.builder.CreateFlatTable;
import org.apache.kylin.engine.spark.builder.NBuildSourceInfo;
import org.apache.kylin.engine.spark.job.NSparkCubingJob;
import org.apache.kylin.engine.spark.job.NSparkMergingJob;
import org.apache.kylin.engine.spark.job.UdfManager;
import org.apache.kylin.engine.spark.metadata.MetadataConverter;
import org.apache.kylin.engine.spark.metadata.cube.PathManager;
import org.apache.kylin.engine.spark.metadata.cube.model.SpanningTree;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.exception.SchedulerException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
import org.apache.kylin.job.lock.MockJobLock;
import org.apache.kylin.metadata.TableMetadataManager;
import org.apache.kylin.metadata.model.ColumnDesc;
import org.apache.kylin.metadata.model.DataModelManager;
import org.apache.kylin.metadata.model.SegmentRange;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.metadata.project.ProjectManager;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.KylinSparkEnv;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.internal.StaticSQLConf;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.spark_project.guava.collect.Sets;

/* loaded from: input_file:org/apache/kylin/engine/spark/LocalWithSparkSessionTest.class */
public class LocalWithSparkSessionTest extends LocalFileMetadataTestCase implements Serializable {
    private static final Logger logger = LoggerFactory.getLogger(LocalWithSparkSessionTest.class);
    private static final String CSV_TABLE_DIR = "../../examples/test_metadata/data/%s.csv";
    protected static final String KYLIN_SQL_BASE_DIR = "../../kylin-it/src/test/resources/query";
    private Map<String, String> systemProp = Maps.newHashMap();
    protected static SparkConf sparkConf;
    protected static SparkSession ss;

    @Before
    public void setup() throws SchedulerException {
        logger.info("Prepare temporary data.");
        overwriteSystemProp("kylin.job.scheduler.poll-interval-second", "1");
        overwriteSystemProp("calcite.keep-in-clause", "true");
        overwriteSystemProp("kylin.metadata.distributed-lock-impl", "org.apache.kylin.engine.spark.utils.MockedDistributedLock$MockedFactory");
        createTestMetadata();
        DefaultScheduler defaultScheduler = DefaultScheduler.getInstance();
        defaultScheduler.init(new JobEngineConfig(KylinConfig.getInstanceFromEnv()), new MockJobLock());
        if (!defaultScheduler.hasStarted()) {
            throw new RuntimeException("scheduler has not been started");
        }
    }

    @After
    public void after() {
        DefaultScheduler.destroyInstance();
        logger.info("Clean up temporary data.");
        cleanupTestMetadata();
        restoreAllSystemProp();
    }

    protected void overwriteSystemProp(String str, String str2) {
        this.systemProp.put(str, System.getProperty(str));
        System.setProperty(str, str2);
    }

    @BeforeClass
    public static void beforeClass() {
        if (Shell.MAC) {
            System.setProperty("org.xerial.snappy.lib.name", "libsnappyjava.jnilib");
        }
        sparkConf = new SparkConf().setAppName(UUID.randomUUID().toString()).setMaster("local[4]");
        sparkConf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer");
        sparkConf.set(StaticSQLConf.CATALOG_IMPLEMENTATION().key(), "in-memory");
        sparkConf.set("spark.sql.shuffle.partitions", "1");
        sparkConf.set("spark.memory.fraction", "0.1");
        sparkConf.set("spark.shuffle.detectCorrupt", "false");
        sparkConf.set("spark.sql.crossJoin.enabled", "true");
        ss = SparkSession.builder().config(sparkConf).getOrCreate();
        KylinSparkEnv.setSparkSession(ss);
        UdfManager.create(ss);
        ss.sparkContext().setLogLevel("WARN");
    }

    private void createTestMetadata() {
        if (System.getProperty("noBuild", "false").equalsIgnoreCase("true")) {
            return;
        }
        KylinConfig.setKylinConfigForLocalTest(TempMetadataBuilder.prepareNLocalTempMetadata());
    }

    protected void createTestMetadata(String str) {
        KylinConfig.setKylinConfigForLocalTest(TempMetadataBuilder.prepareNLocalTempMetadata(false, str));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ExecutableState wait(AbstractExecutable abstractExecutable) throws InterruptedException {
        ExecutableState status;
        do {
            Thread.sleep(500L);
            status = abstractExecutable.getStatus();
        } while (status.isProgressing());
        return status;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void cleanupSegments(String str) throws IOException {
        CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
        CubeInstance cube = cubeManager.getCube(str);
        cubeManager.updateCubeDropSegments(cube, cube.getSegments());
    }

    public ExecutableState buildCuboid(String str, SegmentRange.TSRange tSRange) throws Exception {
        KylinConfig instanceFromEnv = KylinConfig.getInstanceFromEnv();
        CubeManager cubeManager = CubeManager.getInstance(instanceFromEnv);
        CubeInstance cube = cubeManager.getCube(str);
        ExecutableManager executableManager = ExecutableManager.getInstance(instanceFromEnv);
        DataModelManager.getInstance(instanceFromEnv).getModels();
        NSparkCubingJob create = NSparkCubingJob.create(Sets.newHashSet(new CubeSegment[]{cubeManager.appendSegment(cube, tSRange)}), "ADMIN");
        StorageURL valueOf = StorageURL.valueOf(create.getSparkCubingStep().getDistMetaUrl());
        Assert.assertEquals("hdfs", valueOf.getScheme());
        Assert.assertTrue(valueOf.getParameter("path").startsWith(instanceFromEnv.getHdfsWorkingDirectory()));
        executableManager.addJob(create);
        ExecutableState wait = wait((AbstractExecutable) create);
        checkJobTmpPathDeleted(instanceFromEnv, create);
        return wait;
    }

    protected ExecutableState mergeSegments(String str, long j, long j2, boolean z) throws Exception {
        KylinConfig instanceFromEnv = KylinConfig.getInstanceFromEnv();
        CubeManager cubeManager = CubeManager.getInstance(instanceFromEnv);
        ExecutableManager executableManager = ExecutableManager.getInstance(instanceFromEnv);
        CubeInstance reloadCube = cubeManager.reloadCube(str);
        CubeSegment mergeSegments = cubeManager.mergeSegments(reloadCube, new SegmentRange.TSRange(Long.valueOf(j), Long.valueOf(j2)), (SegmentRange) null, z);
        NSparkMergingJob merge = NSparkMergingJob.merge(mergeSegments, "ADMIN");
        executableManager.addJob(merge);
        ExecutableState wait = wait((AbstractExecutable) merge);
        Iterator it = reloadCube.getMergingSegments(mergeSegments).iterator();
        while (it.hasNext()) {
            CubeSegment cubeSegment = (CubeSegment) it.next();
            String segmentParquetStoragePath = PathManager.getSegmentParquetStoragePath(reloadCube, cubeSegment.getName(), cubeSegment.getStorageLocationIdentifier());
            Assert.assertFalse(HadoopUtil.getFileSystem(segmentParquetStoragePath).exists(new Path(HadoopUtil.makeURI(segmentParquetStoragePath))));
        }
        checkJobTmpPathDeleted(instanceFromEnv, merge);
        return wait;
    }

    protected void fullBuildCube(String str) throws Exception {
        Assert.assertTrue(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory().startsWith("file:"));
        cleanupSegments(str);
        Assert.assertEquals(ExecutableState.SUCCEED, buildCuboid(str, null));
    }

    protected void restoreAllSystemProp() {
        this.systemProp.forEach((str, str2) -> {
            if (str2 == null) {
                logger.trace("Clear {}", str);
                System.clearProperty(str);
            } else {
                logger.trace("restore {}", str);
                System.setProperty(str, str2);
            }
        });
        this.systemProp.clear();
    }

    protected static void populateSSWithCSVData(KylinConfig kylinConfig, String str, SparkSession sparkSession) {
        logger.debug("Prepare Spark data.");
        ProjectInstance project = ProjectManager.getInstance(kylinConfig).getProject(str);
        Preconditions.checkArgument(project != null);
        for (String str2 : project.getTables()) {
            if (!"DEFAULT.STREAMING_TABLE".equals(str2) && new File(String.format(Locale.ROOT, CSV_TABLE_DIR, str2)).exists()) {
                TableDesc tableDesc = TableMetadataManager.getInstance(kylinConfig).getTableDesc(str2, str);
                ColumnDesc[] columns = tableDesc.getColumns();
                StructType structType = new StructType();
                for (ColumnDesc columnDesc : columns) {
                    structType = structType.add(columnDesc.getName(), convertType(columnDesc.getType()), false);
                }
                sparkSession.read().schema(structType).csv(String.format(Locale.ROOT, CSV_TABLE_DIR, str2)).createOrReplaceTempView(tableDesc.getName());
            }
        }
        logger.debug(sparkSession.sql("show tables").showString(20, 50, false));
    }

    private static DataType convertType(org.apache.kylin.metadata.datatype.DataType dataType) {
        if (dataType.isTimeFamily()) {
            return DataTypes.TimestampType;
        }
        if (dataType.isDateTimeFamily()) {
            return DataTypes.DateType;
        }
        if (dataType.isIntegerFamily()) {
            return DataTypes.LongType;
        }
        if (dataType.isNumberFamily()) {
            return DataTypes.createDecimalType(19, 4);
        }
        if (dataType.isStringFamily()) {
            return DataTypes.StringType;
        }
        if (dataType.isBoolean()) {
            return DataTypes.BooleanType;
        }
        throw new IllegalArgumentException("Kylin data type: " + dataType + " can not be converted to spark's type.");
    }

    public void buildMultiSegs(String str) throws Exception {
        cleanupSegments(str);
        buildCuboid(str, new SegmentRange.TSRange(Long.valueOf(dateToLong("2009-01-01 00:00:00")), Long.valueOf(dateToLong("2011-01-01 00:00:00"))));
        buildCuboid(str, new SegmentRange.TSRange(Long.valueOf(dateToLong("2011-01-01 00:00:00")), Long.valueOf(dateToLong("2013-01-01 00:00:00"))));
        buildCuboid(str, new SegmentRange.TSRange(Long.valueOf(dateToLong("2013-01-01 00:00:00")), Long.valueOf(dateToLong("2015-01-01 00:00:00"))));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Dataset<Row> initFlatTable(CubeSegment cubeSegment) {
        System.out.println(getTestConfig().getMetadataUrl());
        return new CreateFlatTable(MetadataConverter.getSegmentInfo(cubeSegment.getCubeInstance(), cubeSegment.getUuid(), cubeSegment.getName(), cubeSegment.getStorageLocationIdentifier()), (SpanningTree) null, ss, (NBuildSourceInfo) null).generateDataset(false, true);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public long dateToLong(String str) {
        return DateFormat.stringToMillis(str);
    }

    public String getProject() {
        return "default";
    }

    protected void checkJobTmpPathDeleted(KylinConfig kylinConfig, CubingJob cubingJob) {
        String param = cubingJob.getParam("project");
        String param2 = cubingJob.getParam("jobId");
        Path path = new Path(kylinConfig.getJobTmpDir(param));
        try {
            Assert.assertTrue(HadoopUtil.getFilteredPath(path.getFileSystem(HadoopUtil.getCurrentConfiguration()), path, param2).length == 0);
        } catch (IOException e) {
        }
    }
}
