package org.apache.kylin.stream.coordinator;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.utils.ZKPaths;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.stream.coordinator.assign.AssignmentUtil;
import org.apache.kylin.stream.coordinator.exception.StoreException;
import org.apache.kylin.stream.core.model.CubeAssignment;
import org.apache.kylin.stream.core.model.Node;
import org.apache.kylin.stream.core.model.ReplicaSet;
import org.apache.kylin.stream.core.model.SegmentBuildState;
import org.apache.kylin.stream.core.model.StreamingCubeConsumeState;
import org.apache.kylin.stream.core.source.Partition;
import org.apache.kylin.tool.shaded.com.google.common.base.Function;
import org.apache.kylin.tool.shaded.com.google.common.collect.Lists;
import org.apache.kylin.tool.shaded.com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kylin/stream/coordinator/ZookeeperStreamMetadataStore.class */
public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
    public static final String REPLICA_SET_ROOT = "/replica_sets";
    public static final String RECEIVER_ROOT = "/receivers";
    public static final String CUBE_ROOT = "/cubes";
    public static final String COORDINATOR_NODE = "/coordinator";
    public static final String CUBE_BUILD_STATE = "build_state";
    public static final String CUBE_CONSUME_STATE = "consume_state";
    public static final String CUBE_ASSIGNMENT = "assignment";
    public static final String CUBE_CONSUME_SRC_STATE = "consume_source_state";
    public static final String CUBE_SRC_CHECKPOINT = "source_checkpoint";
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) ZookeeperStreamMetadataStore.class);
    private CuratorFramework client = StreamingUtils.getZookeeperClient();
    private String zkRoot = StreamingUtils.STREAM_ZK_ROOT;
    private String replicaSetRoot;
    private String receiverRoot;
    private String cubeRoot;
    private String coordinatorRoot;

    public ZookeeperStreamMetadataStore() {
        init();
    }

    private void init() {
        try {
            this.replicaSetRoot = this.zkRoot + REPLICA_SET_ROOT;
            this.receiverRoot = this.zkRoot + RECEIVER_ROOT;
            this.cubeRoot = this.zkRoot + CUBE_ROOT;
            this.coordinatorRoot = this.zkRoot + COORDINATOR_NODE;
            createZKNodeIfNotExist(this.zkRoot);
            createZKNodeIfNotExist(this.replicaSetRoot);
            createZKNodeIfNotExist(this.receiverRoot);
            createZKNodeIfNotExist(this.cubeRoot);
            createZKNodeIfNotExist(this.coordinatorRoot);
        } catch (Exception e) {
            logger.error("error when create zk nodes", (Throwable) e);
            throw new StoreException(e);
        }
    }

    private void createZKNodeIfNotExist(String str) throws Exception {
        if (this.client.checkExists().forPath(str) == null) {
            this.client.create().creatingParentsIfNeeded().forPath(str);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.StreamMetadataStore
    public void removeCubeAssignment(String str) {
        try {
            this.client.delete().forPath(ZKPaths.makePath(this.cubeRoot, str, new String[]{CUBE_ASSIGNMENT}));
        } catch (Exception e) {
            logger.error("error when remove cube assignment", (Throwable) e);
            throw new StoreException(e);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.StreamMetadataStore
    public List<CubeAssignment> getAllCubeAssignments() {
        try {
            ArrayList newArrayList = Lists.newArrayList();
            Iterator it = ((List) this.client.getChildren().forPath(this.cubeRoot)).iterator();
            while (it.hasNext()) {
                String cubeAssignmentPath = getCubeAssignmentPath((String) it.next());
                if (this.client.checkExists().forPath(cubeAssignmentPath) != null) {
                    newArrayList.add(CubeAssignment.deserializeCubeAssignment((byte[]) this.client.getData().forPath(cubeAssignmentPath)));
                }
            }
            return newArrayList;
        } catch (Exception e) {
            logger.error("error when get assignments");
            throw new StoreException(e);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.StreamMetadataStore
    public Map<Integer, Map<String, List<Partition>>> getAllReplicaSetAssignments() {
        try {
            return AssignmentUtil.convertCubeAssign2ReplicaSetAssign(getAllCubeAssignments());
        } catch (Exception e) {
            logger.error("error when get assignments");
            throw new StoreException(e);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.StreamMetadataStore
    public Map<String, List<Partition>> getAssignmentsByReplicaSet(int i) {
        try {
            return getAllReplicaSetAssignments().get(Integer.valueOf(i));
        } catch (Exception e) {
            logger.error("error when get assignments");
            throw new StoreException(e);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.StreamMetadataStore
    public CubeAssignment getAssignmentsByCube(String str) {
        try {
            String cubeAssignmentPath = getCubeAssignmentPath(str);
            if (this.client.checkExists().forPath(cubeAssignmentPath) == null) {
                return null;
            }
            return CubeAssignment.deserializeCubeAssignment((byte[]) this.client.getData().forPath(cubeAssignmentPath));
        } catch (Exception e) {
            logger.error("error when get cube assignment");
            throw new StoreException(e);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.StreamMetadataStore
    public List<ReplicaSet> getReplicaSets() {
        ArrayList newArrayList = Lists.newArrayList();
        try {
            Iterator it = ((List) this.client.getChildren().forPath(this.replicaSetRoot)).iterator();
            while (it.hasNext()) {
                newArrayList.add(getReplicaSet(Integer.parseInt((String) it.next())));
            }
            return newArrayList;
        } catch (Exception e) {
            logger.error("error when get replica sets", (Throwable) e);
            throw new StoreException(e);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.StreamMetadataStore
    public List<Integer> getReplicaSetIDs() {
        try {
            return Lists.transform((List) this.client.getChildren().forPath(this.replicaSetRoot), new Function<String, Integer>() { // from class: org.apache.kylin.stream.coordinator.ZookeeperStreamMetadataStore.1
                @Override // org.apache.kylin.tool.shaded.com.google.common.base.Function
                @Nullable
                public Integer apply(@Nullable String str) {
                    return Integer.valueOf(str);
                }
            });
        } catch (Exception e) {
            logger.error("error when get replica sets", (Throwable) e);
            throw new StoreException(e);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.StreamMetadataStore
    public int createReplicaSet(ReplicaSet replicaSet) {
        try {
            List transform = Lists.transform((List) this.client.getChildren().forPath(this.replicaSetRoot), new Function<String, Integer>() { // from class: org.apache.kylin.stream.coordinator.ZookeeperStreamMetadataStore.2
                @Override // org.apache.kylin.tool.shaded.com.google.common.base.Function
                @Nullable
                public Integer apply(@Nullable String str) {
                    Integer num;
                    try {
                        num = Integer.valueOf(str);
                    } catch (Exception e) {
                        num = 0;
                    }
                    return num;
                }
            });
            int i = -1;
            if (transform != null && !transform.isEmpty()) {
                i = ((Integer) Collections.max(transform)).intValue();
            }
            int i2 = i + 1;
            replicaSet.setReplicaSetID(i2);
            this.client.create().creatingParentsIfNeeded().forPath(ZKPaths.makePath(this.replicaSetRoot, String.valueOf(i2)), serializeReplicaSet(replicaSet));
            return i2;
        } catch (Exception e) {
            logger.error("error when create replicaSet:" + replicaSet);
            throw new StoreException(e);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.StreamMetadataStore
    public void updateReplicaSet(ReplicaSet replicaSet) {
        try {
            this.client.setData().forPath(ZKPaths.makePath(this.replicaSetRoot, String.valueOf(replicaSet.getReplicaSetID())), serializeReplicaSet(replicaSet));
        } catch (Exception e) {
            logger.error("error when update replicaSet:" + replicaSet.getReplicaSetID());
            throw new StoreException(e);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.StreamMetadataStore
    public Node getCoordinatorNode() {
        try {
            return (Node) JsonUtil.readValue((byte[]) this.client.getData().forPath(this.coordinatorRoot), Node.class);
        } catch (Exception e) {
            logger.error("error when get coordinator", (Throwable) e);
            throw new StoreException(e);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.StreamMetadataStore
    public void setCoordinatorNode(Node node) {
        try {
            this.client.setData().forPath(this.coordinatorRoot, JsonUtil.writeValueAsBytes(node));
        } catch (Exception e) {
            logger.error("error when set coordinator", (Throwable) e);
            throw new StoreException(e);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.StreamMetadataStore
    public void saveSourceCheckpoint(String str, String str2, int i, String str3) {
        try {
            String makePath = ZKPaths.makePath(this.cubeRoot, str, new String[]{CUBE_SRC_CHECKPOINT, str2, String.valueOf(i)});
            if (this.client.checkExists().forPath(makePath) == null) {
                this.client.create().creatingParentsIfNeeded().forPath(makePath);
            } else {
                logger.warn("checkpoint path already existed under path {}", makePath);
            }
            this.client.setData().forPath(makePath, Bytes.toBytes(str3));
        } catch (Exception e) {
            logger.error("fail to add replicaSet Id to segment build state", (Throwable) e);
            throw new StoreException(e);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.StreamMetadataStore
    public Map<Integer, String> getSourceCheckpoint(String str, String str2) {
        List<String> list;
        try {
            HashMap newHashMap = Maps.newHashMap();
            String makePath = ZKPaths.makePath(this.cubeRoot, str, new String[]{CUBE_SRC_CHECKPOINT, str2});
            if (this.client.checkExists().forPath(makePath) == null || (list = (List) this.client.getChildren().forPath(makePath)) == null) {
                return null;
            }
            for (String str3 : list) {
                newHashMap.put(Integer.valueOf(str3), Bytes.toString((byte[]) this.client.getData().forPath(ZKPaths.makePath(makePath, str3))));
            }
            return newHashMap;
        } catch (Exception e) {
            logger.error("fail to add replicaSet Id to segment build state", (Throwable) e);
            throw new StoreException(e);
        }
    }

    private byte[] serializeReplicaSet(ReplicaSet replicaSet) throws Exception {
        return Bytes.toBytes(JsonUtil.writeValueAsString(replicaSet));
    }

    @Override // org.apache.kylin.stream.coordinator.StreamMetadataStore
    public ReplicaSet getReplicaSet(int i) {
        try {
            ReplicaSet replicaSet = new ReplicaSet();
            replicaSet.setReplicaSetID(i);
            byte[] bArr = (byte[]) this.client.getData().forPath(ZKPaths.makePath(this.replicaSetRoot, String.valueOf(i)));
            if (bArr != null && bArr.length > 0) {
                replicaSet = (ReplicaSet) JsonUtil.readValue(Bytes.toString(bArr), ReplicaSet.class);
            }
            return replicaSet;
        } catch (Exception e) {
            logger.error("error when get replica set:" + i);
            throw new StoreException(e);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.StreamMetadataStore
    public void removeReplicaSet(int i) {
        try {
            this.client.delete().forPath(ZKPaths.makePath(this.replicaSetRoot, String.valueOf(i)));
        } catch (Exception e) {
            logger.error("error when remove replica set:" + i);
            throw new StoreException(e);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.StreamMetadataStore
    public List<Node> getReceivers() {
        ArrayList newArrayList = Lists.newArrayList();
        try {
            Iterator it = ((List) this.client.getChildren().forPath(this.receiverRoot)).iterator();
            while (it.hasNext()) {
                newArrayList.add(Node.from(((String) it.next()).replace('_', ':')));
            }
            return newArrayList;
        } catch (Exception e) {
            logger.error("error when fetch receivers", (Throwable) e);
            throw new StoreException(e);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.StreamMetadataStore
    public List<String> getCubes() {
        try {
            return (List) this.client.getChildren().forPath(this.cubeRoot);
        } catch (Exception e) {
            logger.error("error when fetch cubes", (Throwable) e);
            throw new StoreException(e);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.StreamMetadataStore
    public void addStreamingCube(String str) {
        try {
            String makePath = ZKPaths.makePath(this.cubeRoot, str);
            if (this.client.checkExists().forPath(makePath) == null) {
                this.client.create().creatingParentsIfNeeded().forPath(makePath);
            }
        } catch (Exception e) {
            logger.error("error when add cube", (Throwable) e);
            throw new StoreException(e);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.StreamMetadataStore
    public void removeStreamingCube(String str) {
        try {
            if (this.client.checkExists().forPath(ZKPaths.makePath(this.cubeRoot, str)) != null) {
                this.client.delete().deletingChildrenIfNeeded().forPath(ZKPaths.makePath(this.cubeRoot, str));
            }
        } catch (Exception e) {
            logger.error("error when remove cube", (Throwable) e);
            throw new StoreException(e);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.StreamMetadataStore
    public StreamingCubeConsumeState getStreamingCubeConsumeState(String str) {
        try {
            String cubeConsumeStatePath = getCubeConsumeStatePath(str);
            if (this.client.checkExists().forPath(cubeConsumeStatePath) == null) {
                return StreamingCubeConsumeState.RUNNING;
            }
            byte[] bArr = (byte[]) this.client.getData().forPath(cubeConsumeStatePath);
            return (bArr == null || bArr.length <= 0) ? StreamingCubeConsumeState.RUNNING : (StreamingCubeConsumeState) JsonUtil.readValue(bArr, StreamingCubeConsumeState.class);
        } catch (Exception e) {
            logger.error("error when get streaming cube consume state", (Throwable) e);
            throw new StoreException(e);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.StreamMetadataStore
    public void saveStreamingCubeConsumeState(String str, StreamingCubeConsumeState streamingCubeConsumeState) {
        try {
            String cubeConsumeStatePath = getCubeConsumeStatePath(str);
            if (this.client.checkExists().forPath(cubeConsumeStatePath) != null) {
                this.client.setData().forPath(cubeConsumeStatePath, JsonUtil.writeValueAsBytes(streamingCubeConsumeState));
            } else {
                this.client.create().creatingParentsIfNeeded().forPath(cubeConsumeStatePath, JsonUtil.writeValueAsBytes(streamingCubeConsumeState));
            }
        } catch (Exception e) {
            logger.error("error when save streaming cube consume state", (Throwable) e);
            throw new StoreException(e);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.StreamMetadataStore
    public void addReceiver(Node node) {
        try {
            String makePath = ZKPaths.makePath(this.receiverRoot, node.toNormalizeString());
            if (this.client.checkExists().forPath(makePath) == null) {
                this.client.create().creatingParentsIfNeeded().forPath(makePath);
            }
        } catch (Exception e) {
            logger.error("error when add new receiver", (Throwable) e);
            throw new StoreException(e);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.StreamMetadataStore
    public void removeReceiver(Node node) {
        try {
            String makePath = ZKPaths.makePath(this.receiverRoot, node.toNormalizeString());
            if (this.client.checkExists().forPath(makePath) != null) {
                this.client.delete().deletingChildrenIfNeeded().forPath(makePath);
            }
        } catch (Exception e) {
            logger.error("error when remove receiver:" + node, (Throwable) e);
            throw new StoreException(e);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.StreamMetadataStore
    public void saveNewCubeAssignment(CubeAssignment cubeAssignment) {
        logger.info("try saving new cube assignment for:" + cubeAssignment.getCubeName());
        try {
            String cubeAssignmentPath = getCubeAssignmentPath(cubeAssignment.getCubeName());
            if (this.client.checkExists().forPath(cubeAssignmentPath) == null) {
                this.client.create().creatingParentsIfNeeded().forPath(cubeAssignmentPath, CubeAssignment.serializeCubeAssignment(cubeAssignment));
            } else {
                this.client.setData().forPath(cubeAssignmentPath, CubeAssignment.serializeCubeAssignment(cubeAssignment));
            }
        } catch (Exception e) {
            logger.error("fail to save cube assignment", (Throwable) e);
            throw new StoreException(e);
        }
    }

    public void close() {
        try {
            this.client.close();
        } catch (Exception e) {
            logger.error("exception throws when close assignmentManager", (Throwable) e);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.StreamMetadataStore
    public void addCompleteReplicaSetForSegmentBuild(String str, String str2, int i) {
        try {
            String makePath = ZKPaths.makePath(this.cubeRoot, str, new String[]{CUBE_BUILD_STATE, str2, "replica_sets", String.valueOf(i)});
            if (this.client.checkExists().forPath(makePath) == null) {
                this.client.create().creatingParentsIfNeeded().forPath(makePath);
            } else {
                logger.warn("ReplicaSet id {} existed under path {}", Integer.valueOf(i), makePath);
            }
        } catch (Exception e) {
            logger.error("fail to add replicaSet Id to segment build state", (Throwable) e);
            throw new StoreException(e);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.StreamMetadataStore
    public void updateSegmentBuildState(String str, String str2, SegmentBuildState.BuildState buildState) {
        try {
            this.client.setData().forPath(ZKPaths.makePath(this.cubeRoot, str, new String[]{CUBE_BUILD_STATE, str2}), Bytes.toBytes(JsonUtil.writeValueAsString(buildState)));
        } catch (Exception e) {
            logger.error("fail to update segment build state", (Throwable) e);
            throw new StoreException(e);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.StreamMetadataStore
    public List<SegmentBuildState> getSegmentBuildStates(String str) {
        try {
            String cubeBuildStatePath = getCubeBuildStatePath(str);
            if (this.client.checkExists().forPath(cubeBuildStatePath) == null) {
                return Lists.newArrayList();
            }
            List list = (List) this.client.getChildren().forPath(cubeBuildStatePath);
            ArrayList newArrayList = Lists.newArrayList();
            Iterator it = list.iterator();
            while (it.hasNext()) {
                newArrayList.add(doGetSegmentBuildState(cubeBuildStatePath, (String) it.next()));
            }
            return newArrayList;
        } catch (Exception e) {
            logger.error("fail to get segment build states", (Throwable) e);
            throw new StoreException(e);
        }
    }

    @Override // org.apache.kylin.stream.coordinator.StreamMetadataStore
    public SegmentBuildState getSegmentBuildState(String str, String str2) {
        try {
            return doGetSegmentBuildState(getCubeBuildStatePath(str), str2);
        } catch (Exception e) {
            logger.error("fail to get cube segment remote store state", (Throwable) e);
            throw new StoreException(e);
        }
    }

    private SegmentBuildState doGetSegmentBuildState(String str, String str2) throws Exception {
        SegmentBuildState segmentBuildState = new SegmentBuildState(str2);
        String makePath = ZKPaths.makePath(str, str2);
        byte[] bArr = (byte[]) this.client.getData().forPath(makePath);
        if (bArr != null && bArr.length > 0) {
            segmentBuildState.setState((SegmentBuildState.BuildState) JsonUtil.readValue(Bytes.toString(bArr), SegmentBuildState.BuildState.class));
        }
        Iterator it = ((List) this.client.getChildren().forPath(ZKPaths.makePath(makePath, "replica_sets"))).iterator();
        while (it.hasNext()) {
            segmentBuildState.addCompleteReplicaSet(Integer.valueOf((String) it.next()).intValue());
        }
        return segmentBuildState;
    }

    @Override // org.apache.kylin.stream.coordinator.StreamMetadataStore
    public boolean removeSegmentBuildState(String str, String str2) {
        try {
            String makePath = ZKPaths.makePath(this.cubeRoot, str, new String[]{CUBE_BUILD_STATE, str2});
            if (this.client.checkExists().forPath(makePath) != null) {
                this.client.delete().deletingChildrenIfNeeded().forPath(makePath);
                return true;
            }
            logger.warn("cube segment deep store state does not exisit!, path {} ", makePath);
            return false;
        } catch (Exception e) {
            logger.error("fail to remove cube segment deep store state", (Throwable) e);
            throw new StoreException(e);
        }
    }

    private String getCubeAssignmentPath(String str) {
        return ZKPaths.makePath(this.cubeRoot, str, new String[]{CUBE_ASSIGNMENT});
    }

    private String getCubeBuildStatePath(String str) {
        return ZKPaths.makePath(this.cubeRoot, str, new String[]{CUBE_BUILD_STATE});
    }

    private String getCubeConsumeStatePath(String str) {
        return ZKPaths.makePath(this.cubeRoot, str, new String[]{CUBE_CONSUME_STATE});
    }
}
