/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.shade.org.apache.pulsar.common.naming;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.SortedSet;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.shade.com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import org.apache.pulsar.shade.com.github.benmanes.caffeine.cache.Caffeine;
import org.apache.pulsar.shade.com.google.common.base.Charsets;
import org.apache.pulsar.shade.com.google.common.base.Joiner;
import org.apache.pulsar.shade.com.google.common.base.Preconditions;
import org.apache.pulsar.shade.com.google.common.base.Splitter;
import org.apache.pulsar.shade.com.google.common.collect.BoundType;
import org.apache.pulsar.shade.com.google.common.collect.Range;
import org.apache.pulsar.shade.com.google.common.hash.HashFunction;
import org.apache.pulsar.shade.org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.pulsar.shade.org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.shade.org.apache.zookeeper.data.Stat;
import org.apache.pulsar.stats.CacheMetricsCollector;
import org.apache.pulsar.zookeeper.ZooKeeperCacheListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NamespaceBundleFactory
implements ZooKeeperCacheListener<LocalPolicies> {
    private static final Logger LOG = LoggerFactory.getLogger(NamespaceBundleFactory.class);
    private final HashFunction hashFunc;
    private final AsyncLoadingCache<NamespaceName, NamespaceBundles> bundlesCache;
    private final PulsarService pulsar;

    public NamespaceBundleFactory(PulsarService pulsar, HashFunction hashFunc) {
        this.hashFunc = hashFunc;
        this.bundlesCache = Caffeine.newBuilder().recordStats().buildAsync((namespace, executor) -> {
            String path = AdminResource.joinPath("/admin/local-policies", namespace.toString());
            if (LOG.isDebugEnabled()) {
                LOG.debug("Loading cache with bundles for {}", namespace);
            }
            if (pulsar == null || pulsar.getConfigurationCache() == null) {
                return CompletableFuture.completedFuture(this.getBundles((NamespaceName)namespace, null));
            }
            CompletableFuture future = new CompletableFuture();
            ((CompletableFuture)pulsar.getLocalZkCacheService().policiesCache().getWithStatAsync(path).thenAccept(result -> {
                BundlesData bundlesData = result.map(Map.Entry::getKey).map(p -> p.bundles).orElse(null);
                NamespaceBundles namespaceBundles = this.getBundles((NamespaceName)namespace, bundlesData, result.map(Map.Entry::getValue).map(s -> s.getVersion()).orElse(-1).intValue());
                if (LOG.isDebugEnabled()) {
                    LOG.debug("[{}] Get bundles from getLocalZkCacheService: path: {},  bundles: {}, version: {}", new Object[]{namespace, path, bundlesData != null && bundlesData.boundaries != null ? bundlesData.toString() : "null", namespaceBundles.getVersion()});
                }
                future.complete(namespaceBundles);
            })).exceptionally(ex -> {
                future.completeExceptionally((Throwable)ex);
                return null;
            });
            return future;
        });
        CacheMetricsCollector.CAFFEINE.addCache("bundles", this.bundlesCache);
        pulsar.getLocalZkCacheService().policiesCache().registerListener((path, data, stat) -> {
            String[] paths = path.split("/admin/local-policies/");
            if (paths.length == 2) {
                this.invalidateBundleCache(NamespaceName.get(paths[1]));
            }
        });
        if (pulsar != null && pulsar.getConfigurationCache() != null) {
            pulsar.getLocalZkCacheService().policiesCache().registerListener(this);
        }
        this.pulsar = pulsar;
    }

    @Override
    public void onUpdate(String path, LocalPolicies data, Stat stat) {
        NamespaceName namespace = NamespaceName.get(NamespaceBundleFactory.getNamespaceFromPoliciesPath(path));
        try {
            LOG.info("Policy updated for namespace {}, refreshing the bundle cache.", (Object)namespace);
            this.bundlesCache.synchronous().invalidate(namespace);
        }
        catch (Exception e) {
            LOG.error("Failed to update the policy change for ns {}", (Object)namespace, (Object)e);
        }
    }

    private boolean isOwner(NamespaceBundle nsBundle) {
        if (this.pulsar != null) {
            return this.pulsar.getNamespaceService().getOwnershipCache().getOwnedBundle(nsBundle) != null;
        }
        return false;
    }

    public void invalidateBundleCache(NamespaceName namespace) {
        this.bundlesCache.synchronous().invalidate(namespace);
    }

    public CompletableFuture<NamespaceBundles> getBundlesAsync(NamespaceName nsname) {
        return this.bundlesCache.get(nsname);
    }

    public NamespaceBundles getBundles(NamespaceName nsname) throws Exception {
        return this.bundlesCache.synchronous().get(nsname);
    }

    public Optional<NamespaceBundles> getBundlesIfPresent(NamespaceName nsname) throws Exception {
        return Optional.ofNullable(this.bundlesCache.synchronous().getIfPresent(nsname));
    }

    public NamespaceBundle getBundle(NamespaceName nsname, Range<Long> hashRange) {
        return new NamespaceBundle(nsname, hashRange, this);
    }

    public NamespaceBundle getBundle(String namespace, String bundleRange) {
        Preconditions.checkArgument(bundleRange.contains("_"), "Invalid bundle range");
        String[] boundaries = bundleRange.split("_");
        Long lowerEndpoint = Long.decode(boundaries[0]);
        Long upperEndpoint = Long.decode(boundaries[1]);
        Range<Long> hashRange = Range.range(lowerEndpoint, BoundType.CLOSED, upperEndpoint, upperEndpoint.equals(NamespaceBundles.FULL_UPPER_BOUND) ? BoundType.CLOSED : BoundType.OPEN);
        return this.getBundle(NamespaceName.get(namespace), hashRange);
    }

    public NamespaceBundle getFullBundle(NamespaceName fqnn) throws Exception {
        return this.bundlesCache.synchronous().get(fqnn).getFullBundle();
    }

    public CompletableFuture<NamespaceBundle> getFullBundleAsync(NamespaceName fqnn) throws Exception {
        return this.bundlesCache.get(fqnn).thenApply(NamespaceBundles::getFullBundle);
    }

    public long getLongHashCode(String name) {
        return this.hashFunc.hashString(name, Charsets.UTF_8).padToLong();
    }

    public NamespaceBundles getBundles(NamespaceName nsname, BundlesData bundleData) {
        return this.getBundles(nsname, bundleData, -1L);
    }

    public NamespaceBundles getBundles(NamespaceName nsname, BundlesData bundleData, long version) {
        long[] partitions;
        if (bundleData == null) {
            partitions = new long[]{Long.decode("0x00000000"), Long.decode("0xffffffff")};
        } else {
            partitions = new long[bundleData.boundaries.size()];
            for (int i = 0; i < bundleData.boundaries.size(); ++i) {
                partitions[i] = Long.decode(bundleData.boundaries.get(i));
            }
        }
        return new NamespaceBundles(nsname, partitions, this, version);
    }

    public static BundlesData getBundlesData(NamespaceBundles bundles) throws Exception {
        if (bundles == null) {
            return new BundlesData();
        }
        List<String> boundaries = Arrays.stream(bundles.partitions).boxed().map(p -> String.format("0x%08x", p)).collect(Collectors.toList());
        return new BundlesData(boundaries);
    }

    public Pair<NamespaceBundles, List<NamespaceBundle>> splitBundles(NamespaceBundle targetBundle, int numBundles, Long splitBoundary) {
        Preconditions.checkArgument(this.canSplitBundle(targetBundle), "%s bundle can't be split further", (Object)targetBundle);
        if (splitBoundary != null) {
            Preconditions.checkArgument(splitBoundary > targetBundle.getLowerEndpoint() && splitBoundary < targetBundle.getUpperEndpoint(), "The given fixed key must between the key range of the %s bundle", (Object)targetBundle);
            numBundles = 2;
        }
        Preconditions.checkNotNull(targetBundle, "can't split null bundle");
        Preconditions.checkNotNull(targetBundle.getNamespaceObject(), "namespace must be present");
        NamespaceName nsname = targetBundle.getNamespaceObject();
        NamespaceBundles sourceBundle = this.bundlesCache.synchronous().get(nsname);
        int lastIndex = sourceBundle.partitions.length - 1;
        long[] partitions = new long[sourceBundle.partitions.length + (numBundles - 1)];
        int pos = 0;
        int splitPartition = -1;
        Range<Long> range = targetBundle.getKeyRange();
        for (int i = 0; i < lastIndex; ++i) {
            if (sourceBundle.partitions[i] == range.lowerEndpoint() && range.upperEndpoint() == sourceBundle.partitions[i + 1]) {
                splitPartition = i;
                Long maxVal = sourceBundle.partitions[i + 1];
                Long minVal = sourceBundle.partitions[i];
                Long segSize = splitBoundary == null ? (maxVal - minVal) / (long)numBundles : splitBoundary - minVal;
                partitions[pos++] = minVal;
                Long curPartition = minVal + segSize;
                for (int j = 0; j < numBundles - 1; ++j) {
                    partitions[pos++] = curPartition;
                    curPartition = curPartition + segSize;
                }
                continue;
            }
            partitions[pos++] = sourceBundle.partitions[i];
        }
        partitions[pos] = sourceBundle.partitions[lastIndex];
        if (splitPartition != -1) {
            NamespaceBundles splittedNsBundles = new NamespaceBundles(nsname, partitions, this, sourceBundle.getVersion());
            List<NamespaceBundle> splittedBundles = splittedNsBundles.getBundles().subList(splitPartition, splitPartition + numBundles);
            return new ImmutablePair<NamespaceBundles, List<NamespaceBundle>>(splittedNsBundles, splittedBundles);
        }
        return null;
    }

    public boolean canSplitBundle(NamespaceBundle bundle) {
        Range<Long> range = bundle.getKeyRange();
        return range.upperEndpoint() - range.lowerEndpoint() > 1L;
    }

    public static void validateFullRange(SortedSet<String> partitions) {
        Preconditions.checkArgument(partitions.first().equals("0x00000000") && partitions.last().equals("0xffffffff"));
    }

    public static NamespaceBundleFactory createFactory(PulsarService pulsar, HashFunction hashFunc) {
        return new NamespaceBundleFactory(pulsar, hashFunc);
    }

    public static boolean isFullBundle(String bundleRange) {
        return bundleRange.equals(String.format("%s_%s", "0x00000000", "0xffffffff"));
    }

    public static String getDefaultBundleRange() {
        return String.format("%s_%s", "0x00000000", "0xffffffff");
    }

    public static String getNamespaceFromPoliciesPath(String path) {
        if (path.isEmpty()) {
            return path;
        }
        Iterable<String> splitter = Splitter.on("/").limit(6).split(path);
        Iterator<String> i = splitter.iterator();
        i.next();
        i.next();
        i.next();
        return Joiner.on("/").join(i);
    }
}

