package org.apache.kylin.rest.service;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.SystemPropertiesCache;
import org.apache.kylin.common.exception.KylinException;
import org.apache.kylin.common.exception.code.ErrorCodeServer;
import org.apache.kylin.common.msg.MsgPicker;
import org.apache.kylin.common.scheduler.EventBusFactory;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.RandomUtil;
import org.apache.kylin.guava30.shaded.common.collect.Lists;
import org.apache.kylin.guava30.shaded.common.collect.Sets;
import org.apache.kylin.job.constant.JobStatusEnum;
import org.apache.kylin.job.execution.JobTypeEnum;
import org.apache.kylin.junit.rule.TransactionExceptedException;
import org.apache.kylin.metadata.cube.model.NDataLayout;
import org.apache.kylin.metadata.cube.model.NDataSegment;
import org.apache.kylin.metadata.cube.model.NDataflow;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.cube.model.NIndexPlanManager;
import org.apache.kylin.metadata.cube.utils.StreamingUtils;
import org.apache.kylin.metadata.model.NDataModel;
import org.apache.kylin.metadata.model.NDataModelManager;
import org.apache.kylin.metadata.model.NTableMetadataManager;
import org.apache.kylin.metadata.model.PartitionDesc;
import org.apache.kylin.metadata.model.SegmentRange;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
import org.apache.kylin.metadata.recommendation.candidate.JdbcRawRecStore;
import org.apache.kylin.metadata.streaming.KafkaConfigManager;
import org.apache.kylin.metadata.streaming.StreamingJobRecord;
import org.apache.kylin.metadata.streaming.StreamingJobRecordManager;
import org.apache.kylin.metadata.streaming.StreamingJobStats;
import org.apache.kylin.metadata.streaming.StreamingJobStatsManager;
import org.apache.kylin.rest.config.initialize.ModelBrokenListener;
import org.apache.kylin.rest.request.StreamingJobActionEnum;
import org.apache.kylin.rest.request.StreamingJobFilter;
import org.apache.kylin.rest.response.DataResult;
import org.apache.kylin.rest.response.StreamingJobDataStatsResponse;
import org.apache.kylin.rest.response.StreamingJobResponse;
import org.apache.kylin.rest.util.AclEvaluate;
import org.apache.kylin.rest.util.AclUtil;
import org.apache.kylin.streaming.manager.StreamingJobManager;
import org.apache.kylin.streaming.metadata.StreamingJobMeta;
import org.apache.kylin.streaming.request.StreamingJobStatsRequest;
import org.apache.kylin.streaming.request.StreamingJobUpdateRequest;
import org.apache.kylin.streaming.request.StreamingSegmentRequest;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.springframework.beans.BeanUtils;
import org.springframework.test.util.ReflectionTestUtils;

/* loaded from: input_file:org/apache/kylin/rest/service/StreamingJobServiceTest.class */
public class StreamingJobServiceTest extends CSVSourceTestCase {
    private static String[] timeZones = {"GMT+8", "CST", "PST", "UTC"};
    private static String PROJECT = "streaming_test";
    private static String MODEL_ID = "e78a89dd-847f-4574-8afa-8768b4228b72";
    private static String DATAFLOW_ID = MODEL_ID;
    private final ModelBrokenListener modelBrokenListener = new ModelBrokenListener();

    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();

    @Rule
    public TransactionExceptedException thrown = TransactionExceptedException.none();

    @Mock
    protected IUserGroupService userGroupService = (IUserGroupService) Mockito.spy(NUserGroupService.class);

    @Mock
    private AclUtil aclUtil = (AclUtil) Mockito.spy(AclUtil.class);

    @Mock
    private AclEvaluate aclEvaluate = (AclEvaluate) Mockito.spy(AclEvaluate.class);

    @InjectMocks
    private StreamingJobService streamingJobService = (StreamingJobService) Mockito.spy(new StreamingJobService());

    @InjectMocks
    private IndexPlanService indexPlanService = (IndexPlanService) Mockito.spy(new IndexPlanService());

    @Before
    public void setup() {
        super.setup();
        SystemPropertiesCache.setProperty("HADOOP_USER_NAME", "root");
        ReflectionTestUtils.setField(this.aclEvaluate, "aclUtil", this.aclUtil);
        ReflectionTestUtils.setField(this.streamingJobService, "aclEvaluate", this.aclEvaluate);
        ReflectionTestUtils.setField(this.indexPlanService, "aclEvaluate", this.aclEvaluate);
        ReflectionTestUtils.setField(this.streamingJobService, "indexPlanService", this.indexPlanService);
        try {
            new JdbcRawRecStore(getTestConfig());
        } catch (Exception e) {
        }
        EventBusFactory.getInstance().register(this.modelBrokenListener, false);
    }

    @After
    public void tearDown() {
        getTestConfig().setProperty("kylin.metadata.semi-automatic-mode", "false");
        EventBusFactory.getInstance().unregister(this.modelBrokenListener);
        EventBusFactory.getInstance().restart();
        cleanupTestMetadata();
    }

