package org.apache.seatunnel.app.service.impl;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Resource;
import lombok.NonNull;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.seatunnel.app.common.Constants;
import org.apache.seatunnel.app.dal.dao.IJobInstanceDao;
import org.apache.seatunnel.app.dal.dao.IJobInstanceHistoryDao;
import org.apache.seatunnel.app.dal.dao.IJobMetricsDao;
import org.apache.seatunnel.app.dal.entity.JobInstance;
import org.apache.seatunnel.app.dal.entity.JobInstanceHistory;
import org.apache.seatunnel.app.dal.entity.JobMetrics;
import org.apache.seatunnel.app.domain.response.engine.Engine;
import org.apache.seatunnel.app.domain.response.metrics.JobDAG;
import org.apache.seatunnel.app.domain.response.metrics.JobPipelineDetailMetricsRes;
import org.apache.seatunnel.app.domain.response.metrics.JobPipelineSummaryMetricsRes;
import org.apache.seatunnel.app.domain.response.metrics.JobSummaryMetricsRes;
import org.apache.seatunnel.app.permission.constants.SeatunnelFuncPermissionKeyConstant;
import org.apache.seatunnel.app.service.IJobMetricsService;
import org.apache.seatunnel.app.thirdparty.engine.SeaTunnelEngineProxy;
import org.apache.seatunnel.app.thirdparty.metrics.EngineMetricsExtractorFactory;
import org.apache.seatunnel.app.thirdparty.metrics.IEngineMetricsExtractor;
import org.apache.seatunnel.server.common.CodeGenerateUtils;
import org.apache.seatunnel.server.common.SeatunnelErrorEnum;
import org.apache.seatunnel.server.common.SeatunnelException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:org/apache/seatunnel/app/service/impl/JobMetricsServiceImpl.class */
public class JobMetricsServiceImpl extends SeatunnelBaseServiceImpl implements IJobMetricsService {
    private static final Logger log = LoggerFactory.getLogger(JobMetricsServiceImpl.class);
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

    @Resource
    private IJobMetricsDao jobMetricsDao;

    @Resource
    private IJobInstanceHistoryDao jobInstanceHistoryDao;

    @Resource
    private IJobInstanceDao jobInstanceDao;

