package org.apache.kylin.engine.spark;

import java.io.File;
import java.io.Serializable;
import java.net.BindException;
import java.util.ArrayList;
import java.util.Locale;
import java.util.Objects;
import java.util.Set;
import lombok.Generated;
import org.apache.commons.io.FileUtils;
import org.apache.curator.test.TestingServer;
import org.apache.hadoop.util.Shell;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
import org.apache.kylin.common.util.RandomUtil;
import org.apache.kylin.common.util.TempMetadataBuilder;
import org.apache.kylin.engine.spark.job.NSparkMergingJob;
import org.apache.kylin.engine.spark.merger.AfterMergeOrRefreshResourceMerger;
import org.apache.kylin.engine.spark.utils.SparkJobFactoryUtils;
import org.apache.kylin.guava30.shaded.common.base.Preconditions;
import org.apache.kylin.guava30.shaded.common.collect.UnmodifiableIterator;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.execution.NExecutableManager;
import org.apache.kylin.job.impl.threadpool.NDefaultScheduler;
import org.apache.kylin.metadata.cube.model.IndexPlan;
import org.apache.kylin.metadata.cube.model.LayoutEntity;
import org.apache.kylin.metadata.cube.model.NDataflow;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.model.ColumnDesc;
import org.apache.kylin.metadata.model.NTableMetadataManager;
import org.apache.kylin.metadata.model.SegmentRange;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.project.NProjectManager;
import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparderEnv;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.optimizer.ConvertInnerJoinToSemiJoin;
import org.apache.spark.sql.internal.StaticSQLConf;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.sparkproject.guava.collect.Sets;

/* loaded from: input_file:org/apache/kylin/engine/spark/NLocalWithSparkSessionTest.class */
public class NLocalWithSparkSessionTest extends NLocalFileMetadataTestCase implements Serializable {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(NLocalWithSparkSessionTest.class);
    private static final String CSV_TABLE_DIR = TempMetadataBuilder.TEMP_TEST_METADATA + "/data/%s.csv";
    protected static final String KYLIN_SQL_BASE_DIR = "../kylin-it/src/test/resources/query";
    protected static SparkConf sparkConf;
    protected static SparkSession ss;
    private TestingServer zkTestServer;
    protected IndexDataConstructor indexDataConstructor;

    protected static void ensureSparkConf() {
        if (sparkConf == null) {
            sparkConf = new SparkConf().setAppName(RandomUtil.randomUUIDStr()).setMaster("local[4]");
        }
    }

    @BeforeClass
    public static void beforeClass() {
        if (Shell.MAC) {
            overwriteSystemPropBeforeClass("org.xerial.snappy.lib.name", "libsnappyjava.jnilib");
        }
        ensureSparkConf();
        sparkConf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer");
        sparkConf.set(StaticSQLConf.CATALOG_IMPLEMENTATION().key(), "in-memory");
        sparkConf.set("spark.sql.shuffle.partitions", "1");
        sparkConf.set("spark.memory.fraction", "0.1");
        sparkConf.set("spark.shuffle.detectCorrupt", "false");
        sparkConf.set("spark.sql.crossJoin.enabled", "true");
        sparkConf.set(StaticSQLConf.WAREHOUSE_PATH().key(), TempMetadataBuilder.TEMP_TEST_METADATA + "/spark-warehouse");
        sparkConf.set("spark.sql.parquet.int96RebaseModeInWrite", "LEGACY");
        sparkConf.set("spark.sql.parquet.datetimeRebaseModeInWrite", "LEGACY");
        sparkConf.set("spark.sql.parquet.int96RebaseModeInRead", "CORRECTED");
        sparkConf.set("spark.sql.parquet.datetimeRebaseModeInRead", "CORRECTED");
        sparkConf.set("spark.sql.legacy.timeParserPolicy", "LEGACY");
        sparkConf.set("spark.sql.parquet.mergeSchema", "true");
        sparkConf.set("spark.sql.legacy.allowNegativeScaleOfDecimal", "true");
        sparkConf.set("spark.sql.broadcastTimeout", "900");
        sparkConf.set("spark.sql.globalTempDatabase", "KYLIN_LOGICAL_VIEW");
        if (sparkConf.getOption("spark.sql.extensions").isEmpty()) {
            sparkConf.set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension");
        } else {
            sparkConf.set("spark.sql.extensions", sparkConf.get("spark.sql.extensions") + ", io.delta.sql.DeltaSparkSessionExtension");
        }
        sparkConf.set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog");
        ss = SparkSession.builder().withExtensions(sparkSessionExtensions -> {
            sparkSessionExtensions.injectOptimizerRule(sparkSession -> {
                return new ConvertInnerJoinToSemiJoin();
            });
            return null;
        }).config(sparkConf).getOrCreate();
        SparderEnv.setSparkSession(ss);
    }

