package io.kgraph.pregel;

import ch.qos.logback.core.AsyncAppenderBase;
import io.kgraph.pregel.PregelState;
import java.util.Base64;
import java.util.Map;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.ZKPaths;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/kafka-graphs-core-0.3.0.jar:io/kgraph/pregel/ZKUtils.class */
public class ZKUtils {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ZKUtils.class);
    public static final String GRAPHS_PATH = "/kafka-graphs";
    public static final String PREGEL_PATH = "/kafka-graphs/pregel-";
    public static final String AGGREGATES = "aggregates";
    public static final String BARRIERS = "barriers";
    public static final String GROUP = "group";
    public static final String LEADER = "leader";
    public static final String READY = "ready";
    public static final String SUPERSTEP = "superstep";

    public static CuratorFramework createCurator(String str) {
        CuratorFramework newClient = CuratorFrameworkFactory.newClient(str, new ExponentialBackoffRetry(AsyncAppenderBase.DEFAULT_MAX_FLUSH_TIME, 3));
        newClient.start();
        return newClient;
    }

    public static PregelState maybeCreateReadyToSendNode(CuratorFramework curatorFramework, String str, PregelState pregelState, TreeCache treeCache, int i) throws Exception {
        Map<String, ChildData> currentChildren = treeCache.getCurrentChildren(barrierPath(str, pregelState));
        if (currentChildren == null) {
            return pregelState;
        }
        int size = currentChildren.size();
        if (currentChildren.containsKey(READY)) {
            size--;
        }
        if (size != i) {
            return pregelState;
        }
        String barrierPath = barrierPath(str, pregelState.next());
        Map<String, ChildData> currentChildren2 = treeCache.getCurrentChildren(barrierPath);
        if (currentChildren2 == null) {
            return pregelState.complete();
        }
        int size2 = currentChildren2.size();
        if (currentChildren2.containsKey(READY)) {
            size2--;
        }
        if (size2 == 0) {
            return pregelState.complete();
        }
        String makePath = ZKPaths.makePath(barrierPath, READY);
        log.debug("adding ready {}", makePath);
        curatorFramework.create().creatingParentContainersIfNeeded().forPath(makePath);
        return pregelState.next();
    }

    public static PregelState maybeCreateReadyToReceiveNode(CuratorFramework curatorFramework, String str, PregelState pregelState, TreeCache treeCache) throws Exception {
        Map<String, ChildData> currentChildren = treeCache.getCurrentChildren(barrierPath(str, pregelState));
        if (currentChildren == null) {
            return pregelState;
        }
        int size = currentChildren.size();
        if (currentChildren.containsKey(READY)) {
            size--;
        }
        if (size != 0) {
            return pregelState;
        }
        String makePath = ZKPaths.makePath(barrierPath(str, pregelState.next()), READY);
        log.debug("adding ready {}", makePath);
        curatorFramework.create().creatingParentContainersIfNeeded().forPath(makePath);
        return pregelState.next();
    }

    public static boolean isReady(CuratorFramework curatorFramework, String str, PregelState pregelState) throws Exception {
        return curatorFramework.checkExists().forPath(ZKPaths.makePath(barrierPath(str, pregelState), READY)) != null;
    }

    public static <K> boolean hasChild(CuratorFramework curatorFramework, String str, PregelState pregelState, K k, Serializer<K> serializer) throws Exception {
        return hasChild(curatorFramework, str, pregelState, base64EncodedString(k, serializer));
    }

    public static <K> boolean hasChild(CuratorFramework curatorFramework, String str, PregelState pregelState, String str2) throws Exception {
        return curatorFramework.checkExists().forPath(ZKPaths.makePath(barrierPath(str, pregelState), str2)) != null;
    }

    public static <K> void addChild(CuratorFramework curatorFramework, String str, PregelState pregelState, K k, Serializer<K> serializer) throws Exception {
        addChild(curatorFramework, str, pregelState, base64EncodedString(k, serializer));
    }

    public static <K> void addChild(CuratorFramework curatorFramework, String str, PregelState pregelState, String str2) throws Exception {
        addChild(curatorFramework, str, pregelState, str2, CreateMode.PERSISTENT);
    }

    public static <K> void addChild(CuratorFramework curatorFramework, String str, PregelState pregelState, String str2, CreateMode createMode) throws Exception {
        addChild(curatorFramework, barrierPath(str, pregelState), str2, createMode);
    }

    public static <K> void addChild(CuratorFramework curatorFramework, String str, String str2, CreateMode createMode) throws Exception {
        addChild(curatorFramework, str, str2, createMode, new byte[0]);
    }

    public static <K> void addChild(CuratorFramework curatorFramework, String str, String str2, CreateMode createMode, byte[] bArr) throws Exception {
        String makePath = ZKPaths.makePath(str, str2);
        try {
            log.debug("adding child {}", makePath);
            ((ACLBackgroundPathAndBytesable) curatorFramework.create().creatingParentContainersIfNeeded().withMode(createMode)).forPath(makePath, bArr);
        } catch (KeeperException.NodeExistsException e) {
        }
    }

    public static <K> void removeChild(CuratorFramework curatorFramework, String str, PregelState pregelState, K k, Serializer<K> serializer) throws Exception {
        removeChild(curatorFramework, str, pregelState, base64EncodedString(k, serializer));
    }

    public static <K> void removeChild(CuratorFramework curatorFramework, String str, PregelState pregelState, String str2) throws Exception {
        removeChild(curatorFramework, barrierPath(str, pregelState), str2);
    }

    public static <K> void removeChild(CuratorFramework curatorFramework, String str, String str2) throws Exception {
        String makePath = ZKPaths.makePath(str, str2);
        try {
            log.debug("removing child {}", makePath);
            curatorFramework.delete().forPath(makePath);
        } catch (KeeperException.NoNodeException e) {
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static String aggregatePath(String str, int i) {
        return ZKPaths.makePath(PREGEL_PATH + str, AGGREGATES, String.valueOf(i));
    }

    protected static String barrierPath(String str, PregelState pregelState) {
        String str2 = PREGEL_PATH + str;
        String[] strArr = new String[1];
        strArr[0] = (pregelState.stage() == PregelState.Stage.RECEIVE ? "rcv-" : "snd-") + String.valueOf(pregelState.superstep());
        return ZKPaths.makePath(str2, BARRIERS, strArr);
    }

    private static <K> String base64EncodedString(K k, Serializer<K> serializer) {
        return Base64.getEncoder().encodeToString(serializer.serialize(null, k));
    }
}
