package org.apache.kylin.stream.coordinator.coordinate;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.CubingJob;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.Segments;
import org.apache.kylin.stream.coordinator.assign.AssignmentsCache;
import org.apache.kylin.stream.coordinator.exception.ClusterStateException;
import org.apache.kylin.stream.coordinator.exception.StoreException;
import org.apache.kylin.stream.core.consumer.ConsumerStartProtocol;
import org.apache.kylin.stream.core.model.AssignRequest;
import org.apache.kylin.stream.core.model.ConsumerStatsResponse;
import org.apache.kylin.stream.core.model.CubeAssignment;
import org.apache.kylin.stream.core.model.Node;
import org.apache.kylin.stream.core.model.PauseConsumersRequest;
import org.apache.kylin.stream.core.model.ReplicaSet;
import org.apache.kylin.stream.core.model.ResumeConsumerRequest;
import org.apache.kylin.stream.core.model.StartConsumersRequest;
import org.apache.kylin.stream.core.model.StopConsumersRequest;
import org.apache.kylin.stream.core.source.ISourcePosition;
import org.apache.kylin.stream.core.source.ISourcePositionHandler;
import org.apache.kylin.stream.core.source.IStreamingSource;
import org.apache.kylin.stream.core.source.Partition;
import org.apache.kylin.stream.core.source.StreamingSourceFactory;
import org.apache.kylin.stream.core.util.HDFSUtil;
import org.apache.kylin.tool.shaded.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.kylin.tool.shaded.com.google.common.base.Function;
import org.apache.kylin.tool.shaded.com.google.common.collect.Collections2;
import org.apache.kylin.tool.shaded.com.google.common.collect.Lists;
import org.apache.kylin.tool.shaded.com.google.common.collect.MapDifference;
import org.apache.kylin.tool.shaded.com.google.common.collect.Maps;
import org.apache.kylin.tool.shaded.org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kylin/stream/coordinator/coordinate/ReceiverClusterManager.class */
public class ReceiverClusterManager {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) ReceiverClusterManager.class);
    StreamingCoordinator coordinator;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ReceiverClusterManager(StreamingCoordinator streamingCoordinator) {
        this.coordinator = streamingCoordinator;
    }

    public StreamingCoordinator getCoordinator() {
        return this.coordinator;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void doReBalance(List<CubeAssignment> list, List<CubeAssignment> list2) {
        HashMap newHashMap = Maps.newHashMap();
        HashMap newHashMap2 = Maps.newHashMap();
        for (CubeAssignment cubeAssignment : list) {
            newHashMap.put(cubeAssignment.getCubeName(), cubeAssignment);
        }
        for (CubeAssignment cubeAssignment2 : list2) {
            newHashMap2.put(cubeAssignment2.getCubeName(), cubeAssignment2);
        }
        try {
            Set keySet = newHashMap.keySet();
            Set keySet2 = newHashMap2.keySet();
            if (!keySet.equals(keySet2)) {
                logger.error("previous assignment cubes:{}, new assignment cubes:{}", keySet, keySet2);
                throw new IllegalStateException("previous cube assignments");
            }
            for (Map.Entry entry : Maps.difference(newHashMap, newHashMap2).entriesDiffering().entrySet()) {
                String str = (String) entry.getKey();
                MapDifference.ValueDifference valueDifference = (MapDifference.ValueDifference) entry.getValue();
                reassignCubeImpl(str, (CubeAssignment) valueDifference.leftValue(), (CubeAssignment) valueDifference.rightValue());
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void reassignCubeImpl(String str, CubeAssignment cubeAssignment, CubeAssignment cubeAssignment2) {
        logger.info("start cube reBalance, cube:{}, previous assignments:{}, new assignments:{}", str, cubeAssignment, cubeAssignment2);
        if (cubeAssignment2.equals(cubeAssignment)) {
            logger.info("the new assignment is the same as the previous assignment, do nothing for this reassignment");
            return;
        }
        doReassignWithoutCommit(getCoordinator().getCubeManager().getCube(str), cubeAssignment, cubeAssignment2);
        Iterator it = Maps.difference(cubeAssignment.getAssignments(), cubeAssignment2.getAssignments()).entriesOnlyOnLeft().keySet().iterator();
        while (it.hasNext()) {
            cubeAssignment2.addAssignment((Integer) it.next(), Lists.newArrayList());
        }
        logger.info("Commit reassign {} transaction.", str);
        getCoordinator().getStreamMetadataStore().saveNewCubeAssignment(cubeAssignment2);
        AssignmentsCache.getInstance().clearCubeCache(str);
    }

    void doReassignWithoutCommit(CubeInstance cubeInstance, CubeAssignment cubeAssignment, CubeAssignment cubeAssignment2) {
        String cubeName = cubeAssignment.getCubeName();
        IStreamingSource streamingSource = StreamingSourceFactory.getStreamingSource(cubeInstance);
        MapDifference difference = Maps.difference(cubeAssignment.getAssignments(), cubeAssignment2.getAssignments());
        Map entriesInCommon = difference.entriesInCommon();
        Map entriesOnlyOnRight = difference.entriesOnlyOnRight();
        Map entriesOnlyOnLeft = difference.entriesOnlyOnLeft();
        ArrayList newArrayList = Lists.newArrayList();
        ArrayList<ReplicaSet> newArrayList2 = Lists.newArrayList();
        try {
            Iterator<Map.Entry<Integer, List<Partition>>> it = cubeAssignment.getAssignments().entrySet().iterator();
            while (it.hasNext()) {
                Integer key = it.next().getKey();
                if (entriesInCommon.containsKey(key)) {
                    logger.info("the assignment is not changed for cube:{}, replicaSet:{}", cubeName, key);
                } else {
                    ReplicaSet replicaSet = getCoordinator().getStreamMetadataStore().getReplicaSet(key.intValue());
                    newArrayList.add(syncAndStopConsumersInRs(streamingSource, cubeName, replicaSet));
                    newArrayList2.add(replicaSet);
                }
            }
            ISourcePosition mergePositions = streamingSource.getSourcePositionHandler().mergePositions(newArrayList, ISourcePositionHandler.MergeStrategy.KEEP_LARGE);
            logger.info("the consumer position for cube:{} is:{}", cubeName, mergePositions);
            ArrayList<ReplicaSet> newArrayList3 = Lists.newArrayList();
            ArrayList<ReplicaSet> newArrayList4 = Lists.newArrayList();
            HashSet<Node> hashSet = new HashSet();
            try {
                for (Map.Entry<Integer, List<Partition>> entry : cubeAssignment2.getAssignments().entrySet()) {
                    Integer key2 = entry.getKey();
                    if (!entriesInCommon.containsKey(key2)) {
                        ReplicaSet replicaSet2 = getCoordinator().getStreamMetadataStore().getReplicaSet(key2.intValue());
                        logger.info("assign cube:{} to replicaSet:{}", cubeName, key2);
                        assignCubeToReplicaSet(replicaSet2, cubeName, entry.getValue(), false, true);
                        newArrayList3.add(replicaSet2);
                    }
                }
                Iterator<Map.Entry<Integer, List<Partition>>> it2 = cubeAssignment2.getAssignments().entrySet().iterator();
                while (it2.hasNext()) {
                    Integer key3 = it2.next().getKey();
                    if (!entriesInCommon.containsKey(key3)) {
                        ConsumerStartProtocol consumerStartProtocol = new ConsumerStartProtocol(streamingSource.getSourcePositionHandler().serializePosition(mergePositions.advance()));
                        ReplicaSet replicaSet3 = getCoordinator().getStreamMetadataStore().getReplicaSet(key3.intValue());
                        StartConsumersRequest startConsumersRequest = new StartConsumersRequest();
                        startConsumersRequest.setCube(cubeName);
                        startConsumersRequest.setStartProtocol(consumerStartProtocol);
                        logger.info("start consumers for cube:{}, replicaSet:{}, startRequest:{}", cubeName, key3, startConsumersRequest);
                        startConsumersInReplicaSet(replicaSet3, startConsumersRequest);
                        newArrayList4.add(replicaSet3);
                    }
                }
                Iterator it3 = entriesOnlyOnLeft.entrySet().iterator();
                while (it3.hasNext()) {
                    Integer num = (Integer) ((Map.Entry) it3.next()).getKey();
                    logger.info("make cube immutable for cube:{}, replicaSet{}", cubeName, num);
                    hashSet.addAll(makeCubeImmutableInReplicaSet(getCoordinator().getStreamMetadataStore().getReplicaSet(num.intValue()), cubeName));
                }
                if (!hashSet.isEmpty()) {
                    throw new IOException("Failed to convert to immutable state. ");
                }
                logger.info("Finish cube reassign for cube:{} .", cubeName);
            } catch (IOException e) {
                logger.error("Fail to start consumers for cube:" + cubeName, (Throwable) e);
                Set set = (Set) newArrayList4.stream().map((v0) -> {
                    return v0.getReplicaSetID();
                }).collect(Collectors.toSet());
                for (ReplicaSet replicaSet4 : newArrayList4) {
                    try {
                        StopConsumersRequest stopConsumersRequest = new StopConsumersRequest();
                        stopConsumersRequest.setCube(cubeName);
                        if (entriesOnlyOnRight.containsKey(Integer.valueOf(replicaSet4.getReplicaSetID()))) {
                            stopConsumersRequest.setRemoveData(true);
                        }
                        stopConsumersInReplicaSet(replicaSet4, stopConsumersRequest);
                        set.remove(Integer.valueOf(replicaSet4.getReplicaSetID()));
                    } catch (IOException e2) {
                        logger.error("fail to stop consumers for cube:" + cubeName + " replicaSet:" + replicaSet4.getReplicaSetID(), (Throwable) e2);
                    }
                }
                Set set2 = (Set) newArrayList3.stream().map((v0) -> {
                    return v0.getReplicaSetID();
                }).collect(Collectors.toSet());
                for (ReplicaSet replicaSet5 : newArrayList3) {
                    try {
                        assignCubeToReplicaSet(replicaSet5, cubeName, cubeAssignment.getPartitionsByReplicaSetID(Integer.valueOf(replicaSet5.getReplicaSetID())), true, true);
                        set2.remove(Integer.valueOf(replicaSet5.getReplicaSetID()));
                    } catch (IOException e3) {
                        logger.error("fail to start consumers for cube:" + cubeName + " replicaSet:" + replicaSet5.getReplicaSetID(), (Throwable) e3);
                    }
                }
                HashSet hashSet2 = new HashSet(hashSet);
                for (Node node : hashSet) {
                    try {
                        getCoordinator().makeCubeImmutableForReceiver(node, cubeName);
                        hashSet2.remove(node);
                    } catch (IOException e4) {
                        logger.error("fail to make cube immutable for cube:" + cubeName + " to " + node, (Throwable) e4);
                    }
                }
                StringBuilder sb = new StringBuilder();
                try {
                    sb.append("FailStarted:").append(JsonUtil.writeValueAsString(set)).append(";");
                    sb.append("FailAssigned:").append(JsonUtil.writeValueAsString(set2)).append(";");
                    sb.append("FailRemotedPresisted:").append(JsonUtil.writeValueAsString(hashSet2));
                } catch (JsonProcessingException e5) {
                    logger.error("", (Throwable) e5);
                }
                String sb2 = sb.toString();
                if (!set.isEmpty()) {
                    throw new ClusterStateException(cubeName, ClusterStateException.ClusterState.ROLLBACK_FAILED, ClusterStateException.TransactionStep.START_NEW, sb2, e);
                }
                if (!set2.isEmpty()) {
                    throw new ClusterStateException(cubeName, ClusterStateException.ClusterState.ROLLBACK_FAILED, ClusterStateException.TransactionStep.ASSIGN_NEW, sb2, e);
                }
                if (!hashSet2.isEmpty()) {
                    throw new ClusterStateException(cubeName, ClusterStateException.ClusterState.ROLLBACK_FAILED, ClusterStateException.TransactionStep.MAKE_IMMUTABLE, sb2, e);
                }
                throw new ClusterStateException(cubeName, ClusterStateException.ClusterState.ROLLBACK_SUCCESS, ClusterStateException.TransactionStep.ASSIGN_NEW, sb2, e);
            }
        } catch (Exception e6) {
            logger.error("fail to sync assign replicaSet for cube:" + cubeName, (Throwable) e6);
            Set set3 = (Set) newArrayList2.stream().map((v0) -> {
                return v0.getReplicaSetID();
            }).collect(Collectors.toSet());
            for (ReplicaSet replicaSet6 : newArrayList2) {
                StartConsumersRequest startConsumersRequest2 = new StartConsumersRequest();
                startConsumersRequest2.setCube(cubeName);
                try {
                    startConsumersInReplicaSet(replicaSet6, startConsumersRequest2);
                    set3.remove(Integer.valueOf(replicaSet6.getReplicaSetID()));
                } catch (IOException e7) {
                    logger.error("fail to start consumers for cube:" + cubeName + " replicaSet:" + replicaSet6.getReplicaSetID(), (Throwable) e7);
                }
            }
            if (set3.isEmpty()) {
                throw new ClusterStateException(cubeName, ClusterStateException.ClusterState.ROLLBACK_SUCCESS, ClusterStateException.TransactionStep.STOP_AND_SNYC, "", e6);
            }
            StringBuilder sb3 = new StringBuilder();
            try {
                sb3.append("Fail restart:").append(JsonUtil.writeValueAsString(set3));
            } catch (JsonProcessingException e8) {
                logger.error("", (Throwable) e8);
            }
            throw new ClusterStateException(cubeName, ClusterStateException.ClusterState.ROLLBACK_FAILED, ClusterStateException.TransactionStep.STOP_AND_SNYC, sb3.toString(), e6);
        }
    }

    ISourcePosition syncAndStopConsumersInRs(final IStreamingSource iStreamingSource, String str, ReplicaSet replicaSet) throws IOException {
        if (replicaSet.getNodes().size() <= 1) {
            if (replicaSet.getNodes().size() != 1) {
                return null;
            }
            Node next = replicaSet.getNodes().iterator().next();
            StopConsumersRequest stopConsumersRequest = new StopConsumersRequest();
            stopConsumersRequest.setCube(str);
            logger.info("stop consumers for cube:{}, receiver:{}", str, next);
            return iStreamingSource.getSourcePositionHandler().parsePosition(stopConsumersInReplicaSet(replicaSet, stopConsumersRequest).get(0).getConsumePosition());
        }
        logger.info("sync consume for cube:{}, replicaSet:{}", str, Integer.valueOf(replicaSet.getReplicaSetID()));
        PauseConsumersRequest pauseConsumersRequest = new PauseConsumersRequest();
        pauseConsumersRequest.setCube(str);
        ISourcePosition mergePositions = iStreamingSource.getSourcePositionHandler().mergePositions(Lists.transform(pauseConsumersInReplicaSet(replicaSet, pauseConsumersRequest), new Function<ConsumerStatsResponse, ISourcePosition>() { // from class: org.apache.kylin.stream.coordinator.coordinate.ReceiverClusterManager.1
            @Override // org.apache.kylin.tool.shaded.com.google.common.base.Function
            @Nullable
            public ISourcePosition apply(@Nullable ConsumerStatsResponse consumerStatsResponse) {
                return iStreamingSource.getSourcePositionHandler().parsePosition(consumerStatsResponse.getConsumePosition());
            }
        }), ISourcePositionHandler.MergeStrategy.KEEP_LARGE);
        ResumeConsumerRequest resumeConsumerRequest = new ResumeConsumerRequest();
        resumeConsumerRequest.setCube(str);
        resumeConsumerRequest.setResumeToPosition(iStreamingSource.getSourcePositionHandler().serializePosition(mergePositions));
        resumeConsumersInReplicaSet(replicaSet, resumeConsumerRequest);
        return mergePositions;
    }

    void startConsumersInReplicaSet(ReplicaSet replicaSet, StartConsumersRequest startConsumersRequest) throws IOException {
        Iterator<Node> it = replicaSet.getNodes().iterator();
        while (it.hasNext()) {
            getCoordinator().startConsumersForReceiver(it.next(), startConsumersRequest);
        }
    }

    List<Node> makeCubeImmutableInReplicaSet(ReplicaSet replicaSet, String str) {
        ArrayList arrayList = new ArrayList();
        for (Node node : replicaSet.getNodes()) {
            try {
                getCoordinator().makeCubeImmutableForReceiver(node, str);
            } catch (IOException e) {
                logger.error(String.format(Locale.ROOT, "Convert %s to immutable for node %s failed.", str, node.toNormalizeString()), (Throwable) e);
                arrayList.add(node);
            }
        }
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<ConsumerStatsResponse> resumeConsumersInReplicaSet(ReplicaSet replicaSet, ResumeConsumerRequest resumeConsumerRequest) throws IOException {
        ArrayList newArrayList = Lists.newArrayList();
        Iterator<Node> it = replicaSet.getNodes().iterator();
        while (it.hasNext()) {
            newArrayList.add(getCoordinator().resumeConsumersForReceiver(it.next(), resumeConsumerRequest));
        }
        return newArrayList;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<ConsumerStatsResponse> stopConsumersInReplicaSet(ReplicaSet replicaSet, StopConsumersRequest stopConsumersRequest) throws IOException {
        ArrayList newArrayList = Lists.newArrayList();
        Iterator<Node> it = replicaSet.getNodes().iterator();
        while (it.hasNext()) {
            newArrayList.add(getCoordinator().stopConsumersForReceiver(it.next(), stopConsumersRequest));
        }
        return newArrayList;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<ConsumerStatsResponse> pauseConsumersInReplicaSet(ReplicaSet replicaSet, PauseConsumersRequest pauseConsumersRequest) throws IOException {
        ArrayList newArrayList = Lists.newArrayList();
        ArrayList newArrayList2 = Lists.newArrayList();
        try {
            for (Node node : replicaSet.getNodes()) {
                newArrayList.add(getCoordinator().pauseConsumersForReceiver(node, pauseConsumersRequest));
                newArrayList2.add(node);
            }
            return newArrayList;
        } catch (IOException e) {
            logger.info("Roll back pause consumers for receivers: {}", newArrayList2);
            ResumeConsumerRequest resumeConsumerRequest = new ResumeConsumerRequest();
            resumeConsumerRequest.setCube(pauseConsumersRequest.getCube());
            Iterator it = newArrayList2.iterator();
            while (it.hasNext()) {
                getCoordinator().resumeConsumersForReceiver((Node) it.next(), resumeConsumerRequest);
            }
            throw e;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void assignCubeToReplicaSet(ReplicaSet replicaSet, String str, List<Partition> list, boolean z, boolean z2) throws IOException {
        boolean z3 = false;
        IOException iOException = null;
        AssignRequest assignRequest = new AssignRequest();
        assignRequest.setCubeName(str);
        assignRequest.setPartitions(list);
        assignRequest.setStartConsumers(z);
        for (Node node : replicaSet.getNodes()) {
            try {
                getCoordinator().assignToReceiver(node, assignRequest);
                z3 = true;
            } catch (IOException e) {
                if (z2) {
                    throw e;
                }
                iOException = e;
                logger.error("Cube:" + str + " consumers start fail for node:" + node.toString(), (Throwable) e);
            }
        }
        if (!z3 && iOException != null) {
            throw iOException;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean segmentBuildComplete(CubingJob cubingJob, CubeInstance cubeInstance, CubeSegment cubeSegment, SegmentJobBuildInfo segmentJobBuildInfo) {
        String str = segmentJobBuildInfo.cubeName;
        String str2 = segmentJobBuildInfo.segmentName;
        if (!checkPreviousSegmentReady(cubeSegment)) {
            logger.warn("Segment:{}'s previous segment is not ready, will not set the segment to ready.", cubeSegment);
            return false;
        }
        if (SegmentStatusEnum.READY.equals(cubeSegment.getStatus())) {
            logger.debug("Segment status is: {}", cubeSegment.getStatus());
        } else {
            try {
                promoteNewSegment(cubingJob, cubeInstance, cubeSegment);
                logger.debug("Promote {} succeed.", str2);
            } catch (IOException e) {
                throw new StoreException("Promote failed because of metadata store.", e);
            }
        }
        CubeAssignment assignmentsByCube = getCoordinator().getStreamMetadataStore().getAssignmentsByCube(str);
        Iterator<Integer> it = assignmentsByCube.getReplicaSetIDs().iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            ReplicaSet replicaSet = getCoordinator().getStreamMetadataStore().getReplicaSet(intValue);
            for (Node node : replicaSet.getNodes()) {
                try {
                    getCoordinator().notifyReceiverBuildSuccess(node, str, str2);
                } catch (IOException e2) {
                    logger.error("error when remove cube segment for receiver:" + node, (Throwable) e2);
                }
            }
            if (assignmentsByCube.getPartitionsByReplicaSetID(Integer.valueOf(intValue)).isEmpty()) {
                logger.info("No partition is assign to the replicaSet:{}, check whether there are local segments on the rs.", Integer.valueOf(intValue));
                Node leader = replicaSet.getLeader();
                try {
                    if (getCoordinator().getReceiverAdminClient().getReceiverCubeStats(leader, str).getSegmentStatsMap().keySet().isEmpty()) {
                        logger.info("no local segments exist for replicaSet:{}, cube:{}, update assignments.", Integer.valueOf(intValue), str);
                        assignmentsByCube.removeAssignment(Integer.valueOf(intValue));
                        getCoordinator().getStreamMetadataStore().saveNewCubeAssignment(assignmentsByCube);
                    }
                } catch (IOException e3) {
                    logger.error("error when get receiver cube stats from:" + leader, (Throwable) e3);
                }
            }
        }
        getCoordinator().getStreamMetadataStore().removeSegmentBuildState(str, str2);
        logger.info("Try to remove the hdfs files for cube:{} segment:{}", str, str2);
        removeHDFSFiles(str, str2);
        return true;
    }

    void promoteNewSegment(CubingJob cubingJob, CubeInstance cubeInstance, CubeSegment cubeSegment) throws IOException {
        logger.debug("Try transfer segment's {} state to ready.", cubeSegment.getName());
        long findSourceRecordCount = cubingJob.findSourceRecordCount();
        long findSourceSizeBytes = cubingJob.findSourceSizeBytes();
        long findCubeSizeBytes = cubingJob.findCubeSizeBytes();
        Map<Integer, String> sourceCheckpoint = getCoordinator().getStreamMetadataStore().getSourceCheckpoint(cubeInstance.getName(), cubeSegment.getName());
        final ISourcePositionHandler sourcePositionHandler = StreamingSourceFactory.getStreamingSource(cubeInstance).getSourcePositionHandler();
        ISourcePosition mergePositions = sourcePositionHandler.mergePositions(Collections2.transform(sourceCheckpoint.values(), new Function<String, ISourcePosition>() { // from class: org.apache.kylin.stream.coordinator.coordinate.ReceiverClusterManager.2
            @Override // org.apache.kylin.tool.shaded.com.google.common.base.Function
            @Nullable
            public ISourcePosition apply(@Nullable String str) {
                return sourcePositionHandler.parsePosition(str);
            }
        }), ISourcePositionHandler.MergeStrategy.KEEP_SMALL);
        cubeSegment.setLastBuildJobID(cubingJob.getId());
        cubeSegment.setLastBuildTime(System.currentTimeMillis());
        cubeSegment.setSizeKB(findCubeSizeBytes / FileUtils.ONE_KB);
        cubeSegment.setInputRecords(findSourceRecordCount);
        cubeSegment.setInputRecordsSize(findSourceSizeBytes);
        cubeSegment.setStreamSourceCheckpoint(sourcePositionHandler.serializePosition(mergePositions));
        getCoordinator().getCubeManager().promoteNewlyBuiltSegments(cubeInstance, cubeSegment);
    }

    boolean checkPreviousSegmentReady(CubeSegment cubeSegment) {
        long longValue = ((Long) cubeSegment.getTSRange().start.v).longValue();
        Segments<CubeSegment> segments = cubeSegment.getCubeInstance().getSegments();
        long j = -1;
        Iterator<T> it = segments.iterator();
        while (it.hasNext()) {
            long longValue2 = ((Long) ((CubeSegment) it.next()).getTSRange().end.v).longValue();
            if (longValue2 <= longValue && longValue2 > j) {
                j = longValue2;
            }
        }
        if (j == -1) {
            return true;
        }
        Iterator<T> it2 = segments.iterator();
        while (it2.hasNext()) {
            CubeSegment cubeSegment2 = (CubeSegment) it2.next();
            if (((Long) cubeSegment2.getTSRange().end.v).longValue() == j && SegmentStatusEnum.READY.equals(cubeSegment2.getStatus())) {
                return true;
            }
        }
        return false;
    }

    private void removeHDFSFiles(String str, String str2) {
        String streamingSegmentFilePath = HDFSUtil.getStreamingSegmentFilePath(str, str2);
        try {
            FileSystem fileSystem = HadoopUtil.getFileSystem(streamingSegmentFilePath);
            logger.info("Deleting segment data in HDFS {}", streamingSegmentFilePath);
            fileSystem.delete(new Path(streamingSegmentFilePath), true);
        } catch (Exception e) {
            logger.error("error when remove hdfs file, hdfs path:{}", streamingSegmentFilePath);
        }
    }
}
