package org.apache.kylin.engine.spark.job;

import java.io.IOException;
import java.util.List;
import java.util.Set;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
import org.apache.kylin.common.util.RandomUtil;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.execution.NExecutableManager;
import org.apache.kylin.job.impl.threadpool.NDefaultScheduler;
import org.apache.kylin.metadata.cube.model.NDataSegment;
import org.apache.kylin.metadata.cube.model.NDataflow;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.cube.model.NDataflowUpdate;
import org.apache.kylin.metadata.model.SegmentRange;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.sparkproject.guava.collect.Sets;

@Ignore("for test spark job on yarn")
/* loaded from: input_file:org/apache/kylin/engine/spark/job/NSparkCubingJobOnYarnTest.class */
public class NSparkCubingJobOnYarnTest extends NLocalFileMetadataTestCase {
    @Before
    public void setup() throws Exception {
        overwriteSystemProp("kylin.job.scheduler.poll-interval-second", "1");
        createTestMetadata(new String[0]);
        NDefaultScheduler nDefaultScheduler = NDefaultScheduler.getInstance("default");
        nDefaultScheduler.init(new JobEngineConfig(KylinConfig.getInstanceFromEnv()));
        overwriteSystemProp("kylin.hadoop.conf.dir", "../examples/test_case_data/sandbox");
        overwriteSystemProp("SPARK_HOME", "../../build/spark");
        if (!nDefaultScheduler.hasStarted()) {
            throw new RuntimeException("scheduler has not been started");
        }
    }

    @After
    public void after() throws Exception {
        NDefaultScheduler.destroyInstance();
        cleanupTestMetadata();
    }

    @Test
    public void testSparkJobOnYarn() throws IOException, InterruptedException {
        KylinConfig instanceFromEnv = KylinConfig.getInstanceFromEnv();
        instanceFromEnv.setProperty("kylin.env.hdfs-working-dir", "hdfs://sandbox/kylin");
        instanceFromEnv.setProperty("kylin.env", "DEV");
        instanceFromEnv.setProperty("kylin.engine.spark.job-jar", "../assembly/target/ke-assembly-4.0.0-SNAPSHOT-job.jar");
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(instanceFromEnv, "default");
        NExecutableManager nExecutableManager = NExecutableManager.getInstance(instanceFromEnv, "default");
        NDataflow dataflow = nDataflowManager.getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        NDataflowUpdate nDataflowUpdate = new NDataflowUpdate(dataflow.getUuid());
        nDataflowUpdate.setToRemoveSegs((NDataSegment[]) dataflow.getSegments().toArray(new NDataSegment[0]));
        nDataflowManager.updateDataflow(nDataflowUpdate);
        NDataSegment appendSegment = nDataflowManager.appendSegment(dataflow, new SegmentRange.TimePartitionedSegmentRange(0L, SegmentRange.dateToLong("2012-06-01")));
        List allLayouts = dataflow.getIndexPlan().getAllLayouts();
        NSparkCubingJob create = NSparkCubingJob.create(Sets.newHashSet(new NDataSegment[]{appendSegment}), Sets.newLinkedHashSet(allLayouts), "ADMIN", (Set) null);
        nExecutableManager.addJob(create);
        Assert.assertEquals(ExecutableState.SUCCEED, wait(create));
        NSparkCubingJob create2 = NSparkCubingJob.create(Sets.newHashSet(new NDataSegment[]{nDataflowManager.appendSegment(nDataflowManager.getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa"), new SegmentRange.TimePartitionedSegmentRange(SegmentRange.dateToLong("2012-06-01"), SegmentRange.dateToLong("2013-06-01")))}), Sets.newLinkedHashSet(allLayouts), "ADMIN", (Set) null);
        nExecutableManager.addJob(create2);
        Assert.assertEquals(ExecutableState.SUCCEED, wait(create2));
        NSparkMergingJob merge = NSparkMergingJob.merge(nDataflowManager.mergeSegments(nDataflowManager.getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa"), new SegmentRange.TimePartitionedSegmentRange(0L, SegmentRange.dateToLong("2013-06-01")), false), Sets.newLinkedHashSet(allLayouts), "ADMIN", RandomUtil.randomUUIDStr());
        nExecutableManager.addJob(merge);
        Assert.assertEquals(ExecutableState.SUCCEED, wait(merge));
    }

    private ExecutableState wait(AbstractExecutable abstractExecutable) throws InterruptedException {
        ExecutableState status;
        do {
            Thread.sleep(500L);
            status = abstractExecutable.getStatus();
        } while (status.isProgressing());
        return status;
    }
}
