package org.apache.kylin.stream.coordinator;

import com.fasterxml.jackson.core.JsonProcessingException;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
import org.apache.hadoop.fs.Path;
import org.apache.kafka.clients.Metadata;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.common.util.ServerMode;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.CubingJob;
import org.apache.kylin.engine.mr.StreamingCubingEngine;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.metadata.model.SegmentRange;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.Segments;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
import org.apache.kylin.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.kylin.shaded.com.google.common.base.Function;
import org.apache.kylin.shaded.com.google.common.collect.Collections2;
import org.apache.kylin.shaded.com.google.common.collect.Lists;
import org.apache.kylin.shaded.com.google.common.collect.MapDifference;
import org.apache.kylin.shaded.com.google.common.collect.Maps;
import org.apache.kylin.shaded.com.google.common.collect.Sets;
import org.apache.kylin.stream.coordinator.assign.Assigner;
import org.apache.kylin.stream.coordinator.assign.AssignmentUtil;
import org.apache.kylin.stream.coordinator.assign.AssignmentsCache;
import org.apache.kylin.stream.coordinator.assign.CubePartitionRoundRobinAssigner;
import org.apache.kylin.stream.coordinator.assign.DefaultAssigner;
import org.apache.kylin.stream.coordinator.client.CoordinatorClient;
import org.apache.kylin.stream.coordinator.exception.ClusterStateException;
import org.apache.kylin.stream.coordinator.exception.CoordinateException;
import org.apache.kylin.stream.coordinator.exception.NotLeadCoordinatorException;
import org.apache.kylin.stream.coordinator.exception.StoreException;
import org.apache.kylin.stream.core.client.HttpReceiverAdminClient;
import org.apache.kylin.stream.core.client.ReceiverAdminClient;
import org.apache.kylin.stream.core.consumer.ConsumerStartProtocol;
import org.apache.kylin.stream.core.model.AssignRequest;
import org.apache.kylin.stream.core.model.ConsumerStatsResponse;
import org.apache.kylin.stream.core.model.CubeAssignment;
import org.apache.kylin.stream.core.model.Node;
import org.apache.kylin.stream.core.model.PauseConsumersRequest;
import org.apache.kylin.stream.core.model.ReplicaSet;
import org.apache.kylin.stream.core.model.ResumeConsumerRequest;
import org.apache.kylin.stream.core.model.SegmentBuildState;
import org.apache.kylin.stream.core.model.StartConsumersRequest;
import org.apache.kylin.stream.core.model.StopConsumersRequest;
import org.apache.kylin.stream.core.model.StreamingCubeConsumeState;
import org.apache.kylin.stream.core.model.UnAssignRequest;
import org.apache.kylin.stream.core.source.ISourcePosition;
import org.apache.kylin.stream.core.source.ISourcePositionHandler;
import org.apache.kylin.stream.core.source.IStreamingSource;
import org.apache.kylin.stream.core.source.Partition;
import org.apache.kylin.stream.core.source.StreamingSourceFactory;
import org.apache.kylin.stream.core.util.HDFSUtil;
import org.apache.kylin.stream.core.util.NamedThreadFactory;
import org.apache.kylin.stream.core.util.NodeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Deprecated
/* loaded from: input_file:WEB-INF/lib/kylin-stream-coordinator-3.1.3.jar:org/apache/kylin/stream/coordinator/Coordinator.class */
public class Coordinator implements CoordinatorClient {
    private static final int DEFAULT_PORT = 7070;
    private StreamMetadataStore streamMetadataStore;
    private Assigner assigner;
    private ReceiverAdminClient receiverAdminClient;
    private CuratorFramework zkClient;
    private CoordinatorLeaderSelector selector;
    private volatile boolean isLead;
    private ScheduledExecutorService streamingJobCheckExecutor;
    private StreamingBuildJobStatusChecker jobStatusChecker;
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) Coordinator.class);
    private static volatile Coordinator instance = null;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/kylin-stream-coordinator-3.1.3.jar:org/apache/kylin/stream/coordinator/Coordinator$CoordinatorLeaderSelector.class */
    public class CoordinatorLeaderSelector extends LeaderSelectorListenerAdapter implements Closeable {
        private LeaderSelector leaderSelector;

        public CoordinatorLeaderSelector() {
            this.leaderSelector = new LeaderSelector(Coordinator.this.zkClient, StreamingUtils.COORDINATOR_LEAD, this);
            this.leaderSelector.autoRequeue();
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            this.leaderSelector.close();
        }

        public void start() {
            this.leaderSelector.start();
        }

        @Override // org.apache.curator.framework.recipes.leader.LeaderSelectorListener
        public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
            Coordinator.logger.info("current node become the lead coordinator");
            Coordinator.this.streamMetadataStore.setCoordinatorNode(NodeUtil.getCurrentNode(Coordinator.DEFAULT_PORT));
            Coordinator.this.isLead = true;
            Coordinator.this.restoreJobStatusChecker();
            do {
                try {
                    Thread.sleep(Metadata.TOPIC_EXPIRY_MS);
                } catch (InterruptedException e) {
                    Thread.interrupted();
                }
            } while (this.leaderSelector.hasLeadership());
            Coordinator.logger.info("become the follower coordinator");
            Coordinator.this.isLead = false;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/kylin-stream-coordinator-3.1.3.jar:org/apache/kylin/stream/coordinator/Coordinator$SegmentJobBuildInfo.class */
    public class SegmentJobBuildInfo implements Comparable<SegmentJobBuildInfo> {
        public String cubeName;
        public String segmentName;
        public String jobID;
        public int retryCnt = 0;

        public SegmentJobBuildInfo(String str, String str2, String str3) {
            this.cubeName = str;
            this.segmentName = str2;
            this.jobID = str3;
        }

        public String toString() {
            return "SegmentJobBuildInfo{cubeName='" + this.cubeName + "', segmentName='" + this.segmentName + "', jobID='" + this.jobID + "', retryCnt=" + this.retryCnt + '}';
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            SegmentJobBuildInfo segmentJobBuildInfo = (SegmentJobBuildInfo) obj;
            if (this.cubeName != null) {
                if (!this.cubeName.equals(segmentJobBuildInfo.cubeName)) {
                    return false;
                }
            } else if (segmentJobBuildInfo.cubeName != null) {
                return false;
            }
            if (this.segmentName != null) {
                if (!this.segmentName.equals(segmentJobBuildInfo.segmentName)) {
                    return false;
                }
            } else if (segmentJobBuildInfo.segmentName != null) {
                return false;
            }
            return this.jobID != null ? this.jobID.equals(segmentJobBuildInfo.jobID) : segmentJobBuildInfo.jobID == null;
        }

        public int hashCode() {
            return (31 * ((31 * (this.cubeName != null ? this.cubeName.hashCode() : 0)) + (this.segmentName != null ? this.segmentName.hashCode() : 0))) + (this.jobID != null ? this.jobID.hashCode() : 0);
        }

        @Override // java.lang.Comparable
        public int compareTo(SegmentJobBuildInfo segmentJobBuildInfo) {
            return !this.cubeName.equals(segmentJobBuildInfo.cubeName) ? this.cubeName.compareTo(segmentJobBuildInfo.cubeName) : this.segmentName.compareTo(segmentJobBuildInfo.segmentName);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/kylin-stream-coordinator-3.1.3.jar:org/apache/kylin/stream/coordinator/Coordinator$StreamingBuildJobStatusChecker.class */
    public class StreamingBuildJobStatusChecker implements Runnable {
        private int maxJobTryCnt;
        private ConcurrentMap<String, ConcurrentSkipListSet<SegmentJobBuildInfo>> segmentBuildJobMap;
        private CopyOnWriteArrayList<String> pendingCubeName;

        private StreamingBuildJobStatusChecker() {
            this.maxJobTryCnt = 5;
            this.segmentBuildJobMap = Maps.newConcurrentMap();
            this.pendingCubeName = Lists.newCopyOnWriteArrayList();
        }

        public void addSegmentBuildJob(SegmentJobBuildInfo segmentJobBuildInfo) {
            ConcurrentSkipListSet<SegmentJobBuildInfo> concurrentSkipListSet = this.segmentBuildJobMap.get(segmentJobBuildInfo.cubeName);
            if (concurrentSkipListSet == null) {
                concurrentSkipListSet = new ConcurrentSkipListSet<>();
                ConcurrentSkipListSet<SegmentJobBuildInfo> putIfAbsent = this.segmentBuildJobMap.putIfAbsent(segmentJobBuildInfo.cubeName, concurrentSkipListSet);
                if (putIfAbsent != null) {
                    concurrentSkipListSet = putIfAbsent;
                }
            }
            concurrentSkipListSet.add(segmentJobBuildInfo);
        }

        public void addPendingCube(String str) {
            if (this.pendingCubeName.contains(str)) {
                return;
            }
            this.pendingCubeName.add(str);
        }

        public void clearCheckCube(String str) {
            if (this.pendingCubeName.contains(str)) {
                this.pendingCubeName.remove(str);
            }
            this.segmentBuildJobMap.remove(str);
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                if (Coordinator.this.isLead) {
                    doRun();
                }
            } catch (Exception e) {
                Coordinator.logger.error("error", (Throwable) e);
            }
        }

        private void doRun() {
            ArrayList<SegmentJobBuildInfo> newArrayList = Lists.newArrayList();
            for (ConcurrentSkipListSet<SegmentJobBuildInfo> concurrentSkipListSet : this.segmentBuildJobMap.values()) {
                if (!concurrentSkipListSet.isEmpty()) {
                    SegmentJobBuildInfo first = concurrentSkipListSet.first();
                    Coordinator.logger.info("check the cube:{} segment:{} build status", first.cubeName, first.segmentName);
                    try {
                        CubingJob cubingJob = (CubingJob) Coordinator.this.getExecutableManager().getJob(first.jobID);
                        ExecutableState status = cubingJob.getStatus();
                        if (ExecutableState.SUCCEED.equals(status)) {
                            Coordinator.logger.info("job:{} is complete", first);
                            CubeInstance latestCopyForWrite = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(first.cubeName).latestCopyForWrite();
                            CubeSegment segment = latestCopyForWrite.getSegment(first.segmentName, null);
                            Coordinator.logger.info("the cube:{} segment:{} is ready", first.cubeName, first.segmentName);
                            Coordinator.this.segmentBuildComplete(cubingJob, latestCopyForWrite, segment, first);
                            newArrayList.add(first);
                        } else if (ExecutableState.ERROR.equals(status) && first.retryCnt < this.maxJobTryCnt) {
                            Coordinator.logger.info("job:{} is error, resume the job", first);
                            Coordinator.this.getExecutableManager().resumeJob(first.jobID);
                            first.retryCnt++;
                        }
                    } catch (Exception e) {
                        Coordinator.logger.error("error when check streaming segment job build state:" + first, (Throwable) e);
                    }
                }
            }
            for (SegmentJobBuildInfo segmentJobBuildInfo : newArrayList) {
                this.segmentBuildJobMap.get(segmentJobBuildInfo.cubeName).remove(segmentJobBuildInfo);
            }
            ArrayList newArrayList2 = Lists.newArrayList();
            Iterator<String> it = this.pendingCubeName.iterator();
            while (it.hasNext()) {
                String next = it.next();
                Coordinator.logger.info("check the pending cube:{} ", next);
                try {
                    if (Coordinator.this.tryFindAndBuildSegment(next)) {
                        newArrayList2.add(next);
                    }
                } catch (Exception e2) {
                    Coordinator.logger.error("error when try to find and build cube segment:{}" + next, (Throwable) e2);
                }
            }
            Iterator it2 = newArrayList2.iterator();
            while (it2.hasNext()) {
                this.pendingCubeName.remove((String) it2.next());
            }
        }
    }

    private Coordinator() {
        this.isLead = false;
        this.streamMetadataStore = StreamMetadataStoreFactory.getStreamMetaDataStore();
        this.receiverAdminClient = new HttpReceiverAdminClient();
        this.assigner = getAssigner();
        this.zkClient = StreamingUtils.getZookeeperClient();
        this.selector = new CoordinatorLeaderSelector();
        this.jobStatusChecker = new StreamingBuildJobStatusChecker();
        this.streamingJobCheckExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("streaming_job_status_checker"));
        if (ServerMode.SERVER_MODE.canServeStreamingCoordinator()) {
            start();
        }
    }

    @VisibleForTesting
    public Coordinator(StreamMetadataStore streamMetadataStore, ReceiverAdminClient receiverAdminClient) {
        this.isLead = false;
        this.streamMetadataStore = streamMetadataStore;
        this.receiverAdminClient = receiverAdminClient;
        this.assigner = getAssigner();
        this.zkClient = StreamingUtils.getZookeeperClient();
        this.selector = new CoordinatorLeaderSelector();
        this.jobStatusChecker = new StreamingBuildJobStatusChecker();
        this.streamingJobCheckExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("streaming_job_status_checker"));
        if (ServerMode.SERVER_MODE.canServeStreamingCoordinator()) {
            start();
        }
        this.isLead = true;
    }

    public static Coordinator getInstance() {
        if (instance == null) {
            synchronized (Coordinator.class) {
                if (instance == null) {
                    instance = new Coordinator();
                }
            }
        }
        return instance;
    }

    public void start() {
        this.selector.start();
        this.streamingJobCheckExecutor.scheduleAtFixedRate(this.jobStatusChecker, 0L, 2L, TimeUnit.MINUTES);
    }

    @VisibleForTesting
    public void setReceiverAdminClient(ReceiverAdminClient receiverAdminClient) {
        this.receiverAdminClient = receiverAdminClient;
    }

    @VisibleForTesting
    public void setToLeader() {
        this.streamMetadataStore.setCoordinatorNode(NodeUtil.getCurrentNode(DEFAULT_PORT));
        this.isLead = true;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void restoreJobStatusChecker() {
        logger.info("restore job status checker");
        for (String str : this.streamMetadataStore.getCubes()) {
            List<SegmentBuildState> segmentBuildStates = this.streamMetadataStore.getSegmentBuildStates(str);
            Collections.sort(segmentBuildStates);
            for (SegmentBuildState segmentBuildState : segmentBuildStates) {
                if (segmentBuildState.isInBuilding()) {
                    this.jobStatusChecker.addSegmentBuildJob(new SegmentJobBuildInfo(str, segmentBuildState.getSegmentName(), segmentBuildState.getState().getJobId()));
                }
            }
        }
    }

    @Override // org.apache.kylin.stream.coordinator.client.CoordinatorClient
    public synchronized void assignCube(String str) {
        checkLead();
        this.streamMetadataStore.addStreamingCube(str);
        StreamingCubeInfo streamCubeInfo = getStreamCubeInfo(str);
        if (this.streamMetadataStore.getAssignmentsByCube(streamCubeInfo.getCubeName()) != null) {
            logger.warn("cube " + streamCubeInfo.getCubeName() + " is already assigned.");
            return;
        }
        List<ReplicaSet> replicaSets = this.streamMetadataStore.getReplicaSets();
        if (replicaSets == null || replicaSets.isEmpty()) {
            throw new IllegalStateException("no replicaSet is configured in system");
        }
        doAssignCube(str, this.assigner.assign(streamCubeInfo, replicaSets, this.streamMetadataStore.getAllCubeAssignments()));
    }

    @Override // org.apache.kylin.stream.coordinator.client.CoordinatorClient
    public void unAssignCube(String str) {
        checkLead();
        CubeAssignment assignmentsByCube = this.streamMetadataStore.getAssignmentsByCube(str);
        if (assignmentsByCube == null) {
            return;
        }
        ArrayList newArrayList = Lists.newArrayList();
        try {
            logger.info("send unAssign cube:{} request to receivers", str);
            Iterator<Integer> it = assignmentsByCube.getReplicaSetIDs().iterator();
            while (it.hasNext()) {
                ReplicaSet replicaSet = this.streamMetadataStore.getReplicaSet(it.next().intValue());
                UnAssignRequest unAssignRequest = new UnAssignRequest();
                unAssignRequest.setCube(str);
                Iterator<Node> it2 = replicaSet.getNodes().iterator();
                while (it2.hasNext()) {
                    try {
                        unAssignToReceiver(it2.next(), unAssignRequest);
                    } catch (Exception e) {
                        logger.error("exception throws when unAssign receiver", (Throwable) e);
                    }
                }
            }
            logger.info("remove temp hdfs files");
            removeCubeHDFSFiles(str);
            logger.info("clear cube info from job check list");
            this.jobStatusChecker.clearCheckCube(str);
            logger.info("remove cube info in metadata store");
            this.streamMetadataStore.removeStreamingCube(str);
            AssignmentsCache.getInstance().clearCubeCache(str);
            if (newArrayList.size() > 0) {
                throw new CoordinateException("unAssign fail for receivers:" + newArrayList);
            }
        } catch (Exception e2) {
            throw new CoordinateException(e2);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.client.CoordinatorClient
    public synchronized void reAssignCube(String str, CubeAssignment cubeAssignment) {
        checkLead();
        CubeAssignment assignmentsByCube = this.streamMetadataStore.getAssignmentsByCube(str);
        if (assignmentsByCube != null) {
            reassignCubeImpl(str, assignmentsByCube, cubeAssignment);
        } else {
            logger.info("no previous cube assign exists, use the new assignment:{}", cubeAssignment);
            doAssignCube(str, cubeAssignment);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.client.CoordinatorClient
    public void segmentRemoteStoreComplete(Node node, String str, Pair<Long, Long> pair) {
        checkLead();
        logger.info("segment remote store complete signal received for cube:{}, segment:{}, try to find proper segment to build", str, pair);
        tryFindAndBuildSegment(str);
    }

    @Override // org.apache.kylin.stream.coordinator.client.CoordinatorClient
    public Map<Integer, Map<String, List<Partition>>> reBalanceRecommend() {
        checkLead();
        return reBalancePlan(getEnableStreamingCubes(), this.streamMetadataStore.getReplicaSets());
    }

    @Override // org.apache.kylin.stream.coordinator.client.CoordinatorClient
    public synchronized void reBalance(Map<Integer, Map<String, List<Partition>>> map) {
        checkLead();
        doReBalance(this.streamMetadataStore.getAllCubeAssignments(), AssignmentUtil.convertReplicaSetAssign2CubeAssign(map));
    }

    @Override // org.apache.kylin.stream.coordinator.client.CoordinatorClient
    public void pauseConsumers(String str) {
        checkLead();
        CubeAssignment assignmentsByCube = this.streamMetadataStore.getAssignmentsByCube(str);
        PauseConsumersRequest pauseConsumersRequest = new PauseConsumersRequest();
        pauseConsumersRequest.setCube(str);
        try {
            Iterator<Integer> it = assignmentsByCube.getReplicaSetIDs().iterator();
            while (it.hasNext()) {
                pauseConsumersInReplicaSet(this.streamMetadataStore.getReplicaSet(it.next().intValue()), pauseConsumersRequest);
            }
            this.streamMetadataStore.saveStreamingCubeConsumeState(str, StreamingCubeConsumeState.PAUSED);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.client.CoordinatorClient
    public void resumeConsumers(String str) {
        checkLead();
        CubeAssignment assignmentsByCube = this.streamMetadataStore.getAssignmentsByCube(str);
        ResumeConsumerRequest resumeConsumerRequest = new ResumeConsumerRequest();
        resumeConsumerRequest.setCube(str);
        try {
            Iterator<Integer> it = assignmentsByCube.getReplicaSetIDs().iterator();
            while (it.hasNext()) {
                resumeConsumersInReplicaSet(this.streamMetadataStore.getReplicaSet(it.next().intValue()), resumeConsumerRequest);
            }
            this.streamMetadataStore.saveStreamingCubeConsumeState(str, StreamingCubeConsumeState.RUNNING);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.client.CoordinatorClient
    public void replicaSetLeaderChange(int i, Node node) {
        checkLead();
        Map<String, List<Partition>> assignmentsByReplicaSet = this.streamMetadataStore.getAssignmentsByReplicaSet(i);
        if (assignmentsByReplicaSet == null || assignmentsByReplicaSet.isEmpty()) {
            return;
        }
        Iterator<String> it = assignmentsByReplicaSet.keySet().iterator();
        while (it.hasNext()) {
            AssignmentsCache.getInstance().clearCubeCache(it.next());
        }
    }

    private void checkLead() {
        if (this.isLead) {
            return;
        }
        try {
            throw new NotLeadCoordinatorException("Current coordinator is not lead, please check host " + this.streamMetadataStore.getCoordinatorNode());
        } catch (StoreException e) {
            throw new NotLeadCoordinatorException("Lead coordinator can not found.", e);
        }
    }

    public StreamingCubeInfo getStreamCubeInfo(String str) {
        CubeInstance cube = CubeManager.getInstance(getConfig()).getCube(str);
        if (cube == null) {
            return null;
        }
        return new StreamingCubeInfo(str, StreamingSourceFactory.getStreamingSource(cube).load(str), cube.getConfig().getStreamingCubeConsumerTasksNum());
    }

    private KylinConfig getConfig() {
        return KylinConfig.getInstanceFromEnv();
    }

    private void doAssignCube(String str, CubeAssignment cubeAssignment) {
        HashSet<ReplicaSet> newHashSet = Sets.newHashSet();
        try {
            for (Integer num : cubeAssignment.getReplicaSetIDs()) {
                ReplicaSet replicaSet = this.streamMetadataStore.getReplicaSet(num.intValue());
                assignCubeToReplicaSet(replicaSet, str, cubeAssignment.getPartitionsByReplicaSetID(num), true, false);
                newHashSet.add(replicaSet);
            }
            this.streamMetadataStore.saveNewCubeAssignment(cubeAssignment);
        } catch (Exception e) {
            for (ReplicaSet replicaSet2 : newHashSet) {
                try {
                    UnAssignRequest unAssignRequest = new UnAssignRequest();
                    unAssignRequest.setCube(str);
                    unAssignFromReplicaSet(replicaSet2, unAssignRequest);
                } catch (IOException e2) {
                    logger.error("error when roll back assignment", (Throwable) e);
                }
            }
            throw new RuntimeException(e);
        }
    }

    private CubeAssignment reassignCubeImpl(String str, CubeAssignment cubeAssignment, CubeAssignment cubeAssignment2) {
        logger.info("start cube reBalance, cube:{}, previous assignments:{}, new assignments:{}", str, cubeAssignment, cubeAssignment2);
        if (cubeAssignment2.equals(cubeAssignment)) {
            logger.info("the new assignment is the same as the previous assignment, do nothing for this reassignment");
            return cubeAssignment2;
        }
        doReassign(CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(str), cubeAssignment, cubeAssignment2);
        Iterator it = Maps.difference(cubeAssignment.getAssignments(), cubeAssignment2.getAssignments()).entriesOnlyOnLeft().keySet().iterator();
        while (it.hasNext()) {
            cubeAssignment2.addAssignment((Integer) it.next(), Lists.newArrayList());
        }
        this.streamMetadataStore.saveNewCubeAssignment(cubeAssignment2);
        AssignmentsCache.getInstance().clearCubeCache(str);
        return cubeAssignment2;
    }

    void doReassign(CubeInstance cubeInstance, CubeAssignment cubeAssignment, CubeAssignment cubeAssignment2) {
        String cubeName = cubeAssignment.getCubeName();
        IStreamingSource streamingSource = StreamingSourceFactory.getStreamingSource(cubeInstance);
        MapDifference difference = Maps.difference(cubeAssignment.getAssignments(), cubeAssignment2.getAssignments());
        Map entriesInCommon = difference.entriesInCommon();
        Map entriesOnlyOnRight = difference.entriesOnlyOnRight();
        Map entriesOnlyOnLeft = difference.entriesOnlyOnLeft();
        ArrayList newArrayList = Lists.newArrayList();
        ArrayList<ReplicaSet> newArrayList2 = Lists.newArrayList();
        try {
            Iterator<Map.Entry<Integer, List<Partition>>> it = cubeAssignment.getAssignments().entrySet().iterator();
            while (it.hasNext()) {
                Integer key = it.next().getKey();
                if (entriesInCommon.containsKey(key)) {
                    logger.info("the assignment is not changed for cube:{}, replicaSet:{}", cubeName, key);
                } else {
                    ReplicaSet replicaSet = getStreamMetadataStore().getReplicaSet(key.intValue());
                    newArrayList.add(syncAndStopConsumersInRs(streamingSource, cubeName, replicaSet));
                    newArrayList2.add(replicaSet);
                }
            }
            ISourcePosition mergePositions = streamingSource.getSourcePositionHandler().mergePositions(newArrayList, ISourcePositionHandler.MergeStrategy.KEEP_LARGE);
            logger.info("the consumer position for cube:{} is:{}", cubeName, mergePositions);
            ArrayList<ReplicaSet> newArrayList3 = Lists.newArrayList();
            ArrayList<ReplicaSet> newArrayList4 = Lists.newArrayList();
            HashSet<Node> hashSet = new HashSet();
            try {
                for (Map.Entry<Integer, List<Partition>> entry : cubeAssignment2.getAssignments().entrySet()) {
                    Integer key2 = entry.getKey();
                    if (!entriesInCommon.containsKey(key2)) {
                        ReplicaSet replicaSet2 = getStreamMetadataStore().getReplicaSet(key2.intValue());
                        logger.info("assign cube:{} to replicaSet:{}", cubeName, key2);
                        assignCubeToReplicaSet(replicaSet2, cubeName, entry.getValue(), false, true);
                        newArrayList3.add(replicaSet2);
                    }
                }
                Iterator<Map.Entry<Integer, List<Partition>>> it2 = cubeAssignment2.getAssignments().entrySet().iterator();
                while (it2.hasNext()) {
                    Integer key3 = it2.next().getKey();
                    if (!entriesInCommon.containsKey(key3)) {
                        ConsumerStartProtocol consumerStartProtocol = new ConsumerStartProtocol(streamingSource.getSourcePositionHandler().serializePosition(mergePositions.advance()));
                        ReplicaSet replicaSet3 = getStreamMetadataStore().getReplicaSet(key3.intValue());
                        StartConsumersRequest startConsumersRequest = new StartConsumersRequest();
                        startConsumersRequest.setCube(cubeName);
                        startConsumersRequest.setStartProtocol(consumerStartProtocol);
                        logger.info("start consumers for cube:{}, replicaSet:{}, startRequest:{}", cubeName, key3, startConsumersRequest);
                        startConsumersInReplicaSet(replicaSet3, startConsumersRequest);
                        newArrayList4.add(replicaSet3);
                    }
                }
                Iterator it3 = entriesOnlyOnLeft.entrySet().iterator();
                while (it3.hasNext()) {
                    Integer num = (Integer) ((Map.Entry) it3.next()).getKey();
                    logger.info("make cube immutable for cube:{}, replicaSet{}", cubeName, num);
                    hashSet.addAll(makeCubeImmutableInReplicaSet(getStreamMetadataStore().getReplicaSet(num.intValue()), cubeName));
                }
                if (!hashSet.isEmpty()) {
                    throw new IOException();
                }
                logger.info("finish cube reBalance, cube:{}", cubeName);
            } catch (IOException e) {
                logger.error("fail to start consumers for cube:" + cubeName, (Throwable) e);
                Set set = (Set) newArrayList4.stream().map((v0) -> {
                    return v0.getReplicaSetID();
                }).collect(Collectors.toSet());
                for (ReplicaSet replicaSet4 : newArrayList4) {
                    try {
                        StopConsumersRequest stopConsumersRequest = new StopConsumersRequest();
                        stopConsumersRequest.setCube(cubeName);
                        if (entriesOnlyOnRight.containsKey(Integer.valueOf(replicaSet4.getReplicaSetID()))) {
                            stopConsumersRequest.setRemoveData(true);
                        }
                        stopConsumersInReplicaSet(replicaSet4, stopConsumersRequest);
                        set.remove(Integer.valueOf(replicaSet4.getReplicaSetID()));
                    } catch (IOException e2) {
                        logger.error("fail to stop consumers for cube:" + cubeName + " replicaSet:" + replicaSet4.getReplicaSetID(), (Throwable) e2);
                    }
                }
                Set set2 = (Set) newArrayList3.stream().map((v0) -> {
                    return v0.getReplicaSetID();
                }).collect(Collectors.toSet());
                for (ReplicaSet replicaSet5 : newArrayList3) {
                    try {
                        assignCubeToReplicaSet(replicaSet5, cubeName, cubeAssignment.getPartitionsByReplicaSetID(Integer.valueOf(replicaSet5.getReplicaSetID())), true, true);
                        set2.remove(Integer.valueOf(replicaSet5.getReplicaSetID()));
                    } catch (IOException e3) {
                        logger.error("fail to start consumers for cube:" + cubeName + " replicaSet:" + replicaSet5.getReplicaSetID(), (Throwable) e3);
                    }
                }
                HashSet hashSet2 = new HashSet(hashSet);
                for (Node node : hashSet) {
                    try {
                        makeCubeImmutableForReceiver(node, cubeName);
                        hashSet2.remove(node);
                    } catch (IOException e4) {
                        logger.error("fail to make cube immutable for cube:" + cubeName + " to " + node, (Throwable) e4);
                    }
                }
                StringBuilder sb = new StringBuilder();
                try {
                    sb.append("FailStarted:").append(JsonUtil.writeValueAsString(set)).append(";");
                    sb.append("FailAssigned:").append(JsonUtil.writeValueAsString(set2)).append(";");
                    sb.append("FailRemotedPresisted:").append(JsonUtil.writeValueAsString(hashSet2));
                } catch (JsonProcessingException e5) {
                    logger.error("", (Throwable) e5);
                }
                String sb2 = sb.toString();
                if (!set.isEmpty()) {
                    throw new ClusterStateException(cubeName, ClusterStateException.ClusterState.ROLLBACK_FAILED, ClusterStateException.TransactionStep.START_NEW, sb2, e);
                }
                if (!set2.isEmpty()) {
                    throw new ClusterStateException(cubeName, ClusterStateException.ClusterState.ROLLBACK_FAILED, ClusterStateException.TransactionStep.ASSIGN_NEW, sb2, e);
                }
                if (!hashSet2.isEmpty()) {
                    throw new ClusterStateException(cubeName, ClusterStateException.ClusterState.ROLLBACK_FAILED, ClusterStateException.TransactionStep.MAKE_IMMUTABLE, sb2, e);
                }
                throw new ClusterStateException(cubeName, ClusterStateException.ClusterState.ROLLBACK_SUCCESS, ClusterStateException.TransactionStep.ASSIGN_NEW, sb2, e);
            }
        } catch (Exception e6) {
            logger.error("fail to sync assign replicaSet for cube:" + cubeName, (Throwable) e6);
            Set set3 = (Set) newArrayList2.stream().map((v0) -> {
                return v0.getReplicaSetID();
            }).collect(Collectors.toSet());
            for (ReplicaSet replicaSet6 : newArrayList2) {
                StartConsumersRequest startConsumersRequest2 = new StartConsumersRequest();
                startConsumersRequest2.setCube(cubeName);
                try {
                    startConsumersInReplicaSet(replicaSet6, startConsumersRequest2);
                    set3.remove(Integer.valueOf(replicaSet6.getReplicaSetID()));
                } catch (IOException e7) {
                    logger.error("fail to start consumers for cube:" + cubeName + " replicaSet:" + replicaSet6.getReplicaSetID(), (Throwable) e7);
                }
            }
            if (set3.isEmpty()) {
                throw new ClusterStateException(cubeName, ClusterStateException.ClusterState.ROLLBACK_SUCCESS, ClusterStateException.TransactionStep.STOP_AND_SNYC, "", e6);
            }
            StringBuilder sb3 = new StringBuilder();
            try {
                sb3.append("Fail restart:").append(JsonUtil.writeValueAsString(set3));
            } catch (JsonProcessingException e8) {
                logger.error("", (Throwable) e8);
            }
            throw new ClusterStateException(cubeName, ClusterStateException.ClusterState.ROLLBACK_FAILED, ClusterStateException.TransactionStep.STOP_AND_SNYC, sb3.toString(), e6);
        }
    }

    private ISourcePosition syncAndStopConsumersInRs(final IStreamingSource iStreamingSource, String str, ReplicaSet replicaSet) throws IOException {
        if (replicaSet.getNodes().size() <= 1) {
            if (replicaSet.getNodes().size() != 1) {
                return null;
            }
            Node next = replicaSet.getNodes().iterator().next();
            StopConsumersRequest stopConsumersRequest = new StopConsumersRequest();
            stopConsumersRequest.setCube(str);
            logger.info("stop consumers for cube:{}, receiver:{}", str, next);
            return iStreamingSource.getSourcePositionHandler().parsePosition(stopConsumersInReplicaSet(replicaSet, stopConsumersRequest).get(0).getConsumePosition());
        }
        logger.info("sync consume for cube:{}, replicaSet:{}", str, Integer.valueOf(replicaSet.getReplicaSetID()));
        PauseConsumersRequest pauseConsumersRequest = new PauseConsumersRequest();
        pauseConsumersRequest.setCube(str);
        ISourcePosition mergePositions = iStreamingSource.getSourcePositionHandler().mergePositions(Lists.transform(pauseConsumersInReplicaSet(replicaSet, pauseConsumersRequest), new Function<ConsumerStatsResponse, ISourcePosition>() { // from class: org.apache.kylin.stream.coordinator.Coordinator.1
            @Override // org.apache.kylin.shaded.com.google.common.base.Function, java.util.function.Function
            @Nullable
            public ISourcePosition apply(@Nullable ConsumerStatsResponse consumerStatsResponse) {
                return iStreamingSource.getSourcePositionHandler().parsePosition(consumerStatsResponse.getConsumePosition());
            }
        }), ISourcePositionHandler.MergeStrategy.KEEP_LARGE);
        ResumeConsumerRequest resumeConsumerRequest = new ResumeConsumerRequest();
        resumeConsumerRequest.setCube(str);
        resumeConsumerRequest.setResumeToPosition(iStreamingSource.getSourcePositionHandler().serializePosition(mergePositions));
        resumeConsumersInReplicaSet(replicaSet, resumeConsumerRequest);
        return mergePositions;
    }

    public Map<Integer, Map<String, List<Partition>>> reBalancePlan(List<StreamingCubeInfo> list, List<ReplicaSet> list2) {
        return this.assigner.reBalancePlan(list2, list, this.streamMetadataStore.getAllCubeAssignments());
    }

    private List<StreamingCubeInfo> getEnableStreamingCubes() {
        List<StreamingCubeInfo> streamingCubes = getStreamingCubes();
        ArrayList newArrayList = Lists.newArrayList();
        for (StreamingCubeInfo streamingCubeInfo : streamingCubes) {
            if (CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(streamingCubeInfo.getCubeName()).getStatus() == RealizationStatusEnum.READY) {
                newArrayList.add(streamingCubeInfo);
            }
        }
        return newArrayList;
    }

    private List<StreamingCubeInfo> getStreamingCubes() {
        List<String> cubes = this.streamMetadataStore.getCubes();
        ArrayList newArrayList = Lists.newArrayList();
        Iterator<String> it = cubes.iterator();
        while (it.hasNext()) {
            StreamingCubeInfo streamCubeInfo = getStreamCubeInfo(it.next());
            if (streamCubeInfo != null) {
                newArrayList.add(streamCubeInfo);
            }
        }
        return newArrayList;
    }

    public void removeCubeHDFSFiles(String str) {
        String streamingCubeFilePath = HDFSUtil.getStreamingCubeFilePath(str);
        try {
            HadoopUtil.getFileSystem(streamingCubeFilePath).delete(new Path(streamingCubeFilePath), true);
        } catch (Exception e) {
            logger.error("error when remove hdfs file, hdfs path:{}", streamingCubeFilePath);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.client.CoordinatorClient
    public synchronized void createReplicaSet(ReplicaSet replicaSet) {
        List<ReplicaSet> replicaSets = this.streamMetadataStore.getReplicaSets();
        for (Node node : replicaSet.getNodes()) {
            for (ReplicaSet replicaSet2 : replicaSets) {
                if (replicaSet2.containPhysicalNode(node)) {
                    throw new CoordinateException(String.format(Locale.ROOT, "The receiver node %s is already exists in the set %s", node, replicaSet2));
                }
            }
        }
        int createReplicaSet = this.streamMetadataStore.createReplicaSet(replicaSet);
        try {
            Iterator<Node> it = replicaSet.getNodes().iterator();
            while (it.hasNext()) {
                addReceiverToReplicaSet(it.next(), createReplicaSet);
            }
        } catch (IOException e) {
            logger.warn("create replica set fail", (Throwable) e);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.client.CoordinatorClient
    public synchronized void removeReplicaSet(int i) {
        ReplicaSet replicaSet = this.streamMetadataStore.getReplicaSet(i);
        if (replicaSet == null) {
            return;
        }
        if (replicaSet.getNodes() != null && replicaSet.getNodes().size() > 0) {
            throw new CoordinateException("cannot remove rs, because there are nodes in it");
        }
        Map<String, List<Partition>> assignmentsByReplicaSet = this.streamMetadataStore.getAssignmentsByReplicaSet(i);
        if (assignmentsByReplicaSet != null && !assignmentsByReplicaSet.isEmpty()) {
            throw new CoordinateException("cannot remove rs, because there are assignments");
        }
        this.streamMetadataStore.removeReplicaSet(i);
    }

    @Override // org.apache.kylin.stream.coordinator.client.CoordinatorClient
    public synchronized void addNodeToReplicaSet(Integer num, String str) {
        ReplicaSet replicaSet = this.streamMetadataStore.getReplicaSet(num.intValue());
        Node fromNormalizeString = Node.fromNormalizeString(str);
        for (ReplicaSet replicaSet2 : this.streamMetadataStore.getReplicaSets()) {
            if (replicaSet2.getReplicaSetID() != num.intValue() && replicaSet2.containPhysicalNode(fromNormalizeString)) {
                logger.error("error add Node {} to replicaSet {}, already exist in replicaSet {} ", str, num, Integer.valueOf(replicaSet2.getReplicaSetID()));
                throw new IllegalStateException("Node exists in ReplicaSet!");
            }
        }
        replicaSet.addNode(fromNormalizeString);
        this.streamMetadataStore.updateReplicaSet(replicaSet);
        try {
            Map<String, List<Partition>> assignmentsByReplicaSet = this.streamMetadataStore.getAssignmentsByReplicaSet(num.intValue());
            if (assignmentsByReplicaSet == null || assignmentsByReplicaSet.isEmpty()) {
                return;
            }
            addReceiverToReplicaSet(fromNormalizeString, num.intValue());
            Iterator<String> it = assignmentsByReplicaSet.keySet().iterator();
            while (it.hasNext()) {
                AssignmentsCache.getInstance().clearCubeCache(it.next());
            }
        } catch (IOException e) {
            logger.warn("fail to add receiver to replicaSet ", (Throwable) e);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.client.CoordinatorClient
    public synchronized void removeNodeFromReplicaSet(Integer num, String str) {
        ReplicaSet replicaSet = this.streamMetadataStore.getReplicaSet(num.intValue());
        Node fromNormalizeString = Node.fromNormalizeString(str);
        replicaSet.removeNode(fromNormalizeString);
        this.streamMetadataStore.updateReplicaSet(replicaSet);
        try {
            Map<String, List<Partition>> assignmentsByReplicaSet = this.streamMetadataStore.getAssignmentsByReplicaSet(num.intValue());
            removeReceiverFromReplicaSet(fromNormalizeString);
            if (assignmentsByReplicaSet != null) {
                Iterator<String> it = assignmentsByReplicaSet.keySet().iterator();
                while (it.hasNext()) {
                    AssignmentsCache.getInstance().clearCubeCache(it.next());
                }
            }
        } catch (IOException e) {
            logger.warn("remove node from replicaSet fail", (Throwable) e);
        }
    }

    public void stopConsumers(String str) {
        CubeAssignment assignmentsByCube = this.streamMetadataStore.getAssignmentsByCube(str);
        StopConsumersRequest stopConsumersRequest = new StopConsumersRequest();
        stopConsumersRequest.setCube(str);
        try {
            Iterator<Integer> it = assignmentsByCube.getReplicaSetIDs().iterator();
            while (it.hasNext()) {
                stopConsumersInReplicaSet(this.streamMetadataStore.getReplicaSet(it.next().intValue()), stopConsumersRequest);
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private void doReBalance(List<CubeAssignment> list, List<CubeAssignment> list2) {
        HashMap newHashMap = Maps.newHashMap();
        HashMap newHashMap2 = Maps.newHashMap();
        for (CubeAssignment cubeAssignment : list) {
            newHashMap.put(cubeAssignment.getCubeName(), cubeAssignment);
        }
        for (CubeAssignment cubeAssignment2 : list2) {
            newHashMap2.put(cubeAssignment2.getCubeName(), cubeAssignment2);
        }
        try {
            Set keySet = newHashMap.keySet();
            Set keySet2 = newHashMap2.keySet();
            if (!keySet.equals(keySet2)) {
                logger.error("previous assignment cubes:" + keySet + ", new assignment cubes:" + keySet2);
                throw new IllegalStateException("previous cube assignments");
            }
            for (Map.Entry entry : Maps.difference(newHashMap, newHashMap2).entriesDiffering().entrySet()) {
                String str = (String) entry.getKey();
                MapDifference.ValueDifference valueDifference = (MapDifference.ValueDifference) entry.getValue();
                reassignCubeImpl(str, (CubeAssignment) valueDifference.leftValue(), (CubeAssignment) valueDifference.rightValue());
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void assignCubeToReplicaSet(ReplicaSet replicaSet, String str, List<Partition> list, boolean z, boolean z2) throws IOException {
        boolean z3 = false;
        IOException iOException = null;
        AssignRequest assignRequest = new AssignRequest();
        assignRequest.setCubeName(str);
        assignRequest.setPartitions(list);
        assignRequest.setStartConsumers(z);
        for (Node node : replicaSet.getNodes()) {
            try {
                assignToReceiver(node, assignRequest);
                z3 = true;
            } catch (IOException e) {
                if (z2) {
                    throw e;
                }
                iOException = e;
                logger.error("cube:" + str + " consumers start fail for node:" + node.toString(), (Throwable) e);
            }
        }
        if (!z3 && iOException != null) {
            throw iOException;
        }
    }

    private void assignToReceiver(Node node, AssignRequest assignRequest) throws IOException {
        this.receiverAdminClient.assign(node, assignRequest);
    }

    private void unAssignFromReplicaSet(ReplicaSet replicaSet, UnAssignRequest unAssignRequest) throws IOException {
        Iterator<Node> it = replicaSet.getNodes().iterator();
        while (it.hasNext()) {
            unAssignToReceiver(it.next(), unAssignRequest);
        }
    }

    private void unAssignToReceiver(Node node, UnAssignRequest unAssignRequest) throws IOException {
        this.receiverAdminClient.unAssign(node, unAssignRequest);
    }

    private void addReceiverToReplicaSet(Node node, int i) throws IOException {
        this.receiverAdminClient.addToReplicaSet(node, i);
    }

    private void removeReceiverFromReplicaSet(Node node) throws IOException {
        this.receiverAdminClient.removeFromReplicaSet(node);
    }

    private void startConsumersForReceiver(Node node, StartConsumersRequest startConsumersRequest) throws IOException {
        this.receiverAdminClient.startConsumers(node, startConsumersRequest);
    }

    private ConsumerStatsResponse stopConsumersForReceiver(Node node, StopConsumersRequest stopConsumersRequest) throws IOException {
        return this.receiverAdminClient.stopConsumers(node, stopConsumersRequest);
    }

    private ConsumerStatsResponse pauseConsumersForReceiver(Node node, PauseConsumersRequest pauseConsumersRequest) throws IOException {
        return this.receiverAdminClient.pauseConsumers(node, pauseConsumersRequest);
    }

    public ConsumerStatsResponse resumeConsumersForReceiver(Node node, ResumeConsumerRequest resumeConsumerRequest) throws IOException {
        return this.receiverAdminClient.resumeConsumers(node, resumeConsumerRequest);
    }

    private void makeCubeImmutableForReceiver(Node node, String str) throws IOException {
        this.receiverAdminClient.makeCubeImmutable(node, str);
    }

    public void startConsumersInReplicaSet(ReplicaSet replicaSet, StartConsumersRequest startConsumersRequest) throws IOException {
        Iterator<Node> it = replicaSet.getNodes().iterator();
        while (it.hasNext()) {
            startConsumersForReceiver(it.next(), startConsumersRequest);
        }
    }

    public List<Node> makeCubeImmutableInReplicaSet(ReplicaSet replicaSet, String str) throws IOException {
        ArrayList arrayList = new ArrayList();
        for (Node node : replicaSet.getNodes()) {
            try {
                makeCubeImmutableForReceiver(node, str);
            } catch (IOException e) {
                logger.error(String.format(Locale.ROOT, "Convert %s to immutable for node %s failed.", str, node.toNormalizeString()), (Throwable) e);
                arrayList.add(node);
            }
        }
        return arrayList;
    }

    public List<ConsumerStatsResponse> stopConsumersInReplicaSet(ReplicaSet replicaSet, StopConsumersRequest stopConsumersRequest) throws IOException {
        ArrayList newArrayList = Lists.newArrayList();
        Iterator<Node> it = replicaSet.getNodes().iterator();
        while (it.hasNext()) {
            newArrayList.add(stopConsumersForReceiver(it.next(), stopConsumersRequest));
        }
        return newArrayList;
    }

    public List<ConsumerStatsResponse> pauseConsumersInReplicaSet(ReplicaSet replicaSet, PauseConsumersRequest pauseConsumersRequest) throws IOException {
        ArrayList newArrayList = Lists.newArrayList();
        ArrayList newArrayList2 = Lists.newArrayList();
        try {
            for (Node node : replicaSet.getNodes()) {
                newArrayList.add(pauseConsumersForReceiver(node, pauseConsumersRequest));
                newArrayList2.add(node);
            }
            return newArrayList;
        } catch (IOException e) {
            logger.info("roll back pause consumers for receivers:" + newArrayList2);
            ResumeConsumerRequest resumeConsumerRequest = new ResumeConsumerRequest();
            resumeConsumerRequest.setCube(pauseConsumersRequest.getCube());
            Iterator it = newArrayList2.iterator();
            while (it.hasNext()) {
                resumeConsumersForReceiver((Node) it.next(), resumeConsumerRequest);
            }
            throw e;
        }
    }

    public List<ConsumerStatsResponse> resumeConsumersInReplicaSet(ReplicaSet replicaSet, ResumeConsumerRequest resumeConsumerRequest) throws IOException {
        ArrayList newArrayList = Lists.newArrayList();
        Iterator<Node> it = replicaSet.getNodes().iterator();
        while (it.hasNext()) {
            newArrayList.add(resumeConsumersForReceiver(it.next(), resumeConsumerRequest));
        }
        return newArrayList;
    }

    public StreamMetadataStore getStreamMetadataStore() {
        return this.streamMetadataStore;
    }

    public ExecutableManager getExecutableManager() {
        return ExecutableManager.getInstance(getConfig());
    }

    public CubeManager getCubeManager() {
        return CubeManager.getInstance(getConfig());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized boolean tryFindAndBuildSegment(String str) {
        if (this.streamMetadataStore.getSegmentBuildStates(str).isEmpty()) {
            logger.info("no segment build states for cube:{} found in the metadata store", str);
            return true;
        }
        boolean z = true;
        List<String> findSegmentsCanBuild = findSegmentsCanBuild(str);
        if (findSegmentsCanBuild != null && !findSegmentsCanBuild.isEmpty()) {
            logger.info("try to trigger cube building for cube:{}, segments:{}", str, findSegmentsCanBuild);
            Iterator<String> it = findSegmentsCanBuild.iterator();
            while (it.hasNext()) {
                z = z && triggerSegmentBuild(str, it.next());
            }
        }
        if (!z) {
            this.jobStatusChecker.addPendingCube(str);
        }
        return z;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void segmentBuildComplete(CubingJob cubingJob, CubeInstance cubeInstance, CubeSegment cubeSegment, SegmentJobBuildInfo segmentJobBuildInfo) throws IOException {
        if (!checkPreviousSegmentReady(cubeSegment)) {
            logger.warn("the segment:{}'s previous segment is not ready, will not set the segment to ready", cubeSegment);
            return;
        }
        if (!SegmentStatusEnum.READY.equals(cubeSegment.getStatus())) {
            promoteNewSegment(cubingJob, cubeInstance, cubeSegment);
        }
        String str = segmentJobBuildInfo.cubeName;
        String str2 = segmentJobBuildInfo.segmentName;
        CubeAssignment assignmentsByCube = this.streamMetadataStore.getAssignmentsByCube(str);
        Iterator<Integer> it = assignmentsByCube.getReplicaSetIDs().iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            ReplicaSet replicaSet = this.streamMetadataStore.getReplicaSet(intValue);
            for (Node node : replicaSet.getNodes()) {
                try {
                    this.receiverAdminClient.segmentBuildComplete(node, str, str2);
                } catch (IOException e) {
                    logger.error("error when remove cube segment for receiver:" + node, (Throwable) e);
                }
            }
            if (assignmentsByCube.getPartitionsByReplicaSetID(Integer.valueOf(intValue)).isEmpty()) {
                logger.info("no partition is assign to the replicaSet:{}, check whether there are local segments on the rs.", Integer.valueOf(intValue));
                Node leader = replicaSet.getLeader();
                try {
                    if (this.receiverAdminClient.getReceiverCubeStats(leader, str).getSegmentStatsMap().keySet().isEmpty()) {
                        logger.info("no local segments exist for replicaSet:{}, cube:{}, update assignments.", Integer.valueOf(intValue), str);
                        assignmentsByCube.removeAssignment(Integer.valueOf(intValue));
                        this.streamMetadataStore.saveNewCubeAssignment(assignmentsByCube);
                    }
                } catch (IOException e2) {
                    logger.error("error when get receiver cube stats from:" + leader, (Throwable) e2);
                }
            }
        }
        this.streamMetadataStore.removeSegmentBuildState(str, str2);
        logger.info("try to remove the hdfs files for cube:{} segment:{}", str, str2);
        removeHDFSFiles(str, str2);
        logger.info("try to find segments for cube:{} build", str);
        tryFindAndBuildSegment(segmentJobBuildInfo.cubeName);
    }

    private void promoteNewSegment(CubingJob cubingJob, CubeInstance cubeInstance, CubeSegment cubeSegment) throws IOException {
        long findSourceRecordCount = cubingJob.findSourceRecordCount();
        long findSourceSizeBytes = cubingJob.findSourceSizeBytes();
        long findCubeSizeBytes = cubingJob.findCubeSizeBytes();
        Map<Integer, String> sourceCheckpoint = this.streamMetadataStore.getSourceCheckpoint(cubeInstance.getName(), cubeSegment.getName());
        final ISourcePositionHandler sourcePositionHandler = StreamingSourceFactory.getStreamingSource(cubeInstance).getSourcePositionHandler();
        ISourcePosition mergePositions = sourcePositionHandler.mergePositions(Collections2.transform(sourceCheckpoint.values(), new Function<String, ISourcePosition>() { // from class: org.apache.kylin.stream.coordinator.Coordinator.2
            @Override // org.apache.kylin.shaded.com.google.common.base.Function, java.util.function.Function
            @Nullable
            public ISourcePosition apply(@Nullable String str) {
                return sourcePositionHandler.parsePosition(str);
            }
        }), ISourcePositionHandler.MergeStrategy.KEEP_SMALL);
        cubeSegment.setLastBuildJobID(cubingJob.getId());
        cubeSegment.setLastBuildTime(System.currentTimeMillis());
        cubeSegment.setSizeKB(findCubeSizeBytes / 1024);
        cubeSegment.setInputRecords(findSourceRecordCount);
        cubeSegment.setInputRecordsSize(findSourceSizeBytes);
        cubeSegment.setStreamSourceCheckpoint(sourcePositionHandler.serializePosition(mergePositions));
        getCubeManager().promoteNewlyBuiltSegments(cubeInstance, cubeSegment);
    }

    private boolean checkPreviousSegmentReady(CubeSegment cubeSegment) {
        long longValue = ((Long) cubeSegment.getTSRange().start.v).longValue();
        Segments<CubeSegment> segments = cubeSegment.getCubeInstance().getSegments();
        long j = -1;
        Iterator<T> it = segments.iterator();
        while (it.hasNext()) {
            long longValue2 = ((Long) ((CubeSegment) it.next()).getTSRange().end.v).longValue();
            if (longValue2 <= longValue && longValue2 > j) {
                j = longValue2;
            }
        }
        if (j == -1) {
            return true;
        }
        Iterator<T> it2 = segments.iterator();
        while (it2.hasNext()) {
            CubeSegment cubeSegment2 = (CubeSegment) it2.next();
            if (((Long) cubeSegment2.getTSRange().end.v).longValue() == j && SegmentStatusEnum.READY.equals(cubeSegment2.getStatus())) {
                return true;
            }
        }
        return false;
    }

    private void removeHDFSFiles(String str, String str2) {
        String streamingSegmentFilePath = HDFSUtil.getStreamingSegmentFilePath(str, str2);
        try {
            HadoopUtil.getFileSystem(streamingSegmentFilePath).delete(new Path(streamingSegmentFilePath), true);
        } catch (Exception e) {
            logger.error("error when remove hdfs file, hdfs path:{}", streamingSegmentFilePath);
        }
    }

    private boolean triggerSegmentBuild(String str, String str2) {
        CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(str);
        try {
            Pair<Long, Long> parseSegmentName = CubeSegment.parseSegmentName(str2);
            logger.info("submit streaming segment build, cube:{} segment:{}", str, str2);
            CubeSegment appendSegment = getCubeManager().appendSegment(cube, new SegmentRange.TSRange(parseSegmentName.getFirst(), parseSegmentName.getSecond()));
            DefaultChainedExecutable createStreamingCubingJob = new StreamingCubingEngine().createStreamingCubingJob(appendSegment, "SYSTEM");
            getExecutableManager().addJob(createStreamingCubingJob);
            CubingJob cubingJob = (CubingJob) createStreamingCubingJob;
            appendSegment.setLastBuildJobID(cubingJob.getId());
            this.jobStatusChecker.addSegmentBuildJob(new SegmentJobBuildInfo(str, str2, cubingJob.getId()));
            SegmentBuildState.BuildState buildState = new SegmentBuildState.BuildState();
            buildState.setBuildStartTime(System.currentTimeMillis());
            buildState.setState(SegmentBuildState.BuildState.State.BUILDING);
            buildState.setJobId(cubingJob.getId());
            this.streamMetadataStore.updateSegmentBuildState(str, str2, buildState);
            return true;
        } catch (Exception e) {
            logger.error("streaming job submit fail, cubeName:" + str + " segment:" + str2, (Throwable) e);
            return false;
        }
    }

    private List<String> findSegmentsCanBuild(String str) {
        ArrayList newArrayList = Lists.newArrayList();
        CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(str);
        if (isInOptimize(cube)) {
            return newArrayList;
        }
        int maxBuildingSegments = cube.getConfig().getMaxBuildingSegments();
        CubeSegment latestReadySegment = cube.getLatestReadySegment();
        long j = -1;
        if (latestReadySegment != null) {
            j = ((Long) latestReadySegment.getTSRange().end.v).longValue();
        } else {
            logger.info("there is no ready segments for cube:{}, so only allow 1 segment build concurrently", str);
            maxBuildingSegments = 1;
        }
        Set<Integer> replicaSetIDs = this.streamMetadataStore.getAssignmentsByCube(str).getReplicaSetIDs();
        List<SegmentBuildState> segmentBuildStates = this.streamMetadataStore.getSegmentBuildStates(str);
        Collections.sort(segmentBuildStates);
        int size = cube.getBuildingSegments().size();
        int i = maxBuildingSegments - size;
        for (int i2 = 0; i2 < segmentBuildStates.size(); i2++) {
            SegmentBuildState segmentBuildState = segmentBuildStates.get(i2);
            if (CubeSegment.parseSegmentName(segmentBuildState.getSegmentName()).getFirst().longValue() < j) {
                logger.warn("the cube segment state is not clear correctly, cube:{} segment:{}, clear it", str, segmentBuildState.getSegmentName());
                this.streamMetadataStore.removeSegmentBuildState(str, segmentBuildState.getSegmentName());
            } else {
                if (segmentBuildState.isInBuilding()) {
                    size++;
                    String jobId = segmentBuildState.getState().getJobId();
                    logger.info("there is segment in building, cube:{} segment:{} jobId:{}", str, segmentBuildState.getSegmentName(), jobId);
                    long buildStartTime = segmentBuildState.getState().getBuildStartTime();
                    if (buildStartTime != 0 && jobId != null) {
                        if (System.currentTimeMillis() - buildStartTime >= 2400000) {
                            CubingJob cubingJob = (CubingJob) getExecutableManager().getJob(jobId);
                            ExecutableState status = cubingJob.getStatus();
                            if (ExecutableState.SUCCEED.equals(status)) {
                                CubeSegment segment = cube.getSegment(segmentBuildState.getSegmentName(), null);
                                if (segment != null && SegmentStatusEnum.READY == segment.getStatus()) {
                                    logger.info("job:{} is already succeed, and segment:{} is ready, remove segment build state", jobId, segmentBuildState.getSegmentName());
                                    this.streamMetadataStore.removeSegmentBuildState(str, segmentBuildState.getSegmentName());
                                }
                            } else if (ExecutableState.ERROR.equals(status)) {
                                logger.info("job:{} is error, resume the job", jobId);
                                getExecutableManager().resumeJob(jobId);
                            } else if (ExecutableState.DISCARDED.equals(status)) {
                                logger.info("job:{} is discard, reset the job state in metaStore", jobId);
                                SegmentBuildState.BuildState buildState = new SegmentBuildState.BuildState();
                                buildState.setBuildStartTime(0L);
                                buildState.setState(SegmentBuildState.BuildState.State.WAIT);
                                buildState.setJobId(cubingJob.getId());
                                this.streamMetadataStore.updateSegmentBuildState(str, segmentBuildState.getSegmentName(), buildState);
                                segmentBuildState.setState(buildState);
                                logger.info("segment:{} is discard", segmentBuildState.getSegmentName());
                            } else {
                                logger.info("job:{} is in running, job state: {}", jobId, status);
                            }
                        }
                    }
                }
                if (i <= 0) {
                    logger.info("No left quota to build segments for cube:{}", str);
                    return newArrayList;
                }
                if (!checkSegmentIsReadyToBuild(segmentBuildStates, i2, replicaSetIDs)) {
                    break;
                }
                newArrayList.add(segmentBuildState.getSegmentName());
                i--;
            }
        }
        return newArrayList;
    }

    private boolean isInOptimize(CubeInstance cubeInstance) {
        AbstractExecutable job;
        Segments<CubeSegment> segments = cubeInstance.getSegments(SegmentStatusEnum.READY_PENDING);
        if (segments.size() > 0) {
            logger.info("The cube {} has READY_PENDING segments {}. It's not allowed for building", cubeInstance.getName(), segments);
            return true;
        }
        Iterator<T> it = cubeInstance.getSegments(SegmentStatusEnum.NEW).iterator();
        while (it.hasNext()) {
            String lastBuildJobID = ((CubeSegment) it.next()).getLastBuildJobID();
            if (lastBuildJobID != null && (job = getExecutableManager().getJob(lastBuildJobID)) != null && (job instanceof CubingJob)) {
                if (CubingJob.CubingJobTypeEnum.OPTIMIZE.toString().equals(((CubingJob) job).getJobType())) {
                    logger.info("The cube {} is in optimization. It's not allowed to build new segments during optimization.", cubeInstance.getName());
                    return true;
                }
            }
        }
        return false;
    }

    private boolean checkSegmentIsReadyToBuild(List<SegmentBuildState> list, int i, Set<Integer> set) {
        SegmentBuildState segmentBuildState = list.get(i);
        HashSet newHashSet = Sets.newHashSet(Sets.difference(set, segmentBuildState.getCompleteReplicaSets()));
        if (newHashSet.isEmpty()) {
            return true;
        }
        for (int i2 = i + 1; i2 < list.size(); i2++) {
            SegmentBuildState segmentBuildState2 = list.get(i2);
            Set<Integer> completeReplicaSets = segmentBuildState2.getCompleteReplicaSets();
            Iterator it = newHashSet.iterator();
            while (it.hasNext()) {
                Integer num = (Integer) it.next();
                if (completeReplicaSets.contains(num)) {
                    logger.info("the replica set:{} doesn't have data for segment:{}, but have data for later segment:{}", num, segmentBuildState.getSegmentName(), segmentBuildState2.getSegmentName());
                    it.remove();
                }
            }
        }
        return newHashSet.isEmpty();
    }

    private Assigner getAssigner() {
        Assigner defaultAssigner;
        String streamingAssigner = getConfig().getStreamingAssigner();
        logger.debug("Using assigner {}", streamingAssigner);
        boolean z = -1;
        switch (streamingAssigner.hashCode()) {
            case 296702381:
                if (streamingAssigner.equals("CubePartitionRoundRobinAssigner")) {
                    z = true;
                    break;
                }
                break;
            case 631189277:
                if (streamingAssigner.equals("DefaultAssigner")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                defaultAssigner = new DefaultAssigner();
                break;
            case true:
                defaultAssigner = new CubePartitionRoundRobinAssigner();
                break;
            default:
                defaultAssigner = new DefaultAssigner();
                break;
        }
        return defaultAssigner;
    }
}
