package org.apache.distributedlog.service.placement;

import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.Iterator;
import java.util.TreeSet;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.ZooKeeperClient;
import org.apache.distributedlog.impl.BKNamespaceDriver;
import org.apache.distributedlog.service.placement.PlacementStateManager;
import org.apache.distributedlog.util.Utils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Transaction;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/distributedlog/service/placement/ZKPlacementStateManager.class */
public class ZKPlacementStateManager implements PlacementStateManager {
    private static final Logger logger = LoggerFactory.getLogger(ZKPlacementStateManager.class);
    private static final String SERVER_LOAD_DIR = "/.server-load";
    private final String serverLoadPath;
    private final ZooKeeperClient zkClient;
    private boolean watching = false;

    public ZKPlacementStateManager(URI uri, DistributedLogConfiguration distributedLogConfiguration, StatsLogger statsLogger) {
        String zKServersFromDLUri = BKNamespaceDriver.getZKServersFromDLUri(uri);
        this.zkClient = BKNamespaceDriver.createZKClientBuilder(String.format("ZKPlacementStateManager-%s", zKServersFromDLUri), distributedLogConfiguration, zKServersFromDLUri, statsLogger.scope("placement_state_manager")).build();
        this.serverLoadPath = uri.getPath() + SERVER_LOAD_DIR;
    }

    private void createServerLoadPathIfNoExists(byte[] bArr) throws ZooKeeperClient.ZooKeeperConnectionException, KeeperException, InterruptedException {
        try {
            Utils.zkCreateFullPathOptimistic(this.zkClient, this.serverLoadPath, bArr, this.zkClient.getDefaultACL(), CreateMode.PERSISTENT);
        } catch (KeeperException.NodeExistsException e) {
            logger.debug("the server load path {} is already created by others", this.serverLoadPath, e);
        }
    }

    @Override // org.apache.distributedlog.service.placement.PlacementStateManager
    public void saveOwnership(TreeSet<ServerLoad> treeSet) throws PlacementStateManager.StateManagerSaveException {
        logger.info("saving ownership");
        try {
            ZooKeeper zooKeeper = this.zkClient.get();
            byte[] array = ByteBuffer.allocate(8).putLong(System.currentTimeMillis()).array();
            if (zooKeeper.exists(this.serverLoadPath, false) == null) {
                createServerLoadPathIfNoExists(array);
            }
            Transaction transaction = zooKeeper.transaction();
            HashSet hashSet = new HashSet(zooKeeper.getChildren(this.serverLoadPath, false));
            transaction.setData(this.serverLoadPath, array, -1);
            Iterator<ServerLoad> it = treeSet.iterator();
            while (it.hasNext()) {
                ServerLoad next = it.next();
                String serverToZkFormat = serverToZkFormat(next.getServer());
                String serverPath = serverPath(serverToZkFormat);
                if (hashSet.contains(serverToZkFormat)) {
                    hashSet.remove(serverToZkFormat);
                    transaction.setData(serverPath, next.serialize(), -1);
                } else {
                    transaction.create(serverPath, next.serialize(), this.zkClient.getDefaultACL(), CreateMode.PERSISTENT);
                }
            }
            Iterator it2 = hashSet.iterator();
            while (it2.hasNext()) {
                transaction.delete(serverPath((String) it2.next()), -1);
            }
            transaction.commit();
        } catch (IOException | InterruptedException | KeeperException e) {
            throw new PlacementStateManager.StateManagerSaveException(e);
        }
    }

    @Override // org.apache.distributedlog.service.placement.PlacementStateManager
    public TreeSet<ServerLoad> loadOwnership() throws PlacementStateManager.StateManagerLoadException {
        TreeSet<ServerLoad> treeSet = new TreeSet<>();
        try {
            ZooKeeper zooKeeper = this.zkClient.get();
            Iterator it = zooKeeper.getChildren(this.serverLoadPath, false).iterator();
            while (it.hasNext()) {
                treeSet.add(ServerLoad.deserialize(zooKeeper.getData(serverPath((String) it.next()), false, new Stat())));
            }
            return treeSet;
        } catch (IOException | InterruptedException | KeeperException e) {
            throw new PlacementStateManager.StateManagerLoadException(e);
        }
    }

    @Override // org.apache.distributedlog.service.placement.PlacementStateManager
    public synchronized void watch(final PlacementStateManager.PlacementCallback placementCallback) {
        if (this.watching) {
            return;
        }
        this.watching = true;
        try {
            try {
                this.zkClient.get().getData(this.serverLoadPath, new Watcher() { // from class: org.apache.distributedlog.service.placement.ZKPlacementStateManager.1
                    public void process(WatchedEvent watchedEvent) {
                        try {
                            placementCallback.callback(ZKPlacementStateManager.this.loadOwnership());
                        } catch (PlacementStateManager.StateManagerLoadException e) {
                            ZKPlacementStateManager.logger.error("Watch of Ownership failed", e);
                        } finally {
                            ZKPlacementStateManager.this.watching = false;
                            ZKPlacementStateManager.this.watch(placementCallback);
                        }
                    }
                }, new Stat());
            } catch (KeeperException.NoNodeException e) {
                createServerLoadPathIfNoExists(ByteBuffer.allocate(8).putLong(System.currentTimeMillis()).array());
                this.watching = false;
                watch(placementCallback);
            }
        } catch (ZooKeeperClient.ZooKeeperConnectionException | InterruptedException | KeeperException e2) {
            logger.error("Watch of Ownership failed", e2);
            this.watching = false;
            watch(placementCallback);
        }
    }

    public String serverPath(String str) {
        return String.format("%s/%s", this.serverLoadPath, str);
    }

    protected String serverToZkFormat(String str) {
        return str.replaceAll("/", "--");
    }

    protected String zkFormatToServer(String str) {
        return str.replaceAll("--", "/");
    }
}
