package org.apache.kylin.stream.server;

import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.curator.framework.CuratorFramework;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.JsonSerializer;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeDescManager;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.metadata.TableMetadataManager;
import org.apache.kylin.metadata.model.DataModelDesc;
import org.apache.kylin.metadata.model.DataModelManager;
import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.metadata.project.ProjectManager;
import org.apache.kylin.stream.coordinator.StreamMetadataStore;
import org.apache.kylin.stream.coordinator.StreamMetadataStoreFactory;
import org.apache.kylin.stream.coordinator.StreamingUtils;
import org.apache.kylin.stream.coordinator.client.CoordinatorClient;
import org.apache.kylin.stream.coordinator.client.HttpCoordinatorClient;
import org.apache.kylin.stream.core.consumer.ConsumerStartProtocol;
import org.apache.kylin.stream.core.consumer.EndPositionStopCondition;
import org.apache.kylin.stream.core.consumer.IConsumerProvider;
import org.apache.kylin.stream.core.consumer.IStopConsumptionCondition;
import org.apache.kylin.stream.core.consumer.StreamingConsumerChannel;
import org.apache.kylin.stream.core.metrics.StreamingMetrics;
import org.apache.kylin.stream.core.model.ConsumerStatsResponse;
import org.apache.kylin.stream.core.model.Node;
import org.apache.kylin.stream.core.model.ReplicaSet;
import org.apache.kylin.stream.core.model.StreamingCubeConsumeState;
import org.apache.kylin.stream.core.model.stats.ReceiverCubeStats;
import org.apache.kylin.stream.core.model.stats.ReceiverStats;
import org.apache.kylin.stream.core.source.IStreamingSource;
import org.apache.kylin.stream.core.source.Partition;
import org.apache.kylin.stream.core.source.StreamingSourceConfigManager;
import org.apache.kylin.stream.core.source.StreamingSourceFactory;
import org.apache.kylin.stream.core.storage.StreamingCubeSegment;
import org.apache.kylin.stream.core.storage.StreamingSegmentManager;
import org.apache.kylin.stream.core.storage.columnar.ColumnarStoreCache;
import org.apache.kylin.stream.core.util.HDFSUtil;
import org.apache.kylin.stream.core.util.NamedThreadFactory;
import org.apache.kylin.stream.core.util.NodeUtil;
import org.apache.kylin.stream.server.ReplicaSetLeaderSelector;
import org.apache.kylin.stream.server.retention.RetentionPolicyInfo;
import org.apache.kylin.tool.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.kylin.tool.shaded.com.google.common.collect.Lists;
import org.apache.kylin.tool.shaded.com.google.common.collect.Maps;
import org.apache.kylin.tool.shaded.com.google.common.collect.Sets;
import org.apache.kylin.tool.shaded.org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.AntPathMatcher;

