package org.apache.kylin.stream.coordinator.coordinate;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListSet;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.CubingJob;
import org.apache.kylin.engine.mr.StreamingCubingEngine;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.metadata.model.SegmentRange;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.Segments;
import org.apache.kylin.stream.coordinator.StreamingCubeInfo;
import org.apache.kylin.stream.coordinator.exception.StoreException;
import org.apache.kylin.stream.core.model.SegmentBuildState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/kylin-stream-coordinator-3.0.0.jar:org/apache/kylin/stream/coordinator/coordinate/BuildJobSubmitter.class */
public class BuildJobSubmitter implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) BuildJobSubmitter.class);
    private StreamingCoordinator coordinator;
    private ConcurrentMap<String, ConcurrentSkipListSet<SegmentJobBuildInfo>> segmentBuildJobCheckList = Maps.newConcurrentMap();
    private Set<String> cubeCheckList = new ConcurrentSkipListSet();
    private long checkTimes = 0;

    public BuildJobSubmitter(StreamingCoordinator streamingCoordinator) {
        this.coordinator = streamingCoordinator;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void restore() {
        logger.info("Restore job submitter");
        for (String str : this.coordinator.getStreamMetadataStore().getCubes()) {
            List<SegmentBuildState> segmentBuildStates = this.coordinator.getStreamMetadataStore().getSegmentBuildStates(str);
            Collections.sort(segmentBuildStates);
            for (SegmentBuildState segmentBuildState : segmentBuildStates) {
                if (segmentBuildState.isInBuilding()) {
                    addToJobTrackList(new SegmentJobBuildInfo(str, segmentBuildState.getSegmentName(), segmentBuildState.getState().getJobId()));
                }
            }
        }
    }

    void addToJobTrackList(SegmentJobBuildInfo segmentJobBuildInfo) {
        ConcurrentSkipListSet<SegmentJobBuildInfo> concurrentSkipListSet = this.segmentBuildJobCheckList.get(segmentJobBuildInfo.cubeName);
        if (concurrentSkipListSet == null) {
            concurrentSkipListSet = new ConcurrentSkipListSet<>();
            ConcurrentSkipListSet<SegmentJobBuildInfo> putIfAbsent = this.segmentBuildJobCheckList.putIfAbsent(segmentJobBuildInfo.cubeName, concurrentSkipListSet);
            if (putIfAbsent != null) {
                concurrentSkipListSet = putIfAbsent;
            }
        }
        logger.trace("Add job {} of segment [{} - {}] to track.", segmentJobBuildInfo.jobID, segmentJobBuildInfo.cubeName, segmentJobBuildInfo.segmentName);
        if (concurrentSkipListSet.add(segmentJobBuildInfo)) {
            return;
        }
        logger.debug("Add {} failed because we have a duplicated one.", segmentJobBuildInfo);
        concurrentSkipListSet.remove(segmentJobBuildInfo);
        concurrentSkipListSet.add(segmentJobBuildInfo);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addToCheckList(String str) {
        this.cubeCheckList.add(str);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void clearCheckList(String str) {
        this.cubeCheckList.remove(str);
        this.segmentBuildJobCheckList.remove(str);
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            if (this.coordinator.isLead()) {
                doRun();
            }
        } catch (Exception e) {
            logger.error("Unexpected error", (Throwable) e);
        }
    }

    void doRun() {
        this.checkTimes++;
        logger.debug("\n========================================================================= {}", Long.valueOf(this.checkTimes));
        dumpSegmentBuildJobCheckList();
        this.coordinator.getStreamMetadataStore().reportStat();
        for (SegmentJobBuildInfo segmentJobBuildInfo : traceEarliestSegmentBuildJob()) {
            ConcurrentSkipListSet<SegmentJobBuildInfo> concurrentSkipListSet = this.segmentBuildJobCheckList.get(segmentJobBuildInfo.cubeName);
            logger.trace("Remove job {} from check list.", segmentJobBuildInfo.jobID);
            concurrentSkipListSet.remove(segmentJobBuildInfo);
        }
        findSegmentReadyToBuild();
        if (this.checkTimes % 100 == 1) {
            logger.info("Force traverse all cubes periodically.");
            for (StreamingCubeInfo streamingCubeInfo : this.coordinator.getEnableStreamingCubes()) {
                Iterator<String> it = checkSegmentBuildJobFromMetadata(streamingCubeInfo.getCubeName()).iterator();
                while (it.hasNext()) {
                    submitSegmentBuildJob(streamingCubeInfo.getCubeName(), it.next());
                }
            }
        }
    }

    List<SegmentJobBuildInfo> traceEarliestSegmentBuildJob() {
        ArrayList newArrayList = Lists.newArrayList();
        for (Map.Entry<String, ConcurrentSkipListSet<SegmentJobBuildInfo>> entry : this.segmentBuildJobCheckList.entrySet()) {
            ConcurrentSkipListSet<SegmentJobBuildInfo> value = entry.getValue();
            if (value.isEmpty()) {
                logger.trace("Skip {}", entry.getKey());
            } else {
                SegmentJobBuildInfo first = value.first();
                logger.debug("Check the cube:{} segment:{} build status.", first.cubeName, first.segmentName);
                try {
                    CubingJob cubingJob = (CubingJob) this.coordinator.getExecutableManager().getJob(first.jobID);
                    if (cubingJob == null) {
                        logger.error("Cannot find metadata of current job.");
                    } else {
                        ExecutableState status = cubingJob.getStatus();
                        logger.debug("Current job state {}", status);
                        if (ExecutableState.SUCCEED.equals(status)) {
                            CubeInstance latestCopyForWrite = this.coordinator.getCubeManager().getCube(first.cubeName).latestCopyForWrite();
                            CubeSegment segment = latestCopyForWrite.getSegment(first.segmentName, null);
                            logger.info("The cube:{} segment:{} is ready to be promoted.", first.cubeName, first.segmentName);
                            this.coordinator.getClusterManager().segmentBuildComplete(cubingJob, latestCopyForWrite, segment, first);
                            addToCheckList(latestCopyForWrite.getName());
                            newArrayList.add(first);
                        } else if (ExecutableState.ERROR.equals(status)) {
                            if (first.retryCnt < 5) {
                                logger.info("Job:{} is error, resume the job.", first);
                                this.coordinator.getExecutableManager().resumeJob(first.jobID);
                                first.retryCnt++;
                            } else {
                                logger.warn("Job:{} is error, exceed max retry. Kylin admin could resume it or discard it(to let new building job be sumbitted) .", first);
                            }
                        }
                    }
                } catch (StoreException e) {
                    logger.error("Error when check streaming segment job build state:" + first, (Throwable) e);
                    throw e;
                }
            }
        }
        return newArrayList;
    }

    void findSegmentReadyToBuild() {
        Iterator<String> it = this.cubeCheckList.iterator();
        while (it.hasNext()) {
            String next = it.next();
            boolean z = true;
            for (String str : checkSegmentBuildJobFromMetadata(next)) {
                boolean submitSegmentBuildJob = submitSegmentBuildJob(next, str);
                z = z && submitSegmentBuildJob;
                if (!submitSegmentBuildJob) {
                    logger.debug("Failed to submit building job for {}.", str);
                }
            }
            if (z) {
                it.remove();
                logger.debug("Removed {} from check list.", next);
            }
        }
    }

    List<String> checkSegmentBuildJobFromMetadata(String str) {
        ArrayList newArrayList = Lists.newArrayList();
        CubeInstance cube = this.coordinator.getCubeManager().getCube(str);
        if (isInOptimize(cube)) {
            return newArrayList;
        }
        int maxBuildingSegments = cube.getConfig().getMaxBuildingSegments();
        CubeSegment latestReadySegment = cube.getLatestReadySegment();
        long j = -1;
        if (latestReadySegment != null) {
            j = ((Long) latestReadySegment.getTSRange().end.v).longValue();
        } else {
            logger.info("there is no ready segments for cube:{}, so only allow 1 segment build concurrently", str);
            maxBuildingSegments = 1;
        }
        Set<Integer> replicaSetIDs = this.coordinator.getStreamMetadataStore().getAssignmentsByCube(str).getReplicaSetIDs();
        List<SegmentBuildState> segmentBuildStates = this.coordinator.getStreamMetadataStore().getSegmentBuildStates(str);
        int size = maxBuildingSegments - cube.getBuildingSegments().size();
        boolean z = true;
        Collections.sort(segmentBuildStates);
        for (int i = 0; i < segmentBuildStates.size(); i++) {
            boolean z2 = false;
            if (size <= 0) {
                logger.info("No left quota to build segments for cube:{} at {}", str, Integer.valueOf(size));
                z = false;
            }
            SegmentBuildState segmentBuildState = segmentBuildStates.get(i);
            if (CubeSegment.parseSegmentName(segmentBuildState.getSegmentName()).getFirst().longValue() < j) {
                logger.warn("The cube segment state is not correct because it belongs to historcial part, cube:{} segment:{}, clear it.", str, segmentBuildState.getSegmentName());
                this.coordinator.getStreamMetadataStore().removeSegmentBuildState(str, segmentBuildState.getSegmentName());
            } else {
                if (segmentBuildState.isInBuilding()) {
                    z2 = checkSegmentBuildingJob(segmentBuildState, str, cube);
                    if (!z2) {
                    }
                } else if (segmentBuildState.isInWaiting()) {
                }
                if (!checkSegmentIsReadyToBuild(segmentBuildStates, i, replicaSetIDs)) {
                    logger.debug("Segment {} {} is not ready to submit a building job.", str, segmentBuildState);
                } else if (z || z2) {
                    newArrayList.add(segmentBuildState.getSegmentName());
                    size--;
                }
            }
        }
        if (logger.isDebugEnabled() && !newArrayList.isEmpty()) {
            logger.debug("{} Candidate segment list to be built : {}.", str, String.join(", ", newArrayList));
        }
        return newArrayList;
    }

    private boolean isInOptimize(CubeInstance cubeInstance) {
        AbstractExecutable job;
        Segments<CubeSegment> segments = cubeInstance.getSegments(SegmentStatusEnum.READY_PENDING);
        if (segments.size() > 0) {
            logger.info("The cube {} has READY_PENDING segments {}. It's not allowed for building", cubeInstance.getName(), segments);
            return true;
        }
        Iterator<T> it = cubeInstance.getSegments(SegmentStatusEnum.NEW).iterator();
        while (it.hasNext()) {
            String lastBuildJobID = ((CubeSegment) it.next()).getLastBuildJobID();
            if (lastBuildJobID != null && (job = this.coordinator.getExecutableManager().getJob(lastBuildJobID)) != null && (job instanceof CubingJob)) {
                if (CubingJob.CubingJobTypeEnum.OPTIMIZE.toString().equals(((CubingJob) job).getJobType())) {
                    logger.info("The cube {} is in optimization. It's not allowed to build new segments during optimization.", cubeInstance.getName());
                    return true;
                }
            }
        }
        return false;
    }

    boolean submitSegmentBuildJob(String str, String str2) {
        logger.info("Try submit streaming segment build job, cube:{} segment:{}", str, str2);
        CubeInstance cube = this.coordinator.getCubeManager().getCube(str);
        try {
            CubeSegment cubeSegment = null;
            Pair<Long, Long> parseSegmentName = CubeSegment.parseSegmentName(str2);
            boolean z = false;
            Iterator<T> it = cube.getSegments().iterator();
            while (it.hasNext()) {
                CubeSegment cubeSegment2 = (CubeSegment) it.next();
                SegmentRange.TSRange tSRange = cubeSegment2.getTSRange();
                if (((Long) tSRange.start.v).equals(parseSegmentName.getFirst()) && parseSegmentName.getSecond().equals(tSRange.end.v)) {
                    z = true;
                    cubeSegment = cubeSegment2;
                }
            }
            if (z) {
                logger.warn("Segment {} exists, it will be forced deleted.", str2);
                this.coordinator.getCubeManager().updateCubeDropSegments(cube, cubeSegment);
            }
            logger.debug("Create segment for {} {} .", str, str2);
            CubeSegment appendSegment = this.coordinator.getCubeManager().appendSegment(cube, new SegmentRange.TSRange(parseSegmentName.getFirst(), parseSegmentName.getSecond()));
            DefaultChainedExecutable streamingCubingJob = getStreamingCubingJob(appendSegment);
            this.coordinator.getExecutableManager().addJob(streamingCubingJob);
            String id = streamingCubingJob.getId();
            appendSegment.setLastBuildJobID(id);
            addToJobTrackList(new SegmentJobBuildInfo(str, str2, id));
            SegmentBuildState.BuildState buildState = new SegmentBuildState.BuildState();
            buildState.setBuildStartTime(System.currentTimeMillis());
            buildState.setState(SegmentBuildState.BuildState.State.BUILDING);
            buildState.setJobId(id);
            logger.debug("Commit building job {} for {} {} .", id, str, str2);
            this.coordinator.getStreamMetadataStore().updateSegmentBuildState(str, str2, buildState);
            return true;
        } catch (Exception e) {
            logger.error("Streaming job submit fail, cubeName:" + str + " segment:" + str2, (Throwable) e);
            return false;
        }
    }

    boolean checkSegmentBuildingJob(SegmentBuildState segmentBuildState, String str, CubeInstance cubeInstance) {
        String jobId = segmentBuildState.getState().getJobId();
        logger.debug("There is segment in building, cube:{} segment:{} jobId:{}", str, segmentBuildState.getSegmentName(), jobId);
        long buildStartTime = segmentBuildState.getState().getBuildStartTime();
        if (buildStartTime == 0 || jobId == null) {
            logger.info("Unknown state {}", segmentBuildState);
            return false;
        }
        if (System.currentTimeMillis() - buildStartTime < 900000) {
            return false;
        }
        CubingJob cubingJob = (CubingJob) this.coordinator.getExecutableManager().getJob(jobId);
        if (cubingJob == null) {
            logger.warn("Looks like cubing job is dropped manually, it will be submitted a new one.");
            return true;
        }
        ExecutableState status = cubingJob.getStatus();
        if (ExecutableState.SUCCEED.equals(status)) {
            CubeSegment segment = cubeInstance.getSegment(segmentBuildState.getSegmentName(), null);
            if (segment == null || SegmentStatusEnum.READY != segment.getStatus()) {
                return false;
            }
            logger.info("Job:{} is already succeed, and segment:{} is ready, remove segment build state", jobId, segmentBuildState.getSegmentName());
            this.coordinator.getStreamMetadataStore().removeSegmentBuildState(str, segmentBuildState.getSegmentName());
            return false;
        }
        if (ExecutableState.ERROR.equals(status)) {
            logger.info("Job:{} is error, resume the job.", jobId);
            this.coordinator.getExecutableManager().resumeJob(jobId);
            return false;
        }
        if (!ExecutableState.DISCARDED.equals(status)) {
            logger.info("Job:{} is in running, job state: {}.", jobId, status);
            return false;
        }
        if (KylinConfig.getInstanceFromEnv().isAutoResubmitDiscardJob()) {
            logger.debug("Job:{} is discard, resubmit it later.", jobId);
            return true;
        }
        logger.debug("Job:{} is discard, please resubmit yourself.", jobId);
        return false;
    }

    boolean checkSegmentIsReadyToBuild(List<SegmentBuildState> list, int i, Set<Integer> set) {
        SegmentBuildState segmentBuildState = list.get(i);
        HashSet newHashSet = Sets.newHashSet(Sets.difference(set, segmentBuildState.getCompleteReplicaSets()));
        if (newHashSet.isEmpty()) {
            return true;
        }
        for (int i2 = i + 1; i2 < list.size(); i2++) {
            SegmentBuildState segmentBuildState2 = list.get(i2);
            Set<Integer> completeReplicaSets = segmentBuildState2.getCompleteReplicaSets();
            Iterator it = newHashSet.iterator();
            while (it.hasNext()) {
                Integer num = (Integer) it.next();
                if (completeReplicaSets.contains(num)) {
                    logger.info("the replica set:{} doesn't have data for segment:{}, but have data for later segment:{}", num, segmentBuildState.getSegmentName(), segmentBuildState2.getSegmentName());
                    it.remove();
                }
            }
        }
        if (newHashSet.isEmpty()) {
            return true;
        }
        logger.debug("Not ready data for replica sets: {}", newHashSet);
        return false;
    }

    public Set<String> getCubeCheckList() {
        return this.cubeCheckList;
    }

    public DefaultChainedExecutable getStreamingCubingJob(CubeSegment cubeSegment) {
        return new StreamingCubingEngine().createStreamingCubingJob(cubeSegment, "SYSTEM");
    }

    void dumpSegmentBuildJobCheckList() {
        if (logger.isTraceEnabled()) {
            StringBuilder sb = new StringBuilder("Dump JobCheckList:\t");
            for (Map.Entry<String, ConcurrentSkipListSet<SegmentJobBuildInfo>> entry : this.segmentBuildJobCheckList.entrySet()) {
                sb.append(entry.getKey()).append(":").append(entry.getValue());
            }
            if (logger.isTraceEnabled()) {
                logger.trace(sb.toString());
            }
        }
    }
}
