package org.apache.kylin.engine.spark.job;

import com.google.common.collect.Maps;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.RandomUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.StorageURL;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.common.util.RandomUtil;
import org.apache.kylin.common.util.TimeUtil;
import org.apache.kylin.engine.spark.ExecutableUtils;
import org.apache.kylin.engine.spark.IndexDataConstructor;
import org.apache.kylin.engine.spark.NLocalWithSparkSessionTest;
import org.apache.kylin.engine.spark.builder.SnapshotBuilder;
import org.apache.kylin.engine.spark.merger.AfterBuildResourceMerger;
import org.apache.kylin.engine.spark.storage.ParquetStorage;
import org.apache.kylin.job.dao.JobStatistics;
import org.apache.kylin.job.dao.JobStatisticsManager;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.execution.JobTypeEnum;
import org.apache.kylin.job.execution.NExecutableManager;
import org.apache.kylin.job.impl.threadpool.NDefaultScheduler;
import org.apache.kylin.metadata.cube.cuboid.NCuboidLayoutChooser;
import org.apache.kylin.metadata.cube.cuboid.NSpanningTreeFactory;
import org.apache.kylin.metadata.cube.model.IndexEntity;
import org.apache.kylin.metadata.cube.model.IndexPlan;
import org.apache.kylin.metadata.cube.model.LayoutEntity;
import org.apache.kylin.metadata.cube.model.NDataLayout;
import org.apache.kylin.metadata.cube.model.NDataSegment;
import org.apache.kylin.metadata.cube.model.NDataflow;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.cube.model.NDataflowUpdate;
import org.apache.kylin.metadata.cube.model.NIndexPlanManager;
import org.apache.kylin.metadata.model.NDataModel;
import org.apache.kylin.metadata.model.NDataModelManager;
import org.apache.kylin.metadata.model.NTableMetadataManager;
import org.apache.kylin.metadata.model.SegmentRange;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.Segments;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
import org.apache.kylin.metadata.project.NProjectManager;
import org.apache.kylin.metadata.realization.IRealization;
import org.apache.kylin.storage.IStorage;
import org.apache.kylin.storage.IStorageQuery;
import org.apache.kylin.util.MetadataTestUtils;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.plans.logical.Join;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.Mockito;
import org.sparkproject.guava.collect.Sets;
import scala.runtime.AbstractFunction1;

/* loaded from: input_file:org/apache/kylin/engine/spark/job/NSparkCubingJobTest.class */
public class NSparkCubingJobTest extends NLocalWithSparkSessionTest {
    private KylinConfig config;

    /* loaded from: input_file:org/apache/kylin/engine/spark/job/NSparkCubingJobTest$MockParquetStorage.class */
    public static class MockParquetStorage extends ParquetStorage {
        public Dataset<Row> getFrom(String str, SparkSession sparkSession) {
            return super.getFrom(str, sparkSession);
        }

        public void saveTo(String str, Dataset<Row> dataset, SparkSession sparkSession) {
            Assert.assertFalse(dataset.queryExecution().optimizedPlan().find(new AbstractFunction1<LogicalPlan, Object>() { // from class: org.apache.kylin.engine.spark.job.NSparkCubingJobTest.MockParquetStorage.1
                public Object apply(LogicalPlan logicalPlan) {
                    return Boolean.valueOf(logicalPlan instanceof Join);
                }
            }).isDefined());
            super.saveTo(str, dataset, sparkSession);
        }
    }

    /* loaded from: input_file:org/apache/kylin/engine/spark/job/NSparkCubingJobTest$MockupStorageEngine.class */
    public static class MockupStorageEngine implements IStorage {
        public IStorageQuery createQuery(IRealization iRealization) {
            return null;
        }

