package org.apache.kylin.engine.spark.job;

import com.clearspring.analytics.util.Lists;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.RandomUtil;
import org.apache.kylin.engine.spark.IndexDataConstructor;
import org.apache.kylin.engine.spark.NLocalWithSparkSessionTest;
import org.apache.kylin.engine.spark.merger.AfterMergeOrRefreshResourceMerger;
import org.apache.kylin.guava30.shaded.common.collect.Maps;
import org.apache.kylin.guava30.shaded.common.collect.Sets;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.execution.NExecutableManager;
import org.apache.kylin.job.impl.threadpool.NDefaultScheduler;
import org.apache.kylin.metadata.cube.model.LayoutEntity;
import org.apache.kylin.metadata.cube.model.NDataSegment;
import org.apache.kylin.metadata.cube.model.NDataflow;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.datatype.DataType;
import org.apache.kylin.metadata.job.JobBucket;
import org.apache.kylin.metadata.model.SegmentRange;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.SparderTypeUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kylin/engine/spark/job/NSparkMergingJobTest.class */
public class NSparkMergingJobTest extends NLocalWithSparkSessionTest {
    private KylinConfig config;

    @Before
    public void setup() {
        ss.sparkContext().setLogLevel("ERROR");
        overwriteSystemProp("kylin.job.scheduler.poll-interval-second", "1");
        overwriteSystemProp("kylin.engine.persist-flattable-threshold", "0");
        overwriteSystemProp("kylin.engine.persist-flatview", "true");
        NDefaultScheduler.destroyInstance();
        NDefaultScheduler nDefaultScheduler = NDefaultScheduler.getInstance(getProject());
        nDefaultScheduler.init(new JobEngineConfig(getTestConfig()));
        if (!nDefaultScheduler.hasStarted()) {
            throw new RuntimeException("scheduler has not been started");
        }
        this.config = getTestConfig();
    }

    @After
    public void after() {
        NDefaultScheduler.destroyInstance();
        cleanupTestMetadata();
    }

    @Test
    public void testMultiPartitionMergeSegments() throws Exception {
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(this.config, "default");
        NDataflow dataflow = nDataflowManager.getDataflow("b780e4e4-69af-449e-b09f-05c90dfa04b6");
        NDataSegment segment = dataflow.getSegment("d2edf0c5-5eb2-4968-9ad5-09efbf659324");
        NDataSegment segment2 = dataflow.getSegment("ff839b0b-2c23-4420-b332-0df70e36c343");
        fakeEmptyPartitionLayoutData(segment);
        fakeEmptyPartitionLayoutData(segment2);
        List list = (List) segment.getSegDetails().getLayouts().stream().map((v0) -> {
            return v0.getLayout();
        }).collect(Collectors.toList());
        Set allPartitionIds = segment.getAllPartitionIds();
        NDataSegment mergeSegments = nDataflowManager.mergeSegments(dataflow, new SegmentRange.TimePartitionedSegmentRange(SegmentRange.dateToLong("2020-11-03"), SegmentRange.dateToLong("2020-11-05")), false);
        String id = mergeSegments.getId();
        AtomicLong atomicLong = new AtomicLong(0L);
        NSparkMergingJob merge = NSparkMergingJob.merge(mergeSegments, Sets.newLinkedHashSet(list), "ADMIN", RandomUtil.randomUUIDStr(), Sets.newHashSet(allPartitionIds), (Set) list.stream().flatMap(layoutEntity -> {
            return allPartitionIds.stream().map(l -> {
                return new JobBucket(id, layoutEntity.getId(), atomicLong.incrementAndGet(), l.longValue());
            });
        }).collect(Collectors.toSet()));
        NExecutableManager.getInstance(this.config, getProject()).addJob(merge);
        Assert.assertEquals(ExecutableState.SUCCEED, IndexDataConstructor.wait((AbstractExecutable) merge));
        new AfterMergeOrRefreshResourceMerger(this.config, getProject()).merge(merge.getSparkMergingStep());
        int size = nDataflowManager.getDataflow("b780e4e4-69af-449e-b09f-05c90dfa04b6").getSegment(id).getMultiPartitions().size();
        Assert.assertEquals(segment.getMultiPartitions().size(), size);
        Assert.assertEquals(segment.getMultiPartitions().size(), size);
    }

    private void fakeEmptyPartitionLayoutData(NDataSegment nDataSegment) {
        LinkedHashMap newLinkedHashMap = Maps.newLinkedHashMap();
        nDataSegment.getModel().getMultiPartitionDesc().getColumnRefs().forEach(tblColRef -> {
        });
        nDataSegment.getSegDetails().getLayouts().forEach(nDataLayout -> {
            LayoutEntity layout = nDataLayout.getLayout();
            StructType structType = new StructType();
            LinkedHashMap newLinkedHashMap2 = Maps.newLinkedHashMap();
            newLinkedHashMap2.putAll(newLinkedHashMap);
            layout.getOrderedDimensions().forEach((num, tblColRef2) -> {
            });
            layout.getOrderedMeasures().forEach((num2, measure) -> {
            });
            for (Map.Entry entry : newLinkedHashMap2.entrySet()) {
                structType = structType.add((String) entry.getKey(), SparderTypeUtil.toSparkType((DataType) entry.getValue(), false));
            }
            StructType structType2 = structType;
            nDataLayout.getMultiPartition().forEach(layoutPartition -> {
                Dataset createDataFrame = ss.createDataFrame(Lists.newArrayList(), structType2);
                createDataFrame.write().mode(SaveMode.Overwrite).parquet(NSparkCubingUtil.getStoragePath(nDataSegment, Long.valueOf(nDataLayout.getLayoutId()), Long.valueOf(layoutPartition.getBucketId())));
            });
        });
    }

    @Test
    public void testCancelJob() {
        NSparkMergingJob nSparkMergingJob = new NSparkMergingJob();
        nSparkMergingJob.setProject("default");
        NSparkMergingStep nSparkMergingStep = new NSparkMergingStep();
        nSparkMergingStep.setProject("default");
        nSparkMergingJob.addTask(nSparkMergingStep);
        nSparkMergingJob.cancelJob();
        nSparkMergingStep.setParam("segmentIds", "");
        nSparkMergingStep.setParam("dataflowId", "89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        nSparkMergingJob.cancelJob();
    }
}