    @Override // org.apache.seatunnel.app.service.IJobMetricsService
    public List<JobPipelineSummaryMetricsRes> getJobPipelineSummaryMetrics(@NonNull Integer num, @NonNull Long l) {
        if (num == null) {
            throw new NullPointerException("userId is marked non-null but is null");
        }
        if (l == null) {
            throw new NullPointerException("jobInstanceId is marked non-null but is null");
        }
        funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.JOB_METRICS_SUMMARY, num.intValue());
        return summaryMetrics(getJobPipelineMetrics(num, l, this.jobInstanceDao.getJobInstance(l).getJobEngineId()));
    }

    @Override // org.apache.seatunnel.app.service.IJobMetricsService
    public JobSummaryMetricsRes getJobSummaryMetrics(@NonNull Integer num, @NonNull Long l, @NonNull String str) {
        if (num == null) {
            throw new NullPointerException("userId is marked non-null but is null");
        }
        if (l == null) {
            throw new NullPointerException("jobInstanceId is marked non-null but is null");
        }
        if (str == null) {
            throw new NullPointerException("jobEngineId is marked non-null but is null");
        }
        funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.JOB_METRICS_SUMMARY, num.intValue());
        JobInstance jobInstance = this.jobInstanceDao.getJobInstance(l);
        String jobStatus = new EngineMetricsExtractorFactory(new Engine(jobInstance.getEngineName(), jobInstance.getEngineVersion())).getEngineMetricsExtractor().getJobStatus(str);
        List<JobMetrics> jobPipelineMetrics = getJobPipelineMetrics(num, l, str);
        return new JobSummaryMetricsRes(l.longValue(), Long.parseLong(str), jobPipelineMetrics.stream().mapToLong((v0) -> {
            return v0.getReadRowCount();
        }).sum(), jobPipelineMetrics.stream().mapToLong((v0) -> {
            return v0.getWriteRowCount();
        }).sum(), jobStatus);
    }

    @Override // org.apache.seatunnel.app.service.IJobMetricsService
    public Map<Long, JobSummaryMetricsRes> getALLJobSummaryMetrics(@NonNull Integer num, @NonNull Map<Long, Long> map, @NonNull List<Long> list, @NonNull String str) {
        if (num == null) {
            throw new NullPointerException("userId is marked non-null but is null");
        }
        if (map == null) {
            throw new NullPointerException("jobInstanceIdAndJobEngineIdMap is marked non-null but is null");
        }
        if (list == null) {
            throw new NullPointerException("jobInstanceIdList is marked non-null but is null");
        }
        if (str == null) {
            throw new NullPointerException("syncTaskType is marked non-null but is null");
        }
        log.info("jobInstanceIdAndJobEngineIdMap={}", map);
        funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.JOB_METRICS_SUMMARY, num.intValue());
        List<JobInstance> allJobInstance = this.jobInstanceDao.getAllJobInstance(list);
        if (allJobInstance.isEmpty()) {
            log.warn("getALLJobSummaryMetrics : allJobInstance is empty, task id list is {}", list);
            return new HashMap();
        }
        Map<Long, JobSummaryMetricsRes> map2 = null;
        Map<Long, HashMap<Integer, JobMetrics>> allRunningJobMetricsFromEngine = getAllRunningJobMetricsFromEngine(allJobInstance.get(0).getEngineName(), allJobInstance.get(0).getEngineVersion());
        if (str.equals("BATCH")) {
            map2 = getMatricsListIfTaskTypeIsBatch(allJobInstance, num, allRunningJobMetricsFromEngine, map);
        } else if (str.equals("STREAMING")) {
            map2 = getMatricsListIfTaskTypeIsStreaming(allJobInstance, num, allRunningJobMetricsFromEngine, map);
        }
        log.info("result is {}", map2 == null ? "null" : map2.toString());
        return map2;
    }

    private Map<Long, JobSummaryMetricsRes> getMatricsListIfTaskTypeIsBatch(List<JobInstance> list, Integer num, Map<Long, HashMap<Integer, JobMetrics>> map, Map<Long, Long> map2) {
        HashMap hashMap = new HashMap();
        log.info("allRunningJobMetricsFromEngine is {}", map.toString());
        for (JobInstance jobInstance : list) {
            log.info("jobEngineId={}", jobInstance.getJobEngineId());
            if (jobInstance.getJobStatus() == null || jobInstance.getJobStatus().equals(Constants.FAILED) || jobInstance.getJobStatus().equals(Constants.RUNNING)) {
                if (map.isEmpty() || !map.containsKey(map2.get(jobInstance.getId()))) {
                    log.info("The job does not exist on the engine, it is directly returned from the database");
                    JobSummaryMetricsRes jobSummaryMetricsResByDb = getJobSummaryMetricsResByDb(jobInstance, num, Long.toString(map2.get(jobInstance.getId()).longValue()));
                    if (jobSummaryMetricsResByDb != null) {
                        hashMap.put(jobInstance.getId(), jobSummaryMetricsResByDb);
                    }
                    if (Constants.RUNNING.equals(jobInstance.getJobStatus())) {
                        jobInstance.setJobStatus("FINISHED");
                        this.jobInstanceDao.getJobInstanceMapper().updateById(jobInstance);
                    }
                } else {
                    hashMap.put(jobInstance.getId(), getRunningJobMetricsFromEngine(map, map2, jobInstance));
                    modifyAndUpdateJobInstanceAndJobMetrics(jobInstance, map, map2, num);
                }
            } else if (jobInstance.getJobStatus().equals("FINISHED") || jobInstance.getJobStatus().equals("CANCELED")) {
                JobSummaryMetricsRes jobSummaryMetricsResByDb2 = getJobSummaryMetricsResByDb(jobInstance, num, Long.toString(map2.get(jobInstance.getId()).longValue()));
                log.info("jobStatus=finish oe canceled,JobSummaryMetricsRes={}", jobSummaryMetricsResByDb2);
                hashMap.put(jobInstance.getId(), jobSummaryMetricsResByDb2);
            }
        }
        return hashMap;
    }

    private void modifyAndUpdateJobInstanceAndJobMetrics(JobInstance jobInstance, Map<Long, HashMap<Integer, JobMetrics>> map, Map<Long, Long> map2, Integer num) {
        jobInstance.setJobStatus(Constants.RUNNING);
        HashMap<Integer, JobMetrics> hashMap = map.get(map2.get(jobInstance.getId()));
        List<JobMetrics> byInstanceId = this.jobMetricsDao.getByInstanceId(jobInstance.getId());
        log.info("001jobMetricsFromDb={}", byInstanceId);
        if (byInstanceId.isEmpty()) {
            log.info("002jobMetricsFromDb == null");
            syncMetricsToDbRunning(jobInstance, num, hashMap);
            this.jobInstanceDao.update(jobInstance);
        } else {
            byInstanceId.forEach(jobMetrics -> {
                jobMetrics.setReadRowCount(((JobMetrics) hashMap.get(jobMetrics.getPipelineId())).getReadRowCount());
            });
            byInstanceId.forEach(jobMetrics2 -> {
                jobMetrics2.setWriteRowCount(((JobMetrics) hashMap.get(jobMetrics2.getPipelineId())).getWriteRowCount());
            });
            byInstanceId.forEach(jobMetrics3 -> {
                jobMetrics3.setStatus(Constants.RUNNING);
            });
            updateJobInstanceAndMetrics(jobInstance, byInstanceId);
        }
    }

    private Map<Long, JobSummaryMetricsRes> getMatricsListIfTaskTypeIsStreaming(List<JobInstance> list, Integer num, Map<Long, HashMap<Integer, JobMetrics>> map, Map<Long, Long> map2) {
        HashMap hashMap = new HashMap();
        for (JobInstance jobInstance : list) {
            try {
                if (jobInstance.getJobStatus() != null && jobInstance.getJobStatus().equals("CANCELED")) {
                    hashMap.put(jobInstance.getId(), getJobSummaryMetricsResByDb(jobInstance, num, Long.toString(map2.get(jobInstance.getId()).longValue())));
                } else if (jobInstance.getJobStatus() == null || !(jobInstance.getJobStatus().equals("FINISHED") || jobInstance.getJobStatus().equals(Constants.FAILED))) {
                    if (map.isEmpty() || !map.containsKey(map2.get(jobInstance.getId()))) {
                        String str = null;
                        try {
                            str = getJobStatusByJobEngineId(String.valueOf(map2.get(jobInstance.getId())));
                        } catch (Exception e) {
                            log.warn("getMetricsListIfTaskTypeIsStreaming getJobStatusByJobEngineId is exception jobInstanceId is : {}", jobInstance.getId());
                        }
                        if (str != null) {
                            jobInstance.setJobStatus(str);
                            this.jobInstanceDao.update(jobInstance);
                            hashMap.put(jobInstance.getId(), getJobSummaryMetricsResByDb(jobInstance, num, String.valueOf(map2.get(jobInstance.getId()))));
                            List<JobMetrics> jobMetricsFromDb = getJobMetricsFromDb(jobInstance, num, String.valueOf(map2.get(jobInstance.getId())));
                            if (!jobMetricsFromDb.isEmpty()) {
                                String str2 = str;
                                jobMetricsFromDb.forEach(jobMetrics -> {
                                    jobMetrics.setStatus(str2);
                                });
                                Iterator<JobMetrics> it = jobMetricsFromDb.iterator();
                                while (it.hasNext()) {
                                    this.jobMetricsDao.getJobMetricsMapper().updateById(it.next());
                                }
                            }
                        }
                    } else {
                        modifyAndUpdateJobInstanceAndJobMetrics(jobInstance, map, map2, num);
                        hashMap.put(jobInstance.getId(), getRunningJobMetricsFromEngine(map, map2, jobInstance));
                    }
                } else if (map.isEmpty() || !map.containsKey(map2.get(jobInstance.getId()))) {
                    hashMap.put(jobInstance.getId(), getJobSummaryMetricsResByDb(jobInstance, num, Long.toString(map2.get(jobInstance.getId()).longValue())));
                } else {
                    modifyAndUpdateJobInstanceAndJobMetrics(jobInstance, map, map2, num);
                    hashMap.put(jobInstance.getId(), getRunningJobMetricsFromEngine(map, map2, jobInstance));
                }
            } catch (Exception e2) {
                throw new RuntimeException(e2);
            }
        }
        return hashMap;
    }

    private JobSummaryMetricsRes getRunningJobMetricsFromEngine(Map<Long, HashMap<Integer, JobMetrics>> map, Map<Long, Long> map2, JobInstance jobInstance) {
        HashMap<Integer, JobMetrics> hashMap = map.get(map2.get(jobInstance.getId()));
        log.info("0706jobMetricsFromEngine={}", hashMap);
        long sum = hashMap.values().stream().mapToLong((v0) -> {
            return v0.getReadRowCount();
        }).sum();
        long sum2 = hashMap.values().stream().mapToLong((v0) -> {
            return v0.getWriteRowCount();
        }).sum();
        log.info("jobInstance={}", jobInstance);
        return new JobSummaryMetricsRes(jobInstance.getId().longValue(), 1L, sum, sum2, Constants.RUNNING);
    }

    private JobSummaryMetricsRes getJobSummaryMetricsResByDb(JobInstance jobInstance, Integer num, String str) {
        List<JobMetrics> jobMetricsFromDb = getJobMetricsFromDb(jobInstance, num, str);
        if (jobMetricsFromDb.isEmpty()) {
            return null;
        }
        return new JobSummaryMetricsRes(jobInstance.getId().longValue(), Long.parseLong(jobInstance.getJobEngineId()), jobMetricsFromDb.stream().mapToLong((v0) -> {
            return v0.getReadRowCount();
        }).sum(), jobMetricsFromDb.stream().mapToLong((v0) -> {
            return v0.getWriteRowCount();
        }).sum(), jobInstance.getJobStatus());
    }

    private Map<Long, HashMap<Integer, JobMetrics>> getAllRunningJobMetricsFromEngine(String str, String str2) {
        return new EngineMetricsExtractorFactory(new Engine(str, str2)).getEngineMetricsExtractor().getAllRunningJobMetrics();
    }

    private void updateJobInstanceAndMetrics(JobInstance jobInstance, List<JobMetrics> list) {
        if (jobInstance == null || list == null) {
            return;
        }
        this.jobInstanceDao.update(jobInstance);
        Iterator<JobMetrics> it = list.iterator();
        while (it.hasNext()) {
            this.jobMetricsDao.getJobMetricsMapper().updateById(it.next());
        }
    }

    private String getJobStatusByJobEngineId(String str) {
        return SeaTunnelEngineProxy.getInstance().getJobStatus(str);
    }

    private Map<Integer, JobMetrics> getJobMetricsFromEngineMap(@NonNull JobInstance jobInstance, @NonNull String str) {
        if (jobInstance == null) {
            throw new NullPointerException("jobInstance is marked non-null but is null");
        }
        if (str == null) {
            throw new NullPointerException("jobEngineId is marked non-null but is null");
        }
        log.info("enter getJobMetricsFromEngine");
        return new EngineMetricsExtractorFactory(new Engine(jobInstance.getEngineName(), jobInstance.getEngineVersion())).getEngineMetricsExtractor().getMetricsByJobEngineIdRTMap(str);
    }

    private List<JobMetrics> getJobPipelineDetailMetrics(@NonNull JobInstance jobInstance, @NonNull Integer num, @NonNull String str, String str2, @NonNull IEngineMetricsExtractor iEngineMetricsExtractor) {
        List<JobMetrics> jobMetricsFromEngine;
        if (jobInstance == null) {
            throw new NullPointerException("jobInstance is marked non-null but is null");
        }
        if (num == null) {
            throw new NullPointerException("userId is marked non-null but is null");
        }
        if (str == null) {
            throw new NullPointerException("jobEngineId is marked non-null but is null");
        }
        if (iEngineMetricsExtractor == null) {
            throw new NullPointerException("engineMetricsExtractor is marked non-null but is null");
        }
        if (str2 == null) {
            jobMetricsFromEngine = getJobMetricsFromDb(jobInstance, num, str);
        } else if (iEngineMetricsExtractor.isJobEndStatus(str2)) {
            jobMetricsFromEngine = getJobMetricsFromDb(jobInstance, num, str);
            if (CollectionUtils.isEmpty(jobMetricsFromEngine)) {
                syncMetricsToDb(jobInstance, num, str);
                jobMetricsFromEngine = getJobMetricsFromEngine(jobInstance, str);
            }
        } else {
            jobMetricsFromEngine = getJobMetricsFromEngine(jobInstance, str);
        }
        return jobMetricsFromEngine;
    }

    @Override // org.apache.seatunnel.app.service.IJobMetricsService
    public List<JobPipelineDetailMetricsRes> getJobPipelineDetailMetricsRes(@NonNull Integer num, @NonNull Long l) {
        if (num == null) {
            throw new NullPointerException("userId is marked non-null but is null");
        }
        if (l == null) {
            throw new NullPointerException("jobInstanceId is marked non-null but is null");
        }
        funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.JOB_DETAIL, num.intValue());
        return (List) getJobPipelineMetrics(num, l, this.jobInstanceDao.getJobInstance(l).getJobEngineId()).stream().map(this::wrapperJobMetrics).collect(Collectors.toList());
    }

    private List<JobMetrics> getJobPipelineMetrics(@NonNull Integer num, @NonNull Long l, @NonNull String str) {
        if (num == null) {
            throw new NullPointerException("userId is marked non-null but is null");
        }
        if (l == null) {
            throw new NullPointerException("jobInstanceId is marked non-null but is null");
        }
        if (str == null) {
            throw new NullPointerException("jobEngineId is marked non-null but is null");
        }
        JobInstance jobInstance = this.jobInstanceDao.getJobInstance(l);
        IEngineMetricsExtractor engineMetricsExtractor = new EngineMetricsExtractorFactory(new Engine(jobInstance.getEngineName(), jobInstance.getEngineVersion())).getEngineMetricsExtractor();
        return getJobPipelineDetailMetrics(jobInstance, num, str, engineMetricsExtractor.getJobStatus(str), engineMetricsExtractor);
    }

    @Override // org.apache.seatunnel.app.service.IJobMetricsService
    public JobDAG getJobDAG(@NonNull Integer num, @NonNull Long l) throws JsonProcessingException {
        JobInstanceHistory jobHistoryFromEngine;
        if (num == null) {
            throw new NullPointerException("userId is marked non-null but is null");
        }
        if (l == null) {
            throw new NullPointerException("jobInstanceId is marked non-null but is null");
        }
        funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.JOB_DAG, num.intValue());
        JobInstance jobInstance = this.jobInstanceDao.getJobInstance(l);
        String jobEngineId = jobInstance.getJobEngineId();
        JobInstanceHistory jobHistoryFromDb = getJobHistoryFromDb(jobInstance, num, jobEngineId);
        if (jobHistoryFromDb != null) {
            try {
                return (JobDAG) OBJECT_MAPPER.readValue(jobHistoryFromDb.getDag(), JobDAG.class);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
        if (new EngineMetricsExtractorFactory(new Engine(jobInstance.getEngineName(), jobInstance.getEngineVersion())).getEngineMetricsExtractor().isJobEnd(jobEngineId)) {
            syncHistoryJobInfoToDb(jobInstance, jobEngineId);
            jobHistoryFromEngine = getJobHistoryFromDb(jobInstance, num, jobEngineId);
        } else {
            jobHistoryFromEngine = getJobHistoryFromEngine(jobInstance, jobEngineId);
        }
        if (jobHistoryFromEngine == null) {
            return null;
        }
        try {
            return (JobDAG) OBJECT_MAPPER.readValue(jobHistoryFromEngine.getDag(), JobDAG.class);
        } catch (IOException e2) {
            throw new RuntimeException(e2);
        }
    }

    private JobInstanceHistory getJobHistoryFromEngine(@NonNull JobInstance jobInstance, String str) {
        if (jobInstance == null) {
            throw new NullPointerException("jobInstance is marked non-null but is null");
        }
        return new EngineMetricsExtractorFactory(new Engine(jobInstance.getEngineName(), jobInstance.getEngineVersion())).getEngineMetricsExtractor().getJobHistoryById(str);
    }

    private JobInstanceHistory getJobHistoryFromDb(@NonNull JobInstance jobInstance, Integer num, String str) {
        if (jobInstance == null) {
            throw new NullPointerException("jobInstance is marked non-null but is null");
        }
        relationJobInstanceAndJobEngineId(jobInstance, num, str);
        return this.jobInstanceHistoryDao.getByInstanceId(jobInstance.getId());
    }

    @Override // org.apache.seatunnel.app.service.IJobMetricsService
    public void syncJobDataToDb(@NonNull JobInstance jobInstance, @NonNull Integer num, @NonNull String str) {
        if (jobInstance == null) {
            throw new NullPointerException("jobInstance is marked non-null but is null");
        }
        if (num == null) {
            throw new NullPointerException("userId is marked non-null but is null");
        }
        if (str == null) {
            throw new NullPointerException("jobEngineId is marked non-null but is null");
        }
        relationJobInstanceAndJobEngineId(jobInstance, num, str);
        syncMetricsToDb(jobInstance, num, str);
        syncHistoryJobInfoToDb(jobInstance, str);
        syncCompleteJobInfoToDb(jobInstance);
    }

    private void syncMetricsToDb(@NonNull JobInstance jobInstance, @NonNull Integer num, @NonNull String str) {
        if (jobInstance == null) {
            throw new NullPointerException("jobInstance is marked non-null but is null");
        }
        if (num == null) {
            throw new NullPointerException("userId is marked non-null but is null");
        }
        if (str == null) {
            throw new NullPointerException("jobEngineId is marked non-null but is null");
        }
        Map<Integer, JobMetrics> jobMetricsFromEngineMap = getJobMetricsFromEngineMap(jobInstance, str);
        List<JobMetrics> jobMetricsFromDb = getJobMetricsFromDb(jobInstance, num, str);
        if (jobMetricsFromDb.isEmpty()) {
            List<JobMetrics> asList = Arrays.asList(jobMetricsFromEngineMap.values().toArray(new JobMetrics[0]));
            asList.forEach(jobMetrics -> {
                try {
                    jobMetrics.setId(Long.valueOf(CodeGenerateUtils.getInstance().genCode()));
                    jobMetrics.setJobInstanceId(jobInstance.getId());
                    jobMetrics.setCreateUserId(num);
                    jobMetrics.setUpdateUserId(num);
                } catch (CodeGenerateUtils.CodeGenerateException e) {
                    throw new SeatunnelException(SeatunnelErrorEnum.JOB_RUN_GENERATE_UUID_ERROR);
                }
            });
            if (asList.isEmpty()) {
                return;
            }
            this.jobMetricsDao.getJobMetricsMapper().insertBatchMetrics(asList);
            return;
        }
        String jobStatusByJobEngineId = getJobStatusByJobEngineId(str);
        for (JobMetrics jobMetrics2 : jobMetricsFromDb) {
            JobMetrics jobMetrics3 = jobMetricsFromEngineMap.get(jobMetrics2.getPipelineId());
            jobMetrics2.setWriteQps(jobMetrics3.getWriteQps());
            jobMetrics2.setReadQps(jobMetrics3.getReadQps());
            jobMetrics2.setReadRowCount(jobMetrics3.getReadRowCount());
            jobMetrics2.setWriteRowCount(jobMetrics3.getWriteRowCount());
            jobMetrics2.setStatus(jobStatusByJobEngineId);
            this.jobMetricsDao.getJobMetricsMapper().updateById(jobMetrics2);
        }
    }

    private void syncHistoryJobInfoToDb(@NonNull JobInstance jobInstance, @NonNull String str) {
        if (jobInstance == null) {
            throw new NullPointerException("jobInstance is marked non-null but is null");
        }
        if (str == null) {
            throw new NullPointerException("jobEngineId is marked non-null but is null");
        }
        JobInstanceHistory jobHistoryFromEngine = getJobHistoryFromEngine(jobInstance, str);
        jobHistoryFromEngine.setId(jobInstance.getId());
        if (this.jobInstanceHistoryDao.getByInstanceId(jobInstance.getId()) == null) {
            this.jobInstanceHistoryDao.insert(jobHistoryFromEngine);
        } else {
            this.jobInstanceHistoryDao.updateJobInstanceHistory(jobHistoryFromEngine);
        }
    }

    private void syncCompleteJobInfoToDb(@NonNull JobInstance jobInstance) {
        if (jobInstance == null) {
            throw new NullPointerException("jobInstance is marked non-null but is null");
        }
        jobInstance.setEndTime(new Date());
        this.jobInstanceDao.update(jobInstance);
    }

    private void relationJobInstanceAndJobEngineId(@NonNull JobInstance jobInstance, @NonNull Integer num, @NonNull String str) {
        if (jobInstance == null) {
            throw new NullPointerException("jobInstance is marked non-null but is null");
        }
        if (num == null) {
            throw new NullPointerException("userId is marked non-null but is null");
        }
        if (str == null) {
            throw new NullPointerException("jobEngineId is marked non-null but is null");
        }
        if (StringUtils.isEmpty(jobInstance.getJobEngineId())) {
            jobInstance.setJobEngineId(str);
            jobInstance.setUpdateUserId(num);
            this.jobInstanceDao.update(jobInstance);
        }
    }

    private List<JobMetrics> getJobMetricsFromEngine(@NonNull JobInstance jobInstance, @NonNull String str) {
        if (jobInstance == null) {
            throw new NullPointerException("jobInstance is marked non-null but is null");
        }
        if (str == null) {
            throw new NullPointerException("jobEngineId is marked non-null but is null");
        }
        return new EngineMetricsExtractorFactory(new Engine(jobInstance.getEngineName(), jobInstance.getEngineVersion())).getEngineMetricsExtractor().getMetricsByJobEngineId(str);
    }

    private List<JobPipelineSummaryMetricsRes> summaryMetrics(@NonNull List<JobMetrics> list) {
        if (list == null) {
            throw new NullPointerException("jobPipelineDetailedMetrics is marked non-null but is null");
        }
        return (List) list.stream().map(jobMetrics -> {
            return new JobPipelineSummaryMetricsRes(jobMetrics.getPipelineId().intValue(), jobMetrics.getReadRowCount(), jobMetrics.getWriteRowCount(), jobMetrics.getStatus());
        }).collect(Collectors.toList());
    }

    private List<JobMetrics> getJobMetricsFromDb(@NonNull JobInstance jobInstance, @NonNull Integer num, @NonNull String str) {
        if (jobInstance == null) {
            throw new NullPointerException("jobInstance is marked non-null but is null");
        }
        if (num == null) {
            throw new NullPointerException("userId is marked non-null but is null");
        }
        if (str == null) {
            throw new NullPointerException("jobEngineId is marked non-null but is null");
        }
        relationJobInstanceAndJobEngineId(jobInstance, num, str);
        return this.jobMetricsDao.getByInstanceId(jobInstance.getId());
    }

    @Override // org.apache.seatunnel.app.service.IJobMetricsService
    public ImmutablePair<Long, String> getInstanceIdAndEngineId(@NonNull String str) {
        if (str == null) {
            throw new NullPointerException("key is marked non-null but is null");
        }
        if (!str.contains("::") || str.split("::").length != 2) {
            throw new SeatunnelException(SeatunnelErrorEnum.JOB_METRICS_QUERY_KEY_ERROR, new Object[]{str});
        }
        String[] split = str.split("::");
        return new ImmutablePair<>(Long.valueOf(split[0]), split[1]);
    }

    private JobPipelineDetailMetricsRes wrapperJobMetrics(@NonNull JobMetrics jobMetrics) {
        if (jobMetrics == null) {
            throw new NullPointerException("metrics is marked non-null but is null");
        }
        return new JobPipelineDetailMetricsRes(jobMetrics.getId(), jobMetrics.getPipelineId(), jobMetrics.getReadRowCount(), jobMetrics.getWriteRowCount(), jobMetrics.getSourceTableNames(), jobMetrics.getSinkTableNames(), jobMetrics.getReadQps(), jobMetrics.getWriteQps(), jobMetrics.getRecordDelay(), jobMetrics.getStatus());
    }

    private void syncMetricsToDbRunning(@NonNull JobInstance jobInstance, @NonNull Integer num, @NonNull Map<Integer, JobMetrics> map) {
        if (jobInstance == null) {
            throw new NullPointerException("jobInstance is marked non-null but is null");
        }
        if (num == null) {
            throw new NullPointerException("userId is marked non-null but is null");
        }
        if (map == null) {
            throw new NullPointerException("jobMetricsMap is marked non-null but is null");
        }
        ArrayList arrayList = new ArrayList();
        Iterator<Map.Entry<Integer, JobMetrics>> it = map.entrySet().iterator();
        while (it.hasNext()) {
            JobMetrics value = it.next().getValue();
            value.setId(Long.valueOf(CodeGenerateUtils.getInstance().genCode()));
            value.setJobInstanceId(jobInstance.getId());
            value.setCreateUserId(num);
            value.setUpdateUserId(num);
            arrayList.add(value);
        }
        if (arrayList.isEmpty()) {
            return;
        }
        log.info("003list={}", arrayList);
        this.jobMetricsDao.getJobMetricsMapper().insertBatchMetrics(arrayList);
    }
}