    @Test
    public void testGetStreamingJobList() throws Exception {
        String jobId = StreamingUtils.getJobId(MODEL_ID, JobTypeEnum.STREAMING_BUILD.name());
        StreamingJobStatsManager createStatData = createStatData(jobId);
        DataResult streamingJobList = this.streamingJobService.getStreamingJobList(new StreamingJobFilter("", Collections.EMPTY_LIST, Collections.EMPTY_LIST, Collections.EMPTY_LIST, PROJECT, "last_modified", true), 0, 20);
        Assert.assertEquals(11L, streamingJobList.getTotalSize());
        Assert.assertTrue(!((StreamingJobResponse) ((List) streamingJobList.getValue()).get(0)).isModelBroken());
        Assert.assertNotNull(((StreamingJobResponse) ((List) streamingJobList.getValue()).get(0)).getPartitionDesc());
        Assert.assertEquals(6L, this.streamingJobService.getStreamingJobList(new StreamingJobFilter("stream_merge", Collections.EMPTY_LIST, Collections.EMPTY_LIST, Collections.EMPTY_LIST, PROJECT, "last_modified", true), 0, 20).getTotalSize());
        Assert.assertEquals(2L, this.streamingJobService.getStreamingJobList(new StreamingJobFilter("stream_merge1", Collections.EMPTY_LIST, Collections.EMPTY_LIST, Collections.EMPTY_LIST, PROJECT, "last_modified", true), 0, 20).getTotalSize());
        Assert.assertEquals(0L, this.streamingJobService.getStreamingJobList(new StreamingJobFilter("stream_merge2", Collections.EMPTY_LIST, Collections.EMPTY_LIST, Collections.EMPTY_LIST, PROJECT, "last_modified", true), 0, 20).getTotalSize());
        Assert.assertEquals(2L, this.streamingJobService.getStreamingJobList(new StreamingJobFilter("", Arrays.asList("stream_merge1"), Collections.EMPTY_LIST, Collections.EMPTY_LIST, PROJECT, "last_modified", true), 0, 20).getTotalSize());
        Assert.assertEquals(6L, ((List) this.streamingJobService.getStreamingJobList(new StreamingJobFilter("", Collections.EMPTY_LIST, Arrays.asList("STREAMING_BUILD"), Collections.EMPTY_LIST, PROJECT, "last_modified", true), 0, 20).getValue()).size());
        StreamingJobManager streamingJobManager = StreamingJobManager.getInstance(getTestConfig(), PROJECT);
        streamingJobManager.updateStreamingJob(MODEL_ID + "_build", streamingJobMeta -> {
            streamingJobMeta.setCurrentStatus(JobStatusEnum.RUNNING);
        });
        Assert.assertEquals(3L, this.streamingJobService.getStreamingJobList(new StreamingJobFilter("", Collections.EMPTY_LIST, Collections.EMPTY_LIST, Arrays.asList("RUNNING"), PROJECT, "last_modified", true), 0, 20).getTotalSize());
        DataResult streamingJobList2 = this.streamingJobService.getStreamingJobList(new StreamingJobFilter("", Collections.EMPTY_LIST, Collections.EMPTY_LIST, Collections.EMPTY_LIST, PROJECT, "last_modified", true), 0, 4);
        Assert.assertEquals(4L, ((List) streamingJobList2.getValue()).size());
        Assert.assertTrue(((StreamingJobResponse) ((List) streamingJobList2.getValue()).get(0)).getLastModified() >= ((StreamingJobResponse) ((List) streamingJobList2.getValue()).get(1)).getLastModified());
        Assert.assertTrue(((StreamingJobResponse) ((List) streamingJobList2.getValue()).get(1)).getLastModified() >= ((StreamingJobResponse) ((List) streamingJobList2.getValue()).get(2)).getLastModified());
        Assert.assertTrue(((StreamingJobResponse) ((List) streamingJobList2.getValue()).get(2)).getLastModified() >= ((StreamingJobResponse) ((List) streamingJobList2.getValue()).get(3)).getLastModified());
        DataResult streamingJobList3 = this.streamingJobService.getStreamingJobList(new StreamingJobFilter("", Collections.EMPTY_LIST, Collections.EMPTY_LIST, Collections.EMPTY_LIST, "", "last_modified", false), 0, 20);
        Assert.assertTrue(((StreamingJobResponse) ((List) streamingJobList3.getValue()).get(0)).getLastModified() <= ((StreamingJobResponse) ((List) streamingJobList3.getValue()).get(1)).getLastModified());
        Assert.assertTrue(((StreamingJobResponse) ((List) streamingJobList3.getValue()).get(1)).getLastModified() <= ((StreamingJobResponse) ((List) streamingJobList3.getValue()).get(2)).getLastModified());
        Assert.assertTrue(((StreamingJobResponse) ((List) streamingJobList3.getValue()).get(2)).getLastModified() <= ((StreamingJobResponse) ((List) streamingJobList3.getValue()).get(3)).getLastModified());
        DataResult streamingJobList4 = this.streamingJobService.getStreamingJobList(new StreamingJobFilter("", Collections.EMPTY_LIST, Collections.EMPTY_LIST, Collections.EMPTY_LIST, "", "last_modified", true), 0, 4);
        Assert.assertEquals(11L, streamingJobList4.getTotalSize());
        Assert.assertEquals(4L, ((List) streamingJobList4.getValue()).size());
        Assert.assertEquals(2L, ((List) this.streamingJobService.getStreamingJobList(r0, 0, 2).getValue()).size());
        createStatData.deleteAllStreamingJobStats();
        streamingJobManager.updateStreamingJob(MODEL_ID + "_build", streamingJobMeta2 -> {
            streamingJobMeta2.setCurrentStatus(JobStatusEnum.LAUNCHING_ERROR);
        });
        StreamingJobFilter streamingJobFilter = new StreamingJobFilter("", Collections.EMPTY_LIST, Collections.EMPTY_LIST, Arrays.asList("ERROR"), PROJECT, "last_modified", true);
        DataResult streamingJobList5 = this.streamingJobService.getStreamingJobList(streamingJobFilter, 0, 20);
        Assert.assertEquals(1L, streamingJobList5.getTotalSize());
        Assert.assertTrue(((StreamingJobResponse) ((List) streamingJobList5.getValue()).get(0)).isLaunchingError());
        Assert.assertEquals(JobStatusEnum.LAUNCHING_ERROR, streamingJobManager.getStreamingJobByUuid(MODEL_ID + "_build").getCurrentStatus());
        createStatData.deleteAllStreamingJobStats();
        createStatData.insert(new StreamingJobStats(jobId, PROJECT, 120L, Double.valueOf(32.22d), 60000L, -500L, -60L, Long.valueOf(System.currentTimeMillis() - 100)));
        DataResult streamingJobList6 = this.streamingJobService.getStreamingJobList(streamingJobFilter, 0, 1);
        Assert.assertEquals(1L, streamingJobList6.getTotalSize());
        Assert.assertEquals(0L, ((StreamingJobResponse) ((List) streamingJobList6.getValue()).get(0)).getDataLatency().longValue());
    }

    @Test
    public void testGetStreamingJobListByJobId() {
        String jobId = StreamingUtils.getJobId(MODEL_ID, JobTypeEnum.STREAMING_BUILD.name());
        DataResult streamingJobList = this.streamingJobService.getStreamingJobList(new StreamingJobFilter("", Collections.EMPTY_LIST, Collections.EMPTY_LIST, Collections.EMPTY_LIST, PROJECT, "last_modified", true, Collections.singletonList(jobId)), 0, 20);
        Assert.assertEquals(1L, streamingJobList.getTotalSize());
        Assert.assertEquals(((StreamingJobResponse) ((List) streamingJobList.getValue()).get(0)).getId(), jobId);
    }

    @Test
    public void testGetAllStreamingJobsByJobId() {
        String jobId = StreamingUtils.getJobId(MODEL_ID, JobTypeEnum.STREAMING_BUILD.name());
        List allStreamingJobsById = this.streamingJobService.getAllStreamingJobsById(PROJECT, Collections.singletonList(jobId));
        Assert.assertEquals(1L, allStreamingJobsById.size());
        Assert.assertEquals(((StreamingJobMeta) allStreamingJobsById.get(0)).getId(), jobId);
    }

    @Test
    public void testGetStreamingJobListByJobId_WithoutProject() {
        String jobId = StreamingUtils.getJobId(MODEL_ID, JobTypeEnum.STREAMING_BUILD.name());
        StreamingJobFilter streamingJobFilter = new StreamingJobFilter("", Collections.EMPTY_LIST, Collections.EMPTY_LIST, Collections.EMPTY_LIST, "", "last_modified", true, Collections.singletonList(jobId));
        Collections.singletonList(jobId);
        Assert.assertThrows(ErrorCodeServer.REQUEST_PARAMETER_EMPTY_OR_VALUE_EMPTY.getMsg(new Object[]{"project"}), KylinException.class, () -> {
            this.streamingJobService.getStreamingJobList(streamingJobFilter, 0, 20);
        });
    }

