package org.apache.kylin.engine.spark.job;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.exception.KylinException;
import org.apache.kylin.common.exception.code.ErrorCodeServer;
import org.apache.kylin.common.persistence.transaction.UnitOfWork;
import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
import org.apache.kylin.engine.spark.utils.SparkJobFactoryUtils;
import org.apache.kylin.guava30.shaded.common.collect.Lists;
import org.apache.kylin.guava30.shaded.common.collect.Sets;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableParams;
import org.apache.kylin.job.execution.NExecutableManager;
import org.apache.kylin.job.handler.AbstractJobHandler;
import org.apache.kylin.job.impl.threadpool.NDefaultScheduler;
import org.apache.kylin.job.manager.JobManager;
import org.apache.kylin.job.model.JobParam;
import org.apache.kylin.metadata.cube.model.IndexEntity;
import org.apache.kylin.metadata.cube.model.IndexPlan;
import org.apache.kylin.metadata.cube.model.LayoutEntity;
import org.apache.kylin.metadata.cube.model.LayoutPartition;
import org.apache.kylin.metadata.cube.model.NDataLayout;
import org.apache.kylin.metadata.cube.model.NDataSegment;
import org.apache.kylin.metadata.cube.model.NDataflow;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.cube.model.NDataflowUpdate;
import org.apache.kylin.metadata.cube.model.NIndexPlanManager;
import org.apache.kylin.metadata.cube.model.PartitionStatusEnum;
import org.apache.kylin.metadata.model.NDataModel;
import org.apache.kylin.metadata.model.NDataModelManager;
import org.apache.kylin.metadata.model.SegmentRange;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.Mockito;
import org.springframework.test.util.ReflectionTestUtils;

/* loaded from: input_file:org/apache/kylin/engine/spark/job/JobManagerTest.class */
public class JobManagerTest extends NLocalFileMetadataTestCase {
    private static final String PROJECT = "default";
    private static JobManager jobManager;

    @Rule
    public ExpectedException thrown = ExpectedException.none();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kylin/engine/spark/job/JobManagerTest$Functions.class */
    public interface Functions {
        void process();
    }

    @Before
    public void setup() throws Exception {
        createTestMetadata(new String[0]);
        jobManager = JobManager.getInstance(KylinConfig.getInstanceFromEnv(), PROJECT);
        SparkJobFactoryUtils.initJobFactory();
    }

    private void assertExeption(Functions functions, String str) {
        try {
            functions.process();
            Assert.fail();
        } catch (Exception e) {
            Assert.assertTrue(e instanceof KylinException);
            Assert.assertEquals(str, e.getMessage());
        }
    }

