package org.apache.kylin.metadata.cube.model;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.Vector;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.exception.KylinException;
import org.apache.kylin.common.exception.code.ErrorCodeServer;
import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
import org.apache.kylin.common.util.RandomUtil;
import org.apache.kylin.junit.TimeZoneTestRunner;
import org.apache.kylin.metadata.cube.CubeTestUtils;
import org.apache.kylin.metadata.model.ManagementType;
import org.apache.kylin.metadata.model.NDataModelManager;
import org.apache.kylin.metadata.model.NTableMetadataManager;
import org.apache.kylin.metadata.model.SegmentRange;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.Segments;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.project.NProjectManager;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;

@RunWith(TimeZoneTestRunner.class)
/* loaded from: input_file:org/apache/kylin/metadata/cube/model/NDataflowManagerTest.class */
public class NDataflowManagerTest extends NLocalFileMetadataTestCase {
    private String projectDefault = "default";

    @Rule
    public ExpectedException thrown = ExpectedException.none();

    @Before
    public void setUp() throws Exception {
        createTestMetadata(new String[0]);
    }

    @After
    public void tearDown() throws Exception {
        cleanupTestMetadata();
    }

    @Test
    public void testInvalidMerge() {
        this.thrown.expectMessage("Range TimePartitionedSegmentRange[" + SegmentRange.dateToLong("2010-01-01") + "," + SegmentRange.dateToLong("2013-01-01") + ") must contain at least 2 segments, but there is 0");
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(getTestConfig(), "default");
        nDataflowManager.mergeSegments(nDataflowManager.getDataflowByModelAlias("nmodel_basic"), new SegmentRange.TimePartitionedSegmentRange(SegmentRange.dateToLong("2010-01-01"), SegmentRange.dateToLong("2013-01-01")), false);
    }

    @Test
    public void testCached() {
        NDataflow dataflowByModelAlias = NDataflowManager.getInstance(getTestConfig(), this.projectDefault).getDataflowByModelAlias("nmodel_basic");
        Assert.assertTrue(dataflowByModelAlias.isCachedAndShared());
        Assert.assertTrue(dataflowByModelAlias.getSegments().getFirstSegment().isCachedAndShared());
        Assert.assertTrue(dataflowByModelAlias.getSegments().getFirstSegment().getSegDetails().isCachedAndShared());
        Assert.assertTrue(dataflowByModelAlias.getSegments().getFirstSegment().getLayout(1L).isCachedAndShared());
        NDataflow copy = dataflowByModelAlias.copy();
        Assert.assertFalse(copy.isCachedAndShared());
        Assert.assertFalse(copy.getSegments().getFirstSegment().isCachedAndShared());
        Assert.assertFalse(copy.getSegments().getFirstSegment().getSegDetails().isCachedAndShared());
        Assert.assertFalse(copy.getSegments().getFirstSegment().getLayout(1L).isCachedAndShared());
    }

    @Test
    public void testDataflowStatus() {
        NDataflow copy = NDataflowManager.getInstance(getTestConfig(), this.projectDefault).getDataflowByModelAlias("nmodel_basic").copy();
        RealizationStatusEnum status = copy.getStatus();
        Assert.assertNull(copy.getLastStatus());
        copy.setStatus(RealizationStatusEnum.BROKEN);
        Assert.assertEquals(status, copy.getLastStatus());
        copy.setStatus(RealizationStatusEnum.BROKEN);
        Assert.assertEquals(status, copy.getLastStatus());
    }

    @Test
    public void testImmutableCachedObj() {
        NDataflow dataflowByModelAlias = NDataflowManager.getInstance(getTestConfig(), this.projectDefault).getDataflowByModelAlias("nmodel_basic");
        try {
            dataflowByModelAlias.setStatus(RealizationStatusEnum.OFFLINE);
            Assert.fail();
        } catch (IllegalStateException e) {
        }
        try {
            ((NDataSegment) dataflowByModelAlias.getSegments().get(0)).setCreateTimeUTC(0L);
            Assert.fail();
        } catch (IllegalStateException e2) {
        }
        dataflowByModelAlias.copy().setStatus(RealizationStatusEnum.OFFLINE);
    }

    @Test
    public void testCRUD() throws IOException {
        KylinConfig testConfig = getTestConfig();
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(testConfig, this.projectDefault);
        NIndexPlanManager nIndexPlanManager = NIndexPlanManager.getInstance(testConfig, this.projectDefault);
        NProjectManager nProjectManager = NProjectManager.getInstance(testConfig);
        String randomUUIDStr = RandomUtil.randomUUIDStr();
        nProjectManager.getProject(this.projectDefault);
        IndexPlan copy = nIndexPlanManager.getIndexPlanByModelAlias("nmodel_basic").copy();
        copy.setUuid(randomUUIDStr);
        CubeTestUtils.createTmpModelAndCube(getTestConfig(), copy);
        int size = nDataflowManager.listAllDataflows().size();
        Assert.assertNotNull(nDataflowManager.createDataflow(copy, "test_owner"));
        Assert.assertEquals(size + 1, nDataflowManager.listAllDataflows().size());
        Assert.assertNotNull(nDataflowManager.getDataflow(randomUUIDStr));
        new NDataflowUpdate(randomUUIDStr).setCost(1000);
        Assert.assertEquals(1000L, nDataflowManager.updateDataflow(r0).getCost());
        nDataflowManager.dropDataflow(randomUUIDStr);
        Assert.assertEquals(size, nDataflowManager.listAllDataflows().size());
        Assert.assertNull(nDataflowManager.getDataflow(randomUUIDStr));
    }

    @Test
    public void testGetAllModels() {
        KylinConfig testConfig = getTestConfig();
        List listUnderliningDataModels = NDataflowManager.getInstance(testConfig, this.projectDefault).listUnderliningDataModels();
        overwriteSystemProp("ylin.model.check-dependency-healthk", "true");
        Assert.assertEquals(listUnderliningDataModels, NDataflowManager.getInstance(testConfig, this.projectDefault).listUnderliningDataModels());
    }

    @Test
    public void testUpdateSegment() throws IOException {
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(getTestConfig(), this.projectDefault);
        NDataSegment appendSegment = nDataflowManager.appendSegment(nDataflowManager.getDataflowByModelAlias("nmodel_basic"), SegmentRange.TimePartitionedSegmentRange.createInfinite());
        NDataflow dataflowByModelAlias = nDataflowManager.getDataflowByModelAlias("nmodel_basic");
        Assert.assertEquals(2L, dataflowByModelAlias.getSegments().size());
        NDataflowUpdate nDataflowUpdate = new NDataflowUpdate(dataflowByModelAlias.getUuid());
        nDataflowUpdate.setToRemoveSegs(new NDataSegment[]{appendSegment});
        nDataflowManager.updateDataflow(nDataflowUpdate);
        Assert.assertEquals(1L, nDataflowManager.getDataflowByModelAlias("nmodel_basic").getSegments().size());
    }