    @Test
    public void testGetAllStreamingJobsById_WithoutProject() {
        List singletonList = Collections.singletonList(StreamingUtils.getJobId(MODEL_ID, JobTypeEnum.STREAMING_BUILD.name()));
        Assert.assertThrows(ErrorCodeServer.REQUEST_PARAMETER_EMPTY_OR_VALUE_EMPTY.getMsg(new Object[]{"project"}), KylinException.class, () -> {
            this.streamingJobService.getAllStreamingJobsById("", singletonList);
        });
    }

    @Test
    public void testGetStreamingJobListOfIndex() {
        StreamingJobStatsManager createStatData = createStatData(StreamingUtils.getJobId(MODEL_ID, JobTypeEnum.STREAMING_BUILD.name()));
        Assert.assertEquals("stream_merge1", ((StreamingJobResponse) ((List) this.streamingJobService.getStreamingJobList(new StreamingJobFilter("stream_merge1", Collections.EMPTY_LIST, Collections.EMPTY_LIST, Collections.EMPTY_LIST, PROJECT, "last_modified", true), 0, 20).getValue()).get(0)).getModelName());
        Assert.assertEquals(1L, ((StreamingJobResponse) ((List) r0.getValue()).get(0)).getModelIndexes().intValue());
        Assert.assertEquals(11L, this.streamingJobService.getStreamingJobList(new StreamingJobFilter("", Collections.EMPTY_LIST, Collections.EMPTY_LIST, Collections.EMPTY_LIST, PROJECT, "last_modified", true), 0, 20).getTotalSize());
        NIndexPlanManager nIndexPlanManager = NIndexPlanManager.getInstance(getTestConfig(), PROJECT);
        Assert.assertEquals(4L, nIndexPlanManager.getIndexPlan("4965c827-fbb4-4ea1-a744-3f341a3b030d").getAllLayouts().size());
        Assert.assertEquals(4L, nIndexPlanManager.getIndexPlan("cd2b9a23-699c-4699-b0dd-38c9412b3dfd").getAllLayouts().size());
        createStatData.deleteAllStreamingJobStats();
    }

    @Test
    public void testSetModelInfo() {
        HashMap hashMap = new HashMap();
        StreamingJobMeta streamingJobByUuid = StreamingJobManager.getInstance(getTestConfig(), PROJECT).getStreamingJobByUuid(StreamingUtils.getJobId(MODEL_ID, JobTypeEnum.STREAMING_BUILD.name()));
        hashMap.put(MODEL_ID, ((NDataModelManager) this.streamingJobService.getManager(NDataModelManager.class, PROJECT)).getDataModelDesc("4965c827-fbb4-4ea1-a744-3f341a3b030d"));
        StreamingJobResponse streamingJobResponse = new StreamingJobResponse(streamingJobByUuid);
        this.streamingJobService.setModelInfo(streamingJobResponse, hashMap);
        Assert.assertEquals(64L, streamingJobResponse.getModelIndexes().intValue());
        Assert.assertEquals("SSB_STREAMING.LO_PARTITIONCOLUMN", streamingJobResponse.getPartitionDesc().getPartitionDateColumn());
        hashMap.clear();
        StreamingJobResponse streamingJobResponse2 = new StreamingJobResponse(streamingJobByUuid);
        this.streamingJobService.setModelInfo(streamingJobResponse2, hashMap);
        Assert.assertTrue(streamingJobResponse2.isModelBroken());
        NDataModel nDataModel = (NDataModel) Mockito.spy(NDataModel.class);
        nDataModel.setBroken(true);
        hashMap.put(MODEL_ID, nDataModel);
        StreamingJobResponse streamingJobResponse3 = new StreamingJobResponse(streamingJobByUuid);
        this.streamingJobService.setModelInfo(streamingJobResponse3, hashMap);
        Assert.assertTrue(streamingJobResponse3.isModelBroken());
        NDataModel nDataModel2 = (NDataModel) Mockito.spy(NDataModel.class);
        hashMap.clear();
        hashMap.put(MODEL_ID, nDataModel2);
        Mockito.when(Boolean.valueOf(this.streamingJobService.isBatchModelBroken(nDataModel2))).thenReturn(true);
        StreamingJobResponse streamingJobResponse4 = new StreamingJobResponse(streamingJobByUuid);
        this.streamingJobService.setModelInfo(streamingJobResponse4, hashMap);
        Assert.assertTrue(streamingJobResponse4.isModelBroken());
        hashMap.clear();
        hashMap.put(MODEL_ID, Mockito.spy(NDataModel.class));
        NDataflow nDataflow = new NDataflow();
        nDataflow.setBroken(true);
        NDataflowManager nDataflowManager = (NDataflowManager) Mockito.spy(NDataflowManager.getInstance(getTestConfig(), PROJECT));
        Mockito.when(this.streamingJobService.getManager(NDataflowManager.class, PROJECT)).thenReturn(nDataflowManager);
        Mockito.when(nDataflowManager.getDataflow(Mockito.anyString())).thenReturn(nDataflow);
        StreamingJobResponse streamingJobResponse5 = new StreamingJobResponse(streamingJobByUuid);
        this.streamingJobService.setModelInfo(streamingJobResponse5, hashMap);
        Assert.assertTrue(streamingJobResponse5.isModelBroken());
    }

    @Test
    public void testIsBatchModelBroken() {
        NDataModel nDataModel = (NDataModel) Mockito.spy(NDataModel.class);
        Mockito.when(Boolean.valueOf(nDataModel.isFusionModel())).thenReturn(false);
        Assert.assertFalse(this.streamingJobService.isBatchModelBroken(nDataModel));
    }

    @Test
    public void testIsBatchModelBroken1() {
        NDataModel nDataModel = (NDataModel) Mockito.spy(NDataModel.class);
        Mockito.when(Boolean.valueOf(nDataModel.isFusionModel())).thenReturn(true);
        nDataModel.setFusionId("4965c827-fbb4-4ea1-a744-3f341a3b030d");
        Assert.assertTrue(this.streamingJobService.isBatchModelBroken(nDataModel));
        Assert.assertTrue(this.streamingJobService.isBatchModelBroken((NDataModel) null));
    }

    @Test
    public void testGetStreamingJobDataStats() throws Exception {
        String jobId = StreamingUtils.getJobId(MODEL_ID, JobTypeEnum.STREAMING_BUILD.name());
        StreamingJobStatsManager mockStreamingJobDataStats = mockStreamingJobDataStats(jobId);
        StreamingJobDataStatsResponse streamingJobDataStats = this.streamingJobService.getStreamingJobDataStats(jobId, 1440);
        Assert.assertEquals("500,400", StringUtils.join(streamingJobDataStats.getDataLatencyHist(), ","));
        Assert.assertEquals("32,8", StringUtils.join(streamingJobDataStats.getConsumptionRateHist(), ","));
        Assert.assertEquals("1200,3200", StringUtils.join(streamingJobDataStats.getProcessingTimeHist(), ","));
        Assert.assertNotNull(this.streamingJobService.getStreamingJobDataStats(jobId, 4320));
        Assert.assertNotNull(this.streamingJobService.getStreamingJobDataStats(jobId, 10080));
        Assert.assertNull(this.streamingJobService.getStreamingJobDataStats(jobId, 0).getConsumptionRateHist());
        mockStreamingJobDataStats.dropTable();
    }