    @Test
    public void testPartitionBuildJob() {
        NDataModelManager.getInstance(KylinConfig.getInstanceFromEnv(), PROJECT);
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), PROJECT);
        NDataflow dataflow = nDataflowManager.getDataflow("b780e4e4-69af-449e-b09f-05c90dfa04b6");
        JobParam jobParam = new JobParam();
        jobParam.withTargetSegments(Sets.newHashSet(new String[]{"73570f31-05a5-448f-973c-44209830dd01"}));
        jobParam.setModel("b780e4e4-69af-449e-b09f-05c90dfa04b6");
        jobParam.setOwner("ADMIN");
        jobParam.setProject(PROJECT);
        assertExeption(() -> {
            jobParam.setTargetPartitions(Sets.newHashSet());
            jobManager.buildPartitionJob(jobParam);
        }, ErrorCodeServer.JOB_CREATE_CHECK_MULTI_PARTITION_EMPTY.getMsg(new Object[0]));
        assertExeption(() -> {
            jobParam.setTargetPartitions(Sets.newHashSet(new Long[]{7L}));
            jobManager.buildPartitionJob(jobParam);
        }, ErrorCodeServer.JOB_CREATE_CHECK_MULTI_PARTITION_ABANDON.getMsg(new Object[0]));
        long maxBucketId = dataflow.getSegment("73570f31-05a5-448f-973c-44209830dd01").getMaxBucketId();
        jobParam.setTargetPartitions(Sets.newHashSet(new Long[]{9L}));
        jobManager.buildPartitionJob(jobParam);
        List<AbstractExecutable> runningExecutables = getRunningExecutables(PROJECT, "b780e4e4-69af-449e-b09f-05c90dfa04b6");
        Assert.assertEquals(runningExecutables.size(), 1L);
        Assert.assertEquals(ExecutableParams.getBuckets(runningExecutables.get(0).getParam("buckets")).size(), 15L);
        Assert.assertEquals(nDataflowManager.getDataflow("b780e4e4-69af-449e-b09f-05c90dfa04b6").getSegment("73570f31-05a5-448f-973c-44209830dd01").getMaxBucketId() - maxBucketId, 15L);
        NIndexPlanManager nIndexPlanManager = NIndexPlanManager.getInstance(KylinConfig.getInstanceFromEnv(), PROJECT);
        IndexPlan indexPlan = nIndexPlanManager.getIndexPlan(jobParam.getModel());
        indexPlan.getAllLayouts();
        Assert.assertEquals(runningExecutables.get(0).getLayoutIds().size(), 15L);
        NDataSegment generateSegmentForMultiPartition = generateSegmentForMultiPartition("b780e4e4-69af-449e-b09f-05c90dfa04b6", Lists.newArrayList(new String[]{"usa", "cn"}), "2010-01-01", "2010-02-01", SegmentStatusEnum.READY);
        generateSegmentForMultiPartition.getMultiPartitions().forEach(segmentPartition -> {
            segmentPartition.setStatus(PartitionStatusEnum.NEW);
        });
        ArrayList newArrayList = Lists.newArrayList(new NDataSegment[]{generateSegmentForMultiPartition});
        NDataflowUpdate nDataflowUpdate = new NDataflowUpdate(dataflow.getUuid());
        nDataflowUpdate.setToUpdateSegs((NDataSegment[]) newArrayList.toArray(new NDataSegment[0]));
        nDataflowManager.updateDataflow(nDataflowUpdate);
        nIndexPlanManager.updateIndexPlan(indexPlan.getUuid(), indexPlan2 -> {
            indexPlan2.removeLayouts(Sets.newHashSet(new Long[]{1L}), false, true);
        });
        JobParam jobParam2 = new JobParam(Sets.newHashSet(new String[]{generateSegmentForMultiPartition.getId()}), (Set) null, "b780e4e4-69af-449e-b09f-05c90dfa04b6", "ADMIN", Sets.newHashSet(new Long[]{7L}), (Set) null);
        jobParam2.setProject(PROJECT);
        jobManager.buildPartitionJob(jobParam2);
        List<AbstractExecutable> runningExecutables2 = getRunningExecutables(PROJECT, "b780e4e4-69af-449e-b09f-05c90dfa04b6");
        Assert.assertEquals(2L, runningExecutables2.size());
        Assert.assertEquals(14L, nIndexPlanManager.getIndexPlan("b780e4e4-69af-449e-b09f-05c90dfa04b6").getAllLayouts().size());
        Assert.assertEquals(14L, runningExecutables2.get(1).getLayoutIds().size());
        nIndexPlanManager.updateIndexPlan("b780e4e4-69af-449e-b09f-05c90dfa04b6", indexPlan3 -> {
            IndexEntity indexEntity = new IndexEntity();
            indexEntity.setDimensions(Lists.newArrayList(new Integer[]{1, 3}));
            indexEntity.setId(20000000000L);
            LayoutEntity layoutEntity = new LayoutEntity();
            layoutEntity.setId(20000000001L);
            layoutEntity.setColOrder(Lists.newArrayList(new Integer[]{1, 3}));
            indexEntity.setLayouts(Arrays.asList(layoutEntity));
            List allIndexes = indexPlan3.getAllIndexes();
            allIndexes.add(indexEntity);
            indexPlan3.setIndexes(allIndexes);
        });
        JobParam jobParam3 = new JobParam(Sets.newHashSet(new String[]{generateSegmentForMultiPartition.getId()}), (Set) null, "b780e4e4-69af-449e-b09f-05c90dfa04b6", "ADMIN", Sets.newHashSet(new Long[]{8L}), (Set) null);
        jobManager.buildPartitionJob(jobParam3);
        List<AbstractExecutable> runningExecutables3 = getRunningExecutables(PROJECT, "b780e4e4-69af-449e-b09f-05c90dfa04b6");
        Assert.assertEquals(3L, runningExecutables3.size());
        Assert.assertEquals(15L, nIndexPlanManager.getIndexPlan("b780e4e4-69af-449e-b09f-05c90dfa04b6").getAllLayouts().size());
        Assert.assertEquals(14L, runningExecutables3.get(1).getLayoutIds().size());
        Assert.assertEquals(14L, runningExecutables3.get(2).getLayoutIds().size());
        checkConcurrent(jobParam3);
    }

    @Test
    public void testPartitionJobNoIndex() {
        NDataModelManager.getInstance(KylinConfig.getInstanceFromEnv(), PROJECT);
        NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), PROJECT).getDataflow("b780e4e4-69af-449e-b09f-05c90dfa04b6");
        JobParam jobParam = new JobParam();
        jobParam.withTargetSegments(Sets.newHashSet(new String[]{"73570f31-05a5-448f-973c-44209830dd01"}));
        jobParam.setModel("b780e4e4-69af-449e-b09f-05c90dfa04b6");
        jobParam.setOwner("ADMIN");
        jobParam.setProject(PROJECT);
        NIndexPlanManager nIndexPlanManager = NIndexPlanManager.getInstance(KylinConfig.getInstanceFromEnv(), PROJECT);
        UnitOfWork.doInTransactionWithRetry(() -> {
            return nIndexPlanManager.updateIndexPlan("b780e4e4-69af-449e-b09f-05c90dfa04b6", indexPlan -> {
                indexPlan.removeLayouts(indexPlan.getAllLayoutIds(false), true, true);
            });
        }, "b780e4e4-69af-449e-b09f-05c90dfa04b6");
        assertExeption(() -> {
            jobParam.setTargetPartitions(Sets.newHashSet(new Long[]{9L}));
            jobManager.buildPartitionJob(jobParam);
        }, ErrorCodeServer.JOB_CREATE_CHECK_INDEX_FAIL.getMsg(new Object[0]));
    }

    @Test
    public void testIndexBuildJob() {
        NDataModelManager.getInstance(KylinConfig.getInstanceFromEnv(), PROJECT);
        NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), PROJECT);
        NIndexPlanManager.getInstance(KylinConfig.getInstanceFromEnv(), PROJECT);
        ArrayList newArrayList = Lists.newArrayList(new String[]{"usa", "cn"});
        jobManager.addRelatedIndexJob(new JobParam(Sets.newHashSet(new String[]{generateSegmentForMultiPartition("b780e4e4-69af-449e-b09f-05c90dfa04b6", newArrayList, "2010-01-01", "2010-02-01", SegmentStatusEnum.READY).getId()}), (Set) null, "b780e4e4-69af-449e-b09f-05c90dfa04b6", "ADMIN", (Set) null, (Set) null));
        List<AbstractExecutable> runningExecutables = getRunningExecutables(PROJECT, "b780e4e4-69af-449e-b09f-05c90dfa04b6");
        Assert.assertEquals(1L, runningExecutables.size());
        Assert.assertEquals(15L, runningExecutables.get(0).getLayoutIds().size());
        Assert.assertEquals(2L, runningExecutables.get(0).getTargetPartitions().size());
        JobParam jobParam = new JobParam(Sets.newHashSet(new String[]{generateSegmentForMultiPartition("b780e4e4-69af-449e-b09f-05c90dfa04b6", newArrayList, "2010-02-01", "2010-03-01", SegmentStatusEnum.READY).getId()}), Sets.newHashSet(new Long[]{1L}), "b780e4e4-69af-449e-b09f-05c90dfa04b6", "ADMIN", (Set) null, (Set) null);
        jobManager.addRelatedIndexJob(jobParam);
        List<AbstractExecutable> runningExecutables2 = getRunningExecutables(PROJECT, "b780e4e4-69af-449e-b09f-05c90dfa04b6");
        Assert.assertEquals(2L, runningExecutables2.size());
        Assert.assertEquals(1L, runningExecutables2.get(1).getLayoutIds().size());
        checkConcurrent(jobParam);
        try {
            jobManager.addRelatedIndexJob(new JobParam(Sets.newHashSet(new String[]{generateSegmentForMultiPartition("b780e4e4-69af-449e-b09f-05c90dfa04b6", newArrayList, "2010-03-01", "2010-04-01", SegmentStatusEnum.NEW).getId()}), Sets.newHashSet(new Long[]{1L}), "b780e4e4-69af-449e-b09f-05c90dfa04b6", "ADMIN", (Set) null, (Set) null));
            Assert.fail();
        } catch (Exception e) {
            Assert.assertEquals(e.getCause().getMessage(), "No segment is ready in this job.");
        }
        JobParam jobParam2 = new JobParam(Sets.newHashSet(new String[]{generateSegmentForMultiPartition("b780e4e4-69af-449e-b09f-05c90dfa04b6", Lists.newArrayList(), "2010-04-01", "2010-05-01", SegmentStatusEnum.READY).getId()}), Sets.newHashSet(new Long[]{1L}), "b780e4e4-69af-449e-b09f-05c90dfa04b6", "ADMIN", (Set) null, (Set) null);
        assertExeption(() -> {
            jobManager.addRelatedIndexJob(jobParam2);
        }, ErrorCodeServer.JOB_CREATE_CHECK_MULTI_PARTITION_EMPTY.getMsg(new Object[0]));
    }

    @Test
    public void testSegmentBuildJob() {
        NDataModelManager.getInstance(KylinConfig.getInstanceFromEnv(), PROJECT);
        NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), PROJECT).getDataflow("b780e4e4-69af-449e-b09f-05c90dfa04b6");
        NIndexPlanManager.getInstance(KylinConfig.getInstanceFromEnv(), PROJECT);
        ArrayList newArrayList = Lists.newArrayList(new String[]{"usa", "cn"});
        jobManager.addSegmentJob(new JobParam(Sets.newHashSet(new String[]{generateSegmentForMultiPartition("b780e4e4-69af-449e-b09f-05c90dfa04b6", newArrayList, "2010-01-01", "2010-02-01", SegmentStatusEnum.READY).getId()}), (Set) null, "b780e4e4-69af-449e-b09f-05c90dfa04b6", "ADMIN", Sets.newHashSet(new Long[]{7L}), (Set) null));
        List<AbstractExecutable> runningExecutables = getRunningExecutables(PROJECT, "b780e4e4-69af-449e-b09f-05c90dfa04b6");
        Assert.assertEquals(1L, runningExecutables.size());
        Assert.assertEquals(1L, runningExecutables.get(0).getTargetPartitions().size());
        Assert.assertEquals(15L, runningExecutables.get(0).getLayoutIds().size());
        JobParam jobParam = new JobParam(Sets.newHashSet(new String[]{generateSegmentForMultiPartition("b780e4e4-69af-449e-b09f-05c90dfa04b6", newArrayList, "2010-02-01", "2010-03-01", SegmentStatusEnum.READY).getId()}), Sets.newHashSet(new Long[]{1L}), "b780e4e4-69af-449e-b09f-05c90dfa04b6", "ADMIN", Sets.newHashSet(new Long[]{7L}), (Set) null);
        jobManager.addSegmentJob(jobParam);
        List<AbstractExecutable> runningExecutables2 = getRunningExecutables(PROJECT, "b780e4e4-69af-449e-b09f-05c90dfa04b6");
        Assert.assertEquals(2L, runningExecutables2.size());
        Assert.assertEquals(1L, runningExecutables2.get(1).getTargetPartitions().size());
        Assert.assertEquals(1L, runningExecutables2.get(1).getLayoutIds().size());
        checkConcurrent(jobParam);
    }

    @Test
    public void testSegmentRefreshJob() {
        NDataModelManager.getInstance(KylinConfig.getInstanceFromEnv(), PROJECT);
        generateTableIndex("b780e4e4-69af-449e-b09f-05c90dfa04b6", 20000000001L);
        jobManager.refreshSegmentJob(new JobParam(Sets.newHashSet(new String[]{"73570f31-05a5-448f-973c-44209830dd01"}), (Set) null, "b780e4e4-69af-449e-b09f-05c90dfa04b6", "ADMIN", (Set) null, (Set) null), false);
        List<AbstractExecutable> runningExecutables = getRunningExecutables(PROJECT, "b780e4e4-69af-449e-b09f-05c90dfa04b6");
        Assert.assertEquals(1L, runningExecutables.size());
        Assert.assertEquals(15L, runningExecutables.get(0).getLayoutIds().size());
        JobParam jobParam = new JobParam(Sets.newHashSet(new String[]{"0db919f3-1359-496c-aab5-b6f3951adc0e"}), (Set) null, "b780e4e4-69af-449e-b09f-05c90dfa04b6", "ADMIN", (Set) null, (Set) null);
        jobManager.refreshSegmentJob(jobParam, true);
        List<AbstractExecutable> runningExecutables2 = getRunningExecutables(PROJECT, "b780e4e4-69af-449e-b09f-05c90dfa04b6");
        Assert.assertEquals(2L, runningExecutables2.size());
        Assert.assertEquals(16L, runningExecutables2.get(1).getLayoutIds().size());
        Assert.assertEquals(1L, runningExecutables2.get(0).getTargetPartitions().size());
        Assert.assertEquals(2L, runningExecutables2.get(1).getTargetPartitions().size());
        checkConcurrent(jobParam);
    }

    @Test
    public void testPartitionRefreshJob() {
        generateTableIndex("b780e4e4-69af-449e-b09f-05c90dfa04b6", 20000000001L);
        HashSet newHashSet = Sets.newHashSet(new Long[]{7L});
        jobManager.refreshSegmentJob(new JobParam(Sets.newHashSet(new String[]{"73570f31-05a5-448f-973c-44209830dd01"}), (Set) null, "b780e4e4-69af-449e-b09f-05c90dfa04b6", "ADMIN", newHashSet, (Set) null), false);
        List<AbstractExecutable> runningExecutables = getRunningExecutables(PROJECT, "b780e4e4-69af-449e-b09f-05c90dfa04b6");
        Assert.assertEquals(1L, runningExecutables.size());
        Assert.assertEquals(15L, runningExecutables.get(0).getLayoutIds().size());
        JobParam jobParam = new JobParam(Sets.newHashSet(new String[]{"0db919f3-1359-496c-aab5-b6f3951adc0e"}), (Set) null, "b780e4e4-69af-449e-b09f-05c90dfa04b6", "ADMIN", newHashSet, (Set) null);
        jobManager.refreshSegmentJob(jobParam, true);
        List<AbstractExecutable> runningExecutables2 = getRunningExecutables(PROJECT, "b780e4e4-69af-449e-b09f-05c90dfa04b6");
        Assert.assertEquals(2L, runningExecutables2.size());
        Assert.assertEquals(16L, runningExecutables2.get(1).getLayoutIds().size());
        Assert.assertEquals(1L, runningExecutables2.get(0).getTargetPartitions().size());
        Assert.assertEquals(1L, runningExecutables2.get(1).getTargetPartitions().size());
        checkConcurrent(jobParam);
    }

    @Test
    public void testQuotaLimitReached() {
        NDefaultScheduler nDefaultScheduler = NDefaultScheduler.getInstance(PROJECT);
        nDefaultScheduler.init(new JobEngineConfig(getTestConfig()));
        nDefaultScheduler.getContext().setReachQuotaLimit(true);
        Assert.assertThrows(KylinException.class, () -> {
            try {
                JobManager.checkStorageQuota(PROJECT);
            } finally {
                nDefaultScheduler.forceShutdown();
                nDefaultScheduler.getContext().setReachQuotaLimit(false);
            }
        });
    }

    @Test
    public void testAddJob_throwsException() {
        KylinConfig kylinConfig = (KylinConfig) Mockito.mock(KylinConfig.class);
        ReflectionTestUtils.setField(jobManager, "config", kylinConfig);
        Mockito.when(Boolean.valueOf(kylinConfig.isJobNode())).thenReturn(false);
        Mockito.when(Boolean.valueOf(kylinConfig.isUTEnv())).thenReturn(false);
        try {
            jobManager.addJob((JobParam) null, (AbstractJobHandler) null);
        } catch (Exception e) {
            Assert.assertTrue(e instanceof KylinException);
            Assert.assertEquals(ErrorCodeServer.JOB_CREATE_ABANDON.getCodeMsg(new Object[0]), e.toString());
        }
    }

    public void checkConcurrent(JobParam jobParam) {
        assertExeption(() -> {
            jobManager.addSegmentJob(jobParam);
        }, ErrorCodeServer.JOB_CREATE_CHECK_FAIL.getMsg(new Object[0]));
    }

    private List<AbstractExecutable> getRunningExecutables(String str, String str2) {
        List<AbstractExecutable> runningExecutables = NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), str).getRunningExecutables(str, str2);
        runningExecutables.sort(Comparator.comparing((v0) -> {
            return v0.getCreateTime();
        }));
        return runningExecutables;
    }

    private void generateTableIndex(String str, long j) {
        NIndexPlanManager.getInstance(KylinConfig.getInstanceFromEnv(), PROJECT).updateIndexPlan(str, indexPlan -> {
            IndexEntity indexEntity = new IndexEntity();
            indexEntity.setDimensions(Lists.newArrayList(new Integer[]{1, 3}));
            indexEntity.setId(j);
            LayoutEntity layoutEntity = new LayoutEntity();
            layoutEntity.setId(j);
            layoutEntity.setColOrder(Lists.newArrayList(new Integer[]{1, 3}));
            indexEntity.setLayouts(Arrays.asList(layoutEntity));
            List allIndexes = indexPlan.getAllIndexes();
            allIndexes.add(indexEntity);
            indexPlan.setIndexes(allIndexes);
        });
    }

    private NDataSegment generateSegmentForMultiPartition(String str, List<String> list, String str2, String str3, SegmentStatusEnum segmentStatusEnum) {
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(getTestConfig(), PROJECT);
        ArrayList newArrayList = Lists.newArrayList();
        list.forEach(str4 -> {
            newArrayList.add(new String[]{str4});
        });
        NDataSegment appendSegment = nDataflowManager.appendSegment(nDataflowManager.getDataflow(str), new SegmentRange.TimePartitionedSegmentRange(Long.valueOf(SegmentRange.dateToLong(str2).longValue()), Long.valueOf(SegmentRange.dateToLong(str3).longValue())), segmentStatusEnum, newArrayList);
        appendSegment.getMultiPartitions().forEach(segmentPartition -> {
            segmentPartition.setStatus(PartitionStatusEnum.READY);
        });
        return appendSegment;
    }

    private NDataLayout generateLayoutForMultiPartition(String str, String str2, List<String> list, long j) {
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(getTestConfig(), PROJECT);
        NDataModel dataModelDesc = NDataModelManager.getInstance(getTestConfig(), PROJECT).getDataModelDesc(str);
        NDataflow dataflow = nDataflowManager.getDataflow(str);
        ArrayList newArrayList = Lists.newArrayList();
        list.forEach(str3 -> {
            newArrayList.add(new String[]{str3});
        });
        Set partitionIdsByValues = dataModelDesc.getMultiPartitionDesc().getPartitionIdsByValues(newArrayList);
        NDataLayout newDataLayout = NDataLayout.newDataLayout(dataflow, str2, j);
        partitionIdsByValues.forEach(l -> {
            newDataLayout.getMultiPartition().add(new LayoutPartition(l.longValue()));
        });
        return newDataLayout;
    }
}