    @Test
    public void testUpdateSegmentByDataflowSetSegments() throws IOException {
        KylinConfig testConfig = getTestConfig();
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(testConfig, this.projectDefault);
        NDataflow dataflowByModelAlias = nDataflowManager.getDataflowByModelAlias("nmodel_basic");
        NDataSegDetailsManager nDataSegDetailsManager = NDataSegDetailsManager.getInstance(testConfig, this.projectDefault);
        nDataflowManager.appendSegment(dataflowByModelAlias, SegmentRange.TimePartitionedSegmentRange.createInfinite());
        Segments segments = dataflowByModelAlias.getSegments();
        NDataflow dataflowByModelAlias2 = nDataflowManager.getDataflowByModelAlias("nmodel_basic");
        Assert.assertEquals(2L, dataflowByModelAlias2.getSegments().size());
        nDataflowManager.updateDataflow(dataflowByModelAlias2.getId(), nDataflow -> {
            nDataflow.setSegments(new Segments());
        });
        Iterator it = segments.iterator();
        while (it.hasNext()) {
            Assert.assertNull(nDataSegDetailsManager.getForSegment((NDataSegment) it.next()));
        }
    }

    @Test
    public void testMergeSegmentsSuccess() throws IOException {
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(getTestConfig(), this.projectDefault);
        NDataflow dataflowByModelAlias = nDataflowManager.getDataflowByModelAlias("nmodel_basic");
        NDataflowUpdate nDataflowUpdate = new NDataflowUpdate(dataflowByModelAlias.getUuid());
        nDataflowUpdate.setToRemoveSegs((NDataSegment[]) dataflowByModelAlias.getSegments().toArray(new NDataSegment[0]));
        nDataflowManager.updateDataflow(nDataflowUpdate);
        NDataflow dataflowByModelAlias2 = nDataflowManager.getDataflowByModelAlias("nmodel_basic");
        Assert.assertEquals(0L, dataflowByModelAlias2.getSegments().size());
        NDataSegment appendSegment = nDataflowManager.appendSegment(dataflowByModelAlias2, new SegmentRange.TimePartitionedSegmentRange(0L, 1L));
        appendSegment.setStatus(SegmentStatusEnum.READY);
        NDataflowUpdate nDataflowUpdate2 = new NDataflowUpdate(dataflowByModelAlias2.getUuid());
        nDataflowUpdate2.setToUpdateSegs(new NDataSegment[]{appendSegment});
        nDataflowManager.updateDataflow(nDataflowUpdate2);
        NDataflow dataflowByModelAlias3 = nDataflowManager.getDataflowByModelAlias("nmodel_basic");
        Assert.assertEquals(1L, dataflowByModelAlias3.getSegments().size());
        NDataSegment appendSegment2 = nDataflowManager.appendSegment(dataflowByModelAlias3, new SegmentRange.TimePartitionedSegmentRange(1L, 2L));
        appendSegment2.setStatus(SegmentStatusEnum.READY);
        NDataflowUpdate nDataflowUpdate3 = new NDataflowUpdate(dataflowByModelAlias3.getUuid());
        nDataflowUpdate3.setToUpdateSegs(new NDataSegment[]{appendSegment2});
        nDataflowManager.updateDataflow(nDataflowUpdate3);
        NDataflow dataflowByModelAlias4 = nDataflowManager.getDataflowByModelAlias("nmodel_basic");
        Assert.assertEquals(2L, dataflowByModelAlias4.getSegments().size());
        NDataSegment mergeSegments = nDataflowManager.mergeSegments(dataflowByModelAlias4, new SegmentRange.TimePartitionedSegmentRange(0L, 2L), true);
        Assert.assertEquals(3L, nDataflowManager.getDataflowByModelAlias("nmodel_basic").getSegments().size());
        Assert.assertTrue(mergeSegments.getSegRange().contains(appendSegment.getSegRange()));
        Assert.assertTrue(mergeSegments.getSegRange().contains(appendSegment2.getSegRange()));
    }

