package org.apache.kylin.rest.service;

import java.io.InputStream;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.TimeZone;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.exception.KylinException;
import org.apache.kylin.common.exception.ServerErrorCode;
import org.apache.kylin.common.exception.code.ErrorCodeServer;
import org.apache.kylin.common.msg.Message;
import org.apache.kylin.common.msg.MsgPicker;
import org.apache.kylin.guava30.shaded.common.base.Preconditions;
import org.apache.kylin.job.constant.JobStatusEnum;
import org.apache.kylin.job.execution.JobTypeEnum;
import org.apache.kylin.job.execution.NExecutableManager;
import org.apache.kylin.metadata.cube.model.NDataLayout;
import org.apache.kylin.metadata.cube.model.NDataSegment;
import org.apache.kylin.metadata.cube.model.NDataflow;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.cube.model.NDataflowUpdate;
import org.apache.kylin.metadata.cube.model.NIndexPlanManager;
import org.apache.kylin.metadata.cube.utils.StreamingUtils;
import org.apache.kylin.metadata.model.FusionModelManager;
import org.apache.kylin.metadata.model.NDataModel;
import org.apache.kylin.metadata.model.NDataModelManager;
import org.apache.kylin.metadata.model.SegmentRange;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
import org.apache.kylin.metadata.project.NProjectManager;
import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
import org.apache.kylin.metadata.streaming.StreamingJobRecord;
import org.apache.kylin.metadata.streaming.StreamingJobRecordManager;
import org.apache.kylin.metadata.streaming.StreamingJobStats;
import org.apache.kylin.metadata.streaming.StreamingJobStatsManager;
import org.apache.kylin.rest.request.StreamingJobActionEnum;
import org.apache.kylin.rest.request.StreamingJobFilter;
import org.apache.kylin.rest.response.DataResult;
import org.apache.kylin.rest.response.StreamingJobDataStatsResponse;
import org.apache.kylin.rest.response.StreamingJobResponse;
import org.apache.kylin.rest.util.AclEvaluate;
import org.apache.kylin.rest.util.ModelUtils;
import org.apache.kylin.rest.util.PagingUtil;
import org.apache.kylin.streaming.jobs.scheduler.StreamingScheduler;
import org.apache.kylin.streaming.manager.StreamingJobManager;
import org.apache.kylin.streaming.metadata.StreamingJobMeta;
import org.apache.kylin.streaming.request.StreamingJobStatsRequest;
import org.apache.kylin.streaming.request.StreamingJobUpdateRequest;
import org.apache.kylin.streaming.util.MetaInfoUpdater;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;

@Component("streamingJobService")
/* loaded from: input_file:org/apache/kylin/rest/service/StreamingJobService.class */
public class StreamingJobService extends BasicService {
    private static final Logger logger = LoggerFactory.getLogger(StreamingJobService.class);

    @Autowired
    @Qualifier("indexPlanService")
    IndexPlanService indexPlanService;

    @Autowired
    private AclEvaluate aclEvaluate;

    public void launchStreamingJob(String str, String str2, JobTypeEnum jobTypeEnum) {
        checkModelStatus(str, str2, jobTypeEnum);
        ModelUtils.checkPartitionColumn(str, str2, MsgPicker.getMsg().getPartitionColumnStartError());
        initDefaultParser(str);
        StreamingScheduler.getInstance(str).submitJob(str, str2, jobTypeEnum);
    }

    public void checkModelStatus(String str, String str2, JobTypeEnum jobTypeEnum) {
        String jobId = StreamingUtils.getJobId(str2, jobTypeEnum.name());
        KylinConfig instanceFromEnv = KylinConfig.getInstanceFromEnv();
        StreamingJobMeta streamingJobByUuid = ((StreamingJobManager) getManager(StreamingJobManager.class, str)).getStreamingJobByUuid(jobId);
        NDataModel dataModelDesc = NDataModelManager.getInstance(instanceFromEnv, streamingJobByUuid.getProject()).getDataModelDesc(streamingJobByUuid.getModelId());
        if (dataModelDesc.isBroken() || isBatchModelBroken(dataModelDesc)) {
            throw new KylinException(ServerErrorCode.JOB_START_FAILURE, String.format(Locale.ROOT, MsgPicker.getMsg().getJobBrokenModelStartFailure(), dataModelDesc.getAlias()));
        }
    }

