/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.shade.org.apache.pulsar.common.naming;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.SortedSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.resources.LocalPoliciesResources;
import org.apache.pulsar.metadata.api.CacheGetResult;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.shade.com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import org.apache.pulsar.shade.com.github.benmanes.caffeine.cache.Caffeine;
import org.apache.pulsar.shade.com.google.common.base.Charsets;
import org.apache.pulsar.shade.com.google.common.base.Joiner;
import org.apache.pulsar.shade.com.google.common.base.Preconditions;
import org.apache.pulsar.shade.com.google.common.base.Splitter;
import org.apache.pulsar.shade.com.google.common.collect.BoundType;
import org.apache.pulsar.shade.com.google.common.collect.Range;
import org.apache.pulsar.shade.com.google.common.hash.HashFunction;
import org.apache.pulsar.shade.org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.pulsar.shade.org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.stats.CacheMetricsCollector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NamespaceBundleFactory {
    private static final Logger LOG = LoggerFactory.getLogger(NamespaceBundleFactory.class);
    private final HashFunction hashFunc;
    private final AsyncLoadingCache<NamespaceName, NamespaceBundles> bundlesCache;
    private final PulsarService pulsar;
    private final MetadataCache<Policies> policiesCache;

    public NamespaceBundleFactory(PulsarService pulsar, HashFunction hashFunc) {
        this.hashFunc = hashFunc;
        this.bundlesCache = Caffeine.newBuilder().recordStats().buildAsync(this::loadBundles);
        CacheMetricsCollector.CAFFEINE.addCache("bundles", this.bundlesCache);
        pulsar.getLocalMetadataStore().registerListener(this::handleMetadataStoreNotification);
        this.pulsar = pulsar;
        this.policiesCache = pulsar.getConfigurationMetadataStore().getMetadataCache(Policies.class);
    }

    private CompletableFuture<NamespaceBundles> loadBundles(NamespaceName namespace, Executor executor) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Loading cache with bundles for {}", (Object)namespace);
        }
        if (this.pulsar == null) {
            return CompletableFuture.completedFuture(this.getBundles(namespace, Optional.empty()));
        }
        CompletableFuture<NamespaceBundles> future = new CompletableFuture<NamespaceBundles>();
        ((CompletableFuture)this.pulsar.getPulsarResources().getLocalPolicies().getLocalPoliciesWithVersion(namespace).thenAccept(result -> {
            if (result.isPresent()) {
                try {
                    future.complete(this.readBundles(namespace, (LocalPolicies)((CacheGetResult)result.get()).getValue(), ((CacheGetResult)result.get()).getStat().getVersion()));
                }
                catch (IOException e) {
                    future.completeExceptionally(e);
                }
            } else {
                ((CompletableFuture)this.copyToLocalPolicies(namespace).thenAccept(b -> future.complete((NamespaceBundles)b))).exceptionally(ex -> {
                    future.completeExceptionally((Throwable)ex);
                    return null;
                });
            }
        })).exceptionally(ex -> {
            future.completeExceptionally((Throwable)ex);
            return null;
        });
        return future;
    }

    private NamespaceBundles readBundles(NamespaceName namespace, LocalPolicies localPolicies, long version) throws IOException {
        NamespaceBundles namespaceBundles = this.getBundles(namespace, Optional.of(Pair.of(localPolicies, version)));
        if (LOG.isDebugEnabled()) {
            LOG.debug("[{}] Get bundles from getLocalZkCacheService: bundles: {}, version: {}", new Object[]{namespace, localPolicies.bundles.getBoundaries() != null ? localPolicies.bundles : "null", namespaceBundles.getVersion()});
        }
        return namespaceBundles;
    }

    private CompletableFuture<NamespaceBundles> copyToLocalPolicies(NamespaceName namespace) {
        return this.pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(namespace).thenCompose(optPolicies -> {
            if (!optPolicies.isPresent()) {
                return CompletableFuture.completedFuture(this.getBundles(namespace, Optional.empty()));
            }
            Policies policies = (Policies)optPolicies.get();
            LocalPolicies localPolicies = new LocalPolicies(policies.bundles, null, null);
            return this.pulsar.getPulsarResources().getLocalPolicies().createLocalPoliciesAsync(namespace, localPolicies).thenApply(stat -> this.getBundles(namespace, Optional.of(Pair.of(localPolicies, 0L))));
        });
    }

    private void handleMetadataStoreNotification(Notification n) {
        if (LocalPoliciesResources.isLocalPoliciesPath(n.getPath())) {
            try {
                Optional<NamespaceName> namespace = NamespaceName.getIfValid(NamespaceBundleFactory.getNamespaceFromPoliciesPath(n.getPath()));
                if (namespace.isPresent()) {
                    LOG.info("Policy updated for namespace {}, refreshing the bundle cache.", namespace);
                    this.bundlesCache.synchronous().invalidate(namespace.get());
                }
            }
            catch (Exception e) {
                LOG.error("Failed to update the policy change for path {}", (Object)n.getPath(), (Object)e);
            }
        }
    }

    private boolean isOwner(NamespaceBundle nsBundle) {
        if (this.pulsar != null) {
            return this.pulsar.getNamespaceService().getOwnershipCache().getOwnedBundle(nsBundle) != null;
        }
        return false;
    }

    public void invalidateBundleCache(NamespaceName namespace) {
        this.bundlesCache.synchronous().invalidate(namespace);
    }

    public CompletableFuture<NamespaceBundles> getBundlesAsync(NamespaceName nsname) {
        return this.bundlesCache.get(nsname);
    }

    public NamespaceBundles getBundles(NamespaceName nsname) {
        return (NamespaceBundles)this.bundlesCache.synchronous().get(nsname);
    }

    public Optional<NamespaceBundles> getBundlesIfPresent(NamespaceName nsname) {
        return Optional.ofNullable(this.bundlesCache.synchronous().getIfPresent(nsname));
    }

    public NamespaceBundle getBundle(NamespaceName nsname, Range<Long> hashRange) {
        return new NamespaceBundle(nsname, hashRange, this);
    }

    public NamespaceBundle getBundle(String namespace, String bundleRange) {
        Preconditions.checkArgument(bundleRange.contains("_"), "Invalid bundle range");
        String[] boundaries = bundleRange.split("_");
        Long lowerEndpoint = Long.decode(boundaries[0]);
        Long upperEndpoint = Long.decode(boundaries[1]);
        Range<Long> hashRange = Range.range(lowerEndpoint, BoundType.CLOSED, upperEndpoint, upperEndpoint.equals(NamespaceBundles.FULL_UPPER_BOUND) ? BoundType.CLOSED : BoundType.OPEN);
        return this.getBundle(NamespaceName.get(namespace), hashRange);
    }

    public NamespaceBundle getFullBundle(NamespaceName fqnn) throws Exception {
        return ((NamespaceBundles)this.bundlesCache.synchronous().get(fqnn)).getFullBundle();
    }

    public CompletableFuture<NamespaceBundle> getFullBundleAsync(NamespaceName fqnn) {
        return this.bundlesCache.get(fqnn).thenApply(NamespaceBundles::getFullBundle);
    }

    public long getLongHashCode(String name) {
        return this.hashFunc.hashString(name, Charsets.UTF_8).padToLong();
    }

    public NamespaceBundles getBundles(NamespaceName nsname, BundlesData bundleData) {
        return new NamespaceBundles(nsname, this, Optional.empty(), NamespaceBundles.getPartitions(bundleData));
    }

    private NamespaceBundles getBundles(NamespaceName nsname, Optional<Pair<LocalPolicies, Long>> localPolicies) {
        return new NamespaceBundles(nsname, this, localPolicies);
    }

    public CompletableFuture<Pair<NamespaceBundles, List<NamespaceBundle>>> splitBundles(NamespaceBundle targetBundle, int argNumBundles, Long splitBoundary) {
        Preconditions.checkArgument(this.canSplitBundle(targetBundle), "%s bundle can't be split further", (Object)targetBundle);
        if (splitBoundary != null) {
            Preconditions.checkArgument(splitBoundary > targetBundle.getLowerEndpoint() && splitBoundary < targetBundle.getUpperEndpoint(), "The given fixed key must between the key range of the %s bundle", (Object)targetBundle);
            argNumBundles = 2;
        }
        Preconditions.checkNotNull(targetBundle, "can't split null bundle");
        Preconditions.checkNotNull(targetBundle.getNamespaceObject(), "namespace must be present");
        NamespaceName nsname = targetBundle.getNamespaceObject();
        int numBundles = argNumBundles;
        return this.bundlesCache.get(nsname).thenApply(sourceBundle -> {
            int lastIndex = sourceBundle.partitions.length - 1;
            long[] partitions = new long[sourceBundle.partitions.length + (numBundles - 1)];
            int pos = 0;
            int splitPartition = -1;
            Range<Long> range = targetBundle.getKeyRange();
            for (int i = 0; i < lastIndex; ++i) {
                if (sourceBundle.partitions[i] == range.lowerEndpoint() && range.upperEndpoint() == sourceBundle.partitions[i + 1]) {
                    splitPartition = i;
                    long maxVal = sourceBundle.partitions[i + 1];
                    long minVal = sourceBundle.partitions[i];
                    long segSize = splitBoundary == null ? (maxVal - minVal) / (long)numBundles : splitBoundary - minVal;
                    partitions[pos++] = minVal;
                    long curPartition = minVal + segSize;
                    for (int j = 0; j < numBundles - 1; ++j) {
                        partitions[pos++] = curPartition;
                        curPartition += segSize;
                    }
                    continue;
                }
                partitions[pos++] = sourceBundle.partitions[i];
            }
            partitions[pos] = sourceBundle.partitions[lastIndex];
            if (splitPartition != -1) {
                NamespaceBundles splitNsBundles = new NamespaceBundles(nsname, this, sourceBundle.getLocalPolicies(), partitions);
                List<NamespaceBundle> splitBundles = splitNsBundles.getBundles().subList(splitPartition, splitPartition + numBundles);
                return new ImmutablePair<NamespaceBundles, List<NamespaceBundle>>(splitNsBundles, splitBundles);
            }
            return null;
        });
    }

    public boolean canSplitBundle(NamespaceBundle bundle) {
        Range<Long> range = bundle.getKeyRange();
        return range.upperEndpoint() - range.lowerEndpoint() > 1L;
    }

    public static void validateFullRange(SortedSet<String> partitions) {
        Preconditions.checkArgument(partitions.first().equals("0x00000000") && partitions.last().equals("0xffffffff"));
    }

    public static NamespaceBundleFactory createFactory(PulsarService pulsar, HashFunction hashFunc) {
        return new NamespaceBundleFactory(pulsar, hashFunc);
    }

    public static boolean isFullBundle(String bundleRange) {
        return bundleRange.equals(String.format("%s_%s", "0x00000000", "0xffffffff"));
    }

    public static String getDefaultBundleRange() {
        return String.format("%s_%s", "0x00000000", "0xffffffff");
    }

    public static String getNamespaceFromPoliciesPath(String path) {
        if (path.isEmpty()) {
            return path;
        }
        Iterable<String> splitter = Splitter.on("/").limit(6).split(path);
        Iterator<String> i = splitter.iterator();
        i.next();
        i.next();
        i.next();
        return Joiner.on("/").join(i);
    }
}