    @Test
    public void testMergeKafkaSegmentsSuccess() throws IOException {
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(getTestConfig(), this.projectDefault);
        NDataflow dataflowByModelAlias = nDataflowManager.getDataflowByModelAlias("nmodel_basic");
        NDataflowUpdate nDataflowUpdate = new NDataflowUpdate(dataflowByModelAlias.getUuid());
        nDataflowUpdate.setToRemoveSegs((NDataSegment[]) dataflowByModelAlias.getSegments().toArray(new NDataSegment[0]));
        nDataflowManager.updateDataflow(nDataflowUpdate);
        NDataflow dataflowByModelAlias2 = nDataflowManager.getDataflowByModelAlias("nmodel_basic");
        Assert.assertEquals(0L, dataflowByModelAlias2.getSegments().size());
        NDataSegment appendSegment = nDataflowManager.appendSegment(dataflowByModelAlias2, new SegmentRange.KafkaOffsetPartitionedSegmentRange(1L, 2L, createKafkaPartitionOffset(0, 100L), createKafkaPartitionOffset(0, 200L)));
        appendSegment.setStatus(SegmentStatusEnum.READY);
        NDataflowUpdate nDataflowUpdate2 = new NDataflowUpdate(dataflowByModelAlias2.getUuid());
        nDataflowUpdate2.setToUpdateSegs(new NDataSegment[]{appendSegment});
        nDataflowManager.updateDataflow(nDataflowUpdate2);
        NDataflow dataflowByModelAlias3 = nDataflowManager.getDataflowByModelAlias("nmodel_basic");
        Assert.assertEquals(1L, dataflowByModelAlias3.getSegments().size());
        NDataSegment appendSegment2 = nDataflowManager.appendSegment(dataflowByModelAlias3, new SegmentRange.KafkaOffsetPartitionedSegmentRange(2L, 3L, createKafkaPartitionOffset(0, 200L), createKafkaPartitionOffset(0, 300L)));
        appendSegment2.setStatus(SegmentStatusEnum.READY);
        NDataflowUpdate nDataflowUpdate3 = new NDataflowUpdate(dataflowByModelAlias3.getUuid());
        nDataflowUpdate3.setToUpdateSegs(new NDataSegment[]{appendSegment2});
        nDataflowManager.updateDataflow(nDataflowUpdate3);
        NDataflow dataflowByModelAlias4 = nDataflowManager.getDataflowByModelAlias("nmodel_basic");
        Assert.assertEquals(2L, dataflowByModelAlias4.getSegments().size());
        String randomUUIDStr = RandomUtil.randomUUIDStr();
        NDataSegment mergeSegments = nDataflowManager.mergeSegments(dataflowByModelAlias4, new SegmentRange.KafkaOffsetPartitionedSegmentRange(1L, 3L, createKafkaPartitionOffset(0, 100L), createKafkaPartitionOffset(0, 300L)), true, 1, randomUUIDStr);
        mergeSegments.setStatus(SegmentStatusEnum.READY);
        NDataflowUpdate nDataflowUpdate4 = new NDataflowUpdate(dataflowByModelAlias4.getUuid());
        nDataflowUpdate4.setToUpdateSegs(new NDataSegment[]{mergeSegments});
        nDataflowManager.updateDataflow(nDataflowUpdate4);
        NDataflow dataflowByModelAlias5 = nDataflowManager.getDataflowByModelAlias("nmodel_basic");
        Assert.assertEquals(3L, dataflowByModelAlias5.getSegments().size());
        NDataSegment mergeSegments2 = nDataflowManager.mergeSegments(dataflowByModelAlias5, new SegmentRange.KafkaOffsetPartitionedSegmentRange(1L, 3L, createKafkaPartitionOffset(0, 100L), createKafkaPartitionOffset(0, 300L)), true, 1, randomUUIDStr);
        mergeSegments2.setStatus(SegmentStatusEnum.READY);
        NDataflowUpdate nDataflowUpdate5 = new NDataflowUpdate(dataflowByModelAlias5.getUuid());
        nDataflowUpdate5.setToUpdateSegs(new NDataSegment[]{mergeSegments2});
        nDataflowManager.updateDataflow(nDataflowUpdate5);
        NDataflow dataflowByModelAlias6 = nDataflowManager.getDataflowByModelAlias("nmodel_basic");
        Assert.assertEquals(3L, dataflowByModelAlias6.getSegments().size());
        Assert.assertTrue(mergeSegments.getSegRange().contains(appendSegment.getSegRange()));
        Assert.assertTrue(mergeSegments2.getSegRange().contains(appendSegment2.getSegRange()));
        NDataSegment mergeSegments3 = nDataflowManager.mergeSegments(dataflowByModelAlias6, new SegmentRange.KafkaOffsetPartitionedSegmentRange(1L, 3L, createKafkaPartitionOffset(0, 100L), createKafkaPartitionOffset(0, 300L)), true, 1, (String) null);
        mergeSegments3.setStatus(SegmentStatusEnum.READY);
        NDataflowUpdate nDataflowUpdate6 = new NDataflowUpdate(dataflowByModelAlias6.getUuid());
        nDataflowUpdate6.setToUpdateSegs(new NDataSegment[]{mergeSegments3});
        nDataflowManager.updateDataflow(nDataflowUpdate6);
        NDataflow dataflowByModelAlias7 = nDataflowManager.getDataflowByModelAlias("nmodel_basic");
        Assert.assertEquals(3L, dataflowByModelAlias7.getSegments().size());
        Assert.assertNull(dataflowByModelAlias7.getSegment(randomUUIDStr));
        Assert.assertNotNull(dataflowByModelAlias7.getSegment(mergeSegments3.getId()));
    }

    @Test
    public void testMergeKafkaSegments() throws IOException {
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(getTestConfig(), this.projectDefault);
        NDataflow dataflowByModelAlias = nDataflowManager.getDataflowByModelAlias("nmodel_basic");
        NDataflowUpdate nDataflowUpdate = new NDataflowUpdate(dataflowByModelAlias.getUuid());
        nDataflowUpdate.setToRemoveSegs((NDataSegment[]) dataflowByModelAlias.getSegments().toArray(new NDataSegment[0]));
        nDataflowManager.updateDataflow(nDataflowUpdate);
        NDataflow dataflowByModelAlias2 = nDataflowManager.getDataflowByModelAlias("nmodel_basic");
        Assert.assertEquals(0L, dataflowByModelAlias2.getSegments().size());
        NDataSegment appendSegment = nDataflowManager.appendSegment(dataflowByModelAlias2, new SegmentRange.KafkaOffsetPartitionedSegmentRange(1L, 2L, createKafkaPartitionOffset(0, 100L), createKafkaPartitionOffset(0, 200L)));
        appendSegment.setStatus(SegmentStatusEnum.READY);
        NDataflowUpdate nDataflowUpdate2 = new NDataflowUpdate(dataflowByModelAlias2.getUuid());
        nDataflowUpdate2.setToUpdateSegs(new NDataSegment[]{appendSegment});
        nDataflowManager.updateDataflow(nDataflowUpdate2);
        NDataflow dataflowByModelAlias3 = nDataflowManager.getDataflowByModelAlias("nmodel_basic");
        Assert.assertEquals(1L, dataflowByModelAlias3.getSegments().size());
        NDataSegment appendSegment2 = nDataflowManager.appendSegment(dataflowByModelAlias3, new SegmentRange.KafkaOffsetPartitionedSegmentRange(1L, 2L, createKafkaPartitionOffset(0, 200L), createKafkaPartitionOffset(0, 300L)));
        appendSegment2.setStatus(SegmentStatusEnum.READY);
        NDataflowUpdate nDataflowUpdate3 = new NDataflowUpdate(dataflowByModelAlias3.getUuid());
        nDataflowUpdate3.setToUpdateSegs(new NDataSegment[]{appendSegment2});
        nDataflowManager.updateDataflow(nDataflowUpdate3);
        NDataflow dataflowByModelAlias4 = nDataflowManager.getDataflowByModelAlias("nmodel_basic");
        Assert.assertEquals(2L, dataflowByModelAlias4.getSegments().size());
        NDataSegment appendSegment3 = nDataflowManager.appendSegment(dataflowByModelAlias4, new SegmentRange.KafkaOffsetPartitionedSegmentRange(1L, 2L, createKafkaPartitionOffset(0, 400L), createKafkaPartitionOffset(0, 500L)));
        appendSegment3.setStatus(SegmentStatusEnum.READY);
        NDataflowUpdate nDataflowUpdate4 = new NDataflowUpdate(dataflowByModelAlias4.getUuid());
        nDataflowUpdate4.setToUpdateSegs(new NDataSegment[]{appendSegment3});
        nDataflowManager.updateDataflow(nDataflowUpdate4);
        NDataflow dataflowByModelAlias5 = nDataflowManager.getDataflowByModelAlias("nmodel_basic");
        Assert.assertEquals(3L, dataflowByModelAlias5.getSegments().size());
        Assert.assertEquals(SegmentStatusEnum.NEW, nDataflowManager.mergeSegments(dataflowByModelAlias5, new SegmentRange.KafkaOffsetPartitionedSegmentRange(1L, 2L, createKafkaPartitionOffset(0, 100L), createKafkaPartitionOffset(0, 300L)), true, 1, RandomUtil.randomUUIDStr()).getStatus());
    }