    @Test
    public void testGetStreamingJobDataStatsException() throws Exception {
        String jobId = StreamingUtils.getJobId(MODEL_ID, JobTypeEnum.STREAMING_BUILD.name());
        StreamingJobStatsManager mockStreamingJobDataStats = mockStreamingJobDataStats(jobId);
        try {
            try {
                this.streamingJobService.getStreamingJobDataStats(jobId, 12960);
                mockStreamingJobDataStats.dropTable();
            } catch (Exception e) {
                Assert.assertTrue(e instanceof KylinException);
                mockStreamingJobDataStats.dropTable();
            }
        } catch (Throwable th) {
            mockStreamingJobDataStats.dropTable();
            throw th;
        }
    }

    private StreamingJobStatsManager mockStreamingJobDataStats(String str) {
        getTestConfig().setMetadataUrl("test@jdbc,driverClassName=org.h2.Driver,url=jdbc:h2:mem:db_default;DB_CLOSE_DELAY=-1,username=sa,password=");
        StreamingJobStatsManager streamingJobStatsManager = StreamingJobStatsManager.getInstance();
        long currentTimeMillis = System.currentTimeMillis();
        streamingJobStatsManager.insert(new StreamingJobStats(str, PROJECT, 120L, Double.valueOf(32.22d), 1200L, 500L, 600L, Long.valueOf(currentTimeMillis - 300000)));
        streamingJobStatsManager.insert(new StreamingJobStats(str, PROJECT, 120L, Double.valueOf(8.17d), 3200L, 400L, 800L, Long.valueOf(currentTimeMillis - 400000)));
        return streamingJobStatsManager;
    }

    @Test
    public void testUpdateStreamingJobStatusToStart() throws Exception {
        this.streamingJobService.updateStreamingJobStatus(PROJECT, createJobList(MODEL_ID), "START");
        StreamingJobManager streamingJobManager = StreamingJobManager.getInstance(getTestConfig(), PROJECT);
        Assert.assertEquals(JobStatusEnum.RUNNING, streamingJobManager.getStreamingJobByUuid(StreamingUtils.getJobId(MODEL_ID, JobTypeEnum.STREAMING_BUILD.name())).getCurrentStatus());
        Assert.assertEquals(JobStatusEnum.RUNNING, streamingJobManager.getStreamingJobByUuid(StreamingUtils.getJobId(MODEL_ID, JobTypeEnum.STREAMING_MERGE.name())).getCurrentStatus());
    }

    @Test
    public void testUpdateStatusOfNullPrj() throws Exception {
        this.streamingJobService.updateStreamingJobStatus((String) null, createJobList(MODEL_ID), "START");
        StreamingJobManager streamingJobManager = StreamingJobManager.getInstance(getTestConfig(), PROJECT);
        Assert.assertEquals(JobStatusEnum.RUNNING, streamingJobManager.getStreamingJobByUuid(StreamingUtils.getJobId(MODEL_ID, JobTypeEnum.STREAMING_BUILD.name())).getCurrentStatus());
        Assert.assertEquals(JobStatusEnum.RUNNING, streamingJobManager.getStreamingJobByUuid(StreamingUtils.getJobId(MODEL_ID, JobTypeEnum.STREAMING_MERGE.name())).getCurrentStatus());
    }

    @Test
    public void testUpdateStatusOfEmptyProject() throws Exception {
        this.streamingJobService.updateStreamingJobStatus("", createJobList(MODEL_ID), "START");
        this.streamingJobService.updateStreamingJobStatus("", createJobList(MODEL_ID), "STOP");
        StreamingJobManager streamingJobManager = StreamingJobManager.getInstance(getTestConfig(), PROJECT);
        Assert.assertEquals(JobStatusEnum.STOPPED, streamingJobManager.getStreamingJobByUuid(StreamingUtils.getJobId(MODEL_ID, JobTypeEnum.STREAMING_BUILD.name())).getCurrentStatus());
        Assert.assertEquals(JobStatusEnum.STOPPED, streamingJobManager.getStreamingJobByUuid(StreamingUtils.getJobId(MODEL_ID, JobTypeEnum.STREAMING_MERGE.name())).getCurrentStatus());
    }

    @Test
    public void testUpdateStreamingJobStatusToStop() throws Exception {
        this.streamingJobService.updateStreamingJobStatus(PROJECT, createJobList(MODEL_ID), "START");
        this.streamingJobService.updateStreamingJobStatus(PROJECT, createJobList(MODEL_ID), "STOP");
        StreamingJobManager streamingJobManager = StreamingJobManager.getInstance(getTestConfig(), PROJECT);
        Assert.assertEquals(JobStatusEnum.STOPPED, streamingJobManager.getStreamingJobByUuid(StreamingUtils.getJobId(MODEL_ID, JobTypeEnum.STREAMING_BUILD.name())).getCurrentStatus());
        Assert.assertEquals(JobStatusEnum.STOPPED, streamingJobManager.getStreamingJobByUuid(StreamingUtils.getJobId(MODEL_ID, JobTypeEnum.STREAMING_MERGE.name())).getCurrentStatus());
    }

    private List<String> createJobList(String str) {
        return Arrays.asList(StreamingUtils.getJobId(str, JobTypeEnum.STREAMING_BUILD.name()), StreamingUtils.getJobId(str, JobTypeEnum.STREAMING_MERGE.name()));
    }

    @Test
    public void testUpdateStreamingJobParams() throws Exception {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        hashMap.put("spark.executor.memory", "2g");
        hashMap.put("spark.master", "yarn");
        hashMap.put("spark.driver.memory", "1g");
        hashMap.put("kylin.streaming.duration", "60");
        hashMap.put("spark.executor.cores", "1");
        hashMap.put("spark.executor.instances", "5");
        hashMap.put("kylin.streaming.job-retry-enabled", "true");
        hashMap.put("spark.sql.shuffle.partitions", "10");
        this.streamingJobService.updateStreamingJobParams(PROJECT, StreamingUtils.getJobId(MODEL_ID, JobTypeEnum.STREAMING_BUILD.name()), hashMap);
        hashMap2.put("spark.executor.memory", "3g");
        hashMap2.put("spark.master", "yarn");
        hashMap2.put("spark.driver.memory", "3g");
        hashMap2.put("kylin.streaming.segment-merge-threshold", "5");
        hashMap2.put("spark.executor.cores", "3");
        hashMap2.put("spark.executor.instances", "6");
        hashMap.put("kylin.streaming.job-retry-enabled", "true");
        hashMap2.put("spark.sql.shuffle.partitions", "20");
        this.streamingJobService.updateStreamingJobParams(PROJECT, StreamingUtils.getJobId(MODEL_ID, JobTypeEnum.STREAMING_MERGE.name()), hashMap2);
        StreamingJobManager streamingJobManager = StreamingJobManager.getInstance(getTestConfig(), PROJECT);
        Assert.assertEquals(hashMap.toString(), streamingJobManager.getStreamingJobByUuid(StreamingUtils.getJobId(MODEL_ID, JobTypeEnum.STREAMING_BUILD.name())).getParams().toString());
        Assert.assertEquals(hashMap2.toString(), streamingJobManager.getStreamingJobByUuid(StreamingUtils.getJobId(MODEL_ID, JobTypeEnum.STREAMING_MERGE.name())).getParams().toString());
        this.thrown.expect(KylinException.class);
        hashMap.put("kylin.streaming.table-refresh-interval", "2f");
        this.streamingJobService.updateStreamingJobParams(PROJECT, StreamingUtils.getJobId(MODEL_ID, JobTypeEnum.STREAMING_BUILD.name()), hashMap);
    }

