/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.streaming.jobs;

import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.exception.KylinException;
import org.apache.kylin.metadata.cube.model.NDataSegment;
import org.apache.kylin.metadata.cube.model.NDataflow;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.streaming.common.MergeJobEntry;
import org.apache.kylin.streaming.jobs.StreamingDFMergeJob;
import org.apache.kylin.streaming.jobs.SyncMerger;
import org.apache.kylin.streaming.util.StreamingTestCase;
import org.apache.spark.sql.SparkSession;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

public class SyncMergerTest
extends StreamingTestCase {
    private static String PROJECT = "streaming_test";
    private static String MODEL_ALIAS = "stream_merge1";
    @Rule
    public ExpectedException thrown = ExpectedException.none();

    @Before
    public void setUp() throws Exception {
        this.createTestMetadata(new String[0]);
    }

    @After
    public void tearDown() {
        this.cleanupTestMetadata();
    }

    @Test
    public void testRunSuccessful() {
        KylinConfig testConfig = SyncMergerTest.getTestConfig();
        NDataflowManager mgr = NDataflowManager.getInstance((KylinConfig)testConfig, (String)PROJECT);
        NDataflow df = mgr.getDataflowByModelAlias(MODEL_ALIAS);
        SparkSession ss = SparkSession.builder().master("local").getOrCreate();
        MergeJobEntry mergeJobEntry = this.createMergeJobEntry(mgr, df, ss, PROJECT);
        NDataSegment afterMergeSeg = mergeJobEntry.afterMergeSegment();
        SyncMerger syncMerge = new SyncMerger(mergeJobEntry);
        StreamingDFMergeJob merger = new StreamingDFMergeJob();
        syncMerge.run(merger);
        df = mgr.getDataflow(df.getId());
        Assert.assertEquals((long)1L, (long)df.getSegments().size());
        Assert.assertEquals((Object)SegmentStatusEnum.READY, (Object)df.getSegment(afterMergeSeg.getId()).getStatus());
        ss.stop();
    }

    @Test
    public void testRunFailed() {
        KylinConfig testConfig = SyncMergerTest.getTestConfig();
        NDataflowManager mgr = NDataflowManager.getInstance((KylinConfig)testConfig, (String)PROJECT);
        NDataflow df = mgr.getDataflowByModelAlias(MODEL_ALIAS);
        MergeJobEntry mergeJobEntry = this.createMergeJobEntry(mgr, df, null, PROJECT);
        SyncMerger syncMerge = new SyncMerger(mergeJobEntry);
        StreamingDFMergeJob merger = new StreamingDFMergeJob();
        try {
            syncMerge.run(merger);
            Assert.fail();
        }
        catch (Exception e) {
            Assert.assertTrue((boolean)(e instanceof KylinException));
        }
    }
}