/* loaded from: input_file:org/apache/kylin/stream/server/StreamingServer.class */
public class StreamingServer implements ReplicaSetLeaderSelector.LeaderChangeListener, IConsumerProvider {
    public static final int DEFAULT_PORT = 9090;
    private static final int CONSUMER_STOP_WAIT_TIMEOUT = 10000;
    private ReplicaSetLeaderSelector leaderSelector;
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) StreamingServer.class);
    private static volatile StreamingServer instance = null;
    private Map<String, StreamingConsumerChannel> cubeConsumerMap = Maps.newHashMap();
    private Map<String, List<Partition>> assignments = Maps.newHashMap();
    private Map<String, StreamingSegmentManager> streamingSegmentManagerMap = new ConcurrentHashMap();
    private int replicaSetID = -1;
    private volatile boolean isLeader = false;
    private CuratorFramework streamZKClient = StreamingUtils.getZookeeperClient();
    private StreamMetadataStore streamMetadataStore = StreamMetadataStoreFactory.getStreamMetaDataStore();
    private CoordinatorClient coordinatorClient = new HttpCoordinatorClient(this.streamMetadataStore);
    private Node currentNode = NodeUtil.getCurrentNode(DEFAULT_PORT);
    private final String baseStorePath = calLocalSegmentCacheDir();
    private ScheduledExecutorService segmentStateCheckerExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("segment_state_check"));
    private ExecutorService segmentFlushExecutor = Executors.newFixedThreadPool(5, new NamedThreadFactory("segment_flush"));

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kylin/stream/server/StreamingServer$SegmentHDFSFlusher.class */
    public static class SegmentHDFSFlusher implements Runnable {
        private final Logger logger = LoggerFactory.getLogger((Class<?>) SegmentHDFSFlusher.class);
        private StreamingCubeSegment segment;
        private String hdfsPath;

        public SegmentHDFSFlusher(StreamingCubeSegment streamingCubeSegment, String str) {
            this.segment = streamingCubeSegment;
            this.hdfsPath = str;
        }

        public void flushToHDFS() throws IOException {
            this.logger.info("start to flush cube:{} segment:{} to hdfs:{}", this.segment.getCubeName(), this.segment.getSegmentName(), this.hdfsPath);
            FileSystem fileSystem = HadoopUtil.getFileSystem(this.hdfsPath);
            String path = this.segment.getDataSegmentFolder().getPath();
            Path path2 = new Path(this.hdfsPath);
            if (fileSystem.exists(path2)) {
                this.logger.info("the remote path:{} is already exist, skip copy data to remote", path2);
                return;
            }
            Path path3 = new Path(this.hdfsPath + ".tmp");
            if (fileSystem.exists(path3) && fileSystem.getFileStatus(path3).isDirectory()) {
                this.logger.warn("target temp path: {} is an existed directory, try to delete it.", path3);
                fileSystem.delete(path3, true);
                this.logger.warn("target temp path: {} is deleted.", path3);
            }
            fileSystem.copyFromLocalFile(new Path(path), path3);
            this.logger.info("data copy to remote temp path:{}", path3);
            if (fileSystem.rename(path3, path2)) {
                this.logger.info("successfully rename the temp path to:{}", path2);
            }
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                flushToHDFS();
            } catch (Exception e) {
                this.logger.error("error when flush segment data to hdfs", (Throwable) e);
                throw new IllegalStateException(e);
            }
        }
    }

    private StreamingServer() {
    }

    @VisibleForTesting
    public void setCoordinatorClient(CoordinatorClient coordinatorClient) {
        this.coordinatorClient = coordinatorClient;
    }

    public static synchronized StreamingServer getInstance() {
        if (instance == null) {
            instance = new StreamingServer();
        }
        return instance;
    }

    public void start() throws Exception {
        registerReceiver();
        ReplicaSet findBelongReplicaSet = findBelongReplicaSet();
        if (findBelongReplicaSet != null) {
            addToReplicaSet(findBelongReplicaSet.getReplicaSetID());
        }
        startMetrics();
        startSegmentStateChecker();
        addShutdownHook();
    }

    private void startSegmentStateChecker() {
        this.segmentStateCheckerExecutor.scheduleAtFixedRate(new Runnable() { // from class: org.apache.kylin.stream.server.StreamingServer.1
            @Override // java.lang.Runnable
            public void run() {
                for (StreamingSegmentManager streamingSegmentManager : StreamingServer.this.getAllCubeSegmentManagers()) {
                    CubeInstance cubeInstance = streamingSegmentManager.getCubeInstance();
                    String name = cubeInstance.getName();
                    try {
                        RetentionPolicyInfo retentionPolicyInfo = new RetentionPolicyInfo();
                        String streamingSegmentRetentionPolicy = cubeInstance.getConfig().getStreamingSegmentRetentionPolicy();
                        Map<String, String> streamingSegmentRetentionPolicyProperties = cubeInstance.getConfig().getStreamingSegmentRetentionPolicyProperties(streamingSegmentRetentionPolicy);
                        retentionPolicyInfo.setName(streamingSegmentRetentionPolicy);
                        retentionPolicyInfo.setProperties(streamingSegmentRetentionPolicyProperties);
                        Collection<StreamingCubeSegment> requireRemotePersistSegments = streamingSegmentManager.getRequireRemotePersistSegments();
                        if (!requireRemotePersistSegments.isEmpty()) {
                            StreamingServer.logger.info("found cube {} segments:{} are immutable, retention policy is: {}", name, requireRemotePersistSegments, retentionPolicyInfo.getName());
                            StreamingServer.this.handleImmutableCubeSegments(name, streamingSegmentManager, requireRemotePersistSegments, retentionPolicyInfo);
                        }
                    } catch (Exception e) {
                        StreamingServer.logger.error("error when handle cube:" + name, (Throwable) e);
                    }
                }
            }
        }, 60L, 60L, TimeUnit.SECONDS);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleImmutableCubeSegments(String str, StreamingSegmentManager streamingSegmentManager, Collection<StreamingCubeSegment> collection, RetentionPolicyInfo retentionPolicyInfo) throws Exception {
        if (!RetentionPolicyInfo.FULL_BUILD_POLICY.equalsIgnoreCase(retentionPolicyInfo.getName())) {
            purgeSegments(str, collection, retentionPolicyInfo.getProperties());
        } else if (this.isLeader) {
            sendSegmentsToFullBuild(str, streamingSegmentManager, collection);
        }
    }

    private void sendSegmentsToFullBuild(String str, StreamingSegmentManager streamingSegmentManager, Collection<StreamingCubeSegment> collection) throws Exception {
        ArrayList newArrayList = Lists.newArrayList();
        for (StreamingCubeSegment streamingCubeSegment : collection) {
            newArrayList.add(this.segmentFlushExecutor.submit(new SegmentHDFSFlusher(streamingCubeSegment, HDFSUtil.getStreamingSegmentFilePath(str, streamingCubeSegment.getSegmentName()) + AntPathMatcher.DEFAULT_PATH_SEPARATOR + this.replicaSetID)));
        }
        IStreamingSource streamingSource = StreamingSourceFactory.getStreamingSource(CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(str));
        int i = 0;
        for (StreamingCubeSegment streamingCubeSegment2 : collection) {
            ((Future) newArrayList.get(i)).get();
            logger.info("Save remote store state to metadata store.");
            this.streamMetadataStore.addCompleteReplicaSetForSegmentBuild(streamingCubeSegment2.getCubeName(), streamingCubeSegment2.getSegmentName(), this.replicaSetID);
            logger.info("save remote checkpoint to metadata store");
            this.streamMetadataStore.saveSourceCheckpoint(streamingCubeSegment2.getCubeName(), streamingCubeSegment2.getSegmentName(), this.replicaSetID, streamingSource.getSourcePositionHandler().serializePosition(streamingSegmentManager.getSmallestSourcePosition(streamingCubeSegment2)));
            logger.info("Send notification to coordinator for cube {} segment {}.", str, streamingCubeSegment2.getSegmentName());
            this.coordinatorClient.segmentRemoteStoreComplete(this.currentNode, streamingCubeSegment2.getCubeName(), new Pair<>(Long.valueOf(streamingCubeSegment2.getDateRangeStart()), Long.valueOf(streamingCubeSegment2.getDateRangeEnd())));
            logger.info("Send notification success.");
            streamingCubeSegment2.saveState(StreamingCubeSegment.State.REMOTE_PERSISTED);
            logger.info("Commit cube {} segment {}  status converted to {}.", streamingCubeSegment2.getCubeName(), streamingCubeSegment2.getSegmentName(), StreamingCubeSegment.State.REMOTE_PERSISTED.name());
            i++;
        }
    }

    private void purgeSegments(String str, Collection<StreamingCubeSegment> collection, Map<String, String> map) {
        long longValue = Long.valueOf(map.get("retentionTimeInSec")).longValue();
        boolean z = false;
        for (StreamingCubeSegment streamingCubeSegment : collection) {
            if (longValue * 1000 < System.currentTimeMillis() - streamingCubeSegment.getCreateTime()) {
                logger.info("purge segment:{}", streamingCubeSegment);
                getStreamingSegmentManager(str).purgeSegment(streamingCubeSegment.getSegmentName());
                z = true;
            }
        }
        if (z) {
            resumeConsumerIfPaused(str);
        }
    }

    private void addShutdownHook() {
        Runtime.getRuntime().addShutdownHook(new Thread() { // from class: org.apache.kylin.stream.server.StreamingServer.2
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                StreamingServer.logger.info("start to shut down streaming receiver");
                for (Map.Entry entry : StreamingServer.this.cubeConsumerMap.entrySet()) {
                    StreamingServer.logger.info("start to stop consumer for cube:{}", entry.getKey());
                    ((StreamingConsumerChannel) entry.getValue()).stop(10000L);
                    StreamingServer.logger.info("finish to stop consumer for cube:{}", entry.getKey());
                }
                StreamingServer.logger.info("streaming receiver shut down successfully");
            }
        });
    }

    private void startMetrics() {
        StreamingMetrics.getInstance().start();
    }

    private ReplicaSet findBelongReplicaSet() {
        for (ReplicaSet replicaSet : this.streamMetadataStore.getReplicaSets()) {
            if (replicaSet.containPhysicalNode(this.currentNode)) {
                return replicaSet;
            }
        }
        return null;
    }

    private void registerReceiver() throws Exception {
        logger.info("register receiver: {}", this.currentNode);
        this.streamMetadataStore.addReceiver(this.currentNode);
    }

    private void joinReplicaSetLeaderElection(int i) {
        this.leaderSelector = new ReplicaSetLeaderSelector(this.streamZKClient, this.currentNode, i);
        this.leaderSelector.addLeaderChangeListener(this);
        this.leaderSelector.start();
    }

    public synchronized void assign(Map<String, List<Partition>> map) {
        this.assignments.putAll(map);
    }

    public synchronized void assign(String str, List<Partition> list) {
        this.assignments.put(str, list);
    }

    public synchronized void unAssign(String str) {
        stopConsumer(str);
        this.assignments.remove(str);
        removeCubeData(str);
    }

    public synchronized void startConsumers(List<String> list) {
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            startConsumer(it.next(), null);
        }
    }

    public synchronized void startConsumer(String str, ConsumerStartProtocol consumerStartProtocol) {
        List<Partition> list = this.assignments.get(str);
        StreamingConsumerChannel streamingConsumerChannel = this.cubeConsumerMap.get(str);
        if (streamingConsumerChannel != null) {
            List<Partition> consumePartitions = streamingConsumerChannel.getConsumePartitions();
            Collections.sort(list);
            Collections.sort(consumePartitions);
            if (!list.equals(consumePartitions)) {
                throw new IllegalStateException(String.format(Locale.ROOT, "The running consumer for cube:%s partition:%s is conflict with assign partitions:%s, should stop the consumer first.", str, consumePartitions, list));
            }
            logger.info("The consumer for cube:{} is already running, skip starting", str);
            return;
        }
        if (list == null || list.isEmpty()) {
            logger.info("partitions is empty for cube:{}", str);
            return;
        }
        logger.info("create and start new consumer for cube:{}", str);
        try {
            reloadCubeMetadata(str);
            createNewConsumer(str, list, consumerStartProtocol).start();
        } catch (Exception e) {
            logger.error("consumer start fail for cube:" + str, (Throwable) e);
        }
    }

    public synchronized ConsumerStatsResponse stopConsumer(String str) {
        logger.info("stop consumers for cube: {}", str);
        ConsumerStatsResponse consumerStatsResponse = new ConsumerStatsResponse();
        StreamingConsumerChannel streamingConsumerChannel = this.cubeConsumerMap.get(str);
        if (streamingConsumerChannel != null) {
            streamingConsumerChannel.stop(10000L);
            this.cubeConsumerMap.remove(str);
            consumerStatsResponse.setCubeName(str);
            consumerStatsResponse.setConsumePosition(streamingConsumerChannel.getSourceConsumeInfo());
        }
        return consumerStatsResponse;
    }

    public synchronized void stopAllConsumers() {
        Iterator it = Lists.newArrayList(this.cubeConsumerMap.keySet()).iterator();
        while (it.hasNext()) {
            stopConsumer((String) it.next());
        }
    }

    public synchronized ConsumerStatsResponse pauseConsumer(String str) {
        logger.info("pause consumers for cube: {}", str);
        ConsumerStatsResponse consumerStatsResponse = new ConsumerStatsResponse();
        consumerStatsResponse.setCubeName(str);
        StreamingConsumerChannel streamingConsumerChannel = this.cubeConsumerMap.get(str);
        if (streamingConsumerChannel != null) {
            streamingConsumerChannel.pause(true);
            consumerStatsResponse.setConsumePosition(streamingConsumerChannel.getSourceConsumeInfo());
        } else {
            logger.warn("the consumer for cube:{} does not exist ", str);
        }
        return consumerStatsResponse;
    }

    public synchronized ConsumerStatsResponse resumeConsumer(String str, String str2) {
        logger.info("resume consumers for cube: {}", str);
        ConsumerStatsResponse consumerStatsResponse = new ConsumerStatsResponse();
        consumerStatsResponse.setCubeName(str);
        StreamingConsumerChannel streamingConsumerChannel = this.cubeConsumerMap.get(str);
        if (streamingConsumerChannel == null) {
            logger.warn("the consumer for cube:{} does not exist", str);
            return consumerStatsResponse;
        }
        IStreamingSource streamingSource = StreamingSourceFactory.getStreamingSource(CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(str));
        if (str2 == null || str2.isEmpty()) {
            streamingConsumerChannel.resume();
        } else {
            streamingConsumerChannel.resumeToStopCondition(new EndPositionStopCondition(streamingSource.getSourcePositionHandler().parsePosition(str2)));
            this.cubeConsumerMap.remove(str);
        }
        consumerStatsResponse.setConsumePosition(streamingConsumerChannel.getSourceConsumeInfo());
        return consumerStatsResponse;
    }

    public void addToReplicaSet(int i) {
        logger.info("add the node to the replicaSet:{}, join the group leader election.", Integer.valueOf(i));
        if (this.replicaSetID == i) {
            logger.info("the receiver already in the replica set:{}, return", Integer.valueOf(i));
            return;
        }
        if (this.replicaSetID != -1) {
            throw new IllegalStateException("the receiver is in replica set:" + this.replicaSetID + ", please remove first");
        }
        this.replicaSetID = i;
        joinReplicaSetLeaderElection(i);
        Map<String, List<Partition>> assignmentsByReplicaSet = this.streamMetadataStore.getAssignmentsByReplicaSet(i);
        if (assignmentsByReplicaSet == null) {
            initLocalSegmentManager(Lists.newArrayList());
            return;
        }
        assign(assignmentsByReplicaSet);
        ArrayList newArrayList = Lists.newArrayList(assignmentsByReplicaSet.keySet());
        initLocalSegmentManager(newArrayList);
        startConsumers(newArrayList);
    }

    public void removeFromReplicaSet() {
        if (this.leaderSelector != null) {
            try {
                this.leaderSelector.close();
            } catch (Exception e) {
                logger.error("error happens when close leader selector", (Throwable) e);
            }
        }
        this.replicaSetID = -1;
        this.isLeader = false;
        this.assignments.clear();
        stopAllConsumers();
        Iterator it = Lists.newArrayList(this.streamingSegmentManagerMap.keySet()).iterator();
        while (it.hasNext()) {
            removeCubeData((String) it.next());
        }
    }

    public ReceiverStats getReceiverStats() {
        ReceiverStats receiverStats = new ReceiverStats();
        receiverStats.setAssignments(this.assignments);
        receiverStats.setLead(this.isLeader);
        HashSet<String> newHashSet = Sets.newHashSet();
        newHashSet.addAll(this.assignments.keySet());
        newHashSet.addAll(this.cubeConsumerMap.keySet());
        newHashSet.addAll(this.streamingSegmentManagerMap.keySet());
        for (String str : newHashSet) {
            receiverStats.addCubeStats(str, getCubeStats(str));
        }
        receiverStats.setCacheStats(ColumnarStoreCache.getInstance().getCacheStats());
        return receiverStats;
    }

    public ReceiverCubeStats getCubeStats(String str) {
        ReceiverCubeStats receiverCubeStats = new ReceiverCubeStats();
        StreamingConsumerChannel streamingConsumerChannel = this.cubeConsumerMap.get(str);
        if (streamingConsumerChannel != null) {
            receiverCubeStats.setConsumerStats(streamingConsumerChannel.getConsumerStats());
        }
        StreamingSegmentManager streamingSegmentManager = this.streamingSegmentManagerMap.get(str);
        if (streamingSegmentManager != null) {
            receiverCubeStats.setSegmentStatsMap(streamingSegmentManager.getSegmentStats());
            receiverCubeStats.setTotalIngest(streamingSegmentManager.getIngestCount());
            receiverCubeStats.setLatestEventTime(StreamingSegmentManager.resetTimestampByTimeZone(streamingSegmentManager.getLatestEventTime()));
            receiverCubeStats.setLatestEventIngestTime(StreamingSegmentManager.resetTimestampByTimeZone(streamingSegmentManager.getLatestEventIngestTime()));
            receiverCubeStats.setLongLatencyInfo(streamingSegmentManager.getLongLatencyInfo());
        }
        return receiverCubeStats;
    }

    public void makeCubeImmutable(String str) {
        if (this.cubeConsumerMap.containsKey(str)) {
            logger.info("before make cube immutable, stop consumer for cube:{}", str);
            this.cubeConsumerMap.get(str).stop(10000L);
            this.cubeConsumerMap.remove(str);
        }
        StreamingSegmentManager streamingSegmentManager = this.streamingSegmentManagerMap.get(str);
        if (streamingSegmentManager == null) {
            return;
        }
        streamingSegmentManager.makeAllSegmentsImmutable();
    }

    public void makeCubeSegmentImmutable(String str, String str2) {
        StreamingSegmentManager streamingSegmentManager = this.streamingSegmentManagerMap.get(str);
        if (streamingSegmentManager == null) {
            return;
        }
        streamingSegmentManager.makeSegmentImmutable(str2);
    }

    public void remoteSegmentBuildComplete(String str, String str2) {
        if (getStreamingSegmentManager(str).remoteSegmentBuildComplete(str2).isEmpty()) {
            return;
        }
        resumeConsumerIfPaused(str);
    }

    private void resumeConsumerIfPaused(String str) {
        StreamingConsumerChannel consumer = getConsumer(str);
        if (consumer == null || !consumer.isPaused()) {
            return;
        }
        StreamingCubeConsumeState streamingCubeConsumeState = this.streamMetadataStore.getStreamingCubeConsumeState(str);
        if (streamingCubeConsumeState == null || streamingCubeConsumeState == StreamingCubeConsumeState.RUNNING) {
            logger.info("resume the cube consumer:{} after remove some local immutable segments", str);
            consumer.resume();
        }
    }

    private StreamingConsumerChannel createNewConsumer(String str, List<Partition> list, ConsumerStartProtocol consumerStartProtocol) throws IOException {
        CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(str);
        StreamingSegmentManager streamingSegmentManager = getStreamingSegmentManager(str);
        StreamingConsumerChannel streamingConsumerChannel = new StreamingConsumerChannel(str, StreamingSourceFactory.getStreamingSource(cube).createStreamingConnector(str, list, consumerStartProtocol, streamingSegmentManager), streamingSegmentManager, IStopConsumptionCondition.NEVER_STOP);
        long partitionDateStart = cube.getDescriptor().getPartitionDateStart();
        CubeSegment latestReadySegment = cube.getLatestReadySegment();
        if (latestReadySegment != null) {
            partitionDateStart = ((Long) latestReadySegment.getTSRange().end.v).longValue();
        }
        if (partitionDateStart > 0) {
            streamingConsumerChannel.setMinAcceptEventTime(partitionDateStart);
        }
        StreamingCubeConsumeState streamingCubeConsumeState = this.streamMetadataStore.getStreamingCubeConsumeState(str);
        if (streamingCubeConsumeState != null && streamingCubeConsumeState == StreamingCubeConsumeState.PAUSED) {
            streamingConsumerChannel.pause(false);
        }
        this.cubeConsumerMap.put(str, streamingConsumerChannel);
        return streamingConsumerChannel;
    }

    @Override // org.apache.kylin.stream.server.ReplicaSetLeaderSelector.LeaderChangeListener
    public void becomeLeader() {
        if (this.replicaSetID != -1) {
            logger.info("become leader of the replicaSet:{}", Integer.valueOf(this.replicaSetID));
            try {
                ReplicaSet replicaSet = this.streamMetadataStore.getReplicaSet(this.replicaSetID);
                replicaSet.setLeader(this.currentNode);
                this.streamMetadataStore.updateReplicaSet(replicaSet);
                this.coordinatorClient.replicaSetLeaderChange(this.replicaSetID, this.currentNode);
            } catch (Exception e) {
                logger.error("error when send lead change notification to coordinator", (Throwable) e);
            }
        }
        this.isLeader = true;
    }

    @Override // org.apache.kylin.stream.server.ReplicaSetLeaderSelector.LeaderChangeListener
    public void becomeFollower() {
        this.isLeader = false;
        if (this.replicaSetID != -1) {
            logger.info("become follower of the replicaSet:{}", Integer.valueOf(this.replicaSetID));
        }
    }

    @Override // org.apache.kylin.stream.core.consumer.IConsumerProvider
    public StreamingConsumerChannel getConsumer(String str) {
        return this.cubeConsumerMap.get(str);
    }

    public StreamingSegmentManager getStreamingSegmentManager(String str) {
        if (this.streamingSegmentManagerMap.get(str) == null) {
            synchronized (this.streamingSegmentManagerMap) {
                if (this.streamingSegmentManagerMap.get(str) == null) {
                    CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(str);
                    this.streamingSegmentManagerMap.put(str, new StreamingSegmentManager(this.baseStorePath, cube, StreamingSourceFactory.getStreamingSource(cube).getSourcePositionHandler(), this));
                }
            }
        }
        return this.streamingSegmentManagerMap.get(str);
    }

    public void removeCubeData(String str) {
        logger.info("remove cube data: {}", str);
        StreamingSegmentManager streamingSegmentManager = getStreamingSegmentManager(str);
        if (streamingSegmentManager != null) {
            this.streamingSegmentManagerMap.remove(str);
            streamingSegmentManager.close();
            streamingSegmentManager.purgeAllSegments();
        }
    }

    public void reSubmitCubeSegment(String str, String str2) {
        StreamingCubeSegment segmentByName = getStreamingSegmentManager(str).getSegmentByName(str2);
        if (segmentByName == null) {
            throw new IllegalStateException("cannot find segment:" + str2);
        }
        if (segmentByName.isActive()) {
            throw new IllegalStateException("the segment must be immutable:" + segmentByName);
        }
        try {
            new SegmentHDFSFlusher(segmentByName, HDFSUtil.getStreamingSegmentFilePath(str, str2) + AntPathMatcher.DEFAULT_PATH_SEPARATOR + this.replicaSetID).flushToHDFS();
        } catch (IOException e) {
            throw new RuntimeException("fail to copy segment to hdfs:" + segmentByName, e);
        }
    }

    public Collection<StreamingSegmentManager> getAllCubeSegmentManagers() {
        return this.streamingSegmentManagerMap.values();
    }

    private void initLocalSegmentManager(List<String> list) {
        File file = new File(this.baseStorePath);
        if (!file.exists()) {
            file.mkdirs();
        }
        for (File file2 : file.listFiles(new FileFilter() { // from class: org.apache.kylin.stream.server.StreamingServer.3
            @Override // java.io.FileFilter
            public boolean accept(File file3) {
                return file3.isDirectory();
            }
        })) {
            String name = file2.getName();
            if (list.contains(name)) {
                try {
                    getStreamingSegmentManager(name).restoreSegmentsFromLocal();
                } catch (Exception e) {
                    logger.error("local cube store init fail", (Throwable) e);
                }
            } else {
                logger.info("remove the cube:{} data, because it is not assigned to this node", name);
                try {
                    FileUtils.deleteDirectory(file2);
                } catch (IOException e2) {
                    logger.error("error happens when remove cube folder", (Throwable) e2);
                }
            }
        }
    }

    private void reloadCubeMetadata(String str) throws IOException {
        KylinConfig instanceFromEnv = KylinConfig.getInstanceFromEnv();
        ResourceStore store = ResourceStore.getStore(instanceFromEnv);
        DataModelDesc dataModelDesc = (DataModelDesc) store.getResource(DataModelDesc.concatResourcePath(((CubeDesc) store.getResource(CubeDesc.concatResourcePath(((CubeInstance) store.getResource(CubeInstance.concatResourcePath(str), CubeManager.CUBE_SERIALIZER)).getDescName()), CubeDescManager.CUBE_DESC_SERIALIZER)).getModelName()), new JsonSerializer(DataModelDesc.class));
        ProjectManager projectManager = ProjectManager.getInstance(instanceFromEnv);
        List<ProjectInstance> findProjectsByModel = projectManager.findProjectsByModel(dataModelDesc.getName());
        if (findProjectsByModel.isEmpty()) {
            projectManager.reloadAll();
            findProjectsByModel = projectManager.findProjectsByModel(dataModelDesc.getName());
        }
        if (findProjectsByModel.size() != 1) {
            throw new IllegalArgumentException("the cube:" + str + " is not in any project");
        }
        TableMetadataManager.getInstance(instanceFromEnv).reloadSourceTableQuietly(dataModelDesc.getRootFactTableName(), findProjectsByModel.get(0).getName());
        DataModelManager.getInstance(instanceFromEnv).reloadDataModel(dataModelDesc.getName());
        CubeDescManager.getInstance(instanceFromEnv).reloadCubeDescLocal(str);
        StreamingSourceConfigManager.getInstance(instanceFromEnv).reloadStreamingConfigLocal(CubeManager.getInstance(instanceFromEnv).reloadCubeQuietly(str).getRootFactTable());
    }

    private String calLocalSegmentCacheDir() {
        String kylinHome = KylinConfig.getKylinHome();
        String streamingIndexPath = KylinConfig.getInstanceFromEnv().getStreamingIndexPath();
        String absolutePath = new File(streamingIndexPath).isAbsolute() ? streamingIndexPath : (kylinHome == null || kylinHome.equals("")) ? streamingIndexPath : new File(kylinHome, streamingIndexPath).getAbsolutePath();
        logger.info("Using {} to store local segment cache.", absolutePath);
        return absolutePath;
    }
}
