/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.web;

import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.time.Duration;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationParameters;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.resources.BookieResources;
import org.apache.pulsar.broker.resources.ClusterResources;
import org.apache.pulsar.broker.resources.DynamicConfigurationResources;
import org.apache.pulsar.broker.resources.LocalPoliciesResources;
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.resources.ResourceGroupResources;
import org.apache.pulsar.broker.resources.TenantResources;
import org.apache.pulsar.broker.resources.TopicResources;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.web.AuthenticationFilter;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.PulsarServiceNameResolver;
import org.apache.pulsar.shade.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.pulsar.shade.com.github.benmanes.caffeine.cache.CacheLoader;
import org.apache.pulsar.shade.com.github.benmanes.caffeine.cache.Caffeine;
import org.apache.pulsar.shade.com.github.benmanes.caffeine.cache.LoadingCache;
import org.apache.pulsar.shade.com.google.common.base.Preconditions;
import org.apache.pulsar.shade.com.google.common.collect.BoundType;
import org.apache.pulsar.shade.com.google.common.collect.Lists;
import org.apache.pulsar.shade.com.google.common.collect.Range;
import org.apache.pulsar.shade.com.google.common.collect.Sets;
import org.apache.pulsar.shade.javax.servlet.ServletContext;
import org.apache.pulsar.shade.javax.servlet.http.HttpServletRequest;
import org.apache.pulsar.shade.javax.ws.rs.WebApplicationException;
import org.apache.pulsar.shade.javax.ws.rs.core.Context;
import org.apache.pulsar.shade.javax.ws.rs.core.Response;
import org.apache.pulsar.shade.javax.ws.rs.core.UriBuilder;
import org.apache.pulsar.shade.javax.ws.rs.core.UriInfo;
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.shade.org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.PolicyName;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.TenantOperation;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.path.PolicyPath;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.ObjectMapperFactory;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class PulsarWebResource {
    private static final Logger log = LoggerFactory.getLogger(PulsarWebResource.class);
    private static final LoadingCache<String, PulsarServiceNameResolver> SERVICE_NAME_RESOLVER_CACHE = Caffeine.newBuilder().expireAfterAccess(Duration.ofMinutes(5L)).build(new CacheLoader<String, PulsarServiceNameResolver>(){

        @Override
        public @Nullable PulsarServiceNameResolver load(@NonNull String serviceUrl) throws Exception {
            PulsarServiceNameResolver serviceNameResolver = new PulsarServiceNameResolver();
            serviceNameResolver.updateServiceUrl(serviceUrl);
            return serviceNameResolver;
        }
    });
    static final String ORIGINAL_PRINCIPAL_HEADER = "X-Original-Principal";
    @Context
    protected ServletContext servletContext;
    @Context
    protected HttpServletRequest httpRequest;
    @Context
    protected UriInfo uri;
    private PulsarService pulsar;

    protected PulsarService pulsar() {
        if (this.pulsar == null) {
            this.pulsar = (PulsarService)this.servletContext.getAttribute("pulsar");
        }
        return this.pulsar;
    }

    protected ServiceConfiguration config() {
        return this.pulsar().getConfiguration();
    }

    public static String splitPath(String source, int slice) {
        return PolicyPath.splitPath(source, slice);
    }

    public AuthenticationParameters authParams() {
        return AuthenticationParameters.builder().originalPrincipal(this.originalPrincipal()).clientRole(this.clientAppId()).clientAuthenticationDataSource(this.clientAuthData()).build();
    }

    public String clientAppId() {
        return (String)this.httpRequest.getAttribute(AuthenticationFilter.AuthenticatedRoleAttributeName);
    }

    public String originalPrincipal() {
        return this.httpRequest.getHeader(ORIGINAL_PRINCIPAL_HEADER);
    }

    public AuthenticationDataHttps clientAuthData() {
        return (AuthenticationDataHttps)this.httpRequest.getAttribute(AuthenticationFilter.AuthenticatedDataAttributeName);
    }

    public boolean isRequestHttps() {
        return "https".equalsIgnoreCase(this.httpRequest.getScheme());
    }

    public static boolean isClientAuthenticated(String appId) {
        return appId != null;
    }

    private void validateOriginalPrincipal(String authenticatedPrincipal, String originalPrincipal) {
        if (!this.pulsar.getBrokerService().getAuthorizationService().isValidOriginalPrincipal(authenticatedPrincipal, originalPrincipal, this.clientAuthData())) {
            throw new RestException(Response.Status.UNAUTHORIZED, "Invalid combination of Original principal cannot be empty if the request is via proxy.");
        }
    }

    protected boolean hasSuperUserAccess() {
        try {
            this.validateSuperUserAccess();
        }
        catch (Exception e) {
            return false;
        }
        return true;
    }

    public CompletableFuture<Void> validateSuperUserAccessAsync() {
        if (!this.config().isAuthenticationEnabled() || !this.config().isAuthorizationEnabled()) {
            return CompletableFuture.completedFuture(null);
        }
        String appId = this.clientAppId();
        if (log.isDebugEnabled()) {
            log.debug("[{}] Check super user access: Authenticated: {} -- Role: {}", new Object[]{this.uri.getRequestUri(), PulsarWebResource.isClientAuthenticated(appId), appId});
        }
        String originalPrincipal = this.originalPrincipal();
        this.validateOriginalPrincipal(appId, originalPrincipal);
        if (this.pulsar.getConfiguration().getProxyRoles().contains(appId) || StringUtils.isNotBlank(this.originalPrincipal())) {
            BrokerService brokerService = this.pulsar.getBrokerService();
            return ((CompletableFuture)brokerService.getAuthorizationService().isSuperUser(appId, this.clientAuthData()).thenCompose(proxyAuthorizationSuccess -> {
                if (!proxyAuthorizationSuccess.booleanValue()) {
                    throw new RestException(Response.Status.UNAUTHORIZED, String.format("Proxy not authorized for super-user operation (proxy:%s)", appId));
                }
                return this.pulsar.getBrokerService().getAuthorizationService().isSuperUser(originalPrincipal, this.clientAuthData());
            })).thenAccept(originalPrincipalAuthorizationSuccess -> {
                if (!originalPrincipalAuthorizationSuccess.booleanValue()) {
                    throw new RestException(Response.Status.UNAUTHORIZED, String.format("Original principal not authorized for super-user operation (original:%s)", originalPrincipal));
                }
                if (log.isDebugEnabled()) {
                    log.debug("Successfully authorized {} (proxied by {}) as super-user", (Object)originalPrincipal, (Object)appId);
                }
            });
        }
        return this.pulsar.getBrokerService().getAuthorizationService().isSuperUser(appId, this.clientAuthData()).thenAccept(proxyAuthorizationSuccess -> {
            if (!proxyAuthorizationSuccess.booleanValue()) {
                throw new RestException(Response.Status.UNAUTHORIZED, "This operation requires super-user access");
            }
        });
    }

    public void validateSuperUserAccess() {
        this.sync(this::validateSuperUserAccessAsync);
    }

    protected void validateAdminAccessForTenant(String tenant) {
        try {
            this.validateAdminAccessForTenant(this.pulsar(), this.clientAppId(), this.originalPrincipal(), tenant, this.clientAuthData(), this.config().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
        }
        catch (RestException e) {
            throw e;
        }
        catch (Exception e) {
            log.error("Failed to get tenant admin data for tenant {}", (Object)tenant);
            throw new RestException(e);
        }
    }

    protected void validateAdminAccessForTenant(PulsarService pulsar, String clientAppId, String originalPrincipal, String tenant, AuthenticationDataSource authenticationData, long timeout, TimeUnit unit) {
        try {
            this.validateAdminAccessForTenantAsync(pulsar, clientAppId, originalPrincipal, tenant, authenticationData).get(timeout, unit);
        }
        catch (InterruptedException | ExecutionException | TimeoutException e) {
            Throwable realCause = FutureUtil.unwrapCompletionException(e);
            if (realCause instanceof WebApplicationException) {
                throw (WebApplicationException)realCause;
            }
            throw new RestException(realCause);
        }
    }

    protected CompletableFuture<Void> validateAdminAccessForTenantAsync(String tenant) {
        return this.validateAdminAccessForTenantAsync(this.pulsar(), this.clientAppId(), this.originalPrincipal(), tenant, this.clientAuthData());
    }

    protected CompletableFuture<Void> validateAdminAccessForTenantAsync(PulsarService pulsar, String clientAppId, String originalPrincipal, String tenant, AuthenticationDataSource authenticationData) {
        if (log.isDebugEnabled()) {
            log.debug("check admin access on tenant: {} - Authenticated: {} -- role: {}", new Object[]{tenant, PulsarWebResource.isClientAuthenticated(clientAppId), clientAppId});
        }
        return pulsar.getPulsarResources().getTenantResources().getTenantAsync(tenant).thenCompose(tenantInfoOptional -> {
            if (!tenantInfoOptional.isPresent()) {
                throw new RestException(Response.Status.NOT_FOUND, "Tenant does not exist");
            }
            TenantInfo tenantInfo = (TenantInfo)tenantInfoOptional.get();
            if (pulsar.getConfiguration().isAuthenticationEnabled() && pulsar.getConfiguration().isAuthorizationEnabled()) {
                if (!PulsarWebResource.isClientAuthenticated(clientAppId)) {
                    throw new RestException(Response.Status.FORBIDDEN, "Need to authenticate to perform the request");
                }
                this.validateOriginalPrincipal(clientAppId, originalPrincipal);
                if (pulsar.getConfiguration().getProxyRoles().contains(clientAppId) || StringUtils.isNotBlank(originalPrincipal)) {
                    AuthorizationService authorizationService = pulsar.getBrokerService().getAuthorizationService();
                    return authorizationService.isTenantAdmin(tenant, clientAppId, tenantInfo, authenticationData).thenCompose(isTenantAdmin -> {
                        String debugMsg = "Successfully authorized {} (proxied by {}) on tenant {}";
                        if (!isTenantAdmin.booleanValue()) {
                            return authorizationService.isSuperUser(clientAppId, authenticationData).thenCombine(authorizationService.isSuperUser(originalPrincipal, authenticationData), (proxyAuthorized, originalPrincipalAuthorized) -> {
                                if (!proxyAuthorized.booleanValue() || !originalPrincipalAuthorized.booleanValue()) {
                                    throw new RestException(Response.Status.UNAUTHORIZED, String.format("Proxy not authorized to access resource (proxy:%s,original:%s)", clientAppId, originalPrincipal));
                                }
                                if (log.isDebugEnabled()) {
                                    log.debug(debugMsg, new Object[]{originalPrincipal, clientAppId, tenant});
                                }
                                return null;
                            });
                        }
                        if (log.isDebugEnabled()) {
                            log.debug(debugMsg, new Object[]{originalPrincipal, clientAppId, tenant});
                        }
                        return CompletableFuture.completedFuture(null);
                    });
                }
                return ((CompletableFuture)pulsar.getBrokerService().getAuthorizationService().isSuperUser(clientAppId, authenticationData).thenCompose(isSuperUser -> {
                    if (!isSuperUser.booleanValue()) {
                        return pulsar.getBrokerService().getAuthorizationService().isTenantAdmin(tenant, clientAppId, tenantInfo, authenticationData);
                    }
                    return CompletableFuture.completedFuture(true);
                })).thenAccept(authorized -> {
                    if (!authorized.booleanValue()) {
                        throw new RestException(Response.Status.UNAUTHORIZED, "Don't have permission to administrate resources on this tenant");
                    }
                    log.debug("Successfully authorized {} on tenant {}", (Object)clientAppId, (Object)tenant);
                });
            }
            return CompletableFuture.completedFuture(null);
        });
    }

    protected void validatePeerClusterConflict(String clusterName, Set<String> replicationClusters) {
        try {
            Sets.SetView<String> conflictPeerClusters;
            ClusterData clusterData = this.clusterResources().getCluster(clusterName).orElseThrow(() -> new RestException(Response.Status.PRECONDITION_FAILED, "Invalid replication cluster " + clusterName));
            LinkedHashSet<String> peerClusters = clusterData.getPeerClusterNames();
            if (peerClusters != null && !peerClusters.isEmpty() && !(conflictPeerClusters = Sets.intersection(peerClusters, replicationClusters)).isEmpty()) {
                log.warn("[{}] {}'s peer cluster can't be part of replication clusters {}", new Object[]{this.clientAppId(), clusterName, conflictPeerClusters});
                throw new RestException(Response.Status.CONFLICT, String.format("%s's peer-clusters %s can't be part of replication-clusters %s", clusterName, conflictPeerClusters, replicationClusters));
            }
        }
        catch (RestException re) {
            throw re;
        }
        catch (Exception e) {
            log.warn("[{}] Failed to get cluster-data for {}", new Object[]{this.clientAppId(), clusterName, e});
        }
    }

    protected void validateClusterForTenant(String tenant, String cluster) {
        TenantInfo tenantInfo;
        try {
            tenantInfo = this.pulsar().getPulsarResources().getTenantResources().getTenant(tenant).orElseThrow(() -> new RestException(Response.Status.NOT_FOUND, "Tenant does not exist"));
        }
        catch (RestException e) {
            log.warn("Failed to get tenant admin data for tenant {}", (Object)tenant);
            throw e;
        }
        catch (Exception e) {
            log.error("Failed to get tenant admin data for tenant {}", (Object)tenant, (Object)e);
            throw new RestException(e);
        }
        if (!tenantInfo.getAllowedClusters().contains(cluster)) {
            String msg = String.format("Cluster [%s] is not in the list of allowed clusters list for tenant [%s]", cluster, tenant);
            log.info(msg);
            throw new RestException(Response.Status.FORBIDDEN, msg);
        }
        log.info("Successfully validated clusters on tenant [{}]", (Object)tenant);
    }

    protected CompletableFuture<Void> validateClusterOwnershipAsync(String cluster) {
        return PulsarWebResource.getClusterDataIfDifferentCluster(this.pulsar(), cluster, this.clientAppId()).thenAccept(differentClusterData -> {
            if (differentClusterData != null) {
                try {
                    URI redirect = this.getRedirectionUrl((ClusterData)differentClusterData);
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] Redirecting the rest call to {}: cluster={}", new Object[]{this.clientAppId(), redirect, cluster});
                    }
                    throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
                }
                catch (MalformedURLException ex) {
                    throw new RestException(ex);
                }
            }
        });
    }

    protected void validateClusterOwnership(String cluster) throws WebApplicationException {
        this.sync(() -> this.validateClusterOwnershipAsync(cluster));
    }

    private URI getRedirectionUrl(ClusterData differentClusterData) throws MalformedURLException {
        try {
            PulsarServiceNameResolver serviceNameResolver = this.isRequestHttps() && this.pulsar.getConfiguration().getWebServicePortTls().isPresent() && StringUtils.isNotBlank(differentClusterData.getServiceUrlTls()) ? SERVICE_NAME_RESOLVER_CACHE.get(differentClusterData.getServiceUrlTls()) : SERVICE_NAME_RESOLVER_CACHE.get(differentClusterData.getServiceUrl());
            URL webUrl = new URL(serviceNameResolver.resolveHostUri().toString());
            return UriBuilder.fromUri(this.uri.getRequestUri()).host(webUrl.getHost()).port(webUrl.getPort()).build(new Object[0]);
        }
        catch (Exception exception) {
            if (exception.getCause() != null && exception.getCause() instanceof PulsarClientException.InvalidServiceURL) {
                throw new MalformedURLException(exception.getMessage());
            }
            throw exception;
        }
    }

    protected static CompletableFuture<ClusterData> getClusterDataIfDifferentCluster(PulsarService pulsar, String cluster, String clientAppId) {
        CompletableFuture<ClusterData> clusterDataFuture = new CompletableFuture<ClusterData>();
        if (PulsarWebResource.isValidCluster(pulsar, cluster) || pulsar.getConfiguration().getClusterName().equals(cluster)) {
            clusterDataFuture.complete(null);
            return clusterDataFuture;
        }
        pulsar.getPulsarResources().getClusterResources().getClusterAsync(cluster).whenComplete((clusterDataResult, ex) -> {
            if (ex != null) {
                clusterDataFuture.completeExceptionally(FutureUtil.unwrapCompletionException(ex));
                return;
            }
            if (clusterDataResult.isPresent()) {
                clusterDataFuture.complete((ClusterData)clusterDataResult.get());
            } else {
                log.warn("[{}] Cluster does not exist: requested={}", (Object)clientAppId, (Object)cluster);
                clusterDataFuture.completeExceptionally(new RestException(Response.Status.NOT_FOUND, "Cluster does not exist: cluster=" + cluster));
            }
        });
        return clusterDataFuture;
    }

    static boolean isValidCluster(PulsarService pulsarService, String cluster) {
        if (cluster == null || "global".equals(cluster)) {
            return true;
        }
        return !pulsarService.getConfiguration().isAuthorizationEnabled();
    }

    protected void validateBundleOwnership(String tenant, String cluster, String namespace, boolean authoritative, boolean readOnly, NamespaceBundle bundle) {
        NamespaceName fqnn = NamespaceName.get(tenant, cluster, namespace);
        try {
            this.validateBundleOwnership(bundle, authoritative, readOnly);
        }
        catch (WebApplicationException wae) {
            throw wae;
        }
        catch (Exception oe) {
            log.debug("Failed to find owner for namespace {}", (Object)fqnn, (Object)oe);
            throw new RestException(oe);
        }
    }

    protected NamespaceBundle validateNamespaceBundleRange(NamespaceName fqnn, BundlesData bundles, String bundleRange) {
        try {
            Preconditions.checkArgument(bundleRange.contains("_"), "Invalid bundle range: " + bundleRange);
            String[] boundaries = bundleRange.split("_");
            Long lowerEndpoint = Long.decode(boundaries[0]);
            Long upperEndpoint = Long.decode(boundaries[1]);
            Range<Long> hashRange = Range.range(lowerEndpoint, BoundType.CLOSED, upperEndpoint, upperEndpoint.equals(NamespaceBundles.FULL_UPPER_BOUND) ? BoundType.CLOSED : BoundType.OPEN);
            NamespaceBundle nsBundle = this.pulsar().getNamespaceService().getNamespaceBundleFactory().getBundle(fqnn, hashRange);
            NamespaceBundles nsBundles = this.pulsar().getNamespaceService().getNamespaceBundleFactory().getBundles(fqnn, bundles);
            nsBundles.validateBundle(nsBundle);
            return nsBundle;
        }
        catch (IllegalArgumentException e) {
            log.error("[{}] Invalid bundle range {}/{}, {}", new Object[]{this.clientAppId(), fqnn.toString(), bundleRange, e.getMessage()});
            throw new RestException(Response.Status.PRECONDITION_FAILED, e.getMessage());
        }
        catch (Exception e) {
            log.error("[{}] Failed to validate namespace bundle {}/{}", new Object[]{this.clientAppId(), fqnn.toString(), bundleRange, e});
            throw new RestException(e);
        }
    }

    protected CompletableFuture<Boolean> isBundleOwnedByAnyBroker(NamespaceName fqnn, BundlesData bundles, String bundleRange) {
        NamespaceBundle nsBundle = this.validateNamespaceBundleRange(fqnn, bundles, bundleRange);
        NamespaceService nsService = this.pulsar().getNamespaceService();
        LookupOptions options = LookupOptions.builder().authoritative(false).requestHttps(this.isRequestHttps()).readOnly(true).loadTopicsInBundle(false).build();
        try {
            return nsService.getWebServiceUrlAsync(nsBundle, options).thenApply(optionUrl -> optionUrl.isPresent());
        }
        catch (Exception e) {
            log.error("Failed to check whether namespace bundle is owned {}/{}", new Object[]{fqnn.toString(), bundleRange, e});
            throw new RestException(e);
        }
    }

    protected NamespaceBundle validateNamespaceBundleOwnership(NamespaceName fqnn, BundlesData bundles, String bundleRange, boolean authoritative, boolean readOnly) {
        try {
            NamespaceBundle nsBundle = this.validateNamespaceBundleRange(fqnn, bundles, bundleRange);
            this.validateBundleOwnership(nsBundle, authoritative, readOnly);
            return nsBundle;
        }
        catch (WebApplicationException wae) {
            throw wae;
        }
        catch (Exception e) {
            log.error("[{}] Failed to validate namespace bundle {}/{}", new Object[]{this.clientAppId(), fqnn.toString(), bundleRange, e});
            throw new RestException(e);
        }
    }

    public void validateBundleOwnership(NamespaceBundle bundle, boolean authoritative, boolean readOnly) throws Exception {
        NamespaceService nsService = this.pulsar().getNamespaceService();
        try {
            LookupOptions options = LookupOptions.builder().authoritative(authoritative).requestHttps(this.isRequestHttps()).readOnly(readOnly).loadTopicsInBundle(false).build();
            Optional<URL> webUrl = nsService.getWebServiceUrl(bundle, options);
            if (webUrl == null || !webUrl.isPresent()) {
                log.warn("Unable to get web service url");
                throw new RestException(Response.Status.PRECONDITION_FAILED, "Failed to find ownership for ServiceUnit:" + bundle.toString());
            }
            if (!nsService.isServiceUnitOwned(bundle)) {
                boolean newAuthoritative = this.isLeaderBroker();
                URI redirect = UriBuilder.fromUri(this.uri.getRequestUri()).host(webUrl.get().getHost()).port(webUrl.get().getPort()).replaceQueryParam("authoritative", newAuthoritative).build(new Object[0]);
                log.debug("{} is not a service unit owned", (Object)bundle);
                log.debug("Redirecting the rest call to {}", (Object)redirect);
                throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
            }
        }
        catch (TimeoutException te) {
            String msg = String.format("Finding owner for ServiceUnit %s timed out", bundle);
            log.error(msg, (Throwable)te);
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, msg);
        }
        catch (IllegalArgumentException iae) {
            log.debug("Failed to find owner for ServiceUnit {}", (Object)bundle, (Object)iae);
            throw new RestException(Response.Status.PRECONDITION_FAILED, "ServiceUnit format is not expected. ServiceUnit " + bundle);
        }
        catch (IllegalStateException ise) {
            log.debug("Failed to find owner for ServiceUnit {}", (Object)bundle, (Object)ise);
            throw new RestException(Response.Status.PRECONDITION_FAILED, "ServiceUnit bundle is actived. ServiceUnit " + bundle);
        }
        catch (NullPointerException e) {
            log.warn("Unable to get web service url");
            throw new RestException(Response.Status.PRECONDITION_FAILED, "Failed to find ownership for ServiceUnit:" + bundle);
        }
        catch (WebApplicationException wae) {
            throw wae;
        }
    }

    protected void validateTopicOwnership(TopicName topicName, boolean authoritative) {
        this.sync(() -> this.validateTopicOwnershipAsync(topicName, authoritative));
    }

    protected CompletableFuture<Void> validateTopicOwnershipAsync(TopicName topicName, boolean authoritative) {
        NamespaceService nsService = this.pulsar().getNamespaceService();
        LookupOptions options = LookupOptions.builder().authoritative(authoritative).requestHttps(this.isRequestHttps()).readOnly(false).loadTopicsInBundle(false).build();
        return ((CompletableFuture)((CompletableFuture)((CompletableFuture)nsService.getWebServiceUrlAsync(topicName, options).thenApply(webUrl -> {
            if (webUrl == null || !webUrl.isPresent()) {
                log.info("Unable to get web service url");
                throw new RestException(Response.Status.PRECONDITION_FAILED, "Failed to find ownership for topic:" + topicName);
            }
            return (URL)webUrl.get();
        })).thenCompose(webUrl -> nsService.isServiceUnitOwnedAsync(topicName).thenApply(isTopicOwned -> Pair.of(webUrl, isTopicOwned)))).thenAccept(pair -> {
            URL webUrl = (URL)pair.getLeft();
            boolean isTopicOwned = (Boolean)pair.getRight();
            if (!isTopicOwned) {
                boolean newAuthoritative = PulsarWebResource.isLeaderBroker(this.pulsar());
                URI redirect = UriBuilder.fromUri(this.uri.getRequestUri()).host(webUrl.getHost()).port(webUrl.getPort()).replaceQueryParam("authoritative", newAuthoritative).build(new Object[0]);
                if (log.isDebugEnabled()) {
                    log.debug("Redirecting the rest call to {}", (Object)redirect);
                }
                throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
            }
        })).exceptionally(ex -> {
            if (ex.getCause() instanceof IllegalArgumentException || ex.getCause() instanceof IllegalStateException) {
                if (log.isDebugEnabled()) {
                    log.debug("Failed to find owner for topic: {}", (Object)topicName, ex);
                }
                throw new RestException(Response.Status.PRECONDITION_FAILED, "Can't find owner for topic " + topicName);
            }
            if (ex.getCause() instanceof WebApplicationException) {
                throw (WebApplicationException)ex.getCause();
            }
            throw new RestException(ex.getCause());
        });
    }

    protected void validateGlobalNamespaceOwnership(NamespaceName namespace) {
        int timeout = this.pulsar().getConfiguration().getMetadataStoreOperationTimeoutSeconds();
        try {
            ClusterDataImpl peerClusterData = PulsarWebResource.checkLocalOrGetPeerReplicationCluster(this.pulsar(), namespace).get(timeout, TimeUnit.SECONDS);
            if (peerClusterData != null) {
                URI redirect = this.getRedirectionUrl(peerClusterData);
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Redirecting the rest call to {}: cluster={}", new Object[]{this.clientAppId(), redirect, peerClusterData});
                }
                throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
            }
        }
        catch (InterruptedException e) {
            log.warn("Time-out {} sec while validating policy on {} ", (Object)timeout, (Object)namespace);
            throw new RestException(Response.Status.SERVICE_UNAVAILABLE, String.format("Failed to validate global cluster configuration : ns=%s  emsg=%s", namespace, e.getMessage()));
        }
        catch (WebApplicationException e) {
            throw e;
        }
        catch (Exception e) {
            if (e.getCause() instanceof WebApplicationException) {
                throw (WebApplicationException)e.getCause();
            }
            throw new RestException(Response.Status.SERVICE_UNAVAILABLE, String.format("Failed to validate global cluster configuration : ns=%s  emsg=%s", namespace, e.getMessage()));
        }
    }

    protected CompletableFuture<Void> validateGlobalNamespaceOwnershipAsync(NamespaceName namespace) {
        return PulsarWebResource.checkLocalOrGetPeerReplicationCluster(this.pulsar(), namespace).thenAccept(peerClusterData -> {
            if (peerClusterData != null) {
                try {
                    URI redirect = this.getRedirectionUrl((ClusterData)peerClusterData);
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] Redirecting the rest call to {}: cluster={}", new Object[]{this.clientAppId(), redirect, peerClusterData});
                    }
                    throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
                }
                catch (MalformedURLException mue) {
                    throw new RestException(Response.Status.SERVICE_UNAVAILABLE, String.format("Failed to validate global cluster configuration : ns=%s  emsg=%s", namespace, mue.getMessage()));
                }
            }
        });
    }

    public static CompletableFuture<ClusterDataImpl> checkLocalOrGetPeerReplicationCluster(PulsarService pulsarService, NamespaceName namespace) {
        return PulsarWebResource.checkLocalOrGetPeerReplicationCluster(pulsarService, namespace, false);
    }

    public static CompletableFuture<ClusterDataImpl> checkLocalOrGetPeerReplicationCluster(PulsarService pulsarService, NamespaceName namespace, boolean allowDeletedNamespace) {
        if (!namespace.isGlobal()) {
            return CompletableFuture.completedFuture(null);
        }
        NamespaceName heartbeatNamespace = pulsarService.getHeartbeatNamespaceV2();
        if (namespace.equals(heartbeatNamespace)) {
            return CompletableFuture.completedFuture(null);
        }
        CompletableFuture<ClusterDataImpl> validationFuture = new CompletableFuture<ClusterDataImpl>();
        String localCluster = pulsarService.getConfiguration().getClusterName();
        ((CompletableFuture)pulsarService.getPulsarResources().getNamespaceResources().getPoliciesAsync(namespace).thenAccept(policiesResult -> {
            if (policiesResult.isPresent()) {
                Policies policies = (Policies)policiesResult.get();
                if (!allowDeletedNamespace && policies.deleted) {
                    String msg = String.format("Namespace %s is deleted", namespace.toString());
                    log.warn(msg);
                    validationFuture.completeExceptionally(new RestException(Response.Status.PRECONDITION_FAILED, "Namespace is deleted"));
                } else if (policies.replication_clusters.isEmpty()) {
                    String msg = String.format("Namespace does not have any clusters configured : local_cluster=%s ns=%s", localCluster, namespace.toString());
                    log.warn(msg);
                    validationFuture.completeExceptionally(new RestException(Response.Status.PRECONDITION_FAILED, msg));
                } else if (!policies.replication_clusters.contains(localCluster)) {
                    ((CompletableFuture)PulsarWebResource.getOwnerFromPeerClusterListAsync(pulsarService, policies.replication_clusters).thenAccept(ownerPeerCluster -> {
                        if (ownerPeerCluster != null) {
                            validationFuture.complete((ClusterDataImpl)ownerPeerCluster);
                        } else {
                            String msg = String.format("Namespace missing local cluster name in clusters list: local_cluster=%s ns=%s clusters=%s", localCluster, namespace.toString(), policies.replication_clusters);
                            log.warn(msg);
                            validationFuture.completeExceptionally(new RestException(Response.Status.PRECONDITION_FAILED, msg));
                        }
                    })).exceptionally(ex -> {
                        Throwable cause = FutureUtil.unwrapCompletionException(ex);
                        validationFuture.completeExceptionally(new RestException(cause));
                        return null;
                    });
                } else {
                    validationFuture.complete(null);
                }
            } else {
                String msg = String.format("Namespace %s not found", namespace.toString());
                log.warn(msg);
                validationFuture.completeExceptionally(new RestException(Response.Status.NOT_FOUND, "Namespace not found"));
            }
        })).exceptionally(ex -> {
            Throwable cause = FutureUtil.unwrapCompletionException(ex);
            String msg = String.format("Failed to validate global cluster configuration : cluster=%s ns=%s  emsg=%s", localCluster, namespace, cause.getMessage());
            log.error(msg);
            validationFuture.completeExceptionally(new RestException(cause));
            return null;
        });
        return validationFuture;
    }

    private static CompletableFuture<ClusterDataImpl> getOwnerFromPeerClusterListAsync(PulsarService pulsar, Set<String> replicationClusters) {
        String currentCluster = pulsar.getConfiguration().getClusterName();
        if (replicationClusters == null || replicationClusters.isEmpty() || StringUtils.isBlank(currentCluster)) {
            return CompletableFuture.completedFuture(null);
        }
        return ((CompletableFuture)pulsar.getPulsarResources().getClusterResources().getClusterAsync(currentCluster).thenCompose(cluster -> {
            if (!cluster.isPresent() || ((ClusterData)cluster.get()).getPeerClusterNames() == null) {
                return CompletableFuture.completedFuture(null);
            }
            for (String peerCluster : ((ClusterData)cluster.get()).getPeerClusterNames()) {
                if (!replicationClusters.contains(peerCluster)) continue;
                return pulsar.getPulsarResources().getClusterResources().getClusterAsync(peerCluster).thenApply(ret -> {
                    if (!ret.isPresent()) {
                        throw new RestException(Response.Status.NOT_FOUND, "Peer cluster " + peerCluster + " data not found");
                    }
                    return (ClusterDataImpl)ret.get();
                });
            }
            return CompletableFuture.completedFuture(null);
        })).exceptionally(ex -> {
            log.error("Failed to get peer-cluster {}-{}", (Object)currentCluster, (Object)ex.getMessage());
            throw FutureUtil.wrapToCompletionException(ex);
        });
    }

    protected static CompletableFuture<Void> checkAuthorizationAsync(PulsarService pulsarService, TopicName topicName, String role, AuthenticationDataSource authenticationData) {
        if (!pulsarService.getConfiguration().isAuthorizationEnabled()) {
            return CompletableFuture.completedFuture(null);
        }
        return pulsarService.getBrokerService().getAuthorizationService().allowTopicOperationAsync(topicName, TopicOperation.LOOKUP, null, role, authenticationData).thenAccept(allow -> {
            if (!allow.booleanValue()) {
                log.warn("[{}] Role {} is not allowed to lookup topic", (Object)topicName, (Object)role);
                throw new RestException(Response.Status.UNAUTHORIZED, "Don't have permission to connect to this namespace");
            }
        });
    }

    public void setPulsar(PulsarService pulsar) {
        this.pulsar = pulsar;
    }

    protected boolean isLeaderBroker() {
        return PulsarWebResource.isLeaderBroker(this.pulsar());
    }

    protected static boolean isLeaderBroker(PulsarService pulsar) {
        return pulsar.getLeaderElectionService().isLeader();
    }

    public void validateTenantOperation(String tenant, TenantOperation operation) {
        this.sync(() -> this.validateTenantOperationAsync(tenant, operation));
    }

    public CompletableFuture<Void> validateTenantOperationAsync(String tenant, TenantOperation operation) {
        if (this.pulsar().getConfiguration().isAuthenticationEnabled() && this.pulsar().getBrokerService().isAuthorizationEnabled()) {
            if (!PulsarWebResource.isClientAuthenticated(this.clientAppId())) {
                return FutureUtil.failedFuture(new RestException(Response.Status.UNAUTHORIZED, "Need to authenticate to perform the request"));
            }
            return this.pulsar().getBrokerService().getAuthorizationService().allowTenantOperationAsync(tenant, operation, this.originalPrincipal(), this.clientAppId(), this.clientAuthData()).thenAccept(isAuthorized -> {
                if (!isAuthorized.booleanValue()) {
                    throw new RestException(Response.Status.UNAUTHORIZED, String.format("Unauthorized to validateTenantOperation for originalPrincipal [%s] and clientAppId [%s] about operation [%s] on tenant [%s]", this.originalPrincipal(), this.clientAppId(), operation.toString(), tenant));
                }
            });
        }
        return CompletableFuture.completedFuture(null);
    }

    public void validateNamespaceOperation(NamespaceName namespaceName, NamespaceOperation operation) {
        this.sync(() -> this.validateNamespaceOperationAsync(namespaceName, operation));
    }

    public CompletableFuture<Void> validateNamespaceOperationAsync(NamespaceName namespaceName, NamespaceOperation operation) {
        if (this.pulsar().getConfiguration().isAuthenticationEnabled() && this.pulsar().getBrokerService().isAuthorizationEnabled()) {
            if (!PulsarWebResource.isClientAuthenticated(this.clientAppId())) {
                return FutureUtil.failedFuture(new RestException(Response.Status.FORBIDDEN, "Need to authenticate to perform the request"));
            }
            return this.pulsar().getBrokerService().getAuthorizationService().allowNamespaceOperationAsync(namespaceName, operation, this.originalPrincipal(), this.clientAppId(), this.clientAuthData()).thenAccept(isAuthorized -> {
                if (!isAuthorized.booleanValue()) {
                    throw new RestException(Response.Status.FORBIDDEN, String.format("Unauthorized to validateNamespaceOperation for operation [%s] on namespace [%s]", operation.toString(), namespaceName));
                }
            });
        }
        return CompletableFuture.completedFuture(null);
    }

    public void validateNamespacePolicyOperation(NamespaceName namespaceName, PolicyName policy, PolicyOperation operation) {
        this.sync(() -> this.validateNamespacePolicyOperationAsync(namespaceName, policy, operation));
    }

    public CompletableFuture<Void> validateNamespacePolicyOperationAsync(NamespaceName namespaceName, PolicyName policy, PolicyOperation operation) {
        if (this.pulsar().getConfiguration().isAuthenticationEnabled() && this.pulsar().getBrokerService().isAuthorizationEnabled()) {
            if (!PulsarWebResource.isClientAuthenticated(this.clientAppId())) {
                return FutureUtil.failedFuture(new RestException(Response.Status.FORBIDDEN, "Need to authenticate to perform the request"));
            }
            return this.pulsar().getBrokerService().getAuthorizationService().allowNamespacePolicyOperationAsync(namespaceName, policy, operation, this.originalPrincipal(), this.clientAppId(), this.clientAuthData()).thenAccept(isAuthorized -> {
                if (!isAuthorized.booleanValue()) {
                    throw new RestException(Response.Status.FORBIDDEN, String.format("Unauthorized to validateNamespacePolicyOperation for operation [%s] on namespace [%s] on policy [%s]", operation.toString(), namespaceName, policy.toString()));
                }
            });
        }
        return CompletableFuture.completedFuture(null);
    }

    protected PulsarResources getPulsarResources() {
        return this.pulsar().getPulsarResources();
    }

    protected TenantResources tenantResources() {
        return this.pulsar().getPulsarResources().getTenantResources();
    }

    protected ClusterResources clusterResources() {
        return this.pulsar().getPulsarResources().getClusterResources();
    }

    protected BookieResources bookieResources() {
        return this.pulsar().getPulsarResources().getBookieResources();
    }

    protected TopicResources topicResources() {
        return this.pulsar().getPulsarResources().getTopicResources();
    }

    protected NamespaceResources namespaceResources() {
        return this.pulsar().getPulsarResources().getNamespaceResources();
    }

    protected ResourceGroupResources resourceGroupResources() {
        return this.pulsar().getPulsarResources().getResourcegroupResources();
    }

    protected LocalPoliciesResources getLocalPolicies() {
        return this.pulsar().getPulsarResources().getLocalPolicies();
    }

    protected NamespaceResources.IsolationPolicyResources namespaceIsolationPolicies() {
        return this.namespaceResources().getIsolationPolicies();
    }

    protected DynamicConfigurationResources dynamicConfigurationResources() {
        return this.pulsar().getPulsarResources().getDynamicConfigResources();
    }

    public static ObjectMapper jsonMapper() {
        return ObjectMapperFactory.getThreadLocal();
    }

    public void validatePoliciesReadOnlyAccess() {
        try {
            if (this.namespaceResources().getPoliciesReadOnly()) {
                log.debug("Policies are read-only. Broker cannot do read-write operations");
                throw new RestException(Response.Status.FORBIDDEN, "Broker is forbidden to do read-write operations");
            }
        }
        catch (Exception e) {
            log.warn("Unable to fetch read-only policy config ", (Throwable)e);
            throw new RestException(e);
        }
    }

    protected CompletableFuture<Void> hasActiveNamespace(String tenant) {
        return this.tenantResources().hasActiveNamespace(tenant);
    }

    protected void validateClusterExists(String cluster) {
        try {
            if (!this.clusterResources().clusterExists(cluster)) {
                throw new RestException(Response.Status.PRECONDITION_FAILED, "Cluster " + cluster + " does not exist.");
            }
        }
        catch (Exception e) {
            throw new RestException(e);
        }
    }

    protected CompletableFuture<Void> canUpdateCluster(String tenant, Set<String> oldClusters, Set<String> newClusters) {
        ArrayList activeNamespaceFuture = Lists.newArrayList();
        for (String cluster : oldClusters) {
            if ("global".equals(cluster) || newClusters.contains(cluster)) continue;
            CompletableFuture checkNs = new CompletableFuture();
            activeNamespaceFuture.add(checkNs);
            this.tenantResources().getActiveNamespaces(tenant, cluster).whenComplete((activeNamespaces, ex) -> {
                if (ex != null) {
                    log.warn("Failed to get namespaces under {}-{}, {}", new Object[]{tenant, cluster, ex.getCause().getMessage()});
                    checkNs.completeExceptionally(ex.getCause());
                    return;
                }
                if (activeNamespaces.size() > 0) {
                    log.warn("{}/{} Active-namespaces {}", new Object[]{tenant, cluster, activeNamespaces});
                    checkNs.completeExceptionally(new RestException(Response.Status.PRECONDITION_FAILED, "Active namespaces"));
                    return;
                }
                checkNs.complete(null);
            });
        }
        return FutureUtil.waitForAll(activeNamespaceFuture);
    }

    protected void validateBrokerName(String broker) {
        String brokerUrl = String.format("http://%s", broker);
        String brokerUrlTls = String.format("https://%s", broker);
        if (!brokerUrl.equals(this.pulsar().getSafeWebServiceAddress()) && !brokerUrlTls.equals(this.pulsar().getWebServiceAddressTls())) {
            String[] parts = broker.split(":");
            Preconditions.checkArgument(parts.length == 2, String.format("Invalid broker url %s", broker));
            String host = parts[0];
            int port = Integer.parseInt(parts[1]);
            URI redirect = UriBuilder.fromUri(this.uri.getRequestUri()).host(host).port(port).build(new Object[0]);
            log.debug("[{}] Redirecting the rest call to {}: broker={}", new Object[]{this.clientAppId(), redirect, broker});
            throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
        }
    }

    public void validateTopicPolicyOperation(TopicName topicName, PolicyName policy, PolicyOperation operation) {
        this.sync(() -> this.validateTopicPolicyOperationAsync(topicName, policy, operation));
    }

    public CompletableFuture<Void> validateTopicPolicyOperationAsync(TopicName topicName, PolicyName policy, PolicyOperation operation) {
        if (this.pulsar().getConfiguration().isAuthenticationEnabled() && this.pulsar().getBrokerService().isAuthorizationEnabled()) {
            if (!PulsarWebResource.isClientAuthenticated(this.clientAppId())) {
                return FutureUtil.failedFuture(new RestException(Response.Status.FORBIDDEN, "Need to authenticate to perform the request"));
            }
            return this.pulsar().getBrokerService().getAuthorizationService().allowTopicPolicyOperationAsync(topicName, policy, operation, this.originalPrincipal(), this.clientAppId(), this.clientAuthData()).thenAccept(isAuthorized -> {
                if (!isAuthorized.booleanValue()) {
                    throw new RestException(Response.Status.FORBIDDEN, String.format("Unauthorized to validateTopicPolicyOperation for operation [%s] on topic [%s] on policy [%s]", operation.toString(), topicName, policy.toString()));
                }
            });
        }
        return CompletableFuture.completedFuture(null);
    }

    public void validateTopicOperation(TopicName topicName, TopicOperation operation) {
        this.validateTopicOperation(topicName, operation, null);
    }

    public void validateTopicOperation(TopicName topicName, TopicOperation operation, String subscription) {
        this.sync(() -> this.validateTopicOperationAsync(topicName, operation, subscription));
    }

    public CompletableFuture<Void> validateTopicOperationAsync(TopicName topicName, TopicOperation operation) {
        return this.validateTopicOperationAsync(topicName, operation, null);
    }

    public CompletableFuture<Void> validateTopicOperationAsync(TopicName topicName, TopicOperation operation, String subscription) {
        if (this.pulsar().getConfiguration().isAuthenticationEnabled() && this.pulsar().getBrokerService().isAuthorizationEnabled()) {
            if (!PulsarWebResource.isClientAuthenticated(this.clientAppId())) {
                return FutureUtil.failedFuture(new RestException(Response.Status.UNAUTHORIZED, "Need to authenticate to perform the request"));
            }
            AuthenticationDataHttps authData = this.clientAuthData();
            authData.setSubscription(subscription);
            return this.pulsar().getBrokerService().getAuthorizationService().allowTopicOperationAsync(topicName, operation, this.originalPrincipal(), this.clientAppId(), authData).thenAccept(isAuthorized -> {
                if (!isAuthorized.booleanValue()) {
                    throw new RestException(Response.Status.UNAUTHORIZED, String.format("Unauthorized to validateTopicOperation for operation [%s] on topic [%s]", operation.toString(), topicName));
                }
            });
        }
        return CompletableFuture.completedFuture(null);
    }

    public <T> T sync(Supplier<CompletableFuture<T>> supplier) {
        try {
            return supplier.get().get(this.config().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
        }
        catch (ExecutionException | TimeoutException ex) {
            Throwable realCause = FutureUtil.unwrapCompletionException(ex);
            if (realCause instanceof WebApplicationException) {
                throw (WebApplicationException)realCause;
            }
            throw new RestException(realCause);
        }
        catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
            throw new RestException(ex);
        }
    }
}