    @AfterClass
    public static void afterClass() {
        if (ss != null) {
            ss.close();
        }
        FileUtils.deleteQuietly(new File("../kap-it/metastore_db"));
    }

    @Before
    public void setUp() throws Exception {
        overwriteSystemProp("calcite.keep-in-clause", "true");
        overwriteSystemProp("kylin.build.resource.consecutive-idle-state-num", "1");
        overwriteSystemProp("kylin.build.resource.state-check-interval-seconds", "1s");
        overwriteSystemProp("kylin.engine.spark.build-job-progress-reporter", "org.apache.kylin.engine.spark.job.MockJobProgressReport");
        createTestMetadata(new String[0]);
        SparkJobFactoryUtils.initJobFactory();
        for (int i = 0; i < 100; i++) {
            try {
                this.zkTestServer = new TestingServer(RandomUtil.nextInt(7100, 65530), true);
                break;
            } catch (BindException e) {
                log.warn(e.getMessage());
            }
        }
        overwriteSystemProp("kylin.env.zookeeper-connect-string", this.zkTestServer.getConnectString());
        overwriteSystemProp("kylin.source.provider.9", "org.apache.kylin.engine.spark.mockup.CsvSource");
        this.indexDataConstructor = new IndexDataConstructor(getProject());
    }

    @After
    public void tearDown() throws Exception {
        cleanupTestMetadata();
        if (this.zkTestServer != null) {
            this.zkTestServer.close();
        }
    }