        public <I> I adaptToBuildEngine(Class<I> cls) {
            try {
                if (cls == Class.forName("org.apache.kylin.engine.spark.NSparkCubingEngine$NSparkCubingStorage")) {
                    return (I) ClassUtil.newInstance("NSparkCubingJobTest$MockParquetStorage");
                }
                throw new RuntimeException("Cannot adapt to " + cls);
            } catch (ClassNotFoundException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Before
    public void setup() {
        ss.sparkContext().setLogLevel("ERROR");
        overwriteSystemProp("kylin.job.scheduler.poll-interval-second", "1");
        overwriteSystemProp("kylin.engine.persist-flattable-threshold", "0");
        overwriteSystemProp("kylin.engine.persist-flatview", "true");
        NDefaultScheduler.destroyInstance();
        NDefaultScheduler nDefaultScheduler = NDefaultScheduler.getInstance(getProject());
        nDefaultScheduler.init(new JobEngineConfig(getTestConfig()));
        if (!nDefaultScheduler.hasStarted()) {
            throw new RuntimeException("scheduler has not been started");
        }
        this.config = getTestConfig();
    }

    @After
    public void after() {
        NDefaultScheduler.destroyInstance();
        cleanupTestMetadata();
    }

    @Test
    public void testMergeBasics() throws IOException {
        File createTempFile = File.createTempFile("tmp1", ".csv");
        FileUtils.writeStringToFile(createTempFile, "0,1,3,1000\n0,2,2,1000", Charset.defaultCharset());
        Dataset csv = ss.read().csv(createTempFile.getAbsolutePath());
        Assert.assertEquals(2L, csv.count());
        csv.show();
        File createTempFile2 = File.createTempFile("tmp2", ".csv");
        FileUtils.writeStringToFile(createTempFile2, "0,1,2,2000", Charset.defaultCharset());
        Dataset csv2 = ss.read().csv(createTempFile2.getAbsolutePath());
        Assert.assertEquals(1L, csv2.count());
        csv2.show();
        Dataset union = csv2.union(csv);
        Assert.assertEquals(3L, union.count());
        union.show();
        FileUtils.deleteQuietly(createTempFile);
        FileUtils.deleteQuietly(createTempFile2);
    }

    @Test
    public void testBuildSnapshot() throws Exception {
        NDataflow dataflow = NDataflowManager.getInstance(getTestConfig(), getProject()).getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        new SnapshotBuilder().buildSnapshot(ss, getLookTables(dataflow));
        getLookTables(dataflow).forEach(tableDesc -> {
            Assert.assertNotNull(tableDesc.getLastSnapshotPath());
        });
    }

    private Set<TableDesc> getLookTables(NDataflow nDataflow) {
        return (Set) nDataflow.getModel().getLookupTables().stream().map((v0) -> {
            return v0.getTableDesc();
        }).collect(Collectors.toSet());
    }

    @Test
    public void testBuildSnapshotIgnored_SnapshotIsNull() throws Exception {
        HashSet hashSet = new HashSet(Arrays.asList("DEFAULT.TEST_COUNTRY", "EDW.TEST_CAL_DT"));
        NDataflow dataflow = NDataflowManager.getInstance(getTestConfig(), getProject()).getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        new SnapshotBuilder().buildSnapshot(ss, dataflow.getModel(), hashSet);
        getLookTables(dataflow).forEach(tableDesc -> {
            Assert.assertNotNull(tableDesc.getLastSnapshotPath());
        });
    }

    @Test
    public void testBuildSnapshotIgnored_SnapshotExists() throws Exception {
        HashSet hashSet = new HashSet(Arrays.asList("DEFAULT.TEST_COUNTRY", "EDW.TEST_CAL_DT"));
        KylinConfig testConfig = getTestConfig();
        NDataflow dataflow = NDataflowManager.getInstance(testConfig, getProject()).getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        String str = "default/table_snapshot/mock";
        NTableMetadataManager nTableMetadataManager = NTableMetadataManager.getInstance(testConfig, getProject());
        hashSet.forEach(str2 -> {
            nTableMetadataManager.getTableDesc(str2).setLastSnapshotPath(str);
        });
        new SnapshotBuilder().buildSnapshot(ss, dataflow.getModel(), hashSet);
        Assert.assertTrue(hashSet.stream().allMatch(str3 -> {
            return nTableMetadataManager.getTableDesc(str3).getLastSnapshotPath().equals(str);
        }));
        getLookTables(dataflow).forEach(tableDesc -> {
            Assert.assertNotNull(tableDesc.getLastSnapshotPath());
        });
    }

    @Test
    public void testBuildJob() throws Exception {
        long currentTimeMillis = System.currentTimeMillis();
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(this.config, getProject());
        NExecutableManager nExecutableManager = NExecutableManager.getInstance(this.config, getProject());
        Assert.assertTrue(this.config.getHdfsWorkingDirectory().startsWith("file:"));
        cleanupSegments(nDataflowManager, "89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        NDataflow dataflow = nDataflowManager.getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        NDataSegment appendSegment = nDataflowManager.appendSegment(dataflow, SegmentRange.TimePartitionedSegmentRange.createInfinite());
        ArrayList arrayList = new ArrayList();
        arrayList.add(dataflow.getIndexPlan().getLayoutEntity(20000020001L));
        arrayList.add(dataflow.getIndexPlan().getLayoutEntity(1000001L));
        arrayList.add(dataflow.getIndexPlan().getLayoutEntity(30001L));
        arrayList.add(dataflow.getIndexPlan().getLayoutEntity(10002L));
        Iterator it = NSpanningTreeFactory.fromLayouts(arrayList, dataflow.getUuid()).getRootIndexEntities().iterator();
        while (it.hasNext()) {
            Assert.assertNull(NCuboidLayoutChooser.selectLayoutForBuild(appendSegment, (IndexEntity) it.next()));
        }
        NSparkCubingJob create = NSparkCubingJob.create(Sets.newHashSet(new NDataSegment[]{appendSegment}), Sets.newLinkedHashSet(arrayList), "ADMIN", (Set) null);
        NSparkCubingStep sparkCubingStep = create.getSparkCubingStep();
        StorageURL valueOf = StorageURL.valueOf(sparkCubingStep.getDistMetaUrl());
        Assert.assertEquals("hdfs", valueOf.getScheme());
        Assert.assertTrue(valueOf.getParameter("path").startsWith(this.config.getHdfsWorkingDirectory()));
        nExecutableManager.addJob(create);
        Assert.assertEquals(ExecutableState.SUCCEED, IndexDataConstructor.wait((AbstractExecutable) create));
        long endTime = sparkCubingStep.getEndTime();
        long dayStart = TimeUtil.getDayStart(endTime);
        JobStatisticsManager jobStatisticsManager = JobStatisticsManager.getInstance(this.config, sparkCubingStep.getProject());
        Assert.assertEquals(dayStart, ((JobStatistics) jobStatisticsManager.getOverallJobStats(dayStart, endTime).getSecond()).getDate());
        Assert.assertEquals(1L, r0.getCount());
        AfterBuildResourceMerger afterBuildResourceMerger = new AfterBuildResourceMerger(this.config, getProject());
        afterBuildResourceMerger.mergeAfterIncrement(dataflow.getUuid(), appendSegment.getId(), ExecutableUtils.getLayoutIds(sparkCubingStep), ExecutableUtils.getRemoteStore(this.config, sparkCubingStep));
        Assert.assertEquals(dayStart, ((JobStatistics) jobStatisticsManager.getOverallJobStats(dayStart, endTime).getSecond()).getDate());
        Assert.assertEquals(1L, r0.getCount());
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(dataflow.getIndexPlan().getLayoutEntity(1L));
        arrayList2.add(dataflow.getIndexPlan().getLayoutEntity(20000000001L));
        arrayList2.add(dataflow.getIndexPlan().getLayoutEntity(20001L));
        arrayList2.add(dataflow.getIndexPlan().getLayoutEntity(10001L));
        NDataflow dataflow2 = nDataflowManager.getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        NDataSegment segment = dataflow2.getSegment(appendSegment.getId());
        Iterator it2 = NSpanningTreeFactory.fromLayouts(arrayList2, dataflow.getUuid()).getRootIndexEntities().iterator();
        while (it2.hasNext()) {
            Assert.assertNotNull(NCuboidLayoutChooser.selectLayoutForBuild(segment, (IndexEntity) it2.next()));
        }
        NSparkCubingJob create2 = NSparkCubingJob.create(Sets.newHashSet(new NDataSegment[]{segment}), Sets.newLinkedHashSet(arrayList2), "ADMIN", (Set) null);
        nExecutableManager.addJob(create2);
        Assert.assertEquals(ExecutableState.SUCCEED, IndexDataConstructor.wait((AbstractExecutable) create2));
        afterBuildResourceMerger.mergeAfterCatchup(dataflow2.getUuid(), Sets.newHashSet(new String[]{segment.getId()}), ExecutableUtils.getLayoutIds(create2.getSparkCubingStep()), ExecutableUtils.getRemoteStore(this.config, create2.getSparkCubingStep()), (Set) null);
        validateCube(dataflow2.getSegments().getFirstSegment().getId());
        validateTableIndex(dataflow2.getSegments().getFirstSegment().getId());
        Assert.assertTrue(nDataflowManager.getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa").getSegment(segment.getId()).getLastBuildTime() > currentTimeMillis);
        getLookTables(dataflow).forEach(tableDesc -> {
            Assert.assertTrue(tableDesc.getSnapshotTotalRows() > 0);
        });
    }

    @Test
    public void testBuildJobWithExcludeTable() throws Exception {
        long currentTimeMillis = System.currentTimeMillis();
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(this.config, getProject());
        NExecutableManager nExecutableManager = NExecutableManager.getInstance(this.config, getProject());
        Assert.assertTrue(this.config.getHdfsWorkingDirectory().startsWith("file:"));
        cleanupSegments(nDataflowManager, "89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        NDataflow dataflow = nDataflowManager.getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        NDataSegment appendSegment = nDataflowManager.appendSegment(dataflow, SegmentRange.TimePartitionedSegmentRange.createInfinite());
        ArrayList arrayList = new ArrayList();
        arrayList.add(dataflow.getIndexPlan().getLayoutEntity(20000020001L));
        arrayList.add(dataflow.getIndexPlan().getLayoutEntity(1000001L));
        arrayList.add(dataflow.getIndexPlan().getLayoutEntity(30001L));
        arrayList.add(dataflow.getIndexPlan().getLayoutEntity(10002L));
        Iterator it = NSpanningTreeFactory.fromLayouts(arrayList, dataflow.getUuid()).getRootIndexEntities().iterator();
        while (it.hasNext()) {
            Assert.assertNull(NCuboidLayoutChooser.selectLayoutForBuild(appendSegment, (IndexEntity) it.next()));
        }
        MetadataTestUtils.mockExcludedTable(getProject(), dataflow.getModel().getRootFactTableName());
        NSparkCubingJob create = NSparkCubingJob.create(Sets.newHashSet(new NDataSegment[]{appendSegment}), Sets.newLinkedHashSet(arrayList), "ADMIN", (Set) null);
        NSparkCubingStep sparkCubingStep = create.getSparkCubingStep();
        StorageURL valueOf = StorageURL.valueOf(sparkCubingStep.getDistMetaUrl());
        Assert.assertEquals("hdfs", valueOf.getScheme());
        Assert.assertTrue(valueOf.getParameter("path").startsWith(this.config.getHdfsWorkingDirectory()));
        nExecutableManager.addJob(create);
        Assert.assertEquals(ExecutableState.SUCCEED, IndexDataConstructor.wait((AbstractExecutable) create));
        long endTime = sparkCubingStep.getEndTime();
        long dayStart = TimeUtil.getDayStart(endTime);
        JobStatisticsManager jobStatisticsManager = JobStatisticsManager.getInstance(this.config, sparkCubingStep.getProject());
        Assert.assertEquals(dayStart, ((JobStatistics) jobStatisticsManager.getOverallJobStats(dayStart, endTime).getSecond()).getDate());
        Assert.assertEquals(1L, r0.getCount());
        new AfterBuildResourceMerger(this.config, getProject()).mergeAfterIncrement(dataflow.getUuid(), appendSegment.getId(), ExecutableUtils.getLayoutIds(sparkCubingStep), ExecutableUtils.getRemoteStore(this.config, sparkCubingStep));
        Assert.assertEquals(dayStart, ((JobStatistics) jobStatisticsManager.getOverallJobStats(dayStart, endTime).getSecond()).getDate());
        Assert.assertEquals(1L, r0.getCount());
        Assert.assertTrue(nDataflowManager.getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa").getSegment(appendSegment.getId()).getLastBuildTime() > currentTimeMillis);
        getLookTables(dataflow).forEach(tableDesc -> {
            Assert.assertTrue(tableDesc.getSnapshotTotalRows() > 0);
        });
    }

    @Test
    public void testBuildPartialLayouts() throws Exception {
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(getTestConfig(), getProject());
        cleanupSegments(nDataflowManager, "89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        IndexPlan indexPlan = nDataflowManager.getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa").getIndexPlan();
        IndexEntity indexEntity = indexPlan.getIndexEntity(10000L);
        IndexEntity indexEntity2 = indexPlan.getIndexEntity(0L);
        Assert.assertEquals(2L, indexEntity.getLayouts().size());
        ArrayList arrayList = new ArrayList();
        arrayList.add(indexEntity.getLayouts().get(0));
        arrayList.add(indexEntity2.getLayouts().get(0));
        this.indexDataConstructor.buildIndex("89af4ee2-2cdb-4b07-b39e-4c29856309aa", SegmentRange.TimePartitionedSegmentRange.createInfinite(), Sets.newLinkedHashSet(arrayList), true);
    }

    @Test
    public void testMockedDFBuildJob() throws Exception {
        overwriteSystemProp("kylin.engine.spark.build-class-name", "org.apache.kylin.engine.spark.job.MockedDFBuildJob");
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(this.config, getProject());
        NExecutableManager nExecutableManager = NExecutableManager.getInstance(this.config, getProject());
        cleanupSegments(nDataflowManager, "89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        NDataflow dataflow = nDataflowManager.getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        ArrayList arrayList = new ArrayList();
        arrayList.add(dataflow.getIndexPlan().getLayoutEntity(20000020001L));
        arrayList.add(dataflow.getIndexPlan().getLayoutEntity(1000001L));
        arrayList.add(dataflow.getIndexPlan().getLayoutEntity(30001L));
        arrayList.add(dataflow.getIndexPlan().getLayoutEntity(10002L));
        NDataSegment appendSegment = nDataflowManager.appendSegment(dataflow, SegmentRange.TimePartitionedSegmentRange.createInfinite());
        NSparkCubingJob create = NSparkCubingJob.create(Sets.newHashSet(new NDataSegment[]{appendSegment}), Sets.newLinkedHashSet(arrayList), "ADMIN", (Set) null);
        NSparkCubingStep sparkCubingStep = create.getSparkCubingStep();
        nExecutableManager.addJob(create);
        Assert.assertEquals(ExecutableState.SUCCEED, IndexDataConstructor.wait((AbstractExecutable) create));
        new AfterBuildResourceMerger(this.config, getProject()).mergeAfterIncrement(dataflow.getUuid(), appendSegment.getId(), ExecutableUtils.getLayoutIds(sparkCubingStep), ExecutableUtils.getRemoteStore(this.config, sparkCubingStep));
        for (NDataLayout nDataLayout : nDataflowManager.getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa").getSegments().getFirstSegment().getLayoutsMap().values()) {
            Assert.assertEquals(nDataLayout.getRows(), 123L);
            Assert.assertEquals(nDataLayout.getByteSize(), 123L);
            Assert.assertEquals(nDataLayout.getFileCount(), 123L);
            Assert.assertEquals(nDataLayout.getSourceRows(), 123L);
            Assert.assertEquals(nDataLayout.getSourceByteSize(), 123L);
        }
    }

    @Test
    public void testMockedDFBuildMutipleJob() throws Exception {
        overwriteSystemProp("kylin.engine.spark.build-class-name", "org.apache.kylin.engine.spark.job.MockedDFBuildJob");
        overwriteSystemProp("kylin.engine.persist-flattable-enabled", "true");
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(this.config, getProject());
        NExecutableManager nExecutableManager = NExecutableManager.getInstance(this.config, getProject());
        cleanupSegments(nDataflowManager, "89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        NDataflow dataflow = nDataflowManager.getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        ArrayList arrayList = new ArrayList();
        arrayList.add(dataflow.getIndexPlan().getLayoutEntity(10002L));
        NDataSegment appendSegment = nDataflowManager.appendSegment(dataflow, new SegmentRange.TimePartitionedSegmentRange("2012-01-01", "2012-02-01"), SegmentStatusEnum.READY);
        NDataSegment appendSegment2 = nDataflowManager.appendSegment(dataflow, new SegmentRange.TimePartitionedSegmentRange("2012-02-01", "2012-03-01"), SegmentStatusEnum.READY);
        NSparkCubingJob create = NSparkCubingJob.create(Sets.newHashSet(new NDataSegment[]{appendSegment, appendSegment2}), Sets.newLinkedHashSet(arrayList), "ADMIN", (Set) null);
        NSparkCubingStep sparkCubingStep = create.getSparkCubingStep();
        nExecutableManager.addJob(create);
        Assert.assertEquals(ExecutableState.SUCCEED, IndexDataConstructor.wait((AbstractExecutable) create));
        new AfterBuildResourceMerger(this.config, getProject()).mergeAfterCatchup(dataflow.getUuid(), Sets.newHashSet(new String[]{appendSegment.getId(), appendSegment2.getId()}), Sets.newHashSet(new Long[]{10002L}), ExecutableUtils.getRemoteStore(this.config, sparkCubingStep), (Set) null);
        Segments segments = nDataflowManager.getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa").getSegments();
        Assert.assertEquals(2L, segments.size());
        Assert.assertTrue(((NDataSegment) segments.get(0)).isFlatTableReady());
        Assert.assertTrue(((NDataSegment) segments.get(1)).isFlatTableReady());
    }

    @Test
    public void testCancelCubingJob() {
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(this.config, getProject());
        NExecutableManager nExecutableManager = NExecutableManager.getInstance(this.config, getProject());
        cleanupSegments(nDataflowManager, "89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        NDataflow dataflow = nDataflowManager.getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        Assert.assertEquals(0L, dataflow.getSegments().size());
        NDataSegment appendSegment = nDataflowManager.appendSegment(dataflow, SegmentRange.TimePartitionedSegmentRange.createInfinite());
        List allLayouts = dataflow.getIndexPlan().getAllLayouts();
        ArrayList arrayList = new ArrayList();
        arrayList.add(allLayouts.get(0));
        arrayList.add(allLayouts.get(1));
        arrayList.add(allLayouts.get(2));
        arrayList.add(allLayouts.get(3));
        arrayList.add(allLayouts.get(7));
        NSparkCubingJob create = NSparkCubingJob.create(Sets.newHashSet(new NDataSegment[]{appendSegment}), Sets.newLinkedHashSet(arrayList), "ADMIN", (Set) null);
        nExecutableManager.addJob(create);
        Assert.assertEquals(1L, nDataflowManager.getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa").getSegments().size());
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(ExecutableState.RUNNING, create.getStatus());
        });
        EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
            NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), getProject()).discardJob(create.getId());
            return null;
        }, getProject(), 3, -1L, create.getId());
        Assert.assertEquals(0L, NDataflowManager.getInstance(this.config, getProject()).getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa").getSegments().size());
    }

    @Test
    public void testCancelMergingJob() throws Exception {
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(this.config, getProject());
        NExecutableManager nExecutableManager = NExecutableManager.getInstance(this.config, getProject());
        cleanupSegments(nDataflowManager, "89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        NDataflow dataflow = nDataflowManager.getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        Assert.assertEquals(0L, dataflow.getSegments().size());
        List allLayouts = dataflow.getIndexPlan().getAllLayouts();
        this.indexDataConstructor.buildIndex("89af4ee2-2cdb-4b07-b39e-4c29856309aa", new SegmentRange.TimePartitionedSegmentRange(Long.valueOf(SegmentRange.dateToLong("2011-01-01").longValue()), Long.valueOf(SegmentRange.dateToLong("2012-06-01").longValue())), Sets.newLinkedHashSet(allLayouts), true);
        this.indexDataConstructor.buildIndex("89af4ee2-2cdb-4b07-b39e-4c29856309aa", new SegmentRange.TimePartitionedSegmentRange(Long.valueOf(SegmentRange.dateToLong("2012-06-01").longValue()), Long.valueOf(SegmentRange.dateToLong("2013-01-01").longValue())), Sets.newLinkedHashSet(allLayouts), true);
        NSparkMergingJob merge = NSparkMergingJob.merge(nDataflowManager.mergeSegments(nDataflowManager.getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa"), new SegmentRange.TimePartitionedSegmentRange(SegmentRange.dateToLong("2010-01-02"), SegmentRange.dateToLong("2013-01-01")), false), Sets.newLinkedHashSet(allLayouts), "ADMIN", RandomUtil.randomUUIDStr());
        nExecutableManager.addJob(merge);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(ExecutableState.RUNNING, merge.getStatus());
        });
        EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
            NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), getProject()).discardJob(merge.getId());
            return null;
        }, getProject(), 3, -1L, merge.getId());
        Assert.assertEquals(2L, NDataflowManager.getInstance(this.config, getProject()).getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa").getSegments().size());
    }

    @Test
    public void testGetJobNodeInfo() throws Exception {
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(this.config, getProject());
        NExecutableManager nExecutableManager = NExecutableManager.getInstance(this.config, getProject());
        Assert.assertTrue(this.config.getHdfsWorkingDirectory().startsWith("file:"));
        cleanupSegments(nDataflowManager, "89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        NDataflow dataflow = nDataflowManager.getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        NSparkCubingJob create = NSparkCubingJob.create(Sets.newHashSet(new NDataSegment[]{nDataflowManager.appendSegment(dataflow, SegmentRange.TimePartitionedSegmentRange.createInfinite())}), Sets.newLinkedHashSet(dataflow.getIndexPlan().getAllLayouts()), "ADMIN", (Set) null);
        nExecutableManager.addJob(create);
        IndexDataConstructor.wait((AbstractExecutable) create);
        Assert.assertEquals(this.config.getServerAddress(), create.getOutput().getExtra().get("node_info"));
    }

    private void validateCube(String str) {
        NDataSegment segment = NDataflowManager.getInstance(this.config, getProject()).getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa").getSegment(str);
        Assert.assertEquals(10000L, segment.getLayout(1L).getRows());
        Assert.assertEquals(10000L, segment.getLayout(10001L).getRows());
        Assert.assertEquals(10000L, segment.getLayout(10002L).getRows());
    }

    private void validateTableIndex(String str) {
        NDataSegment segment = NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), getProject()).getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa").getSegment(str);
        LayoutEntity layout = NDataLayout.newDataLayout(segment.getSegDetails(), 20000000001L).getLayout();
        Assert.assertEquals(10000L, segment.getLayout(20000000001L).getRows());
        List collectAsList = new ParquetStorage().getFrom(NSparkCubingUtil.getStoragePath(segment, Long.valueOf(layout.getId())), ss).collectAsList();
        Assert.assertEquals("Ebay", ((Row) collectAsList.get(0)).apply(1).toString());
        Assert.assertEquals("Ebaymotors", ((Row) collectAsList.get(1)).apply(1).toString());
        Assert.assertEquals("Ebay", ((Row) collectAsList.get(9998)).apply(1).toString());
        Assert.assertEquals("英国", ((Row) collectAsList.get(9999)).apply(1).toString());
    }

    @Test
    public void testNSparkCubingJobUsingModelUuid() {
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(this.config, getProject());
        NDataModelManager nDataModelManager = NDataModelManager.getInstance(this.config, getProject());
        NDataModel dataModelDesc = nDataModelManager.getDataModelDesc("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        dataModelDesc.setAlias("nmodel_basic_alias");
        nDataModelManager.updateDataModelDesc(dataModelDesc);
        cleanupSegments(nDataflowManager, "89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        NDataflow dataflow = nDataflowManager.getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        Assert.assertEquals(dataModelDesc.getUuid(), NSparkCubingJob.create(Sets.newHashSet(new NDataSegment[]{nDataflowManager.appendSegment(dataflow, SegmentRange.TimePartitionedSegmentRange.createInfinite())}), Sets.newLinkedHashSet(dataflow.getIndexPlan().getAllLayouts()), "ADMIN", (Set) null).getTargetSubject());
    }

    @Test
    public void testSparkExecutable_WrapConfig() {
        NSparkExecutable nSparkExecutable = new NSparkExecutable();
        nSparkExecutable.setProject("default");
        NProjectManager.getInstance(getTestConfig()).updateProject("default", projectInstance -> {
            projectInstance.getOverrideKylinProps().put("kylin.engine.spark-conf.spark.locality.wait", "10");
        });
        KylinConfig config = nSparkExecutable.getConfig();
        Assert.assertEquals(getTestConfig(), config.base());
        Assert.assertNull(getTestConfig().getSparkConfigOverride().get("spark.locality.wait"));
        Assert.assertEquals("10", config.getSparkConfigOverride().get("spark.locality.wait"));
        nSparkExecutable.setParam("dataflowId", "89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        NIndexPlanManager.getInstance(getTestConfig(), "default").updateIndexPlan("89af4ee2-2cdb-4b07-b39e-4c29856309aa", indexPlan -> {
            indexPlan.getOverrideProps().put("kylin.engine.spark-conf.spark.locality.wait", "20");
        });
        KylinConfig config2 = nSparkExecutable.getConfig();
        Assert.assertEquals(getTestConfig(), config2.base());
        Assert.assertNull(getTestConfig().getSparkConfigOverride().get("spark.locality.wait"));
        Assert.assertEquals("20", config2.getSparkConfigOverride().get("spark.locality.wait"));
    }

    @Test
    public void testLayoutIdMoreThan10000() {
        NSparkExecutable nSparkExecutable = (NSparkExecutable) Mockito.spy(NSparkExecutable.class);
        HashSet newHashSet = Sets.newHashSet();
        for (int i = 0; i < 100000; i++) {
            newHashSet.add(Long.valueOf(RandomUtils.nextLong(1L, 100000L)));
        }
        ((NSparkExecutable) Mockito.doReturn(nSparkExecutable.getParams()).when(nSparkExecutable)).filterEmptySegments(Mockito.anyMap());
        nSparkExecutable.setParam("layoutIds", NSparkCubingUtil.ids2Str(newHashSet));
        newHashSet.removeAll(NSparkCubingUtil.str2Longs(nSparkExecutable.getParam("layoutIds")));
        Assert.assertEquals(0L, newHashSet.size());
    }

    @Test
    public void testFilterEmptySegments() {
        String project = getProject();
        NSparkExecutable nSparkExecutable = (NSparkExecutable) Mockito.spy(NSparkExecutable.class);
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("segmentIds", "s1,s2,ef5e0663-feba-4ed2-b71c-21958122bbff");
        ((NSparkExecutable) Mockito.doReturn("89af4ee2-2cdb-4b07-b39e-4c29856309aa").when(nSparkExecutable)).getDataflowId();
        nSparkExecutable.setProject(project);
        Assert.assertEquals(nSparkExecutable.filterEmptySegments(newHashMap).get("segmentIds"), "ef5e0663-feba-4ed2-b71c-21958122bbff");
    }

    @Test
    public void testBuildFromFlatTable() throws Exception {
        overwriteSystemProp("kylin.storage.provider.20", MockupStorageEngine.class.getName());
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(getTestConfig(), getProject());
        cleanupSegments(nDataflowManager, "89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        IndexPlan indexPlan = nDataflowManager.getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa").getIndexPlan();
        IndexEntity indexEntity = indexPlan.getIndexEntity(10000L);
        IndexEntity indexEntity2 = indexPlan.getIndexEntity(30000L);
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(indexEntity.getLayouts());
        arrayList.addAll(indexEntity2.getLayouts());
        this.indexDataConstructor.buildIndex("89af4ee2-2cdb-4b07-b39e-4c29856309aa", SegmentRange.TimePartitionedSegmentRange.createInfinite(), Sets.newLinkedHashSet(arrayList), true);
    }

    @Test
    public void testSafetyIfDiscard() {
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(this.config, getProject());
        NExecutableManager nExecutableManager = NExecutableManager.getInstance(this.config, getProject());
        cleanupSegments(nDataflowManager, "89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        NDataflow dataflow = nDataflowManager.getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        Assert.assertEquals(0L, dataflow.getSegments().size());
        NDataSegment appendSegment = nDataflowManager.appendSegment(dataflow, new SegmentRange.TimePartitionedSegmentRange(0L, 10L));
        NDataSegment appendSegment2 = nDataflowManager.appendSegment(dataflow, new SegmentRange.TimePartitionedSegmentRange(11L, 12L));
        List allLayouts = dataflow.getIndexPlan().getAllLayouts();
        ArrayList arrayList = new ArrayList();
        arrayList.add(allLayouts.get(0));
        arrayList.add(allLayouts.get(1));
        NSparkCubingJob create = NSparkCubingJob.create(Sets.newHashSet(new NDataSegment[]{appendSegment}), Sets.newLinkedHashSet(arrayList), "ADMIN", JobTypeEnum.INC_BUILD, RandomUtil.randomUUIDStr(), Sets.newHashSet(), (Set) null, (Set) null);
        NSparkCubingJob create2 = NSparkCubingJob.create(Sets.newHashSet(new NDataSegment[]{appendSegment2}), Sets.newLinkedHashSet(arrayList), "ADMIN", JobTypeEnum.INC_BUILD, RandomUtil.randomUUIDStr(), Sets.newHashSet(), (Set) null, (Set) null);
        NSparkCubingJob create3 = NSparkCubingJob.create(Sets.newHashSet(new NDataSegment[]{appendSegment2}), Sets.newLinkedHashSet(arrayList), "ADMIN", JobTypeEnum.INDEX_REFRESH, RandomUtil.randomUUIDStr(), Sets.newHashSet(), (Set) null, (Set) null);
        nExecutableManager.addJob(create);
        nExecutableManager.addJob(create2);
        nExecutableManager.addJob(create3);
        nExecutableManager.updateJobOutput(create.getId(), ExecutableState.READY);
        nExecutableManager.updateJobOutput(create2.getId(), ExecutableState.READY);
        Assert.assertTrue(create.safetyIfDiscard());
        Assert.assertTrue(create2.safetyIfDiscard());
        NSparkCubingJob create4 = NSparkCubingJob.create(Sets.newHashSet(new NDataSegment[]{nDataflowManager.appendSegment(dataflow, new SegmentRange.TimePartitionedSegmentRange(20L, 22L))}), Sets.newLinkedHashSet(arrayList), "ADMIN", JobTypeEnum.INC_BUILD, RandomUtil.randomUUIDStr(), Sets.newHashSet(), (Set) null, (Set) null);
        nExecutableManager.addJob(create4);
        nExecutableManager.updateJobOutput(create.getId(), ExecutableState.RUNNING);
        Assert.assertTrue(create.safetyIfDiscard());
        Assert.assertFalse(create2.safetyIfDiscard());
        Assert.assertTrue(create4.safetyIfDiscard());
        nExecutableManager.updateJobOutput(create.getId(), ExecutableState.SUCCEED);
        Assert.assertTrue(create.safetyIfDiscard());
        Assert.assertFalse(create2.safetyIfDiscard());
        Assert.assertTrue(create3.safetyIfDiscard());
        cleanupSegments(nDataflowManager, "89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        NDataSegment appendSegment3 = nDataflowManager.appendSegment(nDataflowManager.getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa"), new SegmentRange.TimePartitionedSegmentRange(0L, 10L));
        List allLayouts2 = dataflow.getIndexPlan().getAllLayouts();
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(allLayouts2.get(0));
        NSparkCubingJob create5 = NSparkCubingJob.create(Sets.newHashSet(new NDataSegment[]{appendSegment3}), Sets.newLinkedHashSet(arrayList2), "ADMIN", JobTypeEnum.INC_BUILD, RandomUtil.randomUUIDStr(), Sets.newHashSet(), (Set) null, (Set) null);
        nExecutableManager.addJob(create5);
        nExecutableManager.updateJobOutput(create5.getId(), ExecutableState.RUNNING);
        nDataflowManager.dropDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        Assert.assertTrue(create5.checkSuicide());
        Assert.assertTrue(create5.safetyIfDiscard());
    }

    @Test
    @Ignore
    public void testResumeBuildCheckPoints() throws Exception {
        String project = getProject();
        KylinConfig testConfig = getTestConfig();
        overwriteSystemProp("kylin.engine.spark.build-class-name", "MockResumeBuildJob");
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(testConfig, project);
        NExecutableManager nExecutableManager = NExecutableManager.getInstance(testConfig, project);
        cleanupSegments(nDataflowManager, "89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        NDataflow dataflow = nDataflowManager.getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        Assert.assertEquals(0L, dataflow.getSegments().size());
        NDataSegment appendSegment = nDataflowManager.appendSegment(dataflow, SegmentRange.TimePartitionedSegmentRange.createInfinite());
        ArrayList arrayList = new ArrayList();
        arrayList.add(dataflow.getIndexPlan().getLayoutEntity(1000001L));
        arrayList.add(dataflow.getIndexPlan().getLayoutEntity(20000010001L));
        NSparkCubingJob create = NSparkCubingJob.create(Sets.newHashSet(new NDataSegment[]{appendSegment}), Sets.newLinkedHashSet(arrayList), "test_submitter", (Set) null);
        NSparkCubingStep sparkCubingStep = create.getSparkCubingStep();
        sparkCubingStep.setParam("breakPointLayouts", String.valueOf(1000001L));
        KylinConfig createKylinConfig = KylinConfig.createKylinConfig(testConfig);
        createKylinConfig.setMetadataUrl(sparkCubingStep.getParam("distMetaUrl"));
        KylinConfig createKylinConfig2 = KylinConfig.createKylinConfig(testConfig);
        createKylinConfig2.setMetadataUrl(sparkCubingStep.getParam("outputMetaUrl"));
        TableDesc tableDesc = dataflow.getModel().getRootFactTableRef().getTableDesc();
        String tableType = tableDesc.getTableType();
        try {
            tableDesc.setTableType("VIEW");
            NTableMetadataManager.getInstance(testConfig, project).updateTableDesc(tableDesc);
            nExecutableManager.addJob(create);
            Assert.assertFalse(nExecutableManager.getJobOutput(sparkCubingStep.getId()).isResumable());
            Awaitility.await().atMost(40L, TimeUnit.SECONDS).pollDelay(5L, TimeUnit.SECONDS).pollInterval(1L, TimeUnit.SECONDS).untilAsserted(() -> {
                KylinConfig createKylinConfig3 = KylinConfig.createKylinConfig(createKylinConfig);
                try {
                    Assert.assertTrue(nExecutableManager.getJobOutput(sparkCubingStep.getId()).isResumable());
                    NDataflow dataflow2 = NDataflowManager.getInstance(createKylinConfig3, project).getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
                    Assert.assertNotNull(dataflow2);
                    Assert.assertFalse(dataflow2.isBroken());
                    Assert.assertEquals(1L, dataflow2.getSegments().size());
                    Assert.assertNotNull(dataflow2.getSegments().getFirstSegment().getLayout(20000010001L));
                    ResourceStore.clearCache(createKylinConfig3);
                } catch (Throwable th) {
                    ResourceStore.clearCache(createKylinConfig3);
                    throw th;
                }
            });
            tableDesc.setTableType(tableType);
            NTableMetadataManager.getInstance(testConfig, project).updateTableDesc(tableDesc);
            EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
                NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project).pauseJob(create.getId());
                return null;
            }, project, 3, -1L, create.getId());
            Assert.assertTrue(nExecutableManager.getJobOutput(sparkCubingStep.getId()).isResumable());
            KylinConfig createKylinConfig3 = KylinConfig.createKylinConfig(createKylinConfig);
            NDataflow dataflow2 = NDataflowManager.getInstance(createKylinConfig3, project).getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
            Assert.assertEquals(1L, dataflow2.getSegments().size());
            NDataSegment firstSegment = dataflow2.getSegments().getFirstSegment();
            Assert.assertTrue(firstSegment.isFlatTableReady());
            Assert.assertTrue(firstSegment.isDictReady());
            Assert.assertTrue(firstSegment.isFactViewReady());
            Assert.assertNotNull(firstSegment.getLayout(20000010001L));
            Assert.assertNull(firstSegment.getLayout(1000001L));
            ResourceStore.clearCache(createKylinConfig3);
            EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
                NExecutableManager nExecutableManager2 = NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
                nExecutableManager2.removeBreakPoints(sparkCubingStep.getId());
                nExecutableManager2.resumeJob(create.getId());
                return null;
            }, project, 3, -1L, create.getId());
            IndexDataConstructor.wait((AbstractExecutable) create);
            KylinConfig createKylinConfig4 = KylinConfig.createKylinConfig(createKylinConfig);
            NDataflow dataflow3 = NDataflowManager.getInstance(createKylinConfig4, project).getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
            Assert.assertEquals(1L, dataflow3.getSegments().size());
            NDataSegment firstSegment2 = dataflow3.getSegments().getFirstSegment();
            Assert.assertTrue(firstSegment2.isFlatTableReady());
            Assert.assertTrue(firstSegment2.isDictReady());
            Assert.assertTrue(firstSegment2.isFactViewReady());
            Assert.assertNotNull(firstSegment2.getLayout(20000010001L));
            Assert.assertNotNull(firstSegment2.getLayout(1000001L));
            ResourceStore.clearCache(createKylinConfig4);
            NDataSegment firstSegment3 = NDataflowManager.getInstance(createKylinConfig2, project).getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa").getSegments().getFirstSegment();
            Assert.assertFalse(firstSegment3.isFlatTableReady());
            Assert.assertFalse(firstSegment3.isDictReady());
            Assert.assertFalse(firstSegment3.isFactViewReady());
            ResourceStore.clearCache(createKylinConfig);
            ResourceStore.clearCache(createKylinConfig2);
        } catch (Throwable th) {
            tableDesc.setTableType(tableType);
            NTableMetadataManager.getInstance(testConfig, project).updateTableDesc(tableDesc);
            throw th;
        }
    }

    private void cleanupSegments(NDataflowManager nDataflowManager, String str) {
        NDataflow dataflow = nDataflowManager.getDataflow(str);
        NDataflowUpdate nDataflowUpdate = new NDataflowUpdate(dataflow.getUuid());
        nDataflowUpdate.setToRemoveSegs((NDataSegment[]) dataflow.getSegments().toArray(new NDataSegment[0]));
        nDataflowManager.updateDataflow(nDataflowUpdate);
    }
}
