/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.cache;

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.broker.cache.ResourceQuotaCache;
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.shade.com.google.common.base.Preconditions;
import org.apache.pulsar.shade.com.google.common.collect.Maps;
import org.apache.pulsar.shade.org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.shade.org.apache.zookeeper.CreateMode;
import org.apache.pulsar.shade.org.apache.zookeeper.KeeperException;
import org.apache.pulsar.shade.org.apache.zookeeper.ZooDefs;
import org.apache.pulsar.shade.org.apache.zookeeper.ZooKeeper;
import org.apache.pulsar.shade.org.apache.zookeeper.data.Stat;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.apache.pulsar.zookeeper.ZooKeeperManagedLedgerCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LocalZooKeeperCacheService {
    private static final Logger LOG = LoggerFactory.getLogger(LocalZooKeeperCacheService.class);
    private static final String MANAGED_LEDGER_ROOT = "/managed-ledgers";
    public static final String OWNER_INFO_ROOT = "/namespace";
    public static final String LOCAL_POLICIES_ROOT = "/admin/local-policies";
    public static final String AVAILABLE_BOOKIES_ROOT = "/ledgers/available";
    private final ZooKeeperCache cache;
    private ZooKeeperDataCache<NamespaceEphemeralData> ownerInfoCache;
    private ZooKeeperManagedLedgerCache managedLedgerListCache;
    private ResourceQuotaCache resourceQuotaCache;
    private ZooKeeperDataCache<LocalPolicies> policiesCache;
    private ZooKeeperChildrenCache availableBookiesCache;
    private ConfigurationCacheService configurationCacheService;

    public LocalZooKeeperCacheService(ZooKeeperCache cache, ConfigurationCacheService configurationCacheService) throws PulsarServerException {
        this.cache = cache;
        this.configurationCacheService = configurationCacheService;
        this.initZK();
        this.ownerInfoCache = new ZooKeeperDataCache<NamespaceEphemeralData>(cache){

            @Override
            public NamespaceEphemeralData deserialize(String path, byte[] content) throws Exception {
                return ObjectMapperFactory.getThreadLocal().readValue(content, NamespaceEphemeralData.class);
            }
        };
        this.policiesCache = new ZooKeeperDataCache<LocalPolicies>(cache){

            @Override
            public LocalPolicies deserialize(String path, byte[] content) throws Exception {
                return ObjectMapperFactory.getThreadLocal().readValue(content, LocalPolicies.class);
            }

            @Override
            public CompletableFuture<Optional<LocalPolicies>> getAsync(String path) {
                return this.getWithStatAsync(path).thenApply(entry -> entry.map(e -> (LocalPolicies)e.getKey()));
            }

            @Override
            public CompletableFuture<Optional<Map.Entry<LocalPolicies, Stat>>> getWithStatAsync(String path) {
                CompletableFuture<Optional<Map.Entry<LocalPolicies, Stat>>> future = new CompletableFuture<Optional<Map.Entry<LocalPolicies, Stat>>>();
                ((CompletableFuture)super.getWithStatAsync(path).thenAccept(result -> {
                    Optional<LocalPolicies> localPolicies = result.map(Map.Entry::getKey);
                    if (localPolicies.isPresent()) {
                        future.complete((Optional<Map.Entry<LocalPolicies, Stat>>)result);
                    } else {
                        ((CompletableFuture)LocalZooKeeperCacheService.this.createPolicies(path, true).thenAccept(p -> {
                            LOG.info("Successfully created local policies for {} -- {}", (Object)path, p);
                            super.getWithStatAsync(path);
                            Stat stat = new Stat();
                            stat.setVersion(-1);
                            future.complete(Optional.of(Maps.immutableEntry(p.orElse(null), stat)));
                        })).exceptionally(ex -> {
                            future.completeExceptionally((Throwable)ex);
                            return null;
                        });
                    }
                })).exceptionally(ex -> {
                    future.completeExceptionally((Throwable)ex);
                    return null;
                });
                return future;
            }
        };
        this.managedLedgerListCache = new ZooKeeperManagedLedgerCache(cache, MANAGED_LEDGER_ROOT);
        this.resourceQuotaCache = new ResourceQuotaCache(cache);
        this.resourceQuotaCache.initZK();
        this.availableBookiesCache = new ZooKeeperChildrenCache(cache, AVAILABLE_BOOKIES_ROOT);
    }

    private void initZK() throws PulsarServerException {
        String[] paths = new String[]{MANAGED_LEDGER_ROOT, OWNER_INFO_ROOT, LOCAL_POLICIES_ROOT};
        try {
            ZooKeeper zk = this.cache.getZooKeeper();
            for (String path : paths) {
                if (this.cache.exists(path)) continue;
                try {
                    ZkUtils.createFullPathOptimistic(zk, path, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                }
                catch (KeeperException.NodeExistsException nodeExistsException) {
                    // empty catch block
                }
            }
        }
        catch (Exception e) {
            LOG.error(e.getMessage(), (Throwable)e);
            throw new PulsarServerException(e);
        }
    }

    public CompletableFuture<Optional<LocalPolicies>> createPolicies(String path, boolean readFromGlobal) {
        CompletableFuture<Optional<LocalPolicies>> future = new CompletableFuture<Optional<LocalPolicies>>();
        if (path == null || !path.startsWith(LOCAL_POLICIES_ROOT)) {
            future.completeExceptionally(new IllegalArgumentException("Invalid path of local policies " + path));
            return future;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Creating local namespace policies for {} - readFromGlobal: {}", (Object)path, (Object)readFromGlobal);
        }
        CompletableFuture<Optional<LocalPolicies>> readFromGlobalFuture = new CompletableFuture<Optional<LocalPolicies>>();
        if (readFromGlobal) {
            String globalPath = PulsarWebResource.joinPath("/admin/policies", path.substring(path.indexOf(LOCAL_POLICIES_ROOT) + LOCAL_POLICIES_ROOT.length() + 1));
            Preconditions.checkNotNull(this.configurationCacheService);
            Preconditions.checkNotNull(this.configurationCacheService.policiesCache());
            Preconditions.checkNotNull(this.configurationCacheService.policiesCache().getAsync(globalPath));
            ((CompletableFuture)this.configurationCacheService.policiesCache().getAsync(globalPath).thenAccept(policies -> {
                if (policies.isPresent()) {
                    LocalPolicies localPolicies = new LocalPolicies();
                    localPolicies.bundles = ((Policies)policies.get()).bundles;
                    readFromGlobalFuture.complete(Optional.of(localPolicies));
                } else {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Global policies not found at {}", (Object)globalPath);
                    }
                    readFromGlobalFuture.complete(Optional.empty());
                }
            })).exceptionally(ex -> {
                future.completeExceptionally((Throwable)ex);
                return null;
            });
        } else {
            readFromGlobalFuture.complete(Optional.of(new LocalPolicies()));
        }
        ((CompletableFuture)readFromGlobalFuture.thenAccept(localPolicies -> {
            byte[] content;
            if (!localPolicies.isPresent()) {
                future.complete(Optional.empty());
            }
            try {
                content = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(localPolicies.get());
            }
            catch (Throwable t) {
                future.completeExceptionally(t);
                return;
            }
            ZkUtils.asyncCreateFullPathOptimistic(this.cache.getZooKeeper(), path, content, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, (rc, path1, ctx, name) -> {
                if (rc == KeeperException.Code.OK.intValue() || rc == KeeperException.Code.NODEEXISTS.intValue()) {
                    LOG.info("Successfully copyied bundles data to local zk at {}", (Object)path);
                    future.complete((Optional<LocalPolicies>)localPolicies);
                } else {
                    LOG.error("Failed to create policies for {} in local zookeeper: {}", (Object)path, (Object)KeeperException.Code.get(rc));
                    future.completeExceptionally(new PulsarServerException(KeeperException.create(rc)));
                }
            }, null);
        })).exceptionally(ex -> {
            future.completeExceptionally((Throwable)ex);
            return null;
        });
        return future;
    }

    public ResourceQuotaCache getResourceQuotaCache() {
        return this.resourceQuotaCache;
    }

    public ZooKeeperDataCache<NamespaceEphemeralData> ownerInfoCache() {
        return this.ownerInfoCache;
    }

    public ZooKeeperDataCache<LocalPolicies> policiesCache() {
        return this.policiesCache;
    }

    public ZooKeeperManagedLedgerCache managedLedgerListCache() {
        return this.managedLedgerListCache;
    }

    public ZooKeeperChildrenCache availableBookiesCache() {
        return this.availableBookiesCache;
    }

    public CompletableFuture<Boolean> managedLedgerExists(String persistentPath) {
        return this.cache.existsAsync("/managed-ledgers/" + persistentPath, this.cache);
    }
}