    public String getProject() {
        return "default";
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void init() throws Exception {
        overwriteSystemProp("kylin.job.scheduler.poll-interval-second", "1");
        overwriteSystemProp("calcite.keep-in-clause", "true");
        createTestMetadata(new String[0]);
        NDefaultScheduler nDefaultScheduler = NDefaultScheduler.getInstance(getProject());
        nDefaultScheduler.init(new JobEngineConfig(KylinConfig.getInstanceFromEnv()));
        if (!nDefaultScheduler.hasStarted()) {
            throw new RuntimeException("scheduler has not been started");
        }
    }

    public static void populateSSWithCSVData(KylinConfig kylinConfig, String str, SparkSession sparkSession) {
        ProjectInstance project = NProjectManager.getInstance(kylinConfig).getProject(str);
        Preconditions.checkArgument(project != null);
        UnmodifiableIterator it = project.getTables().iterator();
        while (it.hasNext()) {
            String str2 = (String) it.next();
            if (!"DEFAULT.STREAMING_TABLE".equals(str2) && !"DEFAULT.TEST_SNAPSHOT_TABLE".equals(str2) && !str2.contains(kylinConfig.getDDLLogicalViewDB())) {
                TableDesc tableDesc = NTableMetadataManager.getInstance(kylinConfig, str).getTableDesc(str2);
                ColumnDesc[] columns = tableDesc.getColumns();
                StructType structType = new StructType();
                for (ColumnDesc columnDesc : columns) {
                    structType = structType.add(columnDesc.getName(), convertType(columnDesc.getType()), false);
                }
                sparkSession.read().schema(structType).csv(String.format(Locale.ROOT, CSV_TABLE_DIR, str2)).createOrReplaceTempView(tableDesc.getName());
            }
        }
    }

    private static DataType convertType(org.apache.kylin.metadata.datatype.DataType dataType) {
        if (dataType.isTimeFamily()) {
            return DataTypes.TimestampType;
        }
        if (dataType.isDateTimeFamily()) {
            return DataTypes.DateType;
        }
        if (!dataType.isIntegerFamily()) {
            if (!dataType.isNumberFamily()) {
                if (dataType.isStringFamily()) {
                    return DataTypes.StringType;
                }
                if (dataType.isBoolean()) {
                    return DataTypes.BooleanType;
                }
                throw new IllegalArgumentException("KAP data type: " + dataType + " can not be converted to spark's type.");
            }
            String name = dataType.getName();
            boolean z = -1;
            switch (name.hashCode()) {
                case -1325958191:
                    if (name.equals("double")) {
                        z = true;
                        break;
                    }
                    break;
                case 97526364:
                    if (name.equals("float")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    return DataTypes.FloatType;
                case true:
                    return DataTypes.DoubleType;
                default:
                    return (dataType.getPrecision() == -1 || dataType.getScale() == -1) ? DataTypes.createDecimalType(19, 4) : DataTypes.createDecimalType(dataType.getPrecision(), dataType.getScale());
            }
        }
        String name2 = dataType.getName();
        boolean z2 = -1;
        switch (name2.hashCode()) {
            case -1312398097:
                if (name2.equals("tinyint")) {
                    z2 = false;
                    break;
                }
                break;
            case -606531192:
                if (name2.equals("smallint")) {
                    z2 = true;
                    break;
                }
                break;
            case 3237413:
                if (name2.equals("int4")) {
                    z2 = 3;
                    break;
                }
                break;
            case 1958052158:
                if (name2.equals("integer")) {
                    z2 = 2;
                    break;
                }
                break;
        }
        switch (z2) {
            case false:
                return DataTypes.ByteType;
            case true:
                return DataTypes.ShortType;
            case true:
            case true:
                return DataTypes.IntegerType;
            default:
                return DataTypes.LongType;
        }
    }

    protected void fullBuild(String str) throws Exception {
        this.indexDataConstructor.buildDataflow(str);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v43, types: [java.util.List] */
    public void buildMultiSegs(String str, long... jArr) throws Exception {
        NDataflow dataflow = NDataflowManager.getInstance(getTestConfig(), getProject()).getDataflow(str);
        ArrayList arrayList = new ArrayList();
        IndexPlan indexPlan = dataflow.getIndexPlan();
        if (jArr.length == 0) {
            arrayList = indexPlan.getAllLayouts();
        } else {
            for (long j : jArr) {
                arrayList.add(indexPlan.getLayoutEntity(Long.valueOf(j)));
            }
        }
        this.indexDataConstructor.buildIndex(str, new SegmentRange.TimePartitionedSegmentRange(Long.valueOf(SegmentRange.dateToLong("2009-01-01 00:00:00").longValue()), Long.valueOf(SegmentRange.dateToLong("2011-01-01 00:00:00").longValue())), Sets.newLinkedHashSet(arrayList), true);
        this.indexDataConstructor.buildIndex(str, new SegmentRange.TimePartitionedSegmentRange(Long.valueOf(SegmentRange.dateToLong("2011-01-01 00:00:00").longValue()), Long.valueOf(SegmentRange.dateToLong("2013-01-01 00:00:00").longValue())), Sets.newLinkedHashSet(arrayList), true);
        this.indexDataConstructor.buildIndex(str, new SegmentRange.TimePartitionedSegmentRange(Long.valueOf(SegmentRange.dateToLong("2013-01-01 00:00:00").longValue()), Long.valueOf(SegmentRange.dateToLong("2015-01-01 00:00:00").longValue())), Sets.newLinkedHashSet(arrayList), true);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v21, types: [java.util.List] */
    public void buildMultiSegAndMerge(String str, long... jArr) throws Exception {
        buildMultiSegs(str, jArr);
        NDataflow dataflow = NDataflowManager.getInstance(getTestConfig(), getProject()).getDataflow(str);
        ArrayList arrayList = new ArrayList();
        IndexPlan indexPlan = dataflow.getIndexPlan();
        if (jArr.length == 0) {
            arrayList = indexPlan.getAllLayouts();
        } else {
            for (long j : jArr) {
                arrayList.add(indexPlan.getLayoutEntity(Long.valueOf(j)));
            }
        }
        mergeSegments(str, Sets.newLinkedHashSet(arrayList));
    }

    public void mergeSegments(String str, Set<LayoutEntity> set) throws Exception {
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(getTestConfig(), getProject());
        NSparkMergingJob merge = NSparkMergingJob.merge(nDataflowManager.mergeSegments(nDataflowManager.getDataflow(str), new SegmentRange.TimePartitionedSegmentRange(SegmentRange.dateToLong("2011-01-01 00:00:00"), SegmentRange.dateToLong("2015-01-01 00:00:00")), false), Sets.newLinkedHashSet(set), "ADMIN", RandomUtil.randomUUIDStr());
        NExecutableManager nExecutableManager = NExecutableManager.getInstance(getTestConfig(), getProject());
        nExecutableManager.addJob(merge);
        if (!Objects.equals(IndexDataConstructor.wait((AbstractExecutable) merge), ExecutableState.SUCCEED)) {
            throw new IllegalStateException(IndexDataConstructor.firstFailedJobErrorMessage(nExecutableManager, merge));
        }
        new AfterMergeOrRefreshResourceMerger(getTestConfig(), getProject()).merge(merge.getSparkMergingStep());
    }
}