    public void stopStreamingJob(String str, String str2, JobTypeEnum jobTypeEnum) {
        StreamingScheduler.getInstance(str).stopJob(str2, jobTypeEnum);
    }

    public void forceStopStreamingJob(String str, String str2, JobTypeEnum jobTypeEnum) {
        StreamingScheduler streamingScheduler = StreamingScheduler.getInstance(str);
        streamingScheduler.skipJobListener(str, StreamingUtils.getJobId(str2, jobTypeEnum.name()), true);
        try {
            MetaInfoUpdater.updateJobState(str, StreamingUtils.getJobId(str2, jobTypeEnum.name()), JobStatusEnum.STOPPING);
            streamingScheduler.killJob(str2, jobTypeEnum, JobStatusEnum.STOPPED);
            streamingScheduler.skipJobListener(str, StreamingUtils.getJobId(str2, jobTypeEnum.name()), false);
        } catch (Throwable th) {
            streamingScheduler.skipJobListener(str, StreamingUtils.getJobId(str2, jobTypeEnum.name()), false);
            throw th;
        }
    }

    public void updateStreamingJobParams(String str, String str2, Map<String, String> map) {
        this.aclEvaluate.checkProjectOperationPermission(str);
        checkJobParams(map);
        EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
            StreamingJobManager.getInstance(KylinConfig.getInstanceFromEnv(), str).updateStreamingJob(str2, streamingJobMeta -> {
                streamingJobMeta.setParams(map);
            });
            return null;
        }, str);
    }

    private void checkJobParams(Map<String, String> map) {
        StreamingUtils.parseTableRefreshInterval(map.get("kylin.streaming.table-refresh-interval"));
    }

    public void updateStreamingJobStatus(String str, List<String> list, String str2) {
        StreamingJobActionEnum.validate(str2);
        String str3 = str;
        Map map = (Map) getAllStreamingJobs(str).stream().collect(Collectors.toMap((v0) -> {
            return v0.getUuid();
        }, streamingJobMeta -> {
            return streamingJobMeta;
        }));
        for (String str4 : list) {
            if (map.containsKey(str4)) {
                str3 = ((StreamingJobMeta) map.get(str4)).getProject();
            }
            this.aclEvaluate.checkProjectOperationPermission(str3);
            String[] split = str4.split("\\_");
            String str5 = split[0];
            String str6 = "STREAMING_" + split[1].toUpperCase(Locale.ROOT);
            switch (StreamingJobActionEnum.valueOf(str2)) {
                case START:
                    launchStreamingJob(str3, str5, JobTypeEnum.valueOf(str6));
                    break;
                case STOP:
                    stopStreamingJob(str3, str5, JobTypeEnum.valueOf(str6));
                    break;
                case FORCE_STOP:
                    forceStopStreamingJob(str3, str5, JobTypeEnum.valueOf(str6));
                    break;
                case RESTART:
                    forceStopStreamingJob(str3, str5, JobTypeEnum.valueOf(str6));
                    launchStreamingJob(str3, str5, JobTypeEnum.valueOf(str6));
                    break;
            }
        }
    }

    public StreamingJobMeta updateStreamingJobInfo(StreamingJobUpdateRequest streamingJobUpdateRequest) {
        String project = streamingJobUpdateRequest.getProject();
        String modelId = streamingJobUpdateRequest.getModelId();
        String jobType = streamingJobUpdateRequest.getJobType();
        return (StreamingJobMeta) EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
            return StreamingJobManager.getInstance(KylinConfig.getInstanceFromEnv(), project).updateStreamingJob(StreamingUtils.getJobId(modelId, jobType), streamingJobMeta -> {
                streamingJobMeta.setProcessId(streamingJobUpdateRequest.getProcessId());
                streamingJobMeta.setNodeInfo(streamingJobUpdateRequest.getNodeInfo());
                streamingJobMeta.setYarnAppId(streamingJobUpdateRequest.getYarnAppId());
                streamingJobMeta.setYarnAppUrl(streamingJobUpdateRequest.getYarnAppUrl());
                streamingJobMeta.setLastUpdateTime(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.getDefault(Locale.Category.FORMAT)).format(new Date()));
                Integer jobExecutionId = streamingJobMeta.getJobExecutionId();
                streamingJobMeta.setJobExecutionId(Integer.valueOf(jobExecutionId == null ? 1 : jobExecutionId.intValue() + 1));
            });
        }, project);
    }

    public List<StreamingJobRecord> getStreamingJobRecordList(String str) {
        return StreamingJobRecordManager.getInstance().queryByJobId(str);
    }

    public String addSegment(String str, String str2, SegmentRange<?> segmentRange, String str3, String str4) {
        if (StringUtils.isEmpty(str3)) {
            return getSegmentId((NDataSegment) EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
                NDataflowManager nDataflowManager = NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), str);
                NDataflow dataflow = nDataflowManager.getDataflow(str2);
                if (dataflow != null) {
                    return nDataflowManager.appendSegmentForStreaming(dataflow, segmentRange, str4);
                }
                return null;
            }, str));
        }
        int parseInt = Integer.parseInt(str3) + 1;
        return getSegmentId((NDataSegment) EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
            NDataflowManager nDataflowManager = NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), str);
            NDataflow dataflow = nDataflowManager.getDataflow(str2);
            if (dataflow != null) {
                return nDataflowManager.mergeSegments(dataflow, segmentRange, true, Integer.valueOf(parseInt), str4);
            }
            return null;
        }, str));
    }

    private String getSegmentId(NDataSegment nDataSegment) {
        return nDataSegment != null ? nDataSegment.getId() : "";
    }

    public void updateSegment(String str, String str2, String str3, List<NDataSegment> list, String str4, Long l) {
        EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
            Long l2 = -1L;
            NDataflowManager nDataflowManager = NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), str);
            NDataflow dataflow = nDataflowManager.getDataflow(str2);
            if (dataflow != null) {
                NDataSegment segment = dataflow.copy().getSegment(str3);
                if (!l2.equals(l)) {
                    segment.setSourceCount(l.longValue());
                }
                segment.setStatus(SegmentStatusEnum.READY);
                NDataflowUpdate nDataflowUpdate = new NDataflowUpdate(str2);
                nDataflowUpdate.setToUpdateSegs(new NDataSegment[]{segment});
                if (list != null) {
                    nDataflowUpdate.setToRemoveSegs((NDataSegment[]) list.toArray(new NDataSegment[0]));
                }
                if (!StringUtils.isEmpty(str4)) {
                    nDataflowUpdate.setStatus(RealizationStatusEnum.valueOf(str4));
                }
                nDataflowManager.updateDataflow(nDataflowUpdate);
            }
            return 0;
        }, str);
    }

    public void deleteSegment(String str, String str2, List<NDataSegment> list) {
        EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
            NDataflowManager nDataflowManager = NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), str);
            NDataflowUpdate nDataflowUpdate = new NDataflowUpdate(str2);
            if (list != null) {
                nDataflowUpdate.setToRemoveSegs((NDataSegment[]) list.toArray(new NDataSegment[0]));
            }
            if (nDataflowManager.getDataflow(str2) != null) {
                nDataflowManager.updateDataflow(nDataflowUpdate);
            }
            return 0;
        }, str);
    }

    public void updateLayout(String str, String str2, List<NDataLayout> list) {
        EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
            NDataflowUpdate nDataflowUpdate = new NDataflowUpdate(str2);
            nDataflowUpdate.setToAddOrUpdateLayouts((NDataLayout[]) list.toArray(new NDataLayout[0]));
            NDataflowManager nDataflowManager = NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), str);
            if (nDataflowManager.getDataflow(str2) != null) {
                nDataflowManager.updateDataflow(nDataflowUpdate);
            }
            return 0;
        }, str);
    }

    public void collectStreamingJobStats(StreamingJobStatsRequest streamingJobStatsRequest) {
        StreamingJobStatsManager streamingJobStatsManager = getStreamingJobStatsManager();
        String jobId = streamingJobStatsRequest.getJobId();
        String project = streamingJobStatsRequest.getProject();
        Long batchRowNum = streamingJobStatsRequest.getBatchRowNum();
        try {
            streamingJobStatsManager.insert(new StreamingJobStats(jobId, project, batchRowNum, streamingJobStatsRequest.getRowsPerSecond(), streamingJobStatsRequest.getProcessingTime(), streamingJobStatsRequest.getMinDataLatency(), streamingJobStatsRequest.getMaxDataLatency(), streamingJobStatsRequest.getTriggerStartTime()));
        } catch (Exception e) {
            logger.error("Write streaming job stats failed...");
        }
        Date date = new Date(System.currentTimeMillis());
        EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
            KylinConfig instanceFromEnv = KylinConfig.getInstanceFromEnv();
            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.getDefault(Locale.Category.FORMAT));
            simpleDateFormat.setTimeZone(TimeZone.getTimeZone(instanceFromEnv.getTimeZone()));
            StreamingJobManager.getInstance(instanceFromEnv, project).updateStreamingJob(jobId, streamingJobMeta -> {
                streamingJobMeta.setLastUpdateTime(simpleDateFormat.format(date));
                streamingJobMeta.setLastBatchCount(Integer.valueOf(batchRowNum.intValue()));
            });
            return null;
        }, project);
    }

    public StreamingJobStatsManager getStreamingJobStatsManager() {
        return StreamingJobStatsManager.getInstance();
    }

    public DataResult<List<StreamingJobResponse>> getStreamingJobList(StreamingJobFilter streamingJobFilter, int i, int i2) {
        if (!StringUtils.isEmpty(streamingJobFilter.getProject())) {
            this.aclEvaluate.checkProjectOperationPermission(streamingJobFilter.getProject());
        }
        List<StreamingJobMeta> allStreamingJobsById = CollectionUtils.isNotEmpty(streamingJobFilter.getJobIds()) ? getAllStreamingJobsById(streamingJobFilter.getProject(), streamingJobFilter.getJobIds()) : getAllStreamingJobs(streamingJobFilter.getProject());
        Map queryDataLatenciesByJobIds = StreamingJobStatsManager.getInstance().queryDataLatenciesByJobIds((List) allStreamingJobsById.stream().filter(streamingJobMeta -> {
            return JobTypeEnum.STREAMING_BUILD == streamingJobMeta.getJobType();
        }).map(streamingJobMeta2 -> {
            return StreamingUtils.getJobId(streamingJobMeta2.getModelId(), streamingJobMeta2.getJobType().name());
        }).collect(Collectors.toList()));
        List list = (List) ((List) allStreamingJobsById.stream().map(streamingJobMeta3 -> {
            StreamingJobResponse streamingJobResponse = new StreamingJobResponse(streamingJobMeta3);
            if (JobStatusEnum.LAUNCHING_ERROR == streamingJobResponse.getCurrentStatus()) {
                streamingJobResponse.setLaunchingError(true);
                streamingJobResponse.setCurrentStatus(JobStatusEnum.ERROR);
            }
            String jobId = StreamingUtils.getJobId(streamingJobResponse.getModelId(), streamingJobResponse.getJobType().name());
            if (queryDataLatenciesByJobIds != null && queryDataLatenciesByJobIds.containsKey(jobId)) {
                streamingJobResponse.setDataLatency(convertDataLatency((Long) queryDataLatenciesByJobIds.get(jobId)));
            }
            StreamingJobRecord latestOneByJobId = StreamingJobRecordManager.getInstance().getLatestOneByJobId(jobId);
            if (latestOneByJobId != null) {
                streamingJobResponse.setLastStatusDuration(Long.valueOf(System.currentTimeMillis() - latestOneByJobId.getCreateTime().longValue()));
            }
            return streamingJobResponse;
        }).collect(Collectors.toList())).stream().filter(streamingJobResponse -> {
            if (StringUtils.isEmpty(streamingJobFilter.getModelName())) {
                return true;
            }
            return streamingJobResponse.getModelName().contains(streamingJobFilter.getModelName());
        }).filter(streamingJobResponse2 -> {
            if (CollectionUtils.isEmpty(streamingJobFilter.getModelNames())) {
                return true;
            }
            return streamingJobFilter.getModelNames().contains(streamingJobResponse2.getModelName());
        }).filter(streamingJobResponse3 -> {
            if (CollectionUtils.isEmpty(streamingJobFilter.getJobTypes())) {
                return true;
            }
            return streamingJobFilter.getJobTypes().contains(streamingJobResponse3.getJobType().name());
        }).filter(streamingJobResponse4 -> {
            if (CollectionUtils.isEmpty(streamingJobFilter.getStatuses())) {
                return true;
            }
            return streamingJobFilter.getStatuses().contains(streamingJobResponse4.getCurrentStatus().name());
        }).sorted(propertyComparator(StringUtils.isEmpty(streamingJobFilter.getSortBy()) ? "last_modified" : streamingJobFilter.getSortBy(), !streamingJobFilter.isReverse())).collect(Collectors.toList());
        ArrayList arrayList = new ArrayList(PagingUtil.cutPage(list, i, i2));
        if (CollectionUtils.isNotEmpty(arrayList)) {
            Map<String, NDataModel> allDataModels = getAllDataModels(streamingJobFilter.getProject());
            arrayList.forEach(streamingJobResponse5 -> {
                setModelInfo(streamingJobResponse5, allDataModels);
            });
        }
        return new DataResult<>(arrayList, list.size(), i, i2);
    }

    void setModelInfo(StreamingJobResponse streamingJobResponse, Map<String, NDataModel> map) {
        String modelId = StreamingUtils.getModelId(streamingJobResponse.getId());
        NDataModel nDataModel = map.get(modelId);
        if (nDataModel == null || nDataModel.isBroken() || isBatchModelBroken(nDataModel) || ((NDataflowManager) getManager(NDataflowManager.class, streamingJobResponse.getProject())).getDataflow(modelId).checkBrokenWithRelatedInfo()) {
            streamingJobResponse.setModelBroken(true);
        } else {
            streamingJobResponse.setModelIndexes(Integer.valueOf(((NIndexPlanManager) this.indexPlanService.getManager(NIndexPlanManager.class, streamingJobResponse.getProject())).getIndexPlan(modelId).getAllLayouts().size()));
            streamingJobResponse.setPartitionDesc(nDataModel.getPartitionDesc());
        }
    }

    private List<ProjectInstance> getAllProjects(String str) {
        NProjectManager nProjectManager = NProjectManager.getInstance(KylinConfig.getInstanceFromEnv());
        return !StringUtils.isEmpty(str) ? Arrays.asList(nProjectManager.getProject(str)) : nProjectManager.listAllProjects();
    }

    private Map<String, NDataModel> getAllDataModels(String str) {
        KylinConfig instanceFromEnv = KylinConfig.getInstanceFromEnv();
        HashMap hashMap = new HashMap();
        getAllProjects(str).forEach(projectInstance -> {
            List listAllModels = NDataModelManager.getInstance(instanceFromEnv, projectInstance.getName()).listAllModels();
            if (CollectionUtils.isNotEmpty(listAllModels)) {
                listAllModels.forEach(nDataModel -> {
                });
            }
        });
        return hashMap;
    }

    List<StreamingJobMeta> getAllStreamingJobsById(String str, List<String> list) {
        if (CollectionUtils.isEmpty(list)) {
            return Collections.emptyList();
        }
        if (StringUtils.isEmpty(str)) {
            throw new KylinException(ErrorCodeServer.REQUEST_PARAMETER_EMPTY_OR_VALUE_EMPTY, new Object[]{"project"});
        }
        KylinConfig instanceFromEnv = KylinConfig.getInstanceFromEnv();
        Stream<String> stream = list.stream();
        StreamingJobManager streamingJobManager = StreamingJobManager.getInstance(instanceFromEnv, str);
        streamingJobManager.getClass();
        return (List) stream.map(streamingJobManager::getStreamingJobByUuid).collect(Collectors.toList());
    }

    private List<StreamingJobMeta> getAllStreamingJobs(String str) {
        KylinConfig instanceFromEnv = KylinConfig.getInstanceFromEnv();
        ArrayList arrayList = new ArrayList();
        getAllProjects(str).stream().forEach(projectInstance -> {
            arrayList.addAll(StreamingJobManager.getInstance(instanceFromEnv, projectInstance.getName()).listAllStreamingJobMeta());
        });
        return arrayList;
    }

    public boolean isBatchModelBroken(NDataModel nDataModel) {
        try {
            if (nDataModel.isFusionModel()) {
                return FusionModelManager.getInstance(KylinConfig.getInstanceFromEnv(), nDataModel.getProject()).getFusionModel(nDataModel.getFusionId()).getBatchModel().isBroken();
            }
            return false;
        } catch (Exception e) {
            return true;
        }
    }

    public StreamingJobDataStatsResponse getStreamingJobDataStats(String str, Integer num) {
        StreamingJobDataStatsResponse streamingJobDataStatsResponse = new StreamingJobDataStatsResponse();
        Message msg = MsgPicker.getMsg();
        Calendar calendar = Calendar.getInstance(TimeZone.getDefault(), Locale.getDefault(Locale.Category.FORMAT));
        if (num.intValue() > 0) {
            if (num.intValue() > 10080) {
                throw new KylinException(ServerErrorCode.INVALID_PARAMETER, msg.getIllegalTimeFilter());
            }
            calendar.add(12, -num.intValue());
            List queryStreamingJobStats = StreamingJobStatsManager.getInstance().queryStreamingJobStats(calendar.getTimeInMillis(), str);
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList();
            ArrayList arrayList3 = new ArrayList();
            ArrayList arrayList4 = new ArrayList();
            queryStreamingJobStats.stream().forEach(streamingJobStats -> {
                arrayList.add(Integer.valueOf(streamingJobStats.getRowsPerSecond().intValue()));
                arrayList2.add(streamingJobStats.getProcessingTime());
                arrayList3.add(convertDataLatency(streamingJobStats.getMinDataLatency()));
                arrayList4.add(streamingJobStats.getCreateTime());
            });
            streamingJobDataStatsResponse.setConsumptionRateHist(arrayList);
            streamingJobDataStatsResponse.setProcessingTimeHist(arrayList2);
            streamingJobDataStatsResponse.setDataLatencyHist(arrayList3);
            streamingJobDataStatsResponse.setCreateTime(arrayList4);
        }
        return streamingJobDataStatsResponse;
    }

    private Long convertDataLatency(Long l) {
        return Long.valueOf((l == null || l.longValue() >= 0) ? l.longValue() : 0L);
    }

    public StreamingJobResponse getStreamingJobInfo(String str, String str2) {
        StreamingJobResponse streamingJobResponse = new StreamingJobResponse(StreamingJobManager.getInstance(KylinConfig.getInstanceFromEnv(), str2).getStreamingJobByUuid(str));
        StreamingJobStats latestOneByJobId = StreamingJobStatsManager.getInstance().getLatestOneByJobId(str);
        if (latestOneByJobId != null) {
            streamingJobResponse.setDataLatency(latestOneByJobId.getMinDataLatency());
        }
        StreamingJobRecord latestOneByJobId2 = StreamingJobRecordManager.getInstance().getLatestOneByJobId(str);
        if (latestOneByJobId2 != null) {
            streamingJobResponse.setLastStatusDuration(Long.valueOf(System.currentTimeMillis() - latestOneByJobId2.getCreateTime().longValue()));
        }
        return streamingJobResponse;
    }

    public InputStream getStreamingJobAllLog(String str, String str2) {
        this.aclEvaluate.checkProjectOperationPermission(str);
        return ((NExecutableManager) getManager(NExecutableManager.class, str)).getStreamingOutputFromHDFS(str2, Integer.MAX_VALUE).getVerboseMsgStream();
    }

    public String getStreamingJobSimpleLog(String str, String str2) {
        this.aclEvaluate.checkProjectOperationPermission(str);
        return ((NExecutableManager) getManager(NExecutableManager.class, str)).getStreamingOutputFromHDFS(str2).getVerboseMsg();
    }

    public void checkJobExecutionId(String str, String str2, Integer num) {
        Preconditions.checkState(ObjectUtils.equals(num, ((StreamingJobManager) getManager(StreamingJobManager.class, str)).getStreamingJobByUuid(str2).getJobExecutionId()), String.format(Locale.ROOT, "JobExecutionId(%d) is invalid", num));
    }
}