    @Test
    public void testGetDataflow() {
        Assert.assertNotNull(NDataflowManager.getInstance(getTestConfig(), this.projectDefault).getDataflowByModelAlias("nmodel_basic"));
    }

    @Test
    public void testMergeSegmentsFail() throws IOException {
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(getTestConfig(), this.projectDefault);
        NDataflow dataflowByModelAlias = nDataflowManager.getDataflowByModelAlias("nmodel_basic");
        NDataflowUpdate nDataflowUpdate = new NDataflowUpdate(dataflowByModelAlias.getUuid());
        nDataflowUpdate.setToRemoveSegs((NDataSegment[]) dataflowByModelAlias.getSegments().toArray(new NDataSegment[0]));
        nDataflowManager.updateDataflow(nDataflowUpdate);
        NDataflow dataflowByModelAlias2 = nDataflowManager.getDataflowByModelAlias("nmodel_basic");
        Assert.assertEquals(0L, dataflowByModelAlias2.getSegments().size());
        NDataSegment appendSegment = nDataflowManager.appendSegment(dataflowByModelAlias2, new SegmentRange.TimePartitionedSegmentRange(0L, 1L));
        appendSegment.setStatus(SegmentStatusEnum.READY);
        NDataflowUpdate nDataflowUpdate2 = new NDataflowUpdate(dataflowByModelAlias2.getUuid());
        nDataflowUpdate2.setToUpdateSegs(new NDataSegment[]{appendSegment});
        nDataflowManager.updateDataflow(nDataflowUpdate2);
        NDataflow dataflowByModelAlias3 = nDataflowManager.getDataflowByModelAlias("nmodel_basic");
        Assert.assertEquals(1L, dataflowByModelAlias3.getSegments().size());
        NDataSegment appendSegment2 = nDataflowManager.appendSegment(dataflowByModelAlias3, new SegmentRange.TimePartitionedSegmentRange(1L, 2L));
        appendSegment2.setStatus(SegmentStatusEnum.READY);
        NDataflowUpdate nDataflowUpdate3 = new NDataflowUpdate(dataflowByModelAlias3.getUuid());
        nDataflowUpdate3.setToUpdateSegs(new NDataSegment[]{appendSegment2});
        nDataflowManager.updateDataflow(nDataflowUpdate3);
        NDataflow dataflowByModelAlias4 = nDataflowManager.getDataflowByModelAlias("nmodel_basic");
        Assert.assertEquals(2L, dataflowByModelAlias4.getSegments().size());
        NDataSegment appendSegment3 = nDataflowManager.appendSegment(dataflowByModelAlias4, new SegmentRange.TimePartitionedSegmentRange(5L, 6L));
        appendSegment3.setStatus(SegmentStatusEnum.READY);
        NDataflowUpdate nDataflowUpdate4 = new NDataflowUpdate(dataflowByModelAlias4.getUuid());
        nDataflowUpdate4.setToUpdateSegs(new NDataSegment[]{appendSegment3});
        nDataflowManager.updateDataflow(nDataflowUpdate4);
        NDataflow dataflowByModelAlias5 = nDataflowManager.getDataflowByModelAlias("nmodel_basic");
        Assert.assertEquals(3L, dataflowByModelAlias5.getSegments().size());
        try {
            nDataflowManager.mergeSegments(dataflowByModelAlias5, new SegmentRange.TimePartitionedSegmentRange(0L, 2L), false);
            Assert.fail("No exception thrown.");
        } catch (Exception e) {
            Assert.assertTrue(e instanceof IllegalArgumentException);
            Assert.assertTrue(e.getMessage().contains("Empty cube segment found"));
        }
        try {
            nDataflowManager.mergeSegments(dataflowByModelAlias5, new SegmentRange.TimePartitionedSegmentRange(0L, 6L), false);
            Assert.fail("No exception thrown.");
        } catch (Exception e2) {
            Assert.assertTrue(e2 instanceof KylinException);
            Assert.assertTrue(e2.getMessage().contains("Can't merge the selected segments, as there are gap(s) in between. Please check and try again."));
        }
        NDataLayout newDataLayout = NDataLayout.newDataLayout(appendSegment.getDataflow(), appendSegment.getId(), ((LayoutEntity) dataflowByModelAlias5.getIndexPlan().getAllLayouts().get(0)).getId());
        NDataflowUpdate nDataflowUpdate5 = new NDataflowUpdate(dataflowByModelAlias5.getUuid());
        nDataflowUpdate5.setToUpdateSegs(new NDataSegment[]{appendSegment});
        nDataflowUpdate5.setToAddOrUpdateLayouts(new NDataLayout[]{newDataLayout});
        nDataflowManager.updateDataflow(nDataflowUpdate5);
        try {
            nDataflowManager.mergeSegments(dataflowByModelAlias5, new SegmentRange.TimePartitionedSegmentRange(0L, 1L), true);
            Assert.fail("No exception thrown.");
        } catch (Exception e3) {
            Assert.assertTrue(e3 instanceof IllegalArgumentException);
            Assert.assertTrue(e3.getMessage().contains("must contain at least 2 segments, but there is 1"));
        }
        try {
            nDataflowManager.mergeSegments(dataflowByModelAlias5, new SegmentRange.TimePartitionedSegmentRange(0L, 2L), true);
            Assert.fail("No exception thrown.");
        } catch (Exception e4) {
            Assert.assertTrue(e4 instanceof KylinException);
            Assert.assertTrue(e4.getMessage().contains(ErrorCodeServer.SEGMENT_MERGE_CHECK_INDEX_ILLEGAL.getMsg(new Object[0])));
        }
    }

    @Test
    public void testUpdateCuboid() throws IOException {
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(getTestConfig(), this.projectDefault);
        NDataflow dataflowByModelAlias = nDataflowManager.getDataflowByModelAlias("nmodel_basic");
        Assert.assertEquals(8L, dataflowByModelAlias.getSegments().getFirstSegment().getLayoutsMap().size());
        NDataflowUpdate nDataflowUpdate = new NDataflowUpdate(dataflowByModelAlias.getUuid());
        nDataflowUpdate.setToRemoveLayouts(new NDataLayout[]{dataflowByModelAlias.getSegments().getFirstSegment().getLayout(10001L)});
        nDataflowManager.updateDataflow(nDataflowUpdate);
        NDataflow dataflowByModelAlias2 = nDataflowManager.getDataflowByModelAlias("nmodel_basic");
        Assert.assertEquals(7L, dataflowByModelAlias2.getSegments().getFirstSegment().getLayoutsMap().size());
        NDataSegment firstSegment = dataflowByModelAlias2.getSegments().getFirstSegment();
        NDataflowUpdate nDataflowUpdate2 = new NDataflowUpdate(dataflowByModelAlias2.getUuid());
        nDataflowUpdate2.setToAddOrUpdateLayouts(new NDataLayout[]{NDataLayout.newDataLayout(dataflowByModelAlias2, firstSegment.getId(), 10001L), NDataLayout.newDataLayout(dataflowByModelAlias2, firstSegment.getId(), 10002L)});
        nDataflowManager.updateDataflow(nDataflowUpdate2);
        Assert.assertEquals(8L, nDataflowManager.getDataflowByModelAlias("nmodel_basic").getSegments().getFirstSegment().getLayoutsMap().size());
    }