    @Test
    public void testUpdateStreamingJobInfo() throws Exception {
        StreamingJobUpdateRequest streamingJobUpdateRequest = new StreamingJobUpdateRequest();
        streamingJobUpdateRequest.setProject(PROJECT);
        streamingJobUpdateRequest.setJobType(JobTypeEnum.STREAMING_BUILD.name());
        streamingJobUpdateRequest.setModelId(MODEL_ID);
        streamingJobUpdateRequest.setNodeInfo("10.3.1.68:7070");
        streamingJobUpdateRequest.setProcessId("9876");
        streamingJobUpdateRequest.setYarnAppUrl("http://spark1:8088/proxy/application_1616466883257_1384/");
        streamingJobUpdateRequest.setYarnAppId("application_1616466883257_1384");
        this.streamingJobService.updateStreamingJobInfo(streamingJobUpdateRequest);
        StreamingJobMeta streamingJobByUuid = StreamingJobManager.getInstance(getTestConfig(), PROJECT).getStreamingJobByUuid(StreamingUtils.getJobId(MODEL_ID, JobTypeEnum.STREAMING_BUILD.name()));
        Assert.assertEquals("10.3.1.68:7070", streamingJobByUuid.getNodeInfo());
        Assert.assertEquals("9876", streamingJobByUuid.getProcessId());
        Assert.assertEquals("http://spark1:8088/proxy/application_1616466883257_1384/", streamingJobByUuid.getYarnAppUrl());
        Assert.assertEquals("application_1616466883257_1384", streamingJobByUuid.getYarnAppId());
        Assert.assertNotNull(streamingJobByUuid.getLastUpdateTime());
    }

    @Test
    public void testAddSegmentForMerge() {
        SegmentRange.KafkaOffsetPartitionedSegmentRange kafkaOffsetPartitionedSegmentRange = new SegmentRange.KafkaOffsetPartitionedSegmentRange(1613957110000L, 1613957130000L, createKafkaPartitionsOffset(3, 100L), createKafkaPartitionsOffset(3, 300L));
        String randomUUIDStr = RandomUtil.randomUUIDStr();
        this.streamingJobService.addSegment(PROJECT, DATAFLOW_ID, kafkaOffsetPartitionedSegmentRange, "0", randomUUIDStr);
        NDataSegment segment = NDataflowManager.getInstance(getTestConfig(), PROJECT).getDataflow(DATAFLOW_ID).getSegment(randomUUIDStr);
        Assert.assertEquals(randomUUIDStr, segment.getId());
        Assert.assertEquals("1", segment.getAdditionalInfo().get("file_layer"));
        Assert.assertEquals("", this.streamingJobService.addSegment(PROJECT, "not_existed_model", kafkaOffsetPartitionedSegmentRange, "0", randomUUIDStr));
    }

    @Test
    public void testAppendSegment() {
        SegmentRange.KafkaOffsetPartitionedSegmentRange kafkaOffsetPartitionedSegmentRange = new SegmentRange.KafkaOffsetPartitionedSegmentRange(1613957140000L, 1613957150000L, createKafkaPartitionsOffset(3, 500L), createKafkaPartitionsOffset(3, 600L));
        String randomUUIDStr = RandomUtil.randomUUIDStr();
        this.streamingJobService.addSegment(PROJECT, DATAFLOW_ID, kafkaOffsetPartitionedSegmentRange, (String) null, randomUUIDStr);
        NDataSegment segment = NDataflowManager.getInstance(getTestConfig(), PROJECT).getDataflow(DATAFLOW_ID).getSegment(randomUUIDStr);
        Assert.assertEquals(randomUUIDStr, segment.getId());
        Assert.assertNull(segment.getAdditionalInfo().get("file_layer"));
        Assert.assertEquals("", this.streamingJobService.addSegment(PROJECT, "not_existed_model", kafkaOffsetPartitionedSegmentRange, (String) null, randomUUIDStr));
    }

