package org.apache.kylin.stream.coordinator;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.utils.ZKPaths;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.shaded.com.google.common.base.Function;
import org.apache.kylin.shaded.com.google.common.collect.Lists;
import org.apache.kylin.shaded.com.google.common.collect.Maps;
import org.apache.kylin.stream.coordinator.assign.AssignmentUtil;
import org.apache.kylin.stream.coordinator.exception.StoreException;
import org.apache.kylin.stream.core.model.CubeAssignment;
import org.apache.kylin.stream.core.model.Node;
import org.apache.kylin.stream.core.model.ReplicaSet;
import org.apache.kylin.stream.core.model.SegmentBuildState;
import org.apache.kylin.stream.core.model.StreamingCubeConsumeState;
import org.apache.kylin.stream.core.source.Partition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/kylin-stream-coordinator-3.1.3.jar:org/apache/kylin/stream/coordinator/ZookeeperStreamMetadataStore.class */
public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
    public static final String REPLICA_SET_ROOT = "/replica_sets";
    public static final String RECEIVER_ROOT = "/receivers";
    public static final String CUBE_ROOT = "/cubes";
    public static final String COORDINATOR_NODE = "/coordinator";
    public static final String CUBE_BUILD_STATE = "build_state";
    public static final String CUBE_CONSUME_STATE = "consume_state";
    public static final String CUBE_ASSIGNMENT = "assignment";
    public static final String CUBE_CONSUME_SRC_STATE = "consume_source_state";
    public static final String CUBE_SRC_CHECKPOINT = "source_checkpoint";
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) ZookeeperStreamMetadataStore.class);
    private String replicaSetRoot;
    private String receiverRoot;
    private String cubeRoot;
    private String coordinatorRoot;
    private static final long REPORT_DURATION = 300000;
    private AtomicLong readSuccess = new AtomicLong();
    private AtomicLong readFail = new AtomicLong();
    private AtomicLong writeSuccess = new AtomicLong();
    private AtomicLong writeFail = new AtomicLong();
    String reportTemplate = "[StreamMetadataStoreStats]  read : {} ; write: {} ; read failed: {} ; write failed: {} .";
    private AtomicLong lastReport = new AtomicLong();
    private CuratorFramework client = StreamingUtils.getZookeeperClient();
    private String zkRoot = StreamingUtils.STREAM_ZK_ROOT;

    public ZookeeperStreamMetadataStore() {
        init();
    }

    private void init() {
        try {
            this.replicaSetRoot = this.zkRoot + REPLICA_SET_ROOT;
            this.receiverRoot = this.zkRoot + RECEIVER_ROOT;
            this.cubeRoot = this.zkRoot + CUBE_ROOT;
            this.coordinatorRoot = this.zkRoot + COORDINATOR_NODE;
            createZKNodeIfNotExist(this.zkRoot);
            createZKNodeIfNotExist(this.replicaSetRoot);
            createZKNodeIfNotExist(this.receiverRoot);
            createZKNodeIfNotExist(this.cubeRoot);
            createZKNodeIfNotExist(this.coordinatorRoot);
        } catch (Exception e) {
            logger.error("error when create zk nodes", (Throwable) e);
            throw new StoreException(e);
        }
    }

    private void createZKNodeIfNotExist(String str) throws Exception {
        if (this.client.checkExists().forPath(str) == null) {
            this.client.create().creatingParentsIfNeeded().forPath(str);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.StreamMetadataStore
    public void removeCubeAssignment(String str) {
        logger.trace("Remove cube assignment {}.", str);
        checkPath(str);
        try {
            this.client.delete().forPath(ZKPaths.makePath(this.cubeRoot, str, CUBE_ASSIGNMENT));
            this.writeSuccess.getAndIncrement();
        } catch (Exception e) {
            this.writeFail.getAndIncrement();
            logger.error("Error when remove cube assignment " + str, (Throwable) e);
            throw new StoreException(e);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.StreamMetadataStore
    public List<CubeAssignment> getAllCubeAssignments() {
        try {
            ArrayList newArrayList = Lists.newArrayList();
            Iterator<String> it = this.client.getChildren().forPath(this.cubeRoot).iterator();
            while (it.hasNext()) {
                String cubeAssignmentPath = getCubeAssignmentPath(it.next());
                if (this.client.checkExists().forPath(cubeAssignmentPath) != null) {
                    newArrayList.add(CubeAssignment.deserializeCubeAssignment(this.client.getData().forPath(cubeAssignmentPath)));
                }
            }
            this.readSuccess.getAndIncrement();
            return newArrayList;
        } catch (Exception e) {
            this.readFail.getAndIncrement();
            logger.error("Error when get assignments", (Throwable) e);
            throw new StoreException(e);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.StreamMetadataStore
    public Map<Integer, Map<String, List<Partition>>> getAllReplicaSetAssignments() {
        try {
            List<CubeAssignment> allCubeAssignments = getAllCubeAssignments();
            this.readSuccess.getAndIncrement();
            return AssignmentUtil.convertCubeAssign2ReplicaSetAssign(allCubeAssignments);
        } catch (Exception e) {
            this.readFail.getAndIncrement();
            logger.error("Error when get assignments", (Throwable) e);
            throw new StoreException(e);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.StreamMetadataStore
    public Map<String, List<Partition>> getAssignmentsByReplicaSet(int i) {
        try {
            Map<Integer, Map<String, List<Partition>>> allReplicaSetAssignments = getAllReplicaSetAssignments();
            this.readSuccess.getAndIncrement();
            return allReplicaSetAssignments.get(Integer.valueOf(i));
        } catch (Exception e) {
            this.readFail.getAndIncrement();
            logger.error("Error when get assignment for replica set " + i, (Throwable) e);
            throw new StoreException(e);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.StreamMetadataStore
    public CubeAssignment getAssignmentsByCube(String str) {
        try {
            String cubeAssignmentPath = getCubeAssignmentPath(str);
            if (this.client.checkExists().forPath(cubeAssignmentPath) == null) {
                logger.warn("Cannot find content at {}.", cubeAssignmentPath);
                return null;
            }
            byte[] forPath = this.client.getData().forPath(cubeAssignmentPath);
            this.readSuccess.getAndIncrement();
            return CubeAssignment.deserializeCubeAssignment(forPath);
        } catch (Exception e) {
            this.readFail.getAndIncrement();
            logger.error("Error when get cube assignment for " + str, (Throwable) e);
            throw new StoreException(e);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.StreamMetadataStore
    public List<ReplicaSet> getReplicaSets() {
        ArrayList newArrayList = Lists.newArrayList();
        try {
            List<String> forPath = this.client.getChildren().forPath(this.replicaSetRoot);
            this.readSuccess.getAndIncrement();
            Iterator<String> it = forPath.iterator();
            while (it.hasNext()) {
                newArrayList.add(getReplicaSet(Integer.parseInt(it.next())));
            }
            return newArrayList;
        } catch (Exception e) {
            this.readFail.getAndIncrement();
            logger.error("Error when get replica sets", (Throwable) e);
            throw new StoreException(e);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.StreamMetadataStore
    public List<Integer> getReplicaSetIDs() {
        try {
            List<String> forPath = this.client.getChildren().forPath(this.replicaSetRoot);
            this.readSuccess.getAndIncrement();
            return Lists.transform(forPath, new Function<String, Integer>() { // from class: org.apache.kylin.stream.coordinator.ZookeeperStreamMetadataStore.1
                @Override // org.apache.kylin.shaded.com.google.common.base.Function, java.util.function.Function
                @Nullable
                public Integer apply(@Nullable String str) {
                    return Integer.valueOf(str);
                }
            });
        } catch (Exception e) {
            this.readFail.getAndIncrement();
            logger.error("Error when get replica sets", (Throwable) e);
            throw new StoreException(e);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.StreamMetadataStore
    public int createReplicaSet(ReplicaSet replicaSet) {
        try {
            List transform = Lists.transform(this.client.getChildren().forPath(this.replicaSetRoot), new Function<String, Integer>() { // from class: org.apache.kylin.stream.coordinator.ZookeeperStreamMetadataStore.2
                @Override // org.apache.kylin.shaded.com.google.common.base.Function, java.util.function.Function
                @Nullable
                public Integer apply(@Nullable String str) {
                    Integer num;
                    try {
                        num = Integer.valueOf(str);
                    } catch (Exception e) {
                        num = 0;
                    }
                    return num;
                }
            });
            int i = -1;
            if (transform != null && !transform.isEmpty()) {
                i = ((Integer) Collections.max(transform)).intValue();
            }
            int i2 = i + 1;
            logger.trace("Id of new replica set {} is {}.", replicaSet, Integer.valueOf(i2));
            replicaSet.setReplicaSetID(i2);
            this.client.create().creatingParentsIfNeeded().forPath(ZKPaths.makePath(this.replicaSetRoot, String.valueOf(i2)), serializeReplicaSet(replicaSet));
            this.writeSuccess.getAndIncrement();
            return i2;
        } catch (Exception e) {
            this.writeFail.getAndIncrement();
            logger.error("Error when create replicaSet " + replicaSet, (Throwable) e);
            throw new StoreException(e);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.StreamMetadataStore
    public void updateReplicaSet(ReplicaSet replicaSet) {
        try {
            this.client.setData().forPath(ZKPaths.makePath(this.replicaSetRoot, String.valueOf(replicaSet.getReplicaSetID())), serializeReplicaSet(replicaSet));
            this.writeSuccess.getAndIncrement();
        } catch (Exception e) {
            this.writeFail.getAndIncrement();
            logger.error("error when update replicaSet " + replicaSet, (Throwable) e);
            throw new StoreException(e);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.StreamMetadataStore
    public Node getCoordinatorNode() {
        try {
            byte[] forPath = this.client.getData().forPath(this.coordinatorRoot);
            this.readSuccess.getAndIncrement();
            return (Node) JsonUtil.readValue(forPath, Node.class);
        } catch (Exception e) {
            this.readFail.getAndIncrement();
            logger.error("Error when get coordinator leader", (Throwable) e);
            throw new StoreException(e);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.StreamMetadataStore
    public void setCoordinatorNode(Node node) {
        try {
            this.client.setData().forPath(this.coordinatorRoot, JsonUtil.writeValueAsBytes(node));
            this.writeSuccess.getAndIncrement();
        } catch (Exception e) {
            this.writeFail.getAndIncrement();
            logger.error("Error when set coordinator leader to " + node, (Throwable) e);
            throw new StoreException(e);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.StreamMetadataStore
    public void saveSourceCheckpoint(String str, String str2, int i, String str3) {
        checkPath(str, str2);
        logger.trace("Save remote checkpoint {} {} {} with content {}.", str, str2, Integer.valueOf(i), str3);
        try {
            String makePath = ZKPaths.makePath(this.cubeRoot, str, CUBE_SRC_CHECKPOINT, str2, String.valueOf(i));
            if (this.client.checkExists().forPath(makePath) == null) {
                this.client.create().creatingParentsIfNeeded().forPath(makePath);
            } else {
                logger.warn("Checkpoint path already existed under path {}, overwrite with new one.", makePath);
            }
            this.client.setData().forPath(makePath, Bytes.toBytes(str3));
            this.writeSuccess.getAndIncrement();
        } catch (Exception e) {
            this.writeFail.getAndIncrement();
            logger.error("Error when save remote checkpoint for " + str + " " + str2, (Throwable) e);
            throw new StoreException(e);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.StreamMetadataStore
    public Map<Integer, String> getSourceCheckpoint(String str, String str2) {
        List<String> forPath;
        try {
            HashMap newHashMap = Maps.newHashMap();
            String makePath = ZKPaths.makePath(this.cubeRoot, str, CUBE_SRC_CHECKPOINT, str2);
            if (this.client.checkExists().forPath(makePath) == null || (forPath = this.client.getChildren().forPath(makePath)) == null) {
                return null;
            }
            for (String str3 : forPath) {
                newHashMap.put(Integer.valueOf(str3), Bytes.toString(this.client.getData().forPath(ZKPaths.makePath(makePath, str3))));
            }
            this.readSuccess.getAndIncrement();
            return newHashMap;
        } catch (Exception e) {
            this.readFail.getAndIncrement();
            logger.error("Error to fetch remote checkpoint for " + str + " " + str2, (Throwable) e);
            throw new StoreException(e);
        }
    }

    private byte[] serializeReplicaSet(ReplicaSet replicaSet) throws Exception {
        return Bytes.toBytes(JsonUtil.writeValueAsString(replicaSet));
    }

    @Override // org.apache.kylin.stream.coordinator.StreamMetadataStore
    public ReplicaSet getReplicaSet(int i) {
        try {
            ReplicaSet replicaSet = new ReplicaSet();
            replicaSet.setReplicaSetID(i);
            byte[] forPath = this.client.getData().forPath(ZKPaths.makePath(this.replicaSetRoot, String.valueOf(i)));
            if (forPath != null && forPath.length > 0) {
                replicaSet = (ReplicaSet) JsonUtil.readValue(Bytes.toString(forPath), ReplicaSet.class);
            }
            this.readSuccess.getAndIncrement();
            return replicaSet;
        } catch (Exception e) {
            this.readFail.getAndIncrement();
            logger.error("Error when get replica set " + i, (Throwable) e);
            throw new StoreException(e);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.StreamMetadataStore
    public void removeReplicaSet(int i) {
        try {
            this.client.delete().forPath(ZKPaths.makePath(this.replicaSetRoot, String.valueOf(i)));
            this.writeSuccess.getAndIncrement();
        } catch (Exception e) {
            this.writeFail.getAndIncrement();
            logger.error("Error when remove replica set " + i, (Throwable) e);
            throw new StoreException(e);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.StreamMetadataStore
    public List<Node> getReceivers() {
        ArrayList newArrayList = Lists.newArrayList();
        try {
            Iterator<String> it = this.client.getChildren().forPath(this.receiverRoot).iterator();
            while (it.hasNext()) {
                newArrayList.add(Node.from(it.next().replace('_', ':')));
            }
            this.readSuccess.getAndIncrement();
            return newArrayList;
        } catch (Exception e) {
            this.readFail.getAndIncrement();
            logger.error("Error when fetch receivers", (Throwable) e);
            throw new StoreException(e);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.StreamMetadataStore
    public List<String> getCubes() {
        try {
            List<String> forPath = this.client.getChildren().forPath(this.cubeRoot);
            this.readSuccess.getAndIncrement();
            return forPath;
        } catch (Exception e) {
            this.readFail.getAndIncrement();
            logger.error("Error when fetch cubes", (Throwable) e);
            throw new StoreException(e);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.StreamMetadataStore
    public void addStreamingCube(String str) {
        checkPath(str);
        try {
            String makePath = ZKPaths.makePath(this.cubeRoot, str);
            if (this.client.checkExists().forPath(makePath) == null) {
                this.client.create().creatingParentsIfNeeded().forPath(makePath);
            }
            this.writeSuccess.getAndIncrement();
        } catch (Exception e) {
            this.writeFail.getAndIncrement();
            logger.error("Error when add cube " + str, (Throwable) e);
            throw new StoreException(e);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.StreamMetadataStore
    public void removeStreamingCube(String str) {
        logger.trace("Remove cube {}", str);
        checkPath(str);
        try {
            if (this.client.checkExists().forPath(ZKPaths.makePath(this.cubeRoot, str)) != null) {
                this.client.delete().deletingChildrenIfNeeded().forPath(ZKPaths.makePath(this.cubeRoot, str));
            }
            this.writeSuccess.getAndIncrement();
        } catch (Exception e) {
            this.writeFail.getAndIncrement();
            logger.error("Error when remove cube " + str, (Throwable) e);
            throw new StoreException(e);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.StreamMetadataStore
    public StreamingCubeConsumeState getStreamingCubeConsumeState(String str) {
        try {
            String cubeConsumeStatePath = getCubeConsumeStatePath(str);
            if (this.client.checkExists().forPath(cubeConsumeStatePath) == null) {
                return StreamingCubeConsumeState.RUNNING;
            }
            byte[] forPath = this.client.getData().forPath(cubeConsumeStatePath);
            this.readSuccess.getAndIncrement();
            return (forPath == null || forPath.length <= 0) ? StreamingCubeConsumeState.RUNNING : (StreamingCubeConsumeState) JsonUtil.readValue(forPath, StreamingCubeConsumeState.class);
        } catch (Exception e) {
            this.readFail.getAndIncrement();
            logger.error("Error when get streaming cube consume state " + str, (Throwable) e);
            throw new StoreException(e);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.StreamMetadataStore
    public void saveStreamingCubeConsumeState(String str, StreamingCubeConsumeState streamingCubeConsumeState) {
        checkPath(str);
        try {
            String cubeConsumeStatePath = getCubeConsumeStatePath(str);
            if (this.client.checkExists().forPath(cubeConsumeStatePath) != null) {
                this.client.setData().forPath(cubeConsumeStatePath, JsonUtil.writeValueAsBytes(streamingCubeConsumeState));
            } else {
                this.client.create().creatingParentsIfNeeded().forPath(cubeConsumeStatePath, JsonUtil.writeValueAsBytes(streamingCubeConsumeState));
            }
            this.writeSuccess.getAndIncrement();
        } catch (Exception e) {
            this.writeFail.getAndIncrement();
            logger.error("Error when save streaming cube consume state " + str + " with " + streamingCubeConsumeState, (Throwable) e);
            throw new StoreException(e);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.StreamMetadataStore
    public void addReceiver(Node node) {
        logger.trace("Add {}.", node);
        try {
            String makePath = ZKPaths.makePath(this.receiverRoot, node.toNormalizeString());
            if (this.client.checkExists().forPath(makePath) == null) {
                this.client.create().creatingParentsIfNeeded().forPath(makePath);
            } else {
                logger.warn("{} exists.", makePath);
            }
            this.writeSuccess.getAndIncrement();
        } catch (Exception e) {
            this.writeFail.getAndIncrement();
            logger.error("Error when add new receiver " + node, (Throwable) e);
            throw new StoreException(e);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.StreamMetadataStore
    public void removeReceiver(Node node) {
        logger.trace("Remove {}.", node);
        try {
            String makePath = ZKPaths.makePath(this.receiverRoot, node.toNormalizeString());
            if (this.client.checkExists().forPath(makePath) != null) {
                this.client.delete().deletingChildrenIfNeeded().forPath(makePath);
            }
            this.writeSuccess.getAndIncrement();
        } catch (Exception e) {
            this.writeFail.getAndIncrement();
            logger.error("Error when remove receiver " + node, (Throwable) e);
            throw new StoreException(e);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.StreamMetadataStore
    public void saveNewCubeAssignment(CubeAssignment cubeAssignment) {
        logger.trace("Try saving new cube assignment for: {}.", cubeAssignment);
        try {
            String cubeAssignmentPath = getCubeAssignmentPath(cubeAssignment.getCubeName());
            if (this.client.checkExists().forPath(cubeAssignmentPath) == null) {
                this.client.create().creatingParentsIfNeeded().forPath(cubeAssignmentPath, CubeAssignment.serializeCubeAssignment(cubeAssignment));
            } else {
                this.client.setData().forPath(cubeAssignmentPath, CubeAssignment.serializeCubeAssignment(cubeAssignment));
            }
            this.writeSuccess.getAndIncrement();
        } catch (Exception e) {
            this.writeFail.getAndIncrement();
            logger.error("Fail to save cube assignment", (Throwable) e);
            throw new StoreException(e);
        }
    }

    public void close() {
        try {
            this.client.close();
        } catch (Exception e) {
            logger.error("Exception throws when close assignmentManager", (Throwable) e);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.StreamMetadataStore
    public void addCompleteReplicaSetForSegmentBuild(String str, String str2, int i) {
        logger.trace("Add completed rs {} to {} {}", Integer.valueOf(i), str, str2);
        checkPath(str, str2);
        try {
            String makePath = ZKPaths.makePath(this.cubeRoot, str, CUBE_BUILD_STATE, str2, "replica_sets", String.valueOf(i));
            if (this.client.checkExists().forPath(makePath) == null) {
                this.client.create().creatingParentsIfNeeded().forPath(makePath);
            } else {
                logger.warn("ReplicaSet id {} existed under path {}", Integer.valueOf(i), makePath);
            }
            this.writeSuccess.getAndIncrement();
        } catch (Exception e) {
            this.writeFail.getAndIncrement();
            logger.error("Fail to add replicaSet Id to segment build state for " + str2 + " " + i, (Throwable) e);
            throw new StoreException(e);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.StreamMetadataStore
    public void updateSegmentBuildState(String str, String str2, SegmentBuildState.BuildState buildState) {
        logger.trace("Update {} {} to state {}", str, str2, buildState);
        checkPath(str, str2);
        try {
            this.client.setData().forPath(ZKPaths.makePath(this.cubeRoot, str, CUBE_BUILD_STATE, str2), Bytes.toBytes(JsonUtil.writeValueAsString(buildState)));
            this.writeSuccess.getAndIncrement();
        } catch (Exception e) {
            this.writeFail.getAndIncrement();
            logger.error("Fail to update segment build state for " + str2 + " to " + buildState, (Throwable) e);
            throw new StoreException(e);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.StreamMetadataStore
    public List<SegmentBuildState> getSegmentBuildStates(String str) {
        try {
            String cubeBuildStatePath = getCubeBuildStatePath(str);
            if (this.client.checkExists().forPath(cubeBuildStatePath) == null) {
                return Lists.newArrayList();
            }
            List<String> forPath = this.client.getChildren().forPath(cubeBuildStatePath);
            this.readSuccess.getAndIncrement();
            ArrayList newArrayList = Lists.newArrayList();
            Iterator<String> it = forPath.iterator();
            while (it.hasNext()) {
                newArrayList.add(doGetSegmentBuildState(cubeBuildStatePath, it.next()));
            }
            return newArrayList;
        } catch (Exception e) {
            this.readFail.getAndIncrement();
            logger.error("Fail to get segment build states " + str, (Throwable) e);
            throw new StoreException(e);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.StreamMetadataStore
    public SegmentBuildState getSegmentBuildState(String str, String str2) {
        try {
            return doGetSegmentBuildState(getCubeBuildStatePath(str), str2);
        } catch (Exception e) {
            this.readFail.getAndIncrement();
            logger.error("Fail to get segment build state for " + str + " " + str2, (Throwable) e);
            throw new StoreException(e);
        }
    }

    private SegmentBuildState doGetSegmentBuildState(String str, String str2) throws Exception {
        SegmentBuildState segmentBuildState = new SegmentBuildState(str2);
        String makePath = ZKPaths.makePath(str, str2);
        byte[] forPath = this.client.getData().forPath(makePath);
        this.readSuccess.getAndIncrement();
        if (forPath != null && forPath.length > 0) {
            segmentBuildState.setState((SegmentBuildState.BuildState) JsonUtil.readValue(Bytes.toString(forPath), SegmentBuildState.BuildState.class));
        }
        Iterator<String> it = this.client.getChildren().forPath(ZKPaths.makePath(makePath, "replica_sets")).iterator();
        while (it.hasNext()) {
            segmentBuildState.addCompleteReplicaSet(Integer.valueOf(it.next()).intValue());
        }
        return segmentBuildState;
    }

    @Override // org.apache.kylin.stream.coordinator.StreamMetadataStore
    public boolean removeSegmentBuildState(String str, String str2) {
        logger.trace("Remove {} {}", str, str2);
        checkPath(str, str2);
        try {
            String makePath = ZKPaths.makePath(this.cubeRoot, str, CUBE_BUILD_STATE, str2);
            if (this.client.checkExists().forPath(makePath) == null) {
                logger.warn("Cube segment deep store state does not exisit!, path {} ", makePath);
                return false;
            }
            this.client.delete().deletingChildrenIfNeeded().forPath(makePath);
            this.writeSuccess.getAndIncrement();
            return true;
        } catch (Exception e) {
            this.writeFail.getAndIncrement();
            logger.error("Fail to remove cube segment deep store state " + str + " " + str2, (Throwable) e);
            throw new StoreException(e);
        }
    }

    private String getCubeAssignmentPath(String str) {
        return ZKPaths.makePath(this.cubeRoot, str, CUBE_ASSIGNMENT);
    }

    private String getCubeBuildStatePath(String str) {
        return ZKPaths.makePath(this.cubeRoot, str, CUBE_BUILD_STATE);
    }

    private String getCubeConsumeStatePath(String str) {
        return ZKPaths.makePath(this.cubeRoot, str, CUBE_CONSUME_STATE);
    }

    @Override // org.apache.kylin.stream.coordinator.StreamMetadataStore
    public void reportStat() {
        if (this.writeFail.get() > 0 || this.readFail.get() > 0) {
            logger.warn(this.reportTemplate, Long.valueOf(this.readSuccess.get()), Long.valueOf(this.writeSuccess.get()), Long.valueOf(this.readFail.get()), Long.valueOf(this.writeFail.get()));
        } else if (System.currentTimeMillis() - this.lastReport.get() < 300000) {
            return;
        } else {
            logger.debug(this.reportTemplate, Long.valueOf(this.readSuccess.get()), Long.valueOf(this.writeSuccess.get()), Long.valueOf(this.readFail.get()), Long.valueOf(this.writeFail.get()));
        }
        this.lastReport.set(System.currentTimeMillis());
    }

    private void checkPath(String... strArr) {
        for (String str : strArr) {
            if (str == null || str.length() == 0) {
                throw new IllegalArgumentException("Illegal zookeeper path.");
            }
        }
    }
}