    @Test
    public void testConcurrentMergeAndMerge() throws Exception {
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(getTestConfig(), this.projectDefault);
        NDataflow dataflow = nDataflowManager.getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        NDataflowUpdate nDataflowUpdate = new NDataflowUpdate(dataflow.getUuid());
        nDataflowUpdate.setToRemoveSegs((NDataSegment[]) dataflow.getSegments().toArray(new NDataSegment[0]));
        nDataflowManager.updateDataflow(nDataflowUpdate);
        NDataflow dataflow2 = nDataflowManager.getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        Assert.assertEquals(0L, dataflow2.getSegments().size());
        long longValue = SegmentRange.dateToLong("2010-01-01").longValue();
        NDataSegment appendSegment = nDataflowManager.appendSegment(dataflow2, new SegmentRange.TimePartitionedSegmentRange(Long.valueOf(longValue), Long.valueOf(SegmentRange.dateToLong("2013-01-01").longValue())));
        appendSegment.setStatus(SegmentStatusEnum.READY);
        NDataflowUpdate nDataflowUpdate2 = new NDataflowUpdate(dataflow2.getUuid());
        nDataflowUpdate2.setToUpdateSegs(new NDataSegment[]{appendSegment});
        nDataflowManager.updateDataflow(nDataflowUpdate2);
        NDataflow dataflow3 = nDataflowManager.getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        long longValue2 = SegmentRange.dateToLong("2013-01-01").longValue();
        long longValue3 = SegmentRange.dateToLong("2015-01-01").longValue();
        NDataSegment appendSegment2 = nDataflowManager.appendSegment(dataflow3, new SegmentRange.TimePartitionedSegmentRange(Long.valueOf(longValue2), Long.valueOf(longValue3)));
        appendSegment2.setStatus(SegmentStatusEnum.READY);
        NDataflowUpdate nDataflowUpdate3 = new NDataflowUpdate(dataflow2.getUuid());
        nDataflowUpdate3.setToUpdateSegs(new NDataSegment[]{appendSegment2});
        nDataflowManager.updateDataflow(nDataflowUpdate3);
        NDataflow dataflow4 = nDataflowManager.getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        Assert.assertEquals(dataflow4.getSegments().size(), 2L);
        nDataflowManager.mergeSegments(dataflow4, new SegmentRange.TimePartitionedSegmentRange(Long.valueOf(longValue), Long.valueOf(longValue3)), true);
        Assert.assertEquals(nDataflowManager.getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa").getSegments().size(), 3L);
        Assert.assertNotNull(nDataflowManager.getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa").getSegments().get(2));
    }

    @Test
    @Ignore
    public void testConcurrency() throws IOException, InterruptedException {
        final KylinConfig testConfig = getTestConfig();
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(testConfig, this.projectDefault);
        NIndexPlanManager nIndexPlanManager = NIndexPlanManager.getInstance(testConfig, this.projectDefault);
        NProjectManager nProjectManager = NProjectManager.getInstance(testConfig);
        final String[] strArr = {"df1", "df2", "df3", "df4"};
        final int length = strArr.length;
        nProjectManager.getProject(this.projectDefault);
        IndexPlan indexPlanByModelAlias = nIndexPlanManager.getIndexPlanByModelAlias("nmodel_basic");
        ArrayList arrayList = new ArrayList();
        for (String str : strArr) {
            arrayList.add(nDataflowManager.createDataflow(indexPlanByModelAlias, "test_owner"));
        }
        final AtomicInteger atomicInteger = new AtomicInteger();
        final Vector vector = new Vector();
        Thread thread = new Thread() { // from class: org.apache.kylin.metadata.cube.model.NDataflowManagerTest.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    Random random = new Random();
                    while (atomicInteger.get() == 0) {
                        String str2 = strArr[random.nextInt(length)];
                        Thread.sleep(1L);
                    }
                } catch (Exception e) {
                    vector.add(e);
                }
            }
        };
        thread.start();
        Thread[] threadArr = new Thread[length];
        for (int i = 0; i < length; i++) {
            final String str2 = strArr[i];
            threadArr[i] = new Thread() { // from class: org.apache.kylin.metadata.cube.model.NDataflowManagerTest.2
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    try {
                        Random random = new Random();
                        for (int i2 = 0; i2 < 100; i2++) {
                            NDataflowManager nDataflowManager2 = NDataflowManager.getInstance(testConfig, NDataflowManagerTest.this.projectDefault);
                            nDataflowManager2.appendSegment(nDataflowManager2.getDataflow(str2), new SegmentRange.TimePartitionedSegmentRange(Long.valueOf(i2), Long.valueOf(i2 + 1)));
                            Thread.sleep(random.nextInt(1));
                        }
                    } catch (Exception e) {
                        vector.add(e);
                    }
                }
            };
            threadArr[i].start();
        }
        for (int i2 = 0; i2 < length; i2++) {
            threadArr[i2].join();
        }
        atomicInteger.incrementAndGet();
        thread.join();
        if (!vector.isEmpty()) {
            Assert.fail();
        }
        for (String str3 : strArr) {
            Assert.assertEquals(100L, nDataflowManager.getDataflow(str3).getSegments().size());
        }
    }

    @Test
    public void testRefreshSegment() throws IOException {
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(getTestConfig(), this.projectDefault);
        NDataflow dataflowByModelAlias = nDataflowManager.getDataflowByModelAlias("nmodel_basic");
        NDataSegment nDataSegment = (NDataSegment) dataflowByModelAlias.getSegments(new SegmentStatusEnum[]{SegmentStatusEnum.READY, SegmentStatusEnum.WARNING}).get(0);
        NDataSegment refreshSegment = nDataflowManager.refreshSegment(dataflowByModelAlias, nDataSegment.getSegRange());
        Assert.assertTrue(refreshSegment.getSegRange().equals(nDataSegment.getSegRange()));
        Assert.assertEquals(SegmentStatusEnum.NEW, refreshSegment.getStatus());
    }

    @Test
    public void testRefreshSegmentMultiPartition() throws IOException {
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(getTestConfig(), this.projectDefault);
        NDataflow dataflow = nDataflowManager.getDataflow("b780e4e4-69af-449e-b09f-05c90dfa04b6");
        NDataSegment nDataSegment = (NDataSegment) dataflow.getSegments(new SegmentStatusEnum[]{SegmentStatusEnum.READY, SegmentStatusEnum.WARNING}).get(0);
        NDataSegment refreshSegment = nDataflowManager.refreshSegment(dataflow, nDataSegment.getSegRange());
        Assert.assertEquals(nDataSegment.getSegRange(), refreshSegment.getSegRange());
        Assert.assertEquals(SegmentStatusEnum.NEW, refreshSegment.getStatus());
        Assert.assertEquals(0L, ((SegmentPartition) refreshSegment.getMultiPartitions().get(0)).getSourceCount());
        Assert.assertEquals(0L, ((SegmentPartition) refreshSegment.getMultiPartitions().get(0)).getStorageSize());
    }

    @Test
    public void testGetDataflow2() throws IOException {
        Assert.assertTrue("89af4ee2-2cdb-4b07-b39e-4c29856309aa".equals(NDataflowManager.getInstance(getTestConfig(), this.projectDefault).getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa").getUuid()));
    }

    @Test
    public void testCalculateHoles() throws IOException {
        KylinConfig testConfig = getTestConfig();
        NDataModelManager.getInstance(testConfig, this.projectDefault).updateDataModel("89af4ee2-2cdb-4b07-b39e-4c29856309aa", nDataModel -> {
            nDataModel.setManagementType(ManagementType.MODEL_BASED);
        });
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(testConfig, this.projectDefault);
        IndexPlan copy = NIndexPlanManager.getInstance(testConfig, this.projectDefault).getIndexPlanByModelAlias("nmodel_basic").copy();
        copy.setUuid(RandomUtil.randomUUIDStr());
        CubeTestUtils.createTmpModelAndCube(testConfig, copy);
        NDataflow createDataflow = nDataflowManager.createDataflow(copy, "test_owner");
        nDataflowManager.appendSegment(createDataflow, new SegmentRange.TimePartitionedSegmentRange(0L, 1L));
        nDataflowManager.appendSegment(createDataflow, new SegmentRange.TimePartitionedSegmentRange(10L, 100L));
        nDataflowManager.appendSegment(createDataflow, new SegmentRange.TimePartitionedSegmentRange(1000L, 10000L));
        Assert.assertEquals(2L, nDataflowManager.calculateHoles(copy.getUuid()).size());
        NDataflow dataflow = nDataflowManager.getDataflow(copy.getId());
        Assert.assertEquals(2L, nDataflowManager.calculateHoles(copy.getUuid(), dataflow.getSegments()).size());
        ArrayList newArrayList = Lists.newArrayList(dataflow.getSegments());
        newArrayList.add(nDataflowManager.newSegment(dataflow, new SegmentRange.TimePartitionedSegmentRange(1L, 9L)));
        Assert.assertEquals(2L, nDataflowManager.calculateHoles(copy.getUuid(), newArrayList).size());
        newArrayList.clear();
        newArrayList.addAll(dataflow.getSegments());
        newArrayList.add(nDataflowManager.newSegment(dataflow, new SegmentRange.TimePartitionedSegmentRange(1L, 10L)));
        Assert.assertEquals(1L, nDataflowManager.calculateHoles(copy.getUuid(), newArrayList).size());
    }

    @Test
    public void testCalculateHolesOfKafkaRange() throws IOException {
        KylinConfig testConfig = getTestConfig();
        NDataModelManager.getInstance(testConfig, this.projectDefault).updateDataModel("89af4ee2-2cdb-4b07-b39e-4c29856309aa", nDataModel -> {
            nDataModel.setManagementType(ManagementType.MODEL_BASED);
        });
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(testConfig, this.projectDefault);
        IndexPlan copy = NIndexPlanManager.getInstance(testConfig, this.projectDefault).getIndexPlanByModelAlias("nmodel_basic").copy();
        copy.setUuid(RandomUtil.randomUUIDStr());
        CubeTestUtils.createTmpModelAndCube(testConfig, copy);
        NDataflow createDataflow = nDataflowManager.createDataflow(copy, "test_owner");
        nDataflowManager.appendSegmentForStreaming(createDataflow, new SegmentRange.KafkaOffsetPartitionedSegmentRange(0L, 1L, createKafkaPartitionOffset(0, 100L), createKafkaPartitionOffset(0, 200L)));
        nDataflowManager.appendSegmentForStreaming(createDataflow, new SegmentRange.KafkaOffsetPartitionedSegmentRange(10L, 100L, createKafkaPartitionOffset(0, 200L), createKafkaPartitionOffset(0, 400L)));
        nDataflowManager.appendSegmentForStreaming(createDataflow, new SegmentRange.KafkaOffsetPartitionedSegmentRange(1000L, 10000L, createKafkaPartitionOffset(0, 400L), createKafkaPartitionOffset(0, 800L)));
        Assert.assertEquals(2L, nDataflowManager.calculateHoles(copy.getUuid()).size());
        NDataflow dataflow = nDataflowManager.getDataflow(copy.getId());
        Assert.assertEquals(2L, nDataflowManager.calculateHoles(copy.getUuid(), dataflow.getSegments()).size());
        ArrayList newArrayList = Lists.newArrayList(dataflow.getSegments());
        newArrayList.add(nDataflowManager.newSegment(dataflow, new SegmentRange.KafkaOffsetPartitionedSegmentRange(1L, 9L, createKafkaPartitionOffset(0, 100L), createKafkaPartitionOffset(0, 200L))));
        Assert.assertEquals(2L, nDataflowManager.calculateHoles(copy.getUuid(), newArrayList).size());
        newArrayList.clear();
        newArrayList.addAll(dataflow.getSegments());
        newArrayList.add(nDataflowManager.newSegment(dataflow, new SegmentRange.KafkaOffsetPartitionedSegmentRange(1L, 10L, createKafkaPartitionOffset(0, 100L), createKafkaPartitionOffset(0, 200L))));
        Assert.assertEquals(1L, nDataflowManager.calculateHoles(copy.getUuid(), newArrayList).size());
    }

    @Test
    public void testAppendSegmentForStreaming() throws IOException {
        KylinConfig testConfig = getTestConfig();
        NDataModelManager.getInstance(testConfig, this.projectDefault).updateDataModel("89af4ee2-2cdb-4b07-b39e-4c29856309aa", nDataModel -> {
            nDataModel.setManagementType(ManagementType.MODEL_BASED);
        });
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(testConfig, this.projectDefault);
        IndexPlan copy = NIndexPlanManager.getInstance(testConfig, this.projectDefault).getIndexPlanByModelAlias("nmodel_basic").copy();
        copy.setUuid(RandomUtil.randomUUIDStr());
        CubeTestUtils.createTmpModelAndCube(testConfig, copy);
        NDataflow createDataflow = nDataflowManager.createDataflow(copy, "test_owner");
        nDataflowManager.appendSegmentForStreaming(createDataflow, new SegmentRange.KafkaOffsetPartitionedSegmentRange(0L, 1L, createKafkaPartitionOffset(0, 100L), createKafkaPartitionOffset(0, 200L)));
        SegmentRange.KafkaOffsetPartitionedSegmentRange kafkaOffsetPartitionedSegmentRange = new SegmentRange.KafkaOffsetPartitionedSegmentRange(10L, 100L, createKafkaPartitionOffset(0, 200L), createKafkaPartitionOffset(0, 400L));
        String randomUUIDStr = RandomUtil.randomUUIDStr();
        NDataflow dataflow = nDataflowManager.getDataflow(createDataflow.getId());
        Assert.assertEquals(randomUUIDStr, nDataflowManager.appendSegmentForStreaming(dataflow, kafkaOffsetPartitionedSegmentRange, randomUUIDStr).getId());
        Assert.assertEquals(randomUUIDStr, nDataflowManager.appendSegmentForStreaming(nDataflowManager.getDataflow(dataflow.getId()), kafkaOffsetPartitionedSegmentRange, randomUUIDStr).getId());
        Assert.assertEquals(2L, nDataflowManager.getDataflow(r0.getId()).getSegments().size());
    }

    @Test
    public void testAppendSegmentOfSameRangeForStreaming() throws IOException {
        KylinConfig testConfig = getTestConfig();
        NDataModelManager.getInstance(testConfig, this.projectDefault).updateDataModel("89af4ee2-2cdb-4b07-b39e-4c29856309aa", nDataModel -> {
            nDataModel.setManagementType(ManagementType.MODEL_BASED);
        });
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(testConfig, this.projectDefault);
        IndexPlan copy = NIndexPlanManager.getInstance(testConfig, this.projectDefault).getIndexPlanByModelAlias("nmodel_basic").copy();
        copy.setUuid(RandomUtil.randomUUIDStr());
        CubeTestUtils.createTmpModelAndCube(testConfig, copy);
        NDataflow createDataflow = nDataflowManager.createDataflow(copy, "test_owner");
        nDataflowManager.appendSegmentForStreaming(createDataflow, new SegmentRange.KafkaOffsetPartitionedSegmentRange(0L, 1L, createKafkaPartitionOffset(0, 100L), createKafkaPartitionOffset(0, 200L)));
        NDataflow dataflow = nDataflowManager.getDataflow(createDataflow.getId());
        Assert.assertEquals(1L, nDataflowManager.getDataflow(dataflow.getId()).getSegments().size());
        NDataSegment appendSegmentForStreaming = nDataflowManager.appendSegmentForStreaming(dataflow, new SegmentRange.KafkaOffsetPartitionedSegmentRange(0L, 1L, createKafkaPartitionOffset(0, 100L), createKafkaPartitionOffset(0, 200L)));
        NDataflow dataflow2 = nDataflowManager.getDataflow(dataflow.getId());
        Assert.assertEquals("", appendSegmentForStreaming.getId());
        Assert.assertEquals(1L, nDataflowManager.getDataflow(dataflow2.getId()).getSegments().size());
        Assert.assertEquals(1L, nDataflowManager.getDataflow(dataflow2.getId()).getSegments().size());
        NDataSegment appendSegmentForStreaming2 = nDataflowManager.appendSegmentForStreaming(dataflow2, new SegmentRange.KafkaOffsetPartitionedSegmentRange(0L, 1L, createKafkaPartitionOffset(0, 100L), createKafkaPartitionOffset(0, 150L)));
        NDataflow dataflow3 = nDataflowManager.getDataflow(dataflow2.getId());
        Assert.assertEquals(150L, ((Long) nDataflowManager.getDataflow(dataflow3.getId()).getSegment(appendSegmentForStreaming2.getId()).getSegRange().getSourcePartitionOffsetEnd().get(0)).longValue());
        Assert.assertEquals(1L, nDataflowManager.getDataflow(dataflow3.getId()).getSegments().size());
        NDataSegment appendSegmentForStreaming3 = nDataflowManager.appendSegmentForStreaming(dataflow3, new SegmentRange.KafkaOffsetPartitionedSegmentRange(0L, 1L, createKafkaPartitionOffset(0, 100L), createKafkaPartitionOffset(0, 300L)));
        NDataflow dataflow4 = nDataflowManager.getDataflow(dataflow3.getId());
        Assert.assertEquals(1L, nDataflowManager.getDataflow(dataflow4.getId()).getSegments().size());
        Assert.assertEquals(300L, ((Long) nDataflowManager.getDataflow(dataflow4.getId()).getSegment(appendSegmentForStreaming3.getId()).getSegRange().getSourcePartitionOffsetEnd().get(0)).longValue());
        nDataflowManager.appendSegmentForStreaming(dataflow4, new SegmentRange.KafkaOffsetPartitionedSegmentRange(0L, 1L, createKafkaPartitionOffset(0, 500L), createKafkaPartitionOffset(0, 600L)));
        Assert.assertEquals(2L, nDataflowManager.getDataflow(nDataflowManager.getDataflow(dataflow4.getId()).getId()).getSegments().size());
    }

    @Test
    public void testRemoveLayouts() throws IOException {
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(getTestConfig(), this.projectDefault);
        NDataflow dataflowByModelAlias = nDataflowManager.getDataflowByModelAlias("nmodel_basic");
        int size = dataflowByModelAlias.getLastSegment().getSegDetails().getLayouts().size();
        NDataflow removeLayouts = nDataflowManager.removeLayouts(dataflowByModelAlias, Lists.newArrayList(new Long[]{1000001L, 1L}));
        Assert.assertEquals(size - 2, removeLayouts.getLastSegment().getSegDetails().getLayouts().size());
        Assert.assertEquals(size - 2, nDataflowManager.removeLayouts(removeLayouts, Lists.newArrayList(new Long[]{100000000L})).getLastSegment().getSegDetails().getLayouts().size());
    }

    @Test
    public void testCuboidsNotInCube() {
        Iterator it = NDataflowManager.getInstance(getTestConfig(), this.projectDefault).getDataflow(NIndexPlanManager.getInstance(getTestConfig(), this.projectDefault).updateIndexPlan("89af4ee2-2cdb-4b07-b39e-4c29856309aa", indexPlan -> {
            indexPlan.removeLayouts(Sets.newHashSet(new Long[]{1L}), true, true);
        }).getUuid()).getSegments().iterator();
        while (it.hasNext()) {
            Assert.assertFalse(((NDataSegment) it.next()).getLayoutsMap().containsKey(1L));
        }
    }

    @Test
    public void testGetModels() {
        Assert.assertEquals(8L, NDataflowManager.getInstance(getTestConfig(), this.projectDefault).listUnderliningDataModels().size());
        Assert.assertEquals(0L, NDataflowManager.getInstance(getTestConfig(), "ssb").listUnderliningDataModels().size());
    }

    @Test
    public void testBrokenDataFlow_WithBrokenModel() {
        NDataflow dataflow = NDataflowManager.getInstance(getTestConfig(), "broken_test").getDataflow("3f8941de-d01c-42b8-91b5-44646390864b");
        Assert.assertEquals(false, Boolean.valueOf(dataflow.isBroken()));
        Assert.assertEquals(true, Boolean.valueOf(dataflow.checkBrokenWithRelatedInfo()));
    }

    @Test
    public void testBrokenDataFlow_WithBrokenIndexPlan() {
        NDataflow dataflow = NDataflowManager.getInstance(getTestConfig(), "broken_test").getDataflow("039eef32-9691-4c88-93ba-d65c58a1ab7a");
        Assert.assertEquals(false, Boolean.valueOf(dataflow.isBroken()));
        Assert.assertEquals(true, Boolean.valueOf(dataflow.checkBrokenWithRelatedInfo()));
    }

    @Test
    public void testBrokenDataFlow() {
        NDataflow dataflow = NDataflowManager.getInstance(getTestConfig(), "broken_test").getDataflow("f1bb4bbd-a638-442b-a276-e301fde0d7f6");
        Assert.assertEquals(true, Boolean.valueOf(dataflow.isBroken()));
        Assert.assertEquals("f1bb4bbd-a638-442b-a276-e301fde0d7f6", dataflow.getId());
        this.thrown.expect(RuntimeException.class);
        this.thrown.expectMessage("call on Broken Entity's getAllColumns method");
        dataflow.getAllColumns();
    }

    @Test
    public void testGetDataflowByModelAlias_WithBrokenCubePlan() {
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(getTestConfig(), "broken_test");
        NDataflow dataflow = nDataflowManager.getDataflow("039eef32-9691-4c88-93ba-d65c58a1ab7a");
        Assert.assertEquals(false, Boolean.valueOf(dataflow.isBroken()));
        Assert.assertEquals(true, Boolean.valueOf(dataflow.checkBrokenWithRelatedInfo()));
        Assert.assertEquals(nDataflowManager.getDataflowByModelAlias("AUTO_MODEL_TEST_ACCOUNT_1"), dataflow);
    }

    @Test
    public void testGetDataflowByModelAlias_WithBrokenModel() {
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(getTestConfig(), "broken_test");
        NDataflow dataflow = nDataflowManager.getDataflow("3f8941de-d01c-42b8-91b5-44646390864b");
        Assert.assertEquals(false, Boolean.valueOf(dataflow.isBroken()));
        Assert.assertEquals(true, Boolean.valueOf(dataflow.checkBrokenWithRelatedInfo()));
        Assert.assertNull(nDataflowManager.getDataflowByModelAlias("AUTO_MODEL_TEST_COUNTRY_1"));
    }

    @Test
    public void testCacheReload_TableChanged() {
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(getTestConfig(), this.projectDefault);
        NDataflow dataflowByModelAlias = nDataflowManager.getDataflowByModelAlias("nmodel_basic_inner");
        NTableMetadataManager nTableMetadataManager = NTableMetadataManager.getInstance(getTestConfig(), this.projectDefault);
        TableDesc copyForWrite = nTableMetadataManager.copyForWrite(nTableMetadataManager.getTableDesc("DEFAULT.TEST_KYLIN_FACT"));
        copyForWrite.setTop(true);
        nTableMetadataManager.updateTableDesc(copyForWrite);
        Assert.assertNotSame(dataflowByModelAlias, nDataflowManager.getDataflowByModelAlias("nmodel_basic_inner"));
    }

    @Test
    public void testCacheReload_TableRemoved() {
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(getTestConfig(), this.projectDefault);
        NDataflow dataflowByModelAlias = nDataflowManager.getDataflowByModelAlias("nmodel_basic_inner");
        NTableMetadataManager.getInstance(getTestConfig(), this.projectDefault).removeSourceTable("DEFAULT.TEST_KYLIN_FACT");
        Assert.assertNotSame(dataflowByModelAlias, nDataflowManager.getDataflowByModelAlias("nmodel_basic_inner"));
    }

    @Test
    public void testJoinedFlatTableDescDiff() {
        KylinConfig testConfig = getTestConfig();
        testConfig.setProperty("kylin.engine.persist-flattable-enabled", "false");
        NDataSegment lastSegment = ((NDataflow) NDataflowManager.getInstance(testConfig, this.projectDefault).listAllDataflows().get(4)).getLastSegment();
        Assert.assertEquals("11124840-b3e3-43db-bcab-2b78da666d00", lastSegment.getId());
        NCubeJoinedFlatTableDesc nCubeJoinedFlatTableDesc = new NCubeJoinedFlatTableDesc(lastSegment);
        Assert.assertEquals(5L, nCubeJoinedFlatTableDesc.getUsedColumns().size());
        Assert.assertEquals(5L, nCubeJoinedFlatTableDesc.getAllColumns().size());
        testConfig.setProperty("kylin.engine.persist-flattable-enabled", "true");
        NCubeJoinedFlatTableDesc nCubeJoinedFlatTableDesc2 = new NCubeJoinedFlatTableDesc(lastSegment);
        Assert.assertEquals(26L, nCubeJoinedFlatTableDesc2.getAllColumns().size());
        Assert.assertEquals(5L, nCubeJoinedFlatTableDesc2.getUsedColumns().size());
    }

    @Test
    public void testGetFusionModelAlias() {
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(getTestConfig(), "streaming_test");
        Assert.assertEquals("fusion_model", nDataflowManager.getDataflow("14e00a6f-d910-14b6-ee67-e0a5775012c4").getFusionModelAlias());
        Assert.assertEquals("fusion_model", nDataflowManager.getDataflow("3d69e1c0-0165-c144-7dae-8ae5dc0cf16c").getFusionModelAlias());
        Assert.assertEquals("stream_merge", nDataflowManager.getDataflow("e78a89dd-847f-4574-8afa-8768b4228b72").getFusionModelAlias());
        NDataflowManager nDataflowManager2 = NDataflowManager.getInstance(getTestConfig(), this.projectDefault);
        Assert.assertEquals("nmodel_basic_inner", nDataflowManager2.getDataflow("741ca86a-1f13-46da-a59f-95fb68615e3a").getFusionModelAlias());
        NDataModelManager.getInstance(KylinConfig.getInstanceFromEnv(), this.projectDefault).dropModel("741ca86a-1f13-46da-a59f-95fb68615e3a");
        Assert.assertNull(nDataflowManager2.getDataflow("741ca86a-1f13-46da-a59f-95fb68615e3a").getFusionModelAlias());
    }
}