    @Test
    public void testUpdateSegmentForOnline() throws Exception {
        this.streamingJobService.updateSegment(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b73", "c380dd2a-43b8-4268-b73d-2a5f76236901", (List) null, "ONLINE", 0L);
        NDataflow dataflow = NDataflowManager.getInstance(getTestConfig(), PROJECT).getDataflow("e78a89dd-847f-4574-8afa-8768b4228b73");
        Assert.assertEquals(SegmentStatusEnum.READY, dataflow.getSegment("c380dd2a-43b8-4268-b73d-2a5f76236901").getStatus());
        Assert.assertEquals(RealizationStatusEnum.ONLINE, dataflow.getStatus());
    }

    @Test
    public void testUpdateSegmentForCount() throws Exception {
        this.streamingJobService.updateSegment(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b73", "c380dd2a-43b8-4268-b73d-2a5f76236901", (List) null, "ONLINE", 100L);
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(getTestConfig(), PROJECT);
        Assert.assertEquals(100L, nDataflowManager.getDataflow("e78a89dd-847f-4574-8afa-8768b4228b73").getSegment("c380dd2a-43b8-4268-b73d-2a5f76236901").getSourceCount());
        StreamingSegmentRequest streamingSegmentRequest = new StreamingSegmentRequest(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b73", 200L);
        streamingSegmentRequest.setStatus("ONLINE");
        this.streamingJobService.updateSegment(streamingSegmentRequest.getProject(), streamingSegmentRequest.getDataflowId(), "c380dd2a-43b8-4268-b73d-2a5f76236901", (List) null, streamingSegmentRequest.getStatus(), streamingSegmentRequest.getSourceCount());
        Assert.assertEquals(200L, nDataflowManager.getDataflow(streamingSegmentRequest.getDataflowId()).getSegment("c380dd2a-43b8-4268-b73d-2a5f76236901").getSourceCount());
        Assert.assertEquals(200L, streamingSegmentRequest.getSourceCount());
        StreamingSegmentRequest streamingSegmentRequest2 = new StreamingSegmentRequest(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b73");
        streamingSegmentRequest2.setStatus("ONLINE");
        this.streamingJobService.updateSegment(streamingSegmentRequest2.getProject(), streamingSegmentRequest2.getDataflowId(), "c380dd2a-43b8-4268-b73d-2a5f76236901", (List) null, streamingSegmentRequest2.getStatus(), streamingSegmentRequest2.getSourceCount());
        Assert.assertEquals(200L, nDataflowManager.getDataflow(streamingSegmentRequest2.getDataflowId()).getSegment("c380dd2a-43b8-4268-b73d-2a5f76236901").getSourceCount());
        Assert.assertEquals(-1L, streamingSegmentRequest2.getSourceCount());
    }

    @Test
    public void testDeleteSegment() throws Exception {
        KylinConfig testConfig = getTestConfig();
        NDataSegment segment = NDataflowManager.getInstance(testConfig, PROJECT).getDataflow("e78a89dd-847f-4574-8afa-8768b4228b73").getSegment("c380dd2a-43b8-4268-b73d-2a5f76236901");
        Assert.assertNotNull(segment);
        this.streamingJobService.deleteSegment(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b73", Arrays.asList(segment));
        Assert.assertNull(NDataflowManager.getInstance(testConfig, PROJECT).getDataflow("e78a89dd-847f-4574-8afa-8768b4228b73").getSegment("c380dd2a-43b8-4268-b73d-2a5f76236901"));
    }

    @Test
    public void testUpdateLayout() throws Exception {
        NDataflow dataflow = NDataflowManager.getInstance(getTestConfig(), PROJECT).getDataflow("e78a89dd-847f-4574-8afa-8768b4228b72");
        NDataSegment segment = dataflow.getSegment("c380dd2a-43b8-4268-b73d-2a5f76236633");
        Assert.assertEquals(17L, segment.getLayoutSize());
        ArrayList arrayList = new ArrayList();
        arrayList.add(NDataLayout.newDataLayout(dataflow, segment.getId(), 10002L));
        this.streamingJobService.updateLayout(PROJECT, DATAFLOW_ID, arrayList);
        Assert.assertEquals(17L, NDataflowManager.getInstance(r0, PROJECT).getDataflow("e78a89dd-847f-4574-8afa-8768b4228b72").getSegment("c380dd2a-43b8-4268-b73d-2a5f76236633").getLayoutSize());
    }

    @Test
    public void testCollectStreamingJobStats() {
        this.streamingJobService.collectStreamingJobStats(new StreamingJobStatsRequest("e78a89dd-847f-4574-8afa-8768b4228b72_build", PROJECT, 123L, Double.valueOf(123.2d), 42L, 30L, 50L, 60L));
        StreamingJobMeta streamingJobByUuid = StreamingJobManager.getInstance(getTestConfig(), PROJECT).getStreamingJobByUuid("e78a89dd-847f-4574-8afa-8768b4228b72_build");
        String lastUpdateTime = streamingJobByUuid.getLastUpdateTime();
        int intValue = streamingJobByUuid.getLastBatchCount().intValue();
        Assert.assertNotNull(lastUpdateTime);
        Assert.assertEquals(123L, intValue);
    }

    @Test
    public void testCollectStreamingJobStatsException() {
        Mockito.when(this.streamingJobService.getStreamingJobStatsManager()).thenReturn((Object) null);
        try {
            this.streamingJobService.collectStreamingJobStats(new StreamingJobStatsRequest("e78a89dd-847f-4574-8afa-8768b4228b72_build", PROJECT, 123L, Double.valueOf(123.2d), 42L, 30L, 50L, 60L));
        } catch (Exception e) {
            Assert.fail();
        }
    }

    @Test
    public void testGetStreamingJobInfoOfNoData() {
        StreamingJobStatsManager.getInstance().deleteAllStreamingJobStats();
        Assert.assertEquals(JobStatusEnum.STOPPED, this.streamingJobService.getStreamingJobInfo("e78a89dd-847f-4574-8afa-8768b4228b72_build", PROJECT).getCurrentStatus());
        StreamingJobManager.getInstance(getTestConfig(), PROJECT).updateStreamingJob("e78a89dd-847f-4574-8afa-8768b4228b72_build", streamingJobMeta -> {
            streamingJobMeta.setCurrentStatus(JobStatusEnum.RUNNING);
            streamingJobMeta.setLastUpdateTime(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.getDefault(Locale.Category.FORMAT)).format(new Date()));
        });
        StreamingJobResponse streamingJobInfo = this.streamingJobService.getStreamingJobInfo("e78a89dd-847f-4574-8afa-8768b4228b72_build", PROJECT);
        Assert.assertEquals(JobStatusEnum.RUNNING, streamingJobInfo.getCurrentStatus());
        Assert.assertNotNull(streamingJobInfo.getLastStatusDuration());
        Assert.assertNull(streamingJobInfo.getDataLatency());
        Assert.assertNotNull(streamingJobInfo.getLastUpdateTime());
    }

    @Test
    public void testGetStreamingJobInfo() {
        createStatData("e78a89dd-847f-4574-8afa-8768b4228b72_build");
        Assert.assertEquals(JobStatusEnum.STOPPED, this.streamingJobService.getStreamingJobInfo("e78a89dd-847f-4574-8afa-8768b4228b72_build", PROJECT).getCurrentStatus());
        KylinConfig testConfig = getTestConfig();
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.getDefault(Locale.Category.FORMAT));
        StreamingJobManager.getInstance(testConfig, PROJECT).updateStreamingJob("e78a89dd-847f-4574-8afa-8768b4228b72_build", streamingJobMeta -> {
            streamingJobMeta.setCurrentStatus(JobStatusEnum.RUNNING);
            Calendar calendar = Calendar.getInstance();
            calendar.setTimeInMillis(System.currentTimeMillis() - 3600000);
            streamingJobMeta.setLastStartTime(simpleDateFormat.format(calendar.getTime()));
            calendar.setTimeInMillis(System.currentTimeMillis());
            streamingJobMeta.setLastUpdateTime(simpleDateFormat.format(calendar.getTime()));
        });
        StreamingJobResponse streamingJobInfo = this.streamingJobService.getStreamingJobInfo("e78a89dd-847f-4574-8afa-8768b4228b72_build", PROJECT);
        Assert.assertEquals(JobStatusEnum.RUNNING, streamingJobInfo.getCurrentStatus());
        Assert.assertNull(streamingJobInfo.getLastStatusDuration());
        Assert.assertNotNull(streamingJobInfo.getDataLatency());
        Assert.assertNotNull(streamingJobInfo.getLastUpdateTime());
        StreamingJobStatsManager.getInstance().deleteAllStreamingJobStats();
    }

    @Test
    public void testGetStreamingJobRecordList() throws Exception {
        createRecordData("e78a89dd-847f-4574-8afa-8768b4228b72_build");
        List streamingJobRecordList = this.streamingJobService.getStreamingJobRecordList("e78a89dd-847f-4574-8afa-8768b4228b72_build");
        Assert.assertEquals(3L, streamingJobRecordList.size());
        Assert.assertEquals("START", ((StreamingJobRecord) streamingJobRecordList.get(0)).getAction());
        Assert.assertEquals("STOP", ((StreamingJobRecord) streamingJobRecordList.get(1)).getAction());
        Assert.assertEquals("START", ((StreamingJobRecord) streamingJobRecordList.get(2)).getAction());
        Assert.assertTrue(((StreamingJobRecord) streamingJobRecordList.get(0)).getCreateTime().longValue() > ((StreamingJobRecord) streamingJobRecordList.get(1)).getCreateTime().longValue());
        Assert.assertTrue(((StreamingJobRecord) streamingJobRecordList.get(1)).getCreateTime().longValue() > ((StreamingJobRecord) streamingJobRecordList.get(2)).getCreateTime().longValue());
        StreamingJobStatsManager.getInstance().deleteAllStreamingJobStats();
    }

    private StreamingJobStatsManager createStatData(String str) {
        getTestConfig().setMetadataUrl("test@jdbc,driverClassName=org.h2.Driver,url=jdbc:h2:mem:db_default;DB_CLOSE_DELAY=-1,username=sa,password=");
        StreamingJobStatsManager streamingJobStatsManager = StreamingJobStatsManager.getInstance();
        long currentTimeMillis = System.currentTimeMillis();
        for (int i = 60; i > 0; i--) {
            streamingJobStatsManager.insert(new StreamingJobStats(str, PROJECT, 120L, Double.valueOf(32.22d), 60000L, 500L, 60000L, Long.valueOf(currentTimeMillis - (i * 1000))));
        }
        return streamingJobStatsManager;
    }

    private void createRecordData(String str) {
        getTestConfig().setMetadataUrl("test@jdbc,driverClassName=org.h2.Driver,url=jdbc:h2:mem:db_default;DB_CLOSE_DELAY=-1,username=sa,password=");
        StreamingJobRecord streamingJobRecord = new StreamingJobRecord();
        streamingJobRecord.setId(100L);
        streamingJobRecord.setJobId(str);
        streamingJobRecord.setAction("START");
        streamingJobRecord.setCreateTime(Long.valueOf(System.currentTimeMillis() - 90000));
        streamingJobRecord.setUpdateTime(Long.valueOf(System.currentTimeMillis() - 90000));
        streamingJobRecord.setProject(PROJECT);
        StreamingJobRecordManager streamingJobRecordManager = StreamingJobRecordManager.getInstance();
        streamingJobRecordManager.insert(streamingJobRecord);
        StreamingJobRecord streamingJobRecord2 = new StreamingJobRecord();
        BeanUtils.copyProperties(streamingJobRecord, streamingJobRecord2);
        streamingJobRecord2.setId(101L);
        streamingJobRecord2.setAction("STOP");
        streamingJobRecord2.setCreateTime(Long.valueOf(System.currentTimeMillis() - 80000));
        streamingJobRecordManager.insert(streamingJobRecord2);
        StreamingJobRecord streamingJobRecord3 = new StreamingJobRecord();
        BeanUtils.copyProperties(streamingJobRecord, streamingJobRecord3);
        streamingJobRecord3.setId(102L);
        streamingJobRecord3.setAction("START");
        streamingJobRecord3.setCreateTime(Long.valueOf(System.currentTimeMillis() - 70000));
        streamingJobRecordManager.insert(streamingJobRecord3);
    }

    @Test
    public void testForceStopStreamingJob() {
        StreamingJobManager streamingJobManager = StreamingJobManager.getInstance(getTestConfig(), PROJECT);
        streamingJobManager.updateStreamingJob("e78a89dd-847f-4574-8afa-8768b4228b72_build", streamingJobMeta -> {
            streamingJobMeta.setCurrentStatus(JobStatusEnum.RUNNING);
        });
        streamingJobManager.updateStreamingJob("e78a89dd-847f-4574-8afa-8768b4228b72_merge", streamingJobMeta2 -> {
            streamingJobMeta2.setCurrentStatus(JobStatusEnum.RUNNING);
        });
        this.streamingJobService.updateStreamingJobStatus(PROJECT, Arrays.asList(StreamingUtils.getJobId(MODEL_ID, JobTypeEnum.STREAMING_BUILD.name()), StreamingUtils.getJobId(MODEL_ID, JobTypeEnum.STREAMING_MERGE.name())), StreamingJobActionEnum.FORCE_STOP.name());
        StreamingJobMeta streamingJobByUuid = streamingJobManager.getStreamingJobByUuid("e78a89dd-847f-4574-8afa-8768b4228b72_build");
        StreamingJobMeta streamingJobByUuid2 = streamingJobManager.getStreamingJobByUuid("e78a89dd-847f-4574-8afa-8768b4228b72_merge");
        Assert.assertEquals(JobStatusEnum.STOPPED, streamingJobByUuid.getCurrentStatus());
        Assert.assertEquals(JobStatusEnum.STOPPED, streamingJobByUuid2.getCurrentStatus());
    }

    @Test
    public void testRestartStreamingJob() {
        StreamingJobManager streamingJobManager = StreamingJobManager.getInstance(getTestConfig(), PROJECT);
        streamingJobManager.updateStreamingJob("e78a89dd-847f-4574-8afa-8768b4228b72_build", streamingJobMeta -> {
            streamingJobMeta.setCurrentStatus(JobStatusEnum.RUNNING);
        });
        streamingJobManager.updateStreamingJob("e78a89dd-847f-4574-8afa-8768b4228b72_merge", streamingJobMeta2 -> {
            streamingJobMeta2.setCurrentStatus(JobStatusEnum.RUNNING);
        });
        this.streamingJobService.updateStreamingJobStatus(PROJECT, Arrays.asList(StreamingUtils.getJobId(MODEL_ID, JobTypeEnum.STREAMING_BUILD.name()), StreamingUtils.getJobId(MODEL_ID, JobTypeEnum.STREAMING_MERGE.name())), StreamingJobActionEnum.RESTART.name());
        StreamingJobMeta streamingJobByUuid = streamingJobManager.getStreamingJobByUuid("e78a89dd-847f-4574-8afa-8768b4228b72_build");
        StreamingJobMeta streamingJobByUuid2 = streamingJobManager.getStreamingJobByUuid("e78a89dd-847f-4574-8afa-8768b4228b72_merge");
        Assert.assertEquals(JobStatusEnum.RUNNING, streamingJobByUuid.getCurrentStatus());
        Assert.assertEquals(JobStatusEnum.RUNNING, streamingJobByUuid2.getCurrentStatus());
    }

    @Test
    public void testGetStreamingJobSimpleLog() throws IOException {
        String[] createStreamingLogTmpFile = createStreamingLogTmpFile("default", "e1ad7bb0-522e-456a-859d-2eab1df448de_build");
        String[] splitByWholeSeparatorPreserveAllTokens = org.apache.commons.lang.StringUtils.splitByWholeSeparatorPreserveAllTokens(this.streamingJobService.getStreamingJobSimpleLog("default", "e1ad7bb0-522e-456a-859d-2eab1df448de_build"), "\n");
        ArrayList newArrayList = Lists.newArrayList(createStreamingLogTmpFile);
        newArrayList.add("================================================================");
        Assert.assertTrue(Sets.newHashSet(newArrayList).containsAll(Sets.newHashSet(splitByWholeSeparatorPreserveAllTokens)));
    }

    @Test
    public void testGetStreamingJobAllLog() throws IOException {
        String[] createStreamingLogTmpFile = createStreamingLogTmpFile("default", "e1ad7bb0-522e-456a-859d-2eab1df448de_build");
        InputStream streamingJobAllLog = this.streamingJobService.getStreamingJobAllLog("default", "e1ad7bb0-522e-456a-859d-2eab1df448de_build");
        Throwable th = null;
        try {
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(streamingJobAllLog, Charset.defaultCharset()));
            Throwable th2 = null;
            try {
                try {
                    StringBuilder sb = new StringBuilder();
                    while (true) {
                        String readLine = bufferedReader.readLine();
                        if (readLine == null) {
                            break;
                        }
                        if (sb.length() > 0) {
                            sb.append('\n');
                        }
                        sb.append(readLine);
                    }
                    String sb2 = sb.toString();
                    if (bufferedReader != null) {
                        if (0 != 0) {
                            try {
                                bufferedReader.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            bufferedReader.close();
                        }
                    }
                    Assert.assertTrue(Arrays.deepEquals(createStreamingLogTmpFile, org.apache.commons.lang.StringUtils.splitByWholeSeparatorPreserveAllTokens(sb2, "\n")));
                } finally {
                }
            } catch (Throwable th4) {
                if (bufferedReader != null) {
                    if (th2 != null) {
                        try {
                            bufferedReader.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        bufferedReader.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (streamingJobAllLog != null) {
                if (0 != 0) {
                    try {
                        streamingJobAllLog.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    streamingJobAllLog.close();
                }
            }
        }
    }

    public String[] createStreamingLogTmpFile(String str, String str2) throws IOException {
        File newFile = this.temporaryFolder.newFile("driver." + System.currentTimeMillis() + ".log");
        for (int i = 0; i < 200; i++) {
            Files.write(newFile.toPath(), String.format(Locale.ROOT, "lines: %s\n", Integer.valueOf(i)).getBytes(Charset.defaultCharset()), StandardOpenOption.APPEND);
        }
        String[] strArr = (String[]) Files.readAllLines(newFile.toPath()).toArray(new String[0]);
        Path path = new Path(KylinConfig.getInstanceFromEnv().getStreamingJobTmpOutputStorePath(str, str2));
        FileSystem workingFileSystem = HadoopUtil.getWorkingFileSystem();
        workingFileSystem.mkdirs(path);
        workingFileSystem.copyFromLocalFile(new Path(newFile.getAbsolutePath()), path);
        return strArr;
    }

    @Test
    public void testCheckModelStatus() {
        try {
            this.streamingJobService.checkModelStatus(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72", StreamingJobManager.getInstance(getTestConfig(), PROJECT).getStreamingJobByUuid("e78a89dd-847f-4574-8afa-8768b4228b72_build").getJobType());
        } catch (Exception e) {
            Assert.fail();
        }
    }

    @Test
    public void testCheckModelStatus1() {
        KylinConfig testConfig = getTestConfig();
        StreamingJobMeta streamingJobByUuid = StreamingJobManager.getInstance(testConfig, PROJECT).getStreamingJobByUuid("e78a89dd-847f-4574-8afa-8768b4228b72_build");
        KafkaConfigManager.getInstance(testConfig, PROJECT).removeKafkaConfig("SSB.P_LINEORDER_STR");
        Assert.assertTrue(NDataModelManager.getInstance(testConfig, PROJECT).getDataModelDesc("e78a89dd-847f-4574-8afa-8768b4228b72").isBroken());
        this.thrown.expect(KylinException.class);
        this.streamingJobService.checkModelStatus(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72", streamingJobByUuid.getJobType());
    }

    @Test
    public void testCheckModelStatus2() {
        KylinConfig testConfig = getTestConfig();
        StreamingJobMeta streamingJobByUuid = StreamingJobManager.getInstance(testConfig, PROJECT).getStreamingJobByUuid("4965c827-fbb4-4ea1-a744-3f341a3b030d_build");
        NTableMetadataManager.getInstance(testConfig, PROJECT).removeSourceTable("SSB.LINEORDER_HIVE");
        NDataModelManager nDataModelManager = NDataModelManager.getInstance(testConfig, PROJECT);
        Assert.assertFalse(nDataModelManager.getDataModelDesc("4965c827-fbb4-4ea1-a744-3f341a3b030d").isBroken());
        Assert.assertTrue(nDataModelManager.getDataModelDesc("cd2b9a23-699c-4699-b0dd-38c9412b3dfd").isBroken());
        this.thrown.expect(KylinException.class);
        this.streamingJobService.checkModelStatus(PROJECT, "4965c827-fbb4-4ea1-a744-3f341a3b030d", streamingJobByUuid.getJobType());
    }

    @Test
    public void testCheckPartitionColumn() {
        NDataModelManager.getInstance(getTestConfig(), PROJECT).updateDataModel("4965c827-fbb4-4ea1-a744-3f341a3b030d", nDataModel -> {
            nDataModel.setPartitionDesc((PartitionDesc) null);
        });
        this.thrown.expect(KylinException.class);
        this.thrown.expectMessage(MsgPicker.getMsg().getPartitionColumnStartError());
        this.streamingJobService.launchStreamingJob(PROJECT, "4965c827-fbb4-4ea1-a744-3f341a3b030d", JobTypeEnum.STREAMING_BUILD);
    }

    @Test
    public void testCheckPartitionColumn1() {
        NDataModelManager.getInstance(getTestConfig(), PROJECT).updateDataModel("4965c827-fbb4-4ea1-a744-3f341a3b030d", nDataModel -> {
            nDataModel.setPartitionDesc((PartitionDesc) Mockito.spy(PartitionDesc.class));
        });
        this.thrown.expect(KylinException.class);
        this.thrown.expectMessage(MsgPicker.getMsg().getPartitionColumnStartError());
        this.streamingJobService.launchStreamingJob(PROJECT, "4965c827-fbb4-4ea1-a744-3f341a3b030d", JobTypeEnum.STREAMING_BUILD);
    }

    @Test
    public void testCheckPartitionColumn2() {
        NDataModelManager.getInstance(getTestConfig(), PROJECT).updateDataModel("4965c827-fbb4-4ea1-a744-3f341a3b030d", nDataModel -> {
            PartitionDesc partitionDesc = (PartitionDesc) Mockito.spy(PartitionDesc.class);
            partitionDesc.setPartitionDateFormat("yyyy/MM/dd");
            nDataModel.setPartitionDesc(partitionDesc);
        });
        this.thrown.expect(KylinException.class);
        this.thrown.expectMessage(MsgPicker.getMsg().getPartitionColumnStartError());
        this.streamingJobService.launchStreamingJob(PROJECT, "4965c827-fbb4-4ea1-a744-3f341a3b030d", JobTypeEnum.STREAMING_BUILD);
    }

    @Test
    public void testCheckJobExecutionId() {
        String str = DATAFLOW_ID + "_build";
        this.streamingJobService.checkJobExecutionId(PROJECT, str, (Integer) null);
        this.thrown.expect(IllegalStateException.class);
        this.streamingJobService.checkJobExecutionId(PROJECT, str, -1);
    }
}
