package org.apache.kylin.stream.coordinator.coordinate;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.common.util.ServerMode;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
import org.apache.kylin.shaded.com.google.common.collect.Lists;
import org.apache.kylin.shaded.com.google.common.collect.Sets;
import org.apache.kylin.stream.coordinator.StreamMetadataStore;
import org.apache.kylin.stream.coordinator.StreamMetadataStoreFactory;
import org.apache.kylin.stream.coordinator.StreamingCubeInfo;
import org.apache.kylin.stream.coordinator.StreamingUtils;
import org.apache.kylin.stream.coordinator.assign.Assigner;
import org.apache.kylin.stream.coordinator.assign.AssignmentUtil;
import org.apache.kylin.stream.coordinator.assign.AssignmentsCache;
import org.apache.kylin.stream.coordinator.assign.CubePartitionRoundRobinAssigner;
import org.apache.kylin.stream.coordinator.assign.DefaultAssigner;
import org.apache.kylin.stream.coordinator.client.CoordinatorClient;
import org.apache.kylin.stream.coordinator.doctor.ClusterDoctor;
import org.apache.kylin.stream.coordinator.exception.CoordinateException;
import org.apache.kylin.stream.coordinator.exception.NotLeadCoordinatorException;
import org.apache.kylin.stream.coordinator.exception.StoreException;
import org.apache.kylin.stream.core.client.HttpReceiverAdminClient;
import org.apache.kylin.stream.core.client.ReceiverAdminClient;
import org.apache.kylin.stream.core.model.AssignRequest;
import org.apache.kylin.stream.core.model.ConsumerStatsResponse;
import org.apache.kylin.stream.core.model.CubeAssignment;
import org.apache.kylin.stream.core.model.Node;
import org.apache.kylin.stream.core.model.PauseConsumersRequest;
import org.apache.kylin.stream.core.model.ReplicaSet;
import org.apache.kylin.stream.core.model.ResumeConsumerRequest;
import org.apache.kylin.stream.core.model.StartConsumersRequest;
import org.apache.kylin.stream.core.model.StopConsumersRequest;
import org.apache.kylin.stream.core.model.StreamingCubeConsumeState;
import org.apache.kylin.stream.core.model.UnAssignRequest;
import org.apache.kylin.stream.core.source.Partition;
import org.apache.kylin.stream.core.source.StreamingSourceFactory;
import org.apache.kylin.stream.core.util.HDFSUtil;
import org.apache.kylin.stream.core.util.NamedThreadFactory;
import org.apache.kylin.stream.core.util.NodeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kylin/stream/coordinator/coordinate/StreamingCoordinator.class */
public class StreamingCoordinator implements CoordinatorClient {
    private static final int DEFAULT_PORT = 7070;
    private ScheduledExecutorService streamingJobSubmitExecutor;
    private ScheduledExecutorService clusterStateCheckExecutor;
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) StreamingCoordinator.class);
    private static volatile StreamingCoordinator instance = null;
    private volatile boolean isLead = false;
    private StreamMetadataStore streamMetadataStore = StreamMetadataStoreFactory.getStreamMetaDataStore();
    private ReceiverClusterManager clusterManager = new ReceiverClusterManager(this);
    private ReceiverAdminClient receiverAdminClient = new HttpReceiverAdminClient();
    private Assigner assigner = getAssigner();
    private CuratorFramework zkClient = StreamingUtils.getZookeeperClient();
    private CoordinatorLeaderSelector selector = new CoordinatorLeaderSelector();
    private BuildJobSubmitter buildJobSubmitter = new BuildJobSubmitter(this);
    private ClusterDoctor clusterDoctor = new ClusterDoctor();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kylin/stream/coordinator/coordinate/StreamingCoordinator$CoordinatorLeaderSelector.class */
    public class CoordinatorLeaderSelector extends LeaderSelectorListenerAdapter implements Closeable {
        private LeaderSelector leaderSelector;

        public CoordinatorLeaderSelector() {
            this.leaderSelector = new LeaderSelector(StreamingCoordinator.this.zkClient, StreamingUtils.COORDINATOR_LEAD, this);
            this.leaderSelector.autoRequeue();
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            this.leaderSelector.close();
        }

        public void start() {
            this.leaderSelector.start();
        }

        public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
            StreamingCoordinator.logger.info("Current node become the lead coordinator.");
            StreamingCoordinator.this.streamMetadataStore.setCoordinatorNode(NodeUtil.getCurrentNode(StreamingCoordinator.DEFAULT_PORT));
            StreamingCoordinator.this.isLead = true;
            StreamingCoordinator.this.buildJobSubmitter.restore();
            do {
                try {
                    Thread.sleep(300000L);
                } catch (InterruptedException e) {
                    Thread.interrupted();
                }
            } while (this.leaderSelector.hasLeadership());
            StreamingCoordinator.logger.info("Become the follower coordinator.");
            StreamingCoordinator.this.isLead = false;
        }
    }

    private StreamingCoordinator() {
        if (ServerMode.SERVER_MODE.canServeStreamingCoordinator()) {
            this.streamingJobSubmitExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("streaming_job_submitter"));
            this.clusterStateCheckExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("cluster_state_checker"));
            start();
        }
    }

    public static StreamingCoordinator getInstance() {
        if (instance == null) {
            synchronized (StreamingCoordinator.class) {
                if (instance == null) {
                    instance = new StreamingCoordinator();
                }
            }
        }
        return instance;
    }

    private void start() {
        this.selector.start();
        this.streamingJobSubmitExecutor.scheduleAtFixedRate(this.buildJobSubmitter, 0L, 2L, TimeUnit.MINUTES);
        this.clusterStateCheckExecutor.scheduleAtFixedRate(this.clusterDoctor, 5L, 10L, TimeUnit.MINUTES);
    }

    @Override // org.apache.kylin.stream.coordinator.client.CoordinatorClient
    public synchronized void assignCube(String str) {
        checkLead();
        this.streamMetadataStore.addStreamingCube(str);
        StreamingCubeInfo streamCubeInfo = getStreamCubeInfo(str);
        if (this.streamMetadataStore.getAssignmentsByCube(streamCubeInfo.getCubeName()) != null) {
            logger.warn("Cube {} is already assigned.", streamCubeInfo.getCubeName());
            return;
        }
        List<ReplicaSet> replicaSets = this.streamMetadataStore.getReplicaSets();
        if (replicaSets == null || replicaSets.isEmpty()) {
            throw new IllegalStateException("No replicaSet is configured in system");
        }
        doAssignCube(str, this.assigner.assign(streamCubeInfo, replicaSets, this.streamMetadataStore.getAllCubeAssignments()));
    }

    @Override // org.apache.kylin.stream.coordinator.client.CoordinatorClient
    public void unAssignCube(String str) {
        checkLead();
        CubeAssignment assignmentsByCube = this.streamMetadataStore.getAssignmentsByCube(str);
        if (assignmentsByCube == null) {
            return;
        }
        ArrayList newArrayList = Lists.newArrayList();
        try {
            logger.info("Send unAssign cube:{} request to receivers", str);
            Iterator<Integer> it = assignmentsByCube.getReplicaSetIDs().iterator();
            while (it.hasNext()) {
                ReplicaSet replicaSet = this.streamMetadataStore.getReplicaSet(it.next().intValue());
                UnAssignRequest unAssignRequest = new UnAssignRequest();
                unAssignRequest.setCube(str);
                for (Node node : replicaSet.getNodes()) {
                    try {
                        unAssignToReceiver(node, unAssignRequest);
                    } catch (IOException e) {
                        logger.error("Exception throws when unAssign receiver", (Throwable) e);
                        newArrayList.add(node);
                    }
                }
            }
            logger.debug("Remove temp hdfs files for {}", str);
            removeCubeHDFSFiles(str);
            logger.debug("Clear cube info from job check list");
            this.buildJobSubmitter.clearCheckList(str);
            logger.debug("Commit unassign {} transaction.", str);
            this.streamMetadataStore.removeStreamingCube(str);
            AssignmentsCache.getInstance().clearCubeCache(str);
            if (!newArrayList.isEmpty()) {
                throw new CoordinateException("unAssign fail for receivers:" + String.join(",", (Iterable<? extends CharSequence>) newArrayList.stream().map((v0) -> {
                    return v0.toString();
                }).collect(Collectors.toList())));
            }
        } catch (Exception e2) {
            throw new CoordinateException(e2);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.client.CoordinatorClient
    public synchronized void reAssignCube(String str, CubeAssignment cubeAssignment) {
        checkLead();
        CubeAssignment assignmentsByCube = this.streamMetadataStore.getAssignmentsByCube(str);
        if (assignmentsByCube != null) {
            this.clusterManager.reassignCubeImpl(str, assignmentsByCube, cubeAssignment);
        } else {
            logger.info("no previous cube assign exists, use the new assignment:{}", cubeAssignment);
            doAssignCube(str, cubeAssignment);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.client.CoordinatorClient
    public void segmentRemoteStoreComplete(Node node, String str, Pair<Long, Long> pair) {
        checkLead();
        logger.info("Segment remote store complete signal received for cube:{}, segment:{}, by {}.", str, pair, node);
        this.buildJobSubmitter.addToCheckList(str);
    }

    @Override // org.apache.kylin.stream.coordinator.client.CoordinatorClient
    public void pauseConsumers(String str) {
        checkLead();
        CubeAssignment assignmentsByCube = this.streamMetadataStore.getAssignmentsByCube(str);
        PauseConsumersRequest pauseConsumersRequest = new PauseConsumersRequest();
        pauseConsumersRequest.setCube(str);
        try {
            Iterator<Integer> it = assignmentsByCube.getReplicaSetIDs().iterator();
            while (it.hasNext()) {
                this.clusterManager.pauseConsumersInReplicaSet(this.streamMetadataStore.getReplicaSet(it.next().intValue()), pauseConsumersRequest);
            }
            logger.debug("Committing pauseConsumers {} transaction.", str);
            this.streamMetadataStore.saveStreamingCubeConsumeState(str, StreamingCubeConsumeState.PAUSED);
            logger.debug("Committed pauseConsumers {} transaction.", str);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.client.CoordinatorClient
    public void resumeConsumers(String str) {
        checkLead();
        CubeAssignment assignmentsByCube = this.streamMetadataStore.getAssignmentsByCube(str);
        ResumeConsumerRequest resumeConsumerRequest = new ResumeConsumerRequest();
        resumeConsumerRequest.setCube(str);
        try {
            Iterator<Integer> it = assignmentsByCube.getReplicaSetIDs().iterator();
            while (it.hasNext()) {
                this.clusterManager.resumeConsumersInReplicaSet(this.streamMetadataStore.getReplicaSet(it.next().intValue()), resumeConsumerRequest);
            }
            logger.debug("Committing resumeConsumers {} transaction.", str);
            this.streamMetadataStore.saveStreamingCubeConsumeState(str, StreamingCubeConsumeState.RUNNING);
            logger.debug("Committed resumeConsumers {} transaction.", str);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.client.CoordinatorClient
    public void replicaSetLeaderChange(int i, Node node) {
        checkLead();
        Map<String, List<Partition>> assignmentsByReplicaSet = this.streamMetadataStore.getAssignmentsByReplicaSet(i);
        if (assignmentsByReplicaSet == null || assignmentsByReplicaSet.isEmpty()) {
            return;
        }
        Iterator<String> it = assignmentsByReplicaSet.keySet().iterator();
        while (it.hasNext()) {
            AssignmentsCache.getInstance().clearCubeCache(it.next());
        }
    }

    private void doAssignCube(String str, CubeAssignment cubeAssignment) {
        HashSet<ReplicaSet> newHashSet = Sets.newHashSet();
        try {
            for (Integer num : cubeAssignment.getReplicaSetIDs()) {
                ReplicaSet replicaSet = this.streamMetadataStore.getReplicaSet(num.intValue());
                this.clusterManager.assignCubeToReplicaSet(replicaSet, str, cubeAssignment.getPartitionsByReplicaSetID(num), true, false);
                newHashSet.add(replicaSet);
            }
            logger.debug("Committing assignment {} transaction.", str);
            this.streamMetadataStore.saveNewCubeAssignment(cubeAssignment);
            logger.debug("Committed assignment {} transaction.", str);
        } catch (Exception e) {
            logger.debug("Starting roll back success receivers.");
            for (ReplicaSet replicaSet2 : newHashSet) {
                UnAssignRequest unAssignRequest = new UnAssignRequest();
                unAssignRequest.setCube(str);
                unAssignFromReplicaSet(replicaSet2, unAssignRequest);
            }
            throw new CoordinateException(e);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.client.CoordinatorClient
    public Map<Integer, Map<String, List<Partition>>> reBalanceRecommend() {
        checkLead();
        return reBalancePlan(getEnableStreamingCubes(), this.streamMetadataStore.getReplicaSets());
    }

    @Override // org.apache.kylin.stream.coordinator.client.CoordinatorClient
    public synchronized void reBalance(Map<Integer, Map<String, List<Partition>>> map) {
        checkLead();
        this.clusterManager.doReBalance(this.streamMetadataStore.getAllCubeAssignments(), AssignmentUtil.convertReplicaSetAssign2CubeAssign(map));
    }

    Map<Integer, Map<String, List<Partition>>> reBalancePlan(List<StreamingCubeInfo> list, List<ReplicaSet> list2) {
        return this.assigner.reBalancePlan(list2, list, this.streamMetadataStore.getAllCubeAssignments());
    }

    @Override // org.apache.kylin.stream.coordinator.client.CoordinatorClient
    public synchronized void createReplicaSet(ReplicaSet replicaSet) {
        List<ReplicaSet> replicaSets = this.streamMetadataStore.getReplicaSets();
        for (Node node : replicaSet.getNodes()) {
            for (ReplicaSet replicaSet2 : replicaSets) {
                if (replicaSet2.containPhysicalNode(node)) {
                    throw new CoordinateException(String.format(Locale.ROOT, "The receiver node %s is already exists in the set %s", node, replicaSet2));
                }
            }
        }
        int createReplicaSet = this.streamMetadataStore.createReplicaSet(replicaSet);
        try {
            for (Node node2 : replicaSet.getNodes()) {
                logger.trace("Notify {} that it has been added to {} .", node2, Integer.valueOf(createReplicaSet));
                addReceiverToReplicaSet(node2, createReplicaSet);
            }
        } catch (IOException e) {
            logger.warn("Create replica set failed.", (Throwable) e);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.client.CoordinatorClient
    public synchronized void removeReplicaSet(int i) {
        ReplicaSet replicaSet = this.streamMetadataStore.getReplicaSet(i);
        if (replicaSet == null) {
            return;
        }
        if (replicaSet.getNodes() != null && !replicaSet.getNodes().isEmpty()) {
            throw new CoordinateException("Cannot remove rs, because there are nodes in it.");
        }
        Map<String, List<Partition>> assignmentsByReplicaSet = this.streamMetadataStore.getAssignmentsByReplicaSet(i);
        if (assignmentsByReplicaSet != null && !assignmentsByReplicaSet.isEmpty()) {
            throw new CoordinateException("Cannot remove rs, because there are assignments.");
        }
        this.streamMetadataStore.removeReplicaSet(i);
    }

    @Override // org.apache.kylin.stream.coordinator.client.CoordinatorClient
    public synchronized void addNodeToReplicaSet(Integer num, String str) {
        ReplicaSet replicaSet = this.streamMetadataStore.getReplicaSet(num.intValue());
        Node fromNormalizeString = Node.fromNormalizeString(str);
        for (ReplicaSet replicaSet2 : this.streamMetadataStore.getReplicaSets()) {
            if (replicaSet2.getReplicaSetID() != num.intValue() && replicaSet2.containPhysicalNode(fromNormalizeString)) {
                logger.error("Error add Node {} to replicaSet {}, already exist in replicaSet {} ", str, num, Integer.valueOf(replicaSet2.getReplicaSetID()));
                throw new IllegalStateException("Node exists in ReplicaSet!");
            }
        }
        replicaSet.addNode(fromNormalizeString);
        this.streamMetadataStore.updateReplicaSet(replicaSet);
        try {
            Map<String, List<Partition>> assignmentsByReplicaSet = this.streamMetadataStore.getAssignmentsByReplicaSet(num.intValue());
            if (assignmentsByReplicaSet == null || assignmentsByReplicaSet.isEmpty()) {
                return;
            }
            addReceiverToReplicaSet(fromNormalizeString, num.intValue());
            Iterator<String> it = assignmentsByReplicaSet.keySet().iterator();
            while (it.hasNext()) {
                AssignmentsCache.getInstance().clearCubeCache(it.next());
            }
        } catch (IOException e) {
            logger.warn("fail to add receiver to replicaSet ", (Throwable) e);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.client.CoordinatorClient
    public synchronized void removeNodeFromReplicaSet(Integer num, String str) {
        ReplicaSet replicaSet = this.streamMetadataStore.getReplicaSet(num.intValue());
        Node fromNormalizeString = Node.fromNormalizeString(str);
        replicaSet.removeNode(fromNormalizeString);
        this.streamMetadataStore.updateReplicaSet(replicaSet);
        try {
            Map<String, List<Partition>> assignmentsByReplicaSet = this.streamMetadataStore.getAssignmentsByReplicaSet(num.intValue());
            removeReceiverFromReplicaSet(fromNormalizeString);
            if (assignmentsByReplicaSet != null) {
                Iterator<String> it = assignmentsByReplicaSet.keySet().iterator();
                while (it.hasNext()) {
                    AssignmentsCache.getInstance().clearCubeCache(it.next());
                }
            }
        } catch (IOException e) {
            logger.warn("Remove node from replicaSet failed.", (Throwable) e);
        }
    }

    public void stopConsumers(String str) {
        CubeAssignment assignmentsByCube = this.streamMetadataStore.getAssignmentsByCube(str);
        StopConsumersRequest stopConsumersRequest = new StopConsumersRequest();
        stopConsumersRequest.setCube(str);
        try {
            Iterator<Integer> it = assignmentsByCube.getReplicaSetIDs().iterator();
            while (it.hasNext()) {
                this.clusterManager.stopConsumersInReplicaSet(this.streamMetadataStore.getReplicaSet(it.next().intValue()), stopConsumersRequest);
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private void unAssignFromReplicaSet(ReplicaSet replicaSet, UnAssignRequest unAssignRequest) {
        Iterator<Node> it = replicaSet.getNodes().iterator();
        while (it.hasNext()) {
            try {
                unAssignToReceiver(it.next(), unAssignRequest);
            } catch (IOException e) {
                logger.error("Error when roll back assignment", (Throwable) e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void assignToReceiver(Node node, AssignRequest assignRequest) throws IOException {
        this.receiverAdminClient.assign(node, assignRequest);
    }

    private void unAssignToReceiver(Node node, UnAssignRequest unAssignRequest) throws IOException {
        this.receiverAdminClient.unAssign(node, unAssignRequest);
    }

    private void addReceiverToReplicaSet(Node node, int i) throws IOException {
        this.receiverAdminClient.addToReplicaSet(node, i);
    }

    private void removeReceiverFromReplicaSet(Node node) throws IOException {
        this.receiverAdminClient.removeFromReplicaSet(node);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void startConsumersForReceiver(Node node, StartConsumersRequest startConsumersRequest) throws IOException {
        this.receiverAdminClient.startConsumers(node, startConsumersRequest);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ConsumerStatsResponse stopConsumersForReceiver(Node node, StopConsumersRequest stopConsumersRequest) throws IOException {
        return this.receiverAdminClient.stopConsumers(node, stopConsumersRequest);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ConsumerStatsResponse pauseConsumersForReceiver(Node node, PauseConsumersRequest pauseConsumersRequest) throws IOException {
        return this.receiverAdminClient.pauseConsumers(node, pauseConsumersRequest);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ConsumerStatsResponse resumeConsumersForReceiver(Node node, ResumeConsumerRequest resumeConsumerRequest) throws IOException {
        return this.receiverAdminClient.resumeConsumers(node, resumeConsumerRequest);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void makeCubeImmutableForReceiver(Node node, String str) throws IOException {
        this.receiverAdminClient.makeCubeImmutable(node, str);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void notifyReceiverBuildSuccess(Node node, String str, String str2) throws IOException {
        this.receiverAdminClient.segmentBuildComplete(node, str, str2);
    }

    public ExecutableManager getExecutableManager() {
        return ExecutableManager.getInstance(getConfig());
    }

    public CubeManager getCubeManager() {
        return CubeManager.getInstance(getConfig());
    }

    public KylinConfig getConfig() {
        return KylinConfig.getInstanceFromEnv();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<StreamingCubeInfo> getEnableStreamingCubes() {
        List<StreamingCubeInfo> streamingCubes = getStreamingCubes();
        ArrayList newArrayList = Lists.newArrayList();
        for (StreamingCubeInfo streamingCubeInfo : streamingCubes) {
            if (getCubeManager().getCube(streamingCubeInfo.getCubeName()).getStatus() == RealizationStatusEnum.READY) {
                newArrayList.add(streamingCubeInfo);
            }
        }
        return newArrayList;
    }

    List<StreamingCubeInfo> getStreamingCubes() {
        List<String> cubes = this.streamMetadataStore.getCubes();
        ArrayList newArrayList = Lists.newArrayList();
        Iterator<String> it = cubes.iterator();
        while (it.hasNext()) {
            StreamingCubeInfo streamCubeInfo = getStreamCubeInfo(it.next());
            if (streamCubeInfo != null) {
                newArrayList.add(streamCubeInfo);
            }
        }
        return newArrayList;
    }

    public StreamingCubeInfo getStreamCubeInfo(String str) {
        CubeInstance cube = getCubeManager().getCube(str);
        if (cube == null) {
            return null;
        }
        return new StreamingCubeInfo(str, StreamingSourceFactory.getStreamingSource(cube).load(str), cube.getConfig().getStreamingCubeConsumerTasksNum());
    }

    public void removeCubeHDFSFiles(String str) {
        String streamingCubeFilePath = HDFSUtil.getStreamingCubeFilePath(str);
        try {
            HadoopUtil.getFileSystem(streamingCubeFilePath).delete(new Path(streamingCubeFilePath), true);
        } catch (Exception e) {
            logger.error("Error when remove hdfs file, hdfs path:{}", streamingCubeFilePath);
        }
    }

    private void checkLead() {
        if (this.isLead) {
            return;
        }
        try {
            throw new NotLeadCoordinatorException("Current coordinator is not lead, please check host " + this.streamMetadataStore.getCoordinatorNode());
        } catch (StoreException e) {
            throw new NotLeadCoordinatorException("Lead coordinator can not found.", e);
        }
    }

    private Assigner getAssigner() {
        Assigner defaultAssigner;
        String streamingAssigner = getConfig().getStreamingAssigner();
        logger.debug("Using assigner {}", streamingAssigner);
        boolean z = -1;
        switch (streamingAssigner.hashCode()) {
            case 296702381:
                if (streamingAssigner.equals("CubePartitionRoundRobinAssigner")) {
                    z = true;
                    break;
                }
                break;
            case 631189277:
                if (streamingAssigner.equals("DefaultAssigner")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                defaultAssigner = new DefaultAssigner();
                break;
            case true:
                defaultAssigner = new CubePartitionRoundRobinAssigner();
                break;
            default:
                defaultAssigner = new DefaultAssigner();
                break;
        }
        return defaultAssigner;
    }

    public ReceiverClusterManager getClusterManager() {
        return this.clusterManager;
    }

    public boolean isLead() {
        return this.isLead;
    }

    public StreamMetadataStore getStreamMetadataStore() {
        return this.streamMetadataStore;
    }

    public ReceiverAdminClient getReceiverAdminClient() {
        return this.receiverAdminClient;
    }
}
