/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.admin.impl;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.OffloadProcessStatus;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.shade.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.pulsar.shade.com.github.zafarkhaja.semver.Version;
import org.apache.pulsar.shade.com.google.common.collect.Lists;
import org.apache.pulsar.shade.com.google.common.collect.Maps;
import org.apache.pulsar.shade.com.google.common.collect.Sets;
import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
import org.apache.pulsar.shade.javax.ws.rs.WebApplicationException;
import org.apache.pulsar.shade.javax.ws.rs.container.AsyncResponse;
import org.apache.pulsar.shade.javax.ws.rs.core.Response;
import org.apache.pulsar.shade.javax.ws.rs.core.StreamingOutput;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.ManagedLedgerInfo;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl.ManagedLedgerOfflineBacklog;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.shade.org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.EncryptionKeys;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.KeyValue;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.shade.org.apache.pulsar.common.compression.CompressionCodec;
import org.apache.pulsar.shade.org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.shade.org.apache.pulsar.common.events.EventsTopicNames;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.PartitionedManagedLedgerInfo;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.shade.org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.AuthPolicies;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.PartitionedTopicInternalStats;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.PolicyName;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.stats.PartitionedTopicStatsImpl;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.shade.org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PersistentTopicsBase
extends AdminResource {
    private static final Logger log = LoggerFactory.getLogger(PersistentTopicsBase.class);
    private static final int OFFLINE_TOPIC_STAT_TTL_MINS = 10;
    private static final String DEPRECATED_CLIENT_VERSION_PREFIX = "Pulsar-CPP-v";
    private static final Version LEAST_SUPPORTED_CLIENT_VERSION_PREFIX = Version.forIntegers(1, 21);

    protected List<String> internalGetList(Optional<String> bundle) {
        this.validateNamespaceOperation(this.namespaceName, NamespaceOperation.GET_TOPICS);
        try {
            if (!this.namespaceResources().namespaceExists(this.namespaceName)) {
                throw new RestException(Response.Status.NOT_FOUND, "Namespace does not exist");
            }
        }
        catch (RestException re) {
            throw re;
        }
        catch (Exception e) {
            log.error("[{}] Failed to get topic list {}", new Object[]{this.clientAppId(), this.namespaceName, e});
            throw new RestException(e);
        }
        try {
            List<String> topics = this.topicResources().listPersistentTopicsAsync(this.namespaceName).join();
            return topics.stream().filter(topic -> {
                if (PulsarService.isTransactionInternalName(TopicName.get(topic))) {
                    return false;
                }
                if (bundle.isPresent()) {
                    NamespaceBundle b = this.pulsar().getNamespaceService().getNamespaceBundleFactory().getBundle(TopicName.get(topic));
                    return b != null && ((String)bundle.get()).equals(b.getBundleRange());
                }
                return true;
            }).collect(Collectors.toList());
        }
        catch (Exception e) {
            log.error("[{}] Failed to get topics list for namespace {}", new Object[]{this.clientAppId(), this.namespaceName, e});
            throw new RestException(e);
        }
    }

    protected List<String> internalGetPartitionedTopicList() {
        this.validateNamespaceOperation(this.namespaceName, NamespaceOperation.GET_TOPICS);
        try {
            if (!this.namespaceResources().namespaceExists(this.namespaceName)) {
                log.warn("[{}] Failed to get partitioned topic list {}: Namespace does not exist", (Object)this.clientAppId(), (Object)this.namespaceName);
                throw new RestException(Response.Status.NOT_FOUND, "Namespace does not exist");
            }
        }
        catch (RestException e) {
            throw e;
        }
        catch (Exception e) {
            log.error("[{}] Failed to get partitioned topic list for namespace {}", new Object[]{this.clientAppId(), this.namespaceName, e});
            throw new RestException(e);
        }
        return this.getPartitionedTopicList(TopicDomain.getEnum(this.domain()));
    }

    protected Map<String, Set<AuthAction>> internalGetPermissionsOnTopic() {
        this.validateAdminAccessForTenant(this.namespaceName.getTenant());
        String topicUri = this.topicName.toString();
        try {
            Policies policies = this.namespaceResources().getPolicies(this.namespaceName).orElseThrow(() -> new RestException(Response.Status.NOT_FOUND, "Namespace does not exist"));
            HashMap<String, Set<AuthAction>> permissions = Maps.newHashMap();
            AuthPolicies auth = policies.auth_policies;
            auth.getNamespaceAuthentication().forEach(permissions::put);
            if (auth.getTopicAuthentication().containsKey(topicUri)) {
                for (Map.Entry<String, Set<AuthAction>> entry : auth.getTopicAuthentication().get(topicUri).entrySet()) {
                    String role = entry.getKey();
                    Set<AuthAction> topicPermissions = entry.getValue();
                    if (!permissions.containsKey(role)) {
                        permissions.put(role, topicPermissions);
                        continue;
                    }
                    Sets.SetView<AuthAction> union = Sets.union((Set)permissions.get(role), topicPermissions);
                    permissions.put(role, union);
                }
            }
            return permissions;
        }
        catch (Exception e) {
            log.error("[{}] Failed to get permissions for topic {}", new Object[]{this.clientAppId(), topicUri, e});
            throw new RestException(e);
        }
    }

    protected void validateCreateTopic(TopicName topicName) {
        if (PulsarService.isTransactionInternalName(topicName)) {
            log.warn("Forbidden to create transaction internal topic: {}", (Object)topicName);
            throw new RestException(Response.Status.BAD_REQUEST, "Cannot create topic in system topic format!");
        }
    }

    public void validateAdminOperationOnTopic(boolean authoritative) {
        this.validateAdminAccessForTenant(this.topicName.getTenant());
        this.validateTopicOwnership(this.topicName, authoritative);
    }

    private void grantPermissions(TopicName topicUri, String role, Set<AuthAction> actions) {
        try {
            AuthorizationService authService = this.pulsar().getBrokerService().getAuthorizationService();
            if (null == authService) {
                throw new RestException(Response.Status.NOT_IMPLEMENTED, "Authorization is not enabled");
            }
            authService.grantPermissionAsync(topicUri, actions, role, null).get();
            log.info("[{}] Successfully granted access for role {}: {} - topic {}", new Object[]{this.clientAppId(), role, actions, topicUri});
        }
        catch (InterruptedException e) {
            log.error("[{}] Failed to get permissions for topic {}", new Object[]{this.clientAppId(), topicUri, e});
            throw new RestException(e);
        }
        catch (ExecutionException e) {
            if (e.getCause() instanceof MetadataStoreException.NotFoundException || e.getCause() instanceof IllegalArgumentException) {
                log.warn("[{}] Failed to set permissions for topic {}: Namespace does not exist", new Object[]{this.clientAppId(), topicUri, e});
                throw new RestException(Response.Status.NOT_FOUND, "Topic's namespace does not exist");
            }
            if (e.getCause() instanceof MetadataStoreException.BadVersionException || e.getCause() instanceof IllegalStateException) {
                log.warn("[{}] Failed to set permissions for topic {}: {}", new Object[]{this.clientAppId(), topicUri, e.getCause().getMessage(), e});
                throw new RestException(Response.Status.CONFLICT, "Concurrent modification");
            }
            log.error("[{}] Failed to get permissions for topic {}", new Object[]{this.clientAppId(), topicUri, e});
            throw new RestException(e);
        }
    }

    protected void internalGrantPermissionsOnTopic(String role, Set<AuthAction> actions) {
        this.validateAdminAccessForTenant(this.namespaceName.getTenant());
        this.validatePoliciesReadOnlyAccess();
        PartitionedTopicMetadata meta = this.getPartitionedTopicMetadata(this.topicName, true, false);
        int numPartitions = meta.partitions;
        if (numPartitions > 0) {
            for (int i = 0; i < numPartitions; ++i) {
                TopicName topicNamePartition = this.topicName.getPartition(i);
                this.grantPermissions(topicNamePartition, role, actions);
            }
        }
        this.grantPermissions(this.topicName, role, actions);
    }

    protected void internalDeleteTopicForcefully(boolean authoritative, boolean deleteSchema) {
        this.validateTopicOwnership(this.topicName, authoritative);
        this.validateNamespaceOperation(this.topicName.getNamespaceObject(), NamespaceOperation.DELETE_TOPIC);
        try {
            this.pulsar().getBrokerService().deleteTopic(this.topicName.toString(), true, deleteSchema).get();
        }
        catch (Exception e) {
            if (this.isManagedLedgerNotFoundException(e)) {
                log.info("[{}] Topic was already not existing {}", new Object[]{this.clientAppId(), this.topicName, e});
            }
            log.error("[{}] Failed to delete topic forcefully {}", new Object[]{this.clientAppId(), this.topicName, e});
            throw new RestException(e);
        }
    }

    private void revokePermissions(String topicUri, String role) {
        Policies policies;
        try {
            policies = this.namespaceResources().getPolicies(this.namespaceName).orElseThrow(() -> new RestException(Response.Status.NOT_FOUND, "Namespace does not exist"));
        }
        catch (Exception e) {
            log.error("[{}] Failed to revoke permissions for topic {}", new Object[]{this.clientAppId(), topicUri, e});
            throw new RestException(e);
        }
        if (!policies.auth_policies.getTopicAuthentication().containsKey(topicUri) || !policies.auth_policies.getTopicAuthentication().get(topicUri).containsKey(role)) {
            log.warn("[{}] Failed to revoke permission from role {} on topic: Not set at topic level {}", new Object[]{this.clientAppId(), role, topicUri});
            throw new RestException(Response.Status.PRECONDITION_FAILED, "Permissions are not set at the topic level");
        }
        try {
            this.namespaceResources().setPolicies(this.namespaceName, p -> {
                p.auth_policies.getTopicAuthentication().get(topicUri).remove(role);
                return p;
            });
            log.info("[{}] Successfully revoke access for role {} - topic {}", new Object[]{this.clientAppId(), role, topicUri});
        }
        catch (Exception e) {
            log.error("[{}] Failed to revoke permissions for topic {}", new Object[]{this.clientAppId(), topicUri, e});
            throw new RestException(e);
        }
    }

    protected void internalRevokePermissionsOnTopic(String role) {
        this.validateAdminAccessForTenant(this.namespaceName.getTenant());
        this.validatePoliciesReadOnlyAccess();
        PartitionedTopicMetadata meta = this.getPartitionedTopicMetadata(this.topicName, true, false);
        int numPartitions = meta.partitions;
        if (numPartitions > 0) {
            for (int i = 0; i < numPartitions; ++i) {
                TopicName topicNamePartition = this.topicName.getPartition(i);
                this.revokePermissions(topicNamePartition.toString(), role);
            }
        }
        this.revokePermissions(this.topicName.toString(), role);
    }

    protected void internalCreateNonPartitionedTopic(boolean authoritative, Map<String, String> properties) {
        this.validateNonPartitionTopicName(this.topicName.getLocalName());
        if (this.topicName.isGlobal()) {
            this.validateGlobalNamespaceOwnership(this.namespaceName);
        }
        this.validateTopicOwnership(this.topicName, authoritative);
        this.validateNamespaceOperation(this.topicName.getNamespaceObject(), NamespaceOperation.CREATE_TOPIC);
        PartitionedTopicMetadata partitionMetadata = this.getPartitionedTopicMetadata(this.topicName, authoritative, false);
        if (partitionMetadata.partitions > 0) {
            log.warn("[{}] Partitioned topic with the same name already exists {}", (Object)this.clientAppId(), (Object)this.topicName);
            throw new RestException(Response.Status.CONFLICT, "This topic already exists");
        }
        try {
            Optional<Topic> existedTopic = this.pulsar().getBrokerService().getTopicIfExists(this.topicName.toString()).get();
            if (existedTopic.isPresent()) {
                log.error("[{}] Topic {} already exists", (Object)this.clientAppId(), (Object)this.topicName);
                throw new RestException(Response.Status.CONFLICT, "This topic already exists");
            }
            Topic createdTopic = this.getOrCreateTopic(this.topicName, properties);
            log.info("[{}] Successfully created non-partitioned topic {}", (Object)this.clientAppId(), (Object)createdTopic);
        }
        catch (Exception e) {
            if (e instanceof RestException) {
                throw (RestException)e;
            }
            log.error("[{}] Failed to create non-partitioned topic {}", new Object[]{this.clientAppId(), this.topicName, e});
            throw new RestException(e);
        }
    }

    protected void internalUpdatePartitionedTopic(int numPartitions, boolean updateLocalTopicOnly, boolean authoritative, boolean force) {
        int maxPartitions;
        if (numPartitions <= 0) {
            throw new RestException(Response.Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0");
        }
        this.validateTopicOwnership(this.topicName, authoritative);
        this.validateTopicPolicyOperation(this.topicName, PolicyName.PARTITION, PolicyOperation.WRITE);
        if (!updateLocalTopicOnly && !force) {
            this.validatePartitionTopicUpdate(this.topicName.getLocalName(), numPartitions);
        }
        if ((maxPartitions = this.pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic()) > 0 && numPartitions > maxPartitions) {
            throw new RestException(Response.Status.NOT_ACCEPTABLE, "Number of partitions should be less than or equal to " + maxPartitions);
        }
        if (this.topicName.isGlobal() && this.isNamespaceReplicated(this.topicName.getNamespaceObject())) {
            Set<String> clusters = this.getNamespaceReplicatedClusters(this.topicName.getNamespaceObject());
            if (!clusters.contains(this.pulsar().getConfig().getClusterName())) {
                log.error("[{}] local cluster is not part of replicated cluster for namespace {}", (Object)this.clientAppId(), (Object)this.topicName);
                throw new RestException(Response.Status.FORBIDDEN, "Local cluster is not part of replicate cluster list");
            }
            try {
                this.tryCreatePartitionsAsync(numPartitions).get(30L, TimeUnit.SECONDS);
                this.createSubscriptions(this.topicName, numPartitions).get(30L, TimeUnit.SECONDS);
            }
            catch (Exception e) {
                if (e.getCause() instanceof RestException) {
                    throw (RestException)e.getCause();
                }
                log.error("[{}] Failed to update partitioned topic {}", new Object[]{this.clientAppId(), this.topicName, e});
                throw new RestException(e);
            }
            if (!updateLocalTopicOnly) {
                CompletableFuture updatePartition = new CompletableFuture();
                ((CompletableFuture)this.updatePartitionInOtherCluster(numPartitions, clusters).thenRun(() -> {
                    try {
                        ((CompletableFuture)this.namespaceResources().getPartitionedTopicResources().updatePartitionedTopicAsync(this.topicName, p -> new PartitionedTopicMetadata(numPartitions)).thenAccept(r -> updatePartition.complete(null))).exceptionally(ex -> {
                            updatePartition.completeExceptionally(ex.getCause());
                            return null;
                        });
                    }
                    catch (Exception e) {
                        updatePartition.completeExceptionally(e);
                    }
                })).exceptionally(ex -> {
                    updatePartition.completeExceptionally((Throwable)ex);
                    return null;
                });
                try {
                    updatePartition.get(30L, TimeUnit.SECONDS);
                }
                catch (Exception e) {
                    log.error("{} Failed to update number of partitions in zk for topic {} and partitions {}", new Object[]{this.clientAppId(), this.topicName, numPartitions, e});
                    if (e.getCause() instanceof RestException) {
                        throw (RestException)e.getCause();
                    }
                    throw new RestException(e);
                }
            }
            return;
        }
        try {
            this.tryCreatePartitionsAsync(numPartitions).get(30L, TimeUnit.SECONDS);
            this.updatePartitionedTopic(this.topicName, numPartitions, force).get(30L, TimeUnit.SECONDS);
        }
        catch (Exception e) {
            if (e.getCause() instanceof RestException) {
                throw (RestException)e.getCause();
            }
            log.error("[{}] Failed to update partitioned topic {}", new Object[]{this.clientAppId(), this.topicName, e});
            throw new RestException(e);
        }
    }

    protected void internalCreateMissedPartitions(AsyncResponse asyncResponse) {
        ((CompletableFuture)this.getPartitionedTopicMetadataAsync(this.topicName, false, false).thenAccept(metadata -> {
            if (metadata != null) {
                ((CompletableFuture)this.tryCreatePartitionsAsync(metadata.partitions).thenAccept(v -> asyncResponse.resume(Response.noContent().build()))).exceptionally(e -> {
                    log.error("[{}] Failed to create partitions for topic {}", (Object)this.clientAppId(), (Object)this.topicName);
                    PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, e);
                    return null;
                });
            }
        })).exceptionally(ex -> {
            if (!PersistentTopicsBase.isRedirectException(ex)) {
                log.error("[{}] Failed to create partitions for topic {}", (Object)this.clientAppId(), (Object)this.topicName);
            }
            PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, ex);
            return null;
        });
    }

    protected CompletableFuture<Void> internalSetDelayedDeliveryPolicies(DelayedDeliveryPolicies deliveryPolicies, boolean isGlobal) {
        return this.getTopicPoliciesAsyncWithRetry(this.topicName, isGlobal).thenCompose(op -> {
            TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
            topicPolicies.setIsGlobal(isGlobal);
            topicPolicies.setDelayedDeliveryEnabled(deliveryPolicies == null ? null : Boolean.valueOf(deliveryPolicies.isActive()));
            topicPolicies.setDelayedDeliveryTickTimeMillis(deliveryPolicies == null ? null : Long.valueOf(deliveryPolicies.getTickTime()));
            return this.pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(this.topicName, topicPolicies);
        });
    }

    private CompletableFuture<Void> updatePartitionInOtherCluster(int numPartitions, Set<String> clusters) {
        ArrayList results = new ArrayList(clusters.size() - 1);
        clusters.forEach(cluster -> {
            if (cluster.equals(this.pulsar().getConfig().getClusterName())) {
                return;
            }
            CompletionStage updatePartitionTopicFuture = ((CompletableFuture)this.pulsar().getPulsarResources().getClusterResources().getClusterAsync((String)cluster).thenApply(clusterDataOp -> this.pulsar().getBrokerService().getClusterPulsarAdmin((String)cluster, (Optional<ClusterData>)clusterDataOp))).thenCompose(pulsarAdmin -> pulsarAdmin.topics().updatePartitionedTopicAsync(this.topicName.toString(), numPartitions, true, false));
            results.add(updatePartitionTopicFuture);
        });
        return FutureUtil.waitForAll(results);
    }

    protected PartitionedTopicMetadata internalGetPartitionedMetadata(boolean authoritative, boolean checkAllowAutoCreation) {
        PartitionedTopicMetadata metadata = this.getPartitionedTopicMetadata(this.topicName, authoritative, checkAllowAutoCreation);
        if (metadata.partitions == 0 && !checkAllowAutoCreation) {
            try {
                if (!this.pulsar().getNamespaceService().checkTopicExists(this.topicName).get().booleanValue()) {
                    throw new RestException(Response.Status.NOT_FOUND, (Throwable)new PulsarClientException.NotFoundException("Topic not exist"));
                }
            }
            catch (InterruptedException | ExecutionException e) {
                log.error("Failed to check if topic '{}' exists", (Object)this.topicName, (Object)e);
                throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, "Failed to get topic metadata");
            }
        }
        if (metadata.partitions > 1) {
            this.validateClientVersion();
        }
        return metadata;
    }

    protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse, boolean authoritative, boolean force, boolean deleteSchema) {
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateNamespaceOperationAsync(this.topicName.getNamespaceObject(), NamespaceOperation.DELETE_TOPIC).thenCompose(__ -> this.validateTopicOwnershipAsync(this.topicName, authoritative))).thenCompose(__ -> this.pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(this.topicName).thenCompose(partitionedMeta -> {
            int numPartitions = partitionedMeta.partitions;
            if (numPartitions < 1) {
                return CompletableFuture.completedFuture(null);
            }
            if (deleteSchema) {
                return ((CompletableFuture)this.pulsar().getBrokerService().deleteSchemaStorage(this.topicName.getPartition(0).toString()).thenCompose(unused -> this.internalRemovePartitionsAuthenticationPoliciesAsync(numPartitions))).thenCompose(unused2 -> this.internalRemovePartitionsTopicAsync(numPartitions, force));
            }
            return this.internalRemovePartitionsAuthenticationPoliciesAsync(numPartitions).thenCompose(unused -> this.internalRemovePartitionsTopicAsync(numPartitions, force));
        }))).thenCompose(__ -> this.namespaceResources().getPartitionedTopicResources().deletePartitionedTopicAsync(this.topicName))).thenAccept(__ -> {
            log.info("[{}] Deleted partitioned topic {}", (Object)this.clientAppId(), (Object)this.topicName);
            asyncResponse.resume(Response.noContent().build());
        })).exceptionally(ex -> {
            Throwable realCause = FutureUtil.unwrapCompletionException(ex);
            if (realCause instanceof PulsarAdminException.PreconditionFailedException) {
                asyncResponse.resume(new RestException(Response.Status.PRECONDITION_FAILED, "Topic has active producers/subscriptions"));
            } else if (realCause instanceof WebApplicationException) {
                asyncResponse.resume(realCause);
            } else if (realCause instanceof MetadataStoreException.NotFoundException) {
                log.warn("Namespace policies of {} not found", (Object)this.topicName.getNamespaceObject());
                asyncResponse.resume(new RestException(new RestException(Response.Status.NOT_FOUND, "Partitioned topic does not exist")));
            } else if (realCause instanceof PulsarAdminException) {
                asyncResponse.resume(new RestException((PulsarAdminException)realCause));
            } else if (realCause instanceof MetadataStoreException.BadVersionException) {
                asyncResponse.resume(new RestException(new RestException(Response.Status.CONFLICT, "Concurrent modification")));
            } else {
                if (!PersistentTopicsBase.isRedirectException(ex)) {
                    log.error("[{}] Fail to Delete partitioned topic {}", new Object[]{this.clientAppId(), this.topicName, realCause});
                }
                asyncResponse.resume(new RestException(realCause));
            }
            return null;
        });
    }

    private CompletableFuture<Void> internalRemovePartitionsTopicAsync(int numPartitions, boolean force) {
        return FutureUtil.waitForAll(IntStream.range(0, numPartitions).mapToObj(i -> {
            TopicName topicNamePartition = this.topicName.getPartition(i);
            try {
                CompletableFuture future = new CompletableFuture();
                this.pulsar().getAdminClient().topics().deleteAsync(topicNamePartition.toString(), force).whenComplete((r, ex) -> {
                    if (ex != null) {
                        Throwable realCause = FutureUtil.unwrapCompletionException(ex);
                        if (realCause instanceof PulsarAdminException.NotFoundException) {
                            if (log.isDebugEnabled()) {
                                log.debug("[{}] Partition not found: {}", (Object)this.clientAppId(), (Object)topicNamePartition);
                            }
                            future.complete(null);
                        } else {
                            log.error("[{}] Failed to delete partition {}", new Object[]{this.clientAppId(), topicNamePartition, realCause});
                            future.completeExceptionally(realCause);
                        }
                    } else {
                        future.complete(null);
                    }
                });
                return future;
            }
            catch (PulsarServerException ex2) {
                log.error("[{}] Failed to get admin client while delete partition {}", new Object[]{this.clientAppId(), topicNamePartition, ex2});
                return FutureUtil.failedFuture(ex2);
            }
        }).collect(Collectors.toList()));
    }

    private CompletableFuture<Void> internalRemovePartitionsAuthenticationPoliciesAsync(int numPartitions) {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        this.pulsar().getPulsarResources().getNamespaceResources().setPoliciesAsync(this.topicName.getNamespaceObject(), p -> {
            IntStream.range(0, numPartitions).forEach(i -> p.auth_policies.getTopicAuthentication().remove(this.topicName.getPartition(i).toString()));
            p.auth_policies.getTopicAuthentication().remove(this.topicName.toString());
            return p;
        }).whenComplete((r, ex) -> {
            if (ex != null) {
                Throwable realCause = FutureUtil.unwrapCompletionException(ex);
                if (realCause instanceof MetadataStoreException.NotFoundException) {
                    log.warn("Namespace policies of {} not found", (Object)this.topicName.getNamespaceObject());
                    future.complete(null);
                } else {
                    log.error("Failed to delete authentication policies for partitioned topic {}", (Object)this.topicName, ex);
                    future.completeExceptionally(realCause);
                }
            } else {
                log.info("Successfully delete authentication policies for partitioned topic {}", (Object)this.topicName);
                future.complete(null);
            }
        });
        return future;
    }

    protected void internalUnloadTopic(AsyncResponse asyncResponse, boolean authoritative) {
        log.info("[{}] Unloading topic {}", (Object)this.clientAppId(), (Object)this.topicName);
        CompletableFuture<Object> future = this.topicName.isGlobal() ? this.validateGlobalNamespaceOwnershipAsync(this.namespaceName) : CompletableFuture.completedFuture(null);
        ((CompletableFuture)future.thenAccept(__ -> {
            if (this.topicName.isPartitioned()) {
                if (EventsTopicNames.checkTopicIsTransactionCoordinatorAssign(this.topicName)) {
                    this.internalUnloadTransactionCoordinatorAsync(asyncResponse, authoritative);
                } else {
                    this.internalUnloadNonPartitionedTopicAsync(asyncResponse, authoritative);
                }
            } else {
                ((CompletableFuture)this.getPartitionedTopicMetadataAsync(this.topicName, authoritative, false).thenAccept(meta -> {
                    if (meta.partitions > 0) {
                        ArrayList<CompletableFuture<Void>> futures = Lists.newArrayListWithCapacity(meta.partitions);
                        for (int i = 0; i < meta.partitions; ++i) {
                            TopicName topicNamePartition = this.topicName.getPartition(i);
                            try {
                                futures.add(this.pulsar().getAdminClient().topics().unloadAsync(topicNamePartition.toString()));
                                continue;
                            }
                            catch (Exception e) {
                                log.error("[{}] Failed to unload topic {}", new Object[]{this.clientAppId(), topicNamePartition, e});
                                asyncResponse.resume(new RestException(e));
                                return;
                            }
                        }
                        FutureUtil.waitForAll(futures).handle((result, exception) -> {
                            if (exception != null) {
                                Throwable th = exception.getCause();
                                if (th instanceof PulsarAdminException.NotFoundException) {
                                    asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, th.getMessage()));
                                } else if (th instanceof WebApplicationException) {
                                    asyncResponse.resume(th);
                                } else {
                                    log.error("[{}] Failed to unload topic {}", new Object[]{this.clientAppId(), this.topicName, exception});
                                    asyncResponse.resume(new RestException((Throwable)exception));
                                }
                            } else {
                                asyncResponse.resume(Response.noContent().build());
                            }
                            return null;
                        });
                    } else {
                        this.internalUnloadNonPartitionedTopicAsync(asyncResponse, authoritative);
                    }
                })).exceptionally(ex -> {
                    if (!PersistentTopicsBase.isRedirectException(ex)) {
                        log.error("[{}] Failed to get partitioned metadata while unloading topic {}", new Object[]{this.clientAppId(), this.topicName, ex});
                    }
                    PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, ex);
                    return null;
                });
            }
        })).exceptionally(ex -> {
            if (!PersistentTopicsBase.isRedirectException(ex)) {
                log.error("[{}] Failed to validate the global namespace ownership while unloading topic {}", new Object[]{this.clientAppId(), this.topicName, ex});
            }
            PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, ex);
            return null;
        });
    }

    protected CompletableFuture<DelayedDeliveryPolicies> internalGetDelayedDeliveryPolicies(boolean applied, boolean isGlobal) {
        return this.getTopicPoliciesAsyncWithRetry(this.topicName, isGlobal).thenApply(op -> {
            TopicPolicies policies = op.orElseGet(TopicPolicies::new);
            DelayedDeliveryPolicies delayedDeliveryPolicies = null;
            if (policies.isDelayedDeliveryEnabledSet() && policies.isDelayedDeliveryTickTimeMillisSet()) {
                delayedDeliveryPolicies = DelayedDeliveryPolicies.builder().tickTime(policies.getDelayedDeliveryTickTimeMillis()).active(policies.getDelayedDeliveryEnabled()).build();
            }
            if (delayedDeliveryPolicies == null && applied && (delayedDeliveryPolicies = this.getNamespacePolicies((NamespaceName)this.namespaceName).delayed_delivery_policies) == null) {
                delayedDeliveryPolicies = DelayedDeliveryPolicies.builder().tickTime(this.pulsar().getConfiguration().getDelayedDeliveryTickTimeMillis()).active(this.pulsar().getConfiguration().isDelayedDeliveryEnabled()).build();
            }
            return delayedDeliveryPolicies;
        });
    }

    protected CompletableFuture<OffloadPoliciesImpl> internalGetOffloadPolicies(boolean applied, boolean isGlobal) {
        return this.getTopicPoliciesAsyncWithRetry(this.topicName, isGlobal).thenApply(op -> {
            OffloadPoliciesImpl offloadPolicies = op.map(TopicPolicies::getOffloadPolicies).orElse(null);
            if (applied) {
                OffloadPoliciesImpl namespacePolicy = (OffloadPoliciesImpl)this.getNamespacePolicies((NamespaceName)this.namespaceName).offload_policies;
                offloadPolicies = OffloadPoliciesImpl.mergeConfiguration(offloadPolicies, namespacePolicy, this.pulsar().getConfiguration().getProperties());
            }
            return offloadPolicies;
        });
    }

    protected CompletableFuture<Void> internalSetOffloadPolicies(OffloadPoliciesImpl offloadPolicies, boolean isGlobal) {
        return ((CompletableFuture)this.getTopicPoliciesAsyncWithRetry(this.topicName, isGlobal).thenCompose(op -> {
            TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
            topicPolicies.setOffloadPolicies(offloadPolicies);
            topicPolicies.setIsGlobal(isGlobal);
            return this.pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(this.topicName, topicPolicies);
        })).thenCompose(__ -> {
            PartitionedTopicMetadata metadata = PersistentTopicsBase.fetchPartitionedTopicMetadata(this.pulsar(), this.topicName);
            if (metadata.partitions > 0) {
                ArrayList<CompletableFuture<Void>> futures = new ArrayList<CompletableFuture<Void>>(metadata.partitions);
                for (int i = 0; i < metadata.partitions; ++i) {
                    futures.add(this.internalUpdateOffloadPolicies(offloadPolicies, this.topicName.getPartition(i)));
                }
                return FutureUtil.waitForAll(futures);
            }
            return this.internalUpdateOffloadPolicies(offloadPolicies, this.topicName);
        });
    }

    protected CompletableFuture<InactiveTopicPolicies> internalGetInactiveTopicPolicies(boolean applied, boolean isGlobal) {
        return this.getTopicPoliciesAsyncWithRetry(this.topicName, isGlobal).thenApply(op -> op.map(TopicPolicies::getInactiveTopicPolicies).orElseGet(() -> {
            if (applied) {
                InactiveTopicPolicies policies = this.getNamespacePolicies((NamespaceName)this.namespaceName).inactive_topic_policies;
                return policies == null ? new InactiveTopicPolicies(this.config().getBrokerDeleteInactiveTopicsMode(), this.config().getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(), this.config().isBrokerDeleteInactiveTopicsEnabled()) : policies;
            }
            return null;
        }));
    }

    protected CompletableFuture<Void> internalSetInactiveTopicPolicies(InactiveTopicPolicies inactiveTopicPolicies, boolean isGlobal) {
        return this.getTopicPoliciesAsyncWithRetry(this.topicName, isGlobal).thenCompose(op -> {
            TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
            topicPolicies.setIsGlobal(isGlobal);
            topicPolicies.setInactiveTopicPolicies(inactiveTopicPolicies);
            return this.pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(this.topicName, topicPolicies);
        });
    }

    private CompletableFuture<Void> internalUpdateOffloadPolicies(OffloadPoliciesImpl offloadPolicies, TopicName topicName) {
        return this.pulsar().getBrokerService().getTopicIfExists(topicName.toString()).thenAccept(optionalTopic -> {
            try {
                if (!optionalTopic.isPresent() || !topicName.isPersistent()) {
                    return;
                }
                PersistentTopic persistentTopic = (PersistentTopic)optionalTopic.get();
                ManagedLedgerConfig managedLedgerConfig = persistentTopic.getManagedLedger().getConfig();
                if (offloadPolicies == null) {
                    LedgerOffloader namespaceOffloader = this.pulsar().getLedgerOffloaderMap().get(topicName.getNamespaceObject());
                    LedgerOffloader topicOffloader = managedLedgerConfig.getLedgerOffloader();
                    if (topicOffloader != null && topicOffloader != namespaceOffloader) {
                        topicOffloader.close();
                    }
                    managedLedgerConfig.setLedgerOffloader(namespaceOffloader);
                } else {
                    managedLedgerConfig.setLedgerOffloader(this.pulsar().createManagedLedgerOffloader(offloadPolicies));
                }
                persistentTopic.getManagedLedger().setConfig(managedLedgerConfig);
            }
            catch (PulsarServerException e) {
                throw new RestException(e);
            }
        });
    }

    protected CompletableFuture<Integer> internalGetMaxUnackedMessagesOnSubscription(boolean applied, boolean isGlobal) {
        return this.getTopicPoliciesAsyncWithRetry(this.topicName, isGlobal).thenApply(op -> op.map(TopicPolicies::getMaxUnackedMessagesOnSubscription).orElseGet(() -> {
            if (applied) {
                Integer maxUnackedNum = this.getNamespacePolicies((NamespaceName)this.namespaceName).max_unacked_messages_per_subscription;
                return maxUnackedNum == null ? this.config().getMaxUnackedMessagesPerSubscription() : maxUnackedNum.intValue();
            }
            return null;
        }));
    }

    protected CompletableFuture<Void> internalSetMaxUnackedMessagesOnSubscription(Integer maxUnackedNum, boolean isGlobal) {
        if (maxUnackedNum != null && maxUnackedNum < 0) {
            throw new RestException(Response.Status.PRECONDITION_FAILED, "maxUnackedNum must be 0 or more");
        }
        return this.getTopicPoliciesAsyncWithRetry(this.topicName, isGlobal).thenCompose(op -> {
            TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
            topicPolicies.setMaxUnackedMessagesOnSubscription(maxUnackedNum);
            topicPolicies.setIsGlobal(isGlobal);
            return this.pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(this.topicName, topicPolicies);
        });
    }

    protected CompletableFuture<Integer> internalGetMaxUnackedMessagesOnConsumer(boolean applied, boolean isGlobal) {
        return this.getTopicPoliciesAsyncWithRetry(this.topicName, isGlobal).thenApply(op -> op.map(TopicPolicies::getMaxUnackedMessagesOnConsumer).orElseGet(() -> {
            if (applied) {
                Integer maxUnacked = this.getNamespacePolicies((NamespaceName)this.namespaceName).max_unacked_messages_per_consumer;
                return maxUnacked == null ? this.config().getMaxUnackedMessagesPerConsumer() : maxUnacked.intValue();
            }
            return null;
        }));
    }

    protected CompletableFuture<Void> internalSetMaxUnackedMessagesOnConsumer(Integer maxUnackedNum, boolean isGlobal) {
        if (maxUnackedNum != null && maxUnackedNum < 0) {
            throw new RestException(Response.Status.PRECONDITION_FAILED, "maxUnackedNum must be 0 or more");
        }
        return this.getTopicPoliciesAsyncWithRetry(this.topicName).thenCompose(op -> {
            TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
            topicPolicies.setMaxUnackedMessagesOnConsumer(maxUnackedNum);
            topicPolicies.setIsGlobal(isGlobal);
            return this.pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(this.topicName, topicPolicies);
        });
    }

    protected CompletableFuture<Void> internalSetDeduplicationSnapshotInterval(Integer interval, boolean isGlobal) {
        if (interval != null && interval < 0) {
            throw new RestException(Response.Status.PRECONDITION_FAILED, "interval must be 0 or more");
        }
        return this.getTopicPoliciesAsyncWithRetry(this.topicName, isGlobal).thenCompose(op -> {
            TopicPolicies policies = op.orElseGet(TopicPolicies::new);
            policies.setDeduplicationSnapshotIntervalSeconds(interval);
            policies.setIsGlobal(isGlobal);
            return this.pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(this.topicName, policies);
        });
    }

    private void internalUnloadNonPartitionedTopicAsync(AsyncResponse asyncResponse, boolean authoritative) {
        ((CompletableFuture)this.validateTopicOperationAsync(this.topicName, TopicOperation.UNLOAD).thenCompose(unused -> ((CompletableFuture)((CompletableFuture)this.validateTopicOwnershipAsync(this.topicName, authoritative).thenCompose(__ -> this.getTopicReferenceAsync(this.topicName))).thenCompose(topic -> topic.close(false))).thenRun(() -> {
            log.info("[{}] Successfully unloaded topic {}", (Object)this.clientAppId(), (Object)this.topicName);
            asyncResponse.resume(Response.noContent().build());
        }))).exceptionally(ex -> {
            if (!PersistentTopicsBase.isRedirectException(ex)) {
                log.error("[{}] Failed to unload topic {}, {}", new Object[]{this.clientAppId(), this.topicName, ex});
            }
            PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, ex);
            return null;
        });
    }

    private void internalUnloadTransactionCoordinatorAsync(AsyncResponse asyncResponse, boolean authoritative) {
        ((CompletableFuture)this.validateTopicOperationAsync(this.topicName, TopicOperation.UNLOAD).thenCompose(__ -> ((CompletableFuture)this.validateTopicOwnershipAsync(this.topicName, authoritative).thenCompose(v -> this.pulsar().getTransactionMetadataStoreService().removeTransactionMetadataStore(TransactionCoordinatorID.get(this.topicName.getPartitionIndex())))).thenRun(() -> {
            log.info("[{}] Successfully unloaded tc {}", (Object)this.clientAppId(), (Object)this.topicName.getPartitionIndex());
            asyncResponse.resume(Response.noContent().build());
        }))).exceptionally(ex -> {
            if (!PersistentTopicsBase.isRedirectException(ex)) {
                log.error("[{}] Failed to unload tc {},{}", new Object[]{this.clientAppId(), this.topicName.getPartitionIndex(), ex});
            }
            PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, ex);
            return null;
        });
    }

    protected void internalDeleteTopic(boolean authoritative, boolean force, boolean deleteSchema) {
        if (force) {
            this.internalDeleteTopicForcefully(authoritative, deleteSchema);
        } else {
            this.internalDeleteTopic(authoritative, deleteSchema);
        }
    }

    protected void internalDeleteTopic(boolean authoritative, boolean deleteSchema) {
        this.validateNamespaceOperation(this.topicName.getNamespaceObject(), NamespaceOperation.DELETE_TOPIC);
        this.validateTopicOwnership(this.topicName, authoritative);
        try {
            this.pulsar().getBrokerService().deleteTopic(this.topicName.toString(), false, deleteSchema).get();
            log.info("[{}] Successfully removed topic {}", (Object)this.clientAppId(), (Object)this.topicName);
        }
        catch (Exception e) {
            Throwable t = e.getCause();
            log.error("[{}] Failed to delete topic {}", new Object[]{this.clientAppId(), this.topicName, t});
            if (t instanceof BrokerServiceException.TopicBusyException) {
                throw new RestException(Response.Status.PRECONDITION_FAILED, "Topic has active producers/subscriptions");
            }
            if (this.isManagedLedgerNotFoundException(e)) {
                throw new RestException(Response.Status.NOT_FOUND, "Topic not found");
            }
            throw new RestException(t);
        }
    }

    protected void internalGetSubscriptions(AsyncResponse asyncResponse, boolean authoritative) {
        CompletableFuture<Object> future = this.topicName.isGlobal() ? this.validateGlobalNamespaceOwnershipAsync(this.namespaceName) : CompletableFuture.completedFuture(null);
        ((CompletableFuture)((CompletableFuture)future.thenCompose(__ -> this.validateTopicOwnershipAsync(this.topicName, authoritative))).thenAccept(unused -> {
            if (this.topicName.isPartitioned()) {
                this.internalGetSubscriptionsForNonPartitionedTopic(asyncResponse, authoritative);
            } else {
                ((CompletableFuture)this.getPartitionedTopicMetadataAsync(this.topicName, authoritative, false).thenAccept(partitionMetadata -> {
                    block7: {
                        if (partitionMetadata.partitions > 0) {
                            try {
                                Set<String> subscriptions = Collections.newSetFromMap(new ConcurrentHashMap(partitionMetadata.partitions));
                                ArrayList<CompletableFuture<Object>> subscriptionFutures = Lists.newArrayList();
                                if (this.topicName.getDomain() == TopicDomain.persistent) {
                                    ConcurrentHashMap<Integer, CompletableFuture<Boolean>> existsFutures = new ConcurrentHashMap<Integer, CompletableFuture<Boolean>>(partitionMetadata.partitions);
                                    for (int i = 0; i < partitionMetadata.partitions; ++i) {
                                        existsFutures.put(i, this.topicResources().persistentTopicExists(this.topicName.getPartition(i)));
                                    }
                                    ((CompletableFuture)((CompletableFuture)FutureUtil.waitForAll(Lists.newArrayList(existsFutures.values())).thenApply(__ -> existsFutures.entrySet().stream().filter(e -> (Boolean)((CompletableFuture)e.getValue()).join()).map(item -> this.topicName.getPartition((Integer)item.getKey()).toString()).collect(Collectors.toList()))).thenAccept(topics -> {
                                        if (log.isDebugEnabled()) {
                                            log.debug("activeTopics : {}", topics);
                                        }
                                        topics.forEach(topic -> {
                                            try {
                                                CompletableFuture<List<String>> subscriptionsAsync = this.pulsar().getAdminClient().topics().getSubscriptionsAsync((String)topic);
                                                subscriptionFutures.add((CompletableFuture<Object>)subscriptionsAsync.thenApply(subscriptions::addAll));
                                            }
                                            catch (PulsarServerException e) {
                                                throw new RestException(e);
                                            }
                                        });
                                    })).thenAccept(__ -> this.resumeAsyncResponse(asyncResponse, subscriptions, subscriptionFutures));
                                    break block7;
                                }
                                for (int i = 0; i < partitionMetadata.partitions; ++i) {
                                    CompletableFuture<List<String>> subscriptionsAsync = this.pulsar().getAdminClient().topics().getSubscriptionsAsync(this.topicName.getPartition(i).toString());
                                    subscriptionFutures.add((CompletableFuture<Object>)subscriptionsAsync.thenApply(subscriptions::addAll));
                                }
                                this.resumeAsyncResponse(asyncResponse, subscriptions, subscriptionFutures);
                            }
                            catch (Exception e) {
                                log.error("[{}] Failed to get list of subscriptions for {}", new Object[]{this.clientAppId(), this.topicName, e});
                                asyncResponse.resume(e);
                            }
                        } else {
                            this.internalGetSubscriptionsForNonPartitionedTopic(asyncResponse, authoritative);
                        }
                    }
                })).exceptionally(ex -> {
                    if (!PersistentTopicsBase.isRedirectException(ex)) {
                        log.error("[{}] Failed to get partitioned topic metadata while get subscriptions for topic {}", new Object[]{this.clientAppId(), this.topicName, ex});
                    }
                    PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, ex);
                    return null;
                });
            }
        })).exceptionally(ex -> {
            if (!PersistentTopicsBase.isRedirectException(ex)) {
                log.error("[{}] Failed to validate the global namespace/topic ownership while get subscriptions for topic {}", new Object[]{this.clientAppId(), this.topicName, ex});
            }
            PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, ex);
            return null;
        });
    }

    private void resumeAsyncResponse(AsyncResponse asyncResponse, Set<String> subscriptions, List<CompletableFuture<Object>> subscriptionFutures) {
        FutureUtil.waitForAll(subscriptionFutures).whenComplete((r, ex) -> {
            if (ex != null) {
                log.warn("[{}] Failed to get list of subscriptions for {}: {}", new Object[]{this.clientAppId(), this.topicName, ex.getMessage()});
                if (ex instanceof PulsarAdminException) {
                    PulsarAdminException pae = (PulsarAdminException)ex;
                    if (pae.getStatusCode() == Response.Status.NOT_FOUND.getStatusCode()) {
                        asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Internal topics have not been generated yet"));
                        return;
                    }
                    asyncResponse.resume(new RestException(pae));
                    return;
                }
                asyncResponse.resume(new RestException((Throwable)ex));
                return;
            }
            asyncResponse.resume(new ArrayList(subscriptions));
        });
    }

    private void internalGetSubscriptionsForNonPartitionedTopic(AsyncResponse asyncResponse, boolean authoritative) {
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicOwnershipAsync(this.topicName, authoritative).thenCompose(__ -> this.validateTopicOperationAsync(this.topicName, TopicOperation.GET_SUBSCRIPTIONS))).thenCompose(__ -> this.getTopicReferenceAsync(this.topicName))).thenAccept(topic -> asyncResponse.resume(Lists.newArrayList(topic.getSubscriptions().keys())))).exceptionally(ex -> {
            if (!PersistentTopicsBase.isRedirectException(ex)) {
                log.error("[{}] Failed to get list of subscriptions for {}", new Object[]{this.clientAppId(), this.topicName, ex});
            }
            PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, ex);
            return null;
        });
    }

    protected TopicStats internalGetStats(boolean authoritative, boolean getPreciseBacklog, boolean subscriptionBacklogSize, boolean getEarliestTimeInBacklog) {
        if (this.topicName.isGlobal()) {
            this.validateGlobalNamespaceOwnership(this.namespaceName);
        }
        this.validateTopicOwnership(this.topicName, authoritative);
        this.validateTopicOperation(this.topicName, TopicOperation.GET_STATS);
        Topic topic = this.getTopicReference(this.topicName);
        try {
            return topic.asyncGetStats(getPreciseBacklog, subscriptionBacklogSize, getEarliestTimeInBacklog).get();
        }
        catch (InterruptedException | ExecutionException e) {
            log.error("[{}] Failed to get stats for {}", new Object[]{this.clientAppId(), this.topicName, e});
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e instanceof ExecutionException ? e.getCause().getMessage() : e.getMessage());
        }
    }

    protected PersistentTopicInternalStats internalGetInternalStats(boolean authoritative, boolean metadata) {
        if (this.topicName.isGlobal()) {
            this.validateGlobalNamespaceOwnership(this.namespaceName);
        }
        this.validateTopicOwnership(this.topicName, authoritative);
        this.validateTopicOperation(this.topicName, TopicOperation.GET_STATS);
        Topic topic = this.getTopicReference(this.topicName);
        try {
            if (metadata) {
                this.validateTopicOperation(this.topicName, TopicOperation.GET_METADATA);
            }
            return topic.getInternalStats(metadata).get();
        }
        catch (Exception e) {
            log.error("[{}] Failed to get internal stats for {}", new Object[]{this.clientAppId(), this.topicName, e});
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e instanceof ExecutionException ? e.getCause().getMessage() : e.getMessage());
        }
    }

    protected void internalGetManagedLedgerInfo(AsyncResponse asyncResponse, boolean authoritative) {
        CompletableFuture<Object> future = this.topicName.isGlobal() ? this.validateGlobalNamespaceOwnershipAsync(this.namespaceName) : CompletableFuture.completedFuture(null);
        ((CompletableFuture)future.thenAccept(__ -> {
            if (this.topicName.isPartitioned()) {
                this.internalGetManagedLedgerInfoForNonPartitionedTopic(asyncResponse);
            } else {
                ((CompletableFuture)this.getPartitionedTopicMetadataAsync(this.topicName, authoritative, false).thenAccept(partitionMetadata -> {
                    if (partitionMetadata.partitions > 0) {
                        ArrayList<CompletionStage> futures = Lists.newArrayListWithCapacity(partitionMetadata.partitions);
                        PartitionedManagedLedgerInfo partitionedManagedLedgerInfo = new PartitionedManagedLedgerInfo();
                        for (int i = 0; i < partitionMetadata.partitions; ++i) {
                            TopicName topicNamePartition = this.topicName.getPartition(i);
                            try {
                                futures.add(this.pulsar().getAdminClient().topics().getInternalInfoAsync(topicNamePartition.toString()).whenComplete((response, throwable) -> {
                                    if (throwable != null) {
                                        log.error("[{}] Failed to get managed info for {}", new Object[]{this.clientAppId(), topicNamePartition, throwable});
                                        asyncResponse.resume(new RestException((Throwable)throwable));
                                    }
                                    try {
                                        partitionedManagedLedgerInfo.partitions.put(topicNamePartition.toString(), PersistentTopicsBase.jsonMapper().readValue((String)response, ManagedLedgerInfo.class));
                                    }
                                    catch (JsonProcessingException ex) {
                                        log.error("[{}] Failed to parse ManagedLedgerInfo for {} from [{}]", new Object[]{this.clientAppId(), topicNamePartition, response, ex});
                                    }
                                }));
                                continue;
                            }
                            catch (PulsarServerException e) {
                                log.error("[{}] Failed to get admin client while get managed info for {}", new Object[]{this.clientAppId(), topicNamePartition, e});
                                throw new RestException(e);
                            }
                        }
                        FutureUtil.waitForAll(futures).handle((result, exception) -> {
                            if (exception != null) {
                                Throwable t = exception.getCause();
                                if (t instanceof PulsarAdminException.NotFoundException) {
                                    asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Topic not found"));
                                } else {
                                    log.error("[{}] Failed to get managed info for {}", new Object[]{this.clientAppId(), this.topicName, t});
                                    asyncResponse.resume(new RestException(t));
                                }
                            }
                            asyncResponse.resume(output -> PersistentTopicsBase.jsonMapper().writer().writeValue(output, (Object)partitionedManagedLedgerInfo));
                            return null;
                        });
                    } else {
                        this.internalGetManagedLedgerInfoForNonPartitionedTopic(asyncResponse);
                    }
                })).exceptionally(ex -> {
                    if (!PersistentTopicsBase.isRedirectException(ex)) {
                        log.error("[{}] Failed to get partitioned metadata while get managed info for {}", new Object[]{this.clientAppId(), this.topicName, ex});
                    }
                    PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, ex);
                    return null;
                });
            }
        })).exceptionally(ex -> {
            if (!PersistentTopicsBase.isRedirectException(ex)) {
                log.error("[{}] Failed to validate the global namespace ownership while get managed info for {}", new Object[]{this.clientAppId(), this.topicName, ex});
            }
            PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, ex);
            return null;
        });
    }

    protected void internalGetManagedLedgerInfoForNonPartitionedTopic(final AsyncResponse asyncResponse) {
        ((CompletableFuture)this.validateTopicOperationAsync(this.topicName, TopicOperation.GET_STATS).thenAccept(__ -> {
            String managedLedger = this.topicName.getPersistenceNamingEncoding();
            this.pulsar().getManagedLedgerFactory().asyncGetManagedLedgerInfo(managedLedger, new AsyncCallbacks.ManagedLedgerInfoCallback(){

                @Override
                public void getInfoComplete(ManagedLedgerInfo info, Object ctx) {
                    asyncResponse.resume(output -> AdminResource.jsonMapper().writer().writeValue(output, (Object)info));
                }

                @Override
                public void getInfoFailed(ManagedLedgerException exception, Object ctx) {
                    asyncResponse.resume(exception);
                }
            }, null);
        })).exceptionally(ex -> {
            log.error("[{}] Failed to get managed info for {}", new Object[]{this.clientAppId(), this.topicName, ex});
            PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, ex);
            return null;
        });
    }

    protected void internalGetPartitionedStats(AsyncResponse asyncResponse, boolean authoritative, boolean perPartition, boolean getPreciseBacklog, boolean subscriptionBacklogSize, boolean getEarliestTimeInBacklog) {
        CompletableFuture<Object> future = this.topicName.isGlobal() ? this.validateGlobalNamespaceOwnershipAsync(this.namespaceName) : CompletableFuture.completedFuture(null);
        ((CompletableFuture)((CompletableFuture)future.thenCompose(__ -> this.getPartitionedTopicMetadataAsync(this.topicName, authoritative, false))).thenAccept(partitionMetadata -> {
            if (partitionMetadata.partitions == 0) {
                asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Partitioned Topic not found"));
                return;
            }
            PartitionedTopicStatsImpl stats = new PartitionedTopicStatsImpl((PartitionedTopicMetadata)partitionMetadata);
            ArrayList<CompletableFuture<TopicStats>> topicStatsFutureList = Lists.newArrayList();
            for (int i = 0; i < partitionMetadata.partitions; ++i) {
                try {
                    topicStatsFutureList.add(this.pulsar().getAdminClient().topics().getStatsAsync(this.topicName.getPartition(i).toString(), getPreciseBacklog, subscriptionBacklogSize, getEarliestTimeInBacklog));
                    continue;
                }
                catch (PulsarServerException e) {
                    asyncResponse.resume(new RestException(e));
                    return;
                }
            }
            FutureUtil.waitForAll(topicStatsFutureList).handle((result, exception) -> {
                CompletableFuture statFuture = null;
                for (int i = 0; i < topicStatsFutureList.size(); ++i) {
                    statFuture = (CompletableFuture)topicStatsFutureList.get(i);
                    if (!statFuture.isDone() || statFuture.isCompletedExceptionally()) continue;
                    try {
                        stats.add((TopicStats)statFuture.get());
                        if (!perPartition) continue;
                        stats.getPartitions().put(this.topicName.getPartition(i).toString(), (TopicStatsImpl)statFuture.get());
                        continue;
                    }
                    catch (Exception e) {
                        asyncResponse.resume(new RestException(e));
                        return null;
                    }
                }
                if (perPartition && stats.partitions.isEmpty()) {
                    try {
                        boolean pathExists = this.namespaceResources().getPartitionedTopicResources().partitionedTopicExists(this.topicName);
                        if (!pathExists) {
                            asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Internal topics have not been generated yet"));
                            return null;
                        }
                        stats.partitions.put(this.topicName.toString(), new TopicStatsImpl());
                    }
                    catch (Exception e) {
                        asyncResponse.resume(new RestException(e));
                        return null;
                    }
                }
                asyncResponse.resume(stats);
                return null;
            });
        })).exceptionally(ex -> {
            if (!PersistentTopicsBase.isRedirectException(ex)) {
                log.error("[{}] Failed to get partitioned internal stats for {}", new Object[]{this.clientAppId(), this.topicName, ex});
            }
            PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, ex);
            return null;
        });
    }

    protected void internalGetPartitionedStatsInternal(AsyncResponse asyncResponse, boolean authoritative) {
        CompletableFuture<Object> future = this.topicName.isGlobal() ? this.validateGlobalNamespaceOwnershipAsync(this.namespaceName) : CompletableFuture.completedFuture(null);
        ((CompletableFuture)((CompletableFuture)future.thenCompose(__ -> this.getPartitionedTopicMetadataAsync(this.topicName, authoritative, false))).thenAccept(partitionMetadata -> {
            if (partitionMetadata.partitions == 0) {
                asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Partitioned Topic not found"));
                return;
            }
            PartitionedTopicInternalStats stats = new PartitionedTopicInternalStats((PartitionedTopicMetadata)partitionMetadata);
            ArrayList<CompletableFuture<PersistentTopicInternalStats>> topicStatsFutureList = Lists.newArrayList();
            for (int i = 0; i < partitionMetadata.partitions; ++i) {
                try {
                    topicStatsFutureList.add(this.pulsar().getAdminClient().topics().getInternalStatsAsync(this.topicName.getPartition(i).toString(), false));
                    continue;
                }
                catch (PulsarServerException e) {
                    asyncResponse.resume(new RestException(e));
                    return;
                }
            }
            FutureUtil.waitForAll(topicStatsFutureList).handle((result, exception) -> {
                CompletableFuture statFuture = null;
                for (int i = 0; i < topicStatsFutureList.size(); ++i) {
                    statFuture = (CompletableFuture)topicStatsFutureList.get(i);
                    if (!statFuture.isDone() || statFuture.isCompletedExceptionally()) continue;
                    try {
                        stats.partitions.put(this.topicName.getPartition(i).toString(), (PersistentTopicInternalStats)statFuture.get());
                        continue;
                    }
                    catch (Exception e) {
                        asyncResponse.resume(new RestException(e));
                        return null;
                    }
                }
                asyncResponse.resume(!stats.partitions.isEmpty() ? stats : new RestException(Response.Status.NOT_FOUND, "Internal topics have not been generated yet"));
                return null;
            });
        })).exceptionally(ex -> {
            if (!PersistentTopicsBase.isRedirectException(ex)) {
                log.error("[{}] Failed to get partitioned internal stats for {}", new Object[]{this.clientAppId(), this.topicName, ex});
            }
            PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, ex);
            return null;
        });
    }

    protected void internalDeleteSubscription(AsyncResponse asyncResponse, String subName, boolean authoritative, boolean force) {
        if (force) {
            this.internalDeleteSubscriptionForcefully(asyncResponse, subName, authoritative);
        } else {
            this.internalDeleteSubscription(asyncResponse, subName, authoritative);
        }
    }

    protected void internalDeleteSubscription(AsyncResponse asyncResponse, String subName, boolean authoritative) {
        CompletableFuture<Object> future = this.topicName.isGlobal() ? this.validateGlobalNamespaceOwnershipAsync(this.namespaceName) : CompletableFuture.completedFuture(null);
        ((CompletableFuture)((CompletableFuture)future.thenCompose(__ -> this.validateTopicOwnershipAsync(this.topicName, authoritative))).thenAccept(__ -> {
            if (this.topicName.isPartitioned()) {
                this.internalDeleteSubscriptionForNonPartitionedTopic(asyncResponse, subName, authoritative);
            } else {
                ((CompletableFuture)this.getPartitionedTopicMetadataAsync(this.topicName, authoritative, false).thenAcceptAsync(partitionMetadata -> {
                    if (partitionMetadata.partitions > 0) {
                        ArrayList<CompletableFuture<Void>> futures = Lists.newArrayList();
                        for (int i = 0; i < partitionMetadata.partitions; ++i) {
                            TopicName topicNamePartition = this.topicName.getPartition(i);
                            try {
                                futures.add(this.pulsar().getAdminClient().topics().deleteSubscriptionAsync(topicNamePartition.toString(), subName, false));
                                continue;
                            }
                            catch (Exception e) {
                                log.error("[{}] Failed to delete subscription {} {}", new Object[]{this.clientAppId(), topicNamePartition, subName, e});
                                asyncResponse.resume(new RestException(e));
                                return;
                            }
                        }
                        FutureUtil.waitForAll(futures).handle((result, exception) -> {
                            if (exception != null) {
                                Throwable t = exception.getCause();
                                if (t instanceof PulsarAdminException.NotFoundException) {
                                    asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Subscription not found"));
                                    return null;
                                }
                                if (t instanceof PulsarAdminException.PreconditionFailedException) {
                                    asyncResponse.resume(new RestException(Response.Status.PRECONDITION_FAILED, "Subscription has active connected consumers"));
                                    return null;
                                }
                                log.error("[{}] Failed to delete subscription {} {}", new Object[]{this.clientAppId(), this.topicName, subName, t});
                                asyncResponse.resume(new RestException(t));
                                return null;
                            }
                            asyncResponse.resume(Response.noContent().build());
                            return null;
                        });
                    } else {
                        this.internalDeleteSubscriptionForNonPartitionedTopic(asyncResponse, subName, authoritative);
                    }
                }, (Executor)this.pulsar().getExecutor())).exceptionally(ex -> {
                    log.error("[{}] Failed to delete subscription {} from topic {}", new Object[]{this.clientAppId(), subName, this.topicName, ex});
                    PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, ex);
                    return null;
                });
            }
        })).exceptionally(ex -> {
            if (!PersistentTopicsBase.isRedirectException(ex)) {
                log.error("[{}] Failed to delete subscription {} from topic {}", new Object[]{this.clientAppId(), subName, this.topicName, ex});
            }
            PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, ex);
            return null;
        });
    }

    private void internalDeleteSubscriptionForNonPartitionedTopic(AsyncResponse asyncResponse, String subName, boolean authoritative) {
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicOwnershipAsync(this.topicName, authoritative).thenRun(() -> this.validateTopicOperation(this.topicName, TopicOperation.UNSUBSCRIBE))).thenCompose(__ -> {
            Topic topic = this.getTopicReference(this.topicName);
            Subscription sub = topic.getSubscription(subName);
            if (sub == null) {
                throw new RestException(Response.Status.NOT_FOUND, "Subscription not found");
            }
            return sub.delete();
        })).thenRun(() -> {
            log.info("[{}][{}] Deleted subscription {}", new Object[]{this.clientAppId(), this.topicName, subName});
            asyncResponse.resume(Response.noContent().build());
        })).exceptionally(ex -> {
            Throwable cause = ex.getCause();
            if (cause instanceof BrokerServiceException.SubscriptionBusyException) {
                log.error("[{}] Failed to delete subscription {} from topic {}", new Object[]{this.clientAppId(), subName, this.topicName, cause});
                asyncResponse.resume(new RestException(Response.Status.PRECONDITION_FAILED, "Subscription has active connected consumers"));
            } else {
                if (!PersistentTopicsBase.isRedirectException(ex)) {
                    log.error("[{}] Failed to delete subscription {} {}", new Object[]{this.clientAppId(), this.topicName, subName, cause});
                }
                asyncResponse.resume(new RestException(cause));
            }
            return null;
        });
    }

    protected void internalDeleteSubscriptionForcefully(AsyncResponse asyncResponse, String subName, boolean authoritative) {
        CompletableFuture<Object> future = this.topicName.isGlobal() ? this.validateGlobalNamespaceOwnershipAsync(this.namespaceName) : CompletableFuture.completedFuture(null);
        ((CompletableFuture)future.thenAccept(__ -> {
            if (this.topicName.isPartitioned()) {
                this.internalDeleteSubscriptionForNonPartitionedTopicForcefully(asyncResponse, subName, authoritative);
            } else {
                ((CompletableFuture)this.getPartitionedTopicMetadataAsync(this.topicName, authoritative, false).thenAccept(partitionMetadata -> {
                    if (partitionMetadata.partitions > 0) {
                        ArrayList<CompletableFuture<Void>> futures = Lists.newArrayList();
                        for (int i = 0; i < partitionMetadata.partitions; ++i) {
                            TopicName topicNamePartition = this.topicName.getPartition(i);
                            try {
                                futures.add(this.pulsar().getAdminClient().topics().deleteSubscriptionAsync(topicNamePartition.toString(), subName, true));
                                continue;
                            }
                            catch (Exception e) {
                                log.error("[{}] Failed to delete subscription forcefully {} {}", new Object[]{this.clientAppId(), topicNamePartition, subName, e});
                                asyncResponse.resume(new RestException(e));
                                return;
                            }
                        }
                        FutureUtil.waitForAll(futures).handle((result, exception) -> {
                            if (exception != null) {
                                Throwable t = exception.getCause();
                                if (t instanceof PulsarAdminException.NotFoundException) {
                                    asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Subscription not found"));
                                    return null;
                                }
                                log.error("[{}] Failed to delete subscription forcefully {} {}", new Object[]{this.clientAppId(), this.topicName, subName, t});
                                asyncResponse.resume(new RestException(t));
                                return null;
                            }
                            asyncResponse.resume(Response.noContent().build());
                            return null;
                        });
                    } else {
                        this.internalDeleteSubscriptionForNonPartitionedTopicForcefully(asyncResponse, subName, authoritative);
                    }
                })).exceptionally(ex -> {
                    if (!PersistentTopicsBase.isRedirectException(ex)) {
                        log.error("[{}] Failed to delete subscription forcefully {} from topic {}", new Object[]{this.clientAppId(), subName, this.topicName, ex});
                    }
                    PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, ex);
                    return null;
                });
            }
        })).exceptionally(ex -> {
            if (!PersistentTopicsBase.isRedirectException(ex)) {
                log.error("[{}] Failed to delete subscription {} from topic {}", new Object[]{this.clientAppId(), subName, this.topicName, ex});
            }
            PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, ex);
            return null;
        });
    }

    private void internalDeleteSubscriptionForNonPartitionedTopicForcefully(AsyncResponse asyncResponse, String subName, boolean authoritative) {
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicOwnershipAsync(this.topicName, authoritative).thenRun(() -> this.validateTopicOperation(this.topicName, TopicOperation.UNSUBSCRIBE))).thenCompose(__ -> {
            Topic topic = this.getTopicReference(this.topicName);
            Subscription sub = topic.getSubscription(subName);
            if (sub == null) {
                throw new RestException(Response.Status.NOT_FOUND, "Subscription not found");
            }
            return sub.deleteForcefully();
        })).thenRun(() -> {
            log.info("[{}][{}] Deleted subscription forcefully {}", new Object[]{this.clientAppId(), this.topicName, subName});
            asyncResponse.resume(Response.noContent().build());
        })).exceptionally(ex -> {
            if (!PersistentTopicsBase.isRedirectException(ex)) {
                log.error("[{}] Failed to delete subscription forcefully {} {}", new Object[]{this.clientAppId(), this.topicName, subName, ex});
            }
            PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, ex);
            return null;
        });
    }

    protected void internalSkipAllMessages(AsyncResponse asyncResponse, String subName, boolean authoritative) {
        CompletableFuture<Object> future = this.topicName.isGlobal() ? this.validateGlobalNamespaceOwnershipAsync(this.namespaceName) : CompletableFuture.completedFuture(null);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)future.thenCompose(__ -> this.validateTopicOwnershipAsync(this.topicName, authoritative))).thenCompose(__ -> this.validateTopicOperationAsync(this.topicName, TopicOperation.SKIP, subName))).thenCompose(__ -> {
            if (this.topicName.isPartitioned()) {
                return this.internalSkipAllMessagesForNonPartitionedTopicAsync(asyncResponse, subName, authoritative);
            }
            return this.getPartitionedTopicMetadataAsync(this.topicName, authoritative, false).thenCompose(partitionMetadata -> {
                if (partitionMetadata.partitions > 0) {
                    ArrayList<CompletableFuture<Void>> futures = Lists.newArrayList();
                    for (int i = 0; i < partitionMetadata.partitions; ++i) {
                        TopicName topicNamePartition = this.topicName.getPartition(i);
                        try {
                            futures.add(this.pulsar().getAdminClient().topics().skipAllMessagesAsync(topicNamePartition.toString(), subName));
                            continue;
                        }
                        catch (Exception e) {
                            log.error("[{}] Failed to skip all messages {} {}", new Object[]{this.clientAppId(), topicNamePartition, subName, e});
                            asyncResponse.resume(new RestException(e));
                            return CompletableFuture.completedFuture(null);
                        }
                    }
                    return FutureUtil.waitForAll(futures).handle((result, exception) -> {
                        if (exception != null) {
                            Throwable t = exception.getCause();
                            if (t instanceof PulsarAdminException.NotFoundException) {
                                asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Subscription not found"));
                            } else {
                                log.error("[{}] Failed to skip all messages {} {}", new Object[]{this.clientAppId(), this.topicName, subName, t});
                                asyncResponse.resume(new RestException(t));
                            }
                            return null;
                        }
                        asyncResponse.resume(Response.noContent().build());
                        return null;
                    });
                }
                return this.internalSkipAllMessagesForNonPartitionedTopicAsync(asyncResponse, subName, authoritative);
            });
        })).exceptionally(ex -> {
            if (!PersistentTopicsBase.isRedirectException(ex)) {
                log.error("[{}] Failed to skip all messages for subscription {} on topic {}", new Object[]{this.clientAppId(), subName, this.topicName, ex});
            }
            PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, ex);
            return null;
        });
    }

    private CompletableFuture<Void> internalSkipAllMessagesForNonPartitionedTopicAsync(AsyncResponse asyncResponse, String subName, boolean authoritative) {
        return ((CompletableFuture)this.validateTopicOwnershipAsync(this.topicName, authoritative).thenCompose(__ -> this.validateTopicOperationAsync(this.topicName, TopicOperation.SKIP, subName))).thenCompose(__ -> ((CompletableFuture)this.getTopicReferenceAsync(this.topicName).thenCompose(t -> {
            PersistentTopic topic = (PersistentTopic)t;
            BiConsumer<Void, Throwable> biConsumer = (v, ex) -> {
                if (ex != null) {
                    asyncResponse.resume(new RestException((Throwable)ex));
                    log.error("[{}] Failed to skip all messages {} {}", new Object[]{this.clientAppId(), this.topicName, subName, ex});
                } else {
                    asyncResponse.resume(Response.noContent().build());
                    log.info("[{}] Cleared backlog on {} {}", new Object[]{this.clientAppId(), this.topicName, subName});
                }
            };
            if (subName.startsWith(topic.getReplicatorPrefix())) {
                String remoteCluster = PersistentReplicator.getRemoteCluster(subName);
                PersistentReplicator repl = (PersistentReplicator)topic.getPersistentReplicator(remoteCluster);
                if (repl == null) {
                    asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Subscription not found"));
                    return CompletableFuture.completedFuture(null);
                }
                return repl.clearBacklog().whenComplete((BiConsumer)biConsumer);
            }
            PersistentSubscription sub = topic.getSubscription(subName);
            if (sub == null) {
                asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Subscription not found"));
                return CompletableFuture.completedFuture(null);
            }
            return sub.clearBacklog().whenComplete((BiConsumer)biConsumer);
        })).exceptionally(ex -> {
            if (!PersistentTopicsBase.isRedirectException(ex)) {
                log.error("[{}] Failed to skip all messages for subscription {} on topic {}", new Object[]{this.clientAppId(), subName, this.topicName, ex});
            }
            PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, ex);
            return null;
        }));
    }

    protected void internalSkipMessages(AsyncResponse asyncResponse, String subName, int numMessages, boolean authoritative) {
        CompletableFuture<Object> future = this.topicName.isGlobal() ? this.validateGlobalNamespaceOwnershipAsync(this.namespaceName) : CompletableFuture.completedFuture(null);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)future.thenCompose(__ -> this.validateTopicOperationAsync(this.topicName, TopicOperation.SKIP))).thenCompose(__ -> this.validateTopicOwnershipAsync(this.topicName, authoritative))).thenCompose(__ -> this.getPartitionedTopicMetadataAsync(this.topicName, authoritative, false).thenCompose(partitionMetadata -> {
            if (partitionMetadata.partitions > 0) {
                String msg = "Skip messages on a partitioned topic is not allowed";
                log.warn("[{}] {} {} {}", new Object[]{this.clientAppId(), msg, this.topicName, subName});
                throw new RestException(Response.Status.METHOD_NOT_ALLOWED, msg);
            }
            return this.getTopicReferenceAsync(this.topicName).thenCompose(t -> {
                PersistentTopic topic = (PersistentTopic)t;
                if (topic == null) {
                    throw new RestException(new RestException(Response.Status.NOT_FOUND, "Topic not found"));
                }
                if (subName.startsWith(topic.getReplicatorPrefix())) {
                    String remoteCluster = PersistentReplicator.getRemoteCluster(subName);
                    PersistentReplicator repl = (PersistentReplicator)topic.getPersistentReplicator(remoteCluster);
                    if (repl == null) {
                        return FutureUtil.failedFuture(new RestException(Response.Status.NOT_FOUND, "Replicator not found"));
                    }
                    return repl.skipMessages(numMessages).thenAccept(unused -> {
                        log.info("[{}] Skipped {} messages on {} {}", new Object[]{this.clientAppId(), numMessages, this.topicName, subName});
                        asyncResponse.resume(Response.noContent().build());
                    });
                }
                PersistentSubscription sub = topic.getSubscription(subName);
                if (sub == null) {
                    return FutureUtil.failedFuture(new RestException(Response.Status.NOT_FOUND, "Subscription not found"));
                }
                return sub.skipMessages(numMessages).thenAccept(unused -> {
                    log.info("[{}] Skipped {} messages on {} {}", new Object[]{this.clientAppId(), numMessages, this.topicName, subName});
                    asyncResponse.resume(Response.noContent().build());
                });
            });
        }))).exceptionally(ex -> {
            if (!PersistentTopicsBase.isRedirectException(ex)) {
                log.error("[{}] Failed to skip {} messages {} {}", new Object[]{this.clientAppId(), numMessages, this.topicName, subName, ex});
            }
            PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, ex);
            return null;
        });
    }

    protected void internalExpireMessagesForAllSubscriptions(AsyncResponse asyncResponse, int expireTimeInSeconds, boolean authoritative) {
        CompletableFuture<Object> future = this.topicName.isGlobal() ? this.validateGlobalNamespaceOwnershipAsync(this.namespaceName) : CompletableFuture.completedFuture(null);
        ((CompletableFuture)future.thenCompose(__ -> this.getPartitionedTopicMetadataAsync(this.topicName, authoritative, false).thenAccept(partitionMetadata -> {
            if (this.topicName.isPartitioned()) {
                this.internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(asyncResponse, (PartitionedTopicMetadata)partitionMetadata, expireTimeInSeconds, authoritative);
            } else if (partitionMetadata.partitions > 0) {
                ArrayList<CompletableFuture<Void>> futures = Lists.newArrayListWithCapacity(partitionMetadata.partitions);
                for (int i = 0; i < partitionMetadata.partitions; ++i) {
                    TopicName topicNamePartition = this.topicName.getPartition(i);
                    try {
                        futures.add(this.pulsar().getAdminClient().topics().expireMessagesForAllSubscriptionsAsync(topicNamePartition.toString(), expireTimeInSeconds));
                        continue;
                    }
                    catch (Exception e) {
                        log.error("[{}] Failed to expire messages up to {} on {}", new Object[]{this.clientAppId(), expireTimeInSeconds, topicNamePartition, e});
                        asyncResponse.resume(new RestException(e));
                        return;
                    }
                }
                FutureUtil.waitForAll(futures).handle((result, exception) -> {
                    if (exception != null) {
                        Throwable t = exception.getCause();
                        log.error("[{}] Failed to expire messages up to {} on {}", new Object[]{this.clientAppId(), expireTimeInSeconds, this.topicName, t});
                        asyncResponse.resume(new RestException(t));
                        return null;
                    }
                    asyncResponse.resume(Response.noContent().build());
                    return null;
                });
            } else {
                this.internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(asyncResponse, (PartitionedTopicMetadata)partitionMetadata, expireTimeInSeconds, authoritative);
            }
        }))).exceptionally(ex -> {
            if (!PersistentTopicsBase.isRedirectException(ex)) {
                log.error("[{}] Failed to expire messages for all subscription on topic {}", new Object[]{this.clientAppId(), this.topicName, ex});
            }
            PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, ex);
            return null;
        });
    }

    private void internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(AsyncResponse asyncResponse, PartitionedTopicMetadata partitionMetadata, int expireTimeInSeconds, boolean authoritative) {
        ((CompletableFuture)((CompletableFuture)this.validateTopicOperationAsync(this.topicName, TopicOperation.EXPIRE_MESSAGES).thenCompose(__ -> this.validateTopicOwnershipAsync(this.topicName, authoritative))).thenCompose(__ -> this.getTopicReferenceAsync(this.topicName).thenAccept(t -> {
            if (t == null) {
                PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, new RestException(Response.Status.NOT_FOUND, "Topic not found"));
                return;
            }
            if (!(t instanceof PersistentTopic)) {
                PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, new RestException(Response.Status.METHOD_NOT_ALLOWED, "Expire messages for all subscriptions on a non-persistent topic is not allowed"));
                return;
            }
            PersistentTopic topic = (PersistentTopic)t;
            ArrayList<CompletableFuture<Void>> futures = Lists.newArrayListWithCapacity((int)topic.getReplicators().size());
            ArrayList<String> subNames = Lists.newArrayListWithCapacity((int)topic.getReplicators().size() + (int)topic.getSubscriptions().size());
            subNames.addAll(topic.getReplicators().keys());
            subNames.addAll(topic.getSubscriptions().keys());
            for (int i = 0; i < subNames.size(); ++i) {
                try {
                    futures.add(this.internalExpireMessagesByTimestampForSinglePartitionAsync(partitionMetadata, (String)subNames.get(i), expireTimeInSeconds, authoritative));
                    continue;
                }
                catch (Exception e) {
                    log.error("[{}] Failed to expire messages for all subscription up to {} on {}", new Object[]{this.clientAppId(), expireTimeInSeconds, this.topicName, e});
                    asyncResponse.resume(new RestException(e));
                    return;
                }
            }
            FutureUtil.waitForAll(futures).handle((result, exception) -> {
                if (exception != null) {
                    Throwable throwable = FutureUtil.unwrapCompletionException(exception);
                    log.error("[{}] Failed to expire messages for all subscription up to {} on {}", new Object[]{this.clientAppId(), expireTimeInSeconds, this.topicName, throwable});
                    asyncResponse.resume(new RestException(throwable));
                    return null;
                }
                asyncResponse.resume(Response.noContent().build());
                return null;
            });
        }))).exceptionally(ex -> {
            if (!PersistentTopicsBase.isRedirectException(ex)) {
                log.error("[{}] Failed to expire messages for all subscription up to {} on {}", new Object[]{this.clientAppId(), expireTimeInSeconds, this.topicName, ex});
            }
            PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, ex);
            return null;
        });
    }

    protected void internalResetCursor(AsyncResponse asyncResponse, String subName, long timestamp, boolean authoritative) {
        if (this.topicName.isGlobal()) {
            try {
                this.validateGlobalNamespaceOwnership(this.namespaceName);
            }
            catch (Exception e) {
                log.warn("[{}][{}] Failed to reset cursor on subscription {} to time {}: {}", new Object[]{this.clientAppId(), this.topicName, subName, timestamp, e.getMessage()});
                PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, e);
                return;
            }
        }
        this.validateTopicOwnership(this.topicName, authoritative);
        this.validateTopicOperation(this.topicName, TopicOperation.RESET_CURSOR, subName);
        if (this.topicName.isPartitioned()) {
            this.internalResetCursorForNonPartitionedTopic(asyncResponse, subName, timestamp, authoritative);
        } else {
            ((CompletableFuture)this.getPartitionedTopicMetadataAsync(this.topicName, authoritative, false).thenAccept(partitionMetadata -> {
                int numPartitions = partitionMetadata.partitions;
                if (numPartitions > 0) {
                    CompletableFuture future = new CompletableFuture();
                    AtomicInteger count = new AtomicInteger(numPartitions);
                    AtomicInteger failureCount = new AtomicInteger(0);
                    AtomicReference partitionException = new AtomicReference();
                    for (int i = 0; i < numPartitions; ++i) {
                        TopicName topicNamePartition = this.topicName.getPartition(i);
                        try {
                            this.pulsar().getAdminClient().topics().resetCursorAsync(topicNamePartition.toString(), subName, timestamp).handle((r, ex) -> {
                                if (ex != null) {
                                    if (ex instanceof PulsarAdminException.PreconditionFailedException) {
                                        failureCount.incrementAndGet();
                                        partitionException.set(ex);
                                    } else {
                                        log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}", new Object[]{this.clientAppId(), topicNamePartition, subName, timestamp, ex});
                                        future.completeExceptionally((Throwable)ex);
                                        return null;
                                    }
                                }
                                if (count.decrementAndGet() == 0) {
                                    future.complete(null);
                                }
                                return null;
                            });
                            continue;
                        }
                        catch (Exception e) {
                            log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}", new Object[]{this.clientAppId(), topicNamePartition, subName, timestamp, e});
                            future.completeExceptionally(e);
                        }
                    }
                    future.whenComplete((r, ex) -> {
                        if (ex != null) {
                            if (ex instanceof PulsarAdminException) {
                                asyncResponse.resume(new RestException((PulsarAdminException)ex));
                                return;
                            }
                            asyncResponse.resume(new RestException((Throwable)ex));
                            return;
                        }
                        if (failureCount.get() == numPartitions) {
                            log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}", new Object[]{this.clientAppId(), this.topicName, subName, timestamp, partitionException.get()});
                            asyncResponse.resume(new RestException(Response.Status.PRECONDITION_FAILED, ((Throwable)partitionException.get()).getMessage()));
                            return;
                        }
                        if (failureCount.get() > 0) {
                            log.warn("[{}] [{}] Partial errors for reset cursor on subscription {} to time {}", new Object[]{this.clientAppId(), this.topicName, subName, timestamp, partitionException.get()});
                        }
                        asyncResponse.resume(Response.noContent().build());
                    });
                } else {
                    this.internalResetCursorForNonPartitionedTopic(asyncResponse, subName, timestamp, authoritative);
                }
            })).exceptionally(ex -> {
                if (!PersistentTopicsBase.isRedirectException(ex)) {
                    log.error("[{}] Failed to expire messages for all subscription on topic {}", new Object[]{this.clientAppId(), this.topicName, ex});
                }
                PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, ex);
                return null;
            });
        }
    }

    private void internalResetCursorForNonPartitionedTopic(AsyncResponse asyncResponse, String subName, long timestamp, boolean authoritative) {
        try {
            this.validateTopicOwnership(this.topicName, authoritative);
            this.validateTopicOperation(this.topicName, TopicOperation.RESET_CURSOR, subName);
            log.info("[{}] [{}] Received reset cursor on subscription {} to time {}", new Object[]{this.clientAppId(), this.topicName, subName, timestamp});
            PersistentTopic topic = (PersistentTopic)this.getTopicReference(this.topicName);
            if (topic == null) {
                asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Topic not found"));
                return;
            }
            PersistentSubscription sub = topic.getSubscription(subName);
            if (sub == null) {
                asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Subscription not found"));
                return;
            }
            ((CompletableFuture)sub.resetCursor(timestamp).thenRun(() -> {
                log.info("[{}][{}] Reset cursor on subscription {} to time {}", new Object[]{this.clientAppId(), this.topicName, subName, timestamp});
                asyncResponse.resume(Response.noContent().build());
            })).exceptionally(ex -> {
                Throwable t = ex instanceof CompletionException ? ex.getCause() : ex;
                log.warn("[{}][{}] Failed to reset cursor on subscription {} to time {}", new Object[]{this.clientAppId(), this.topicName, subName, timestamp, t});
                if (t instanceof BrokerServiceException.SubscriptionInvalidCursorPosition) {
                    asyncResponse.resume(new RestException(Response.Status.PRECONDITION_FAILED, "Unable to find position for timestamp specified: " + t.getMessage()));
                } else if (t instanceof BrokerServiceException.SubscriptionBusyException) {
                    asyncResponse.resume(new RestException(Response.Status.PRECONDITION_FAILED, "Failed for Subscription Busy: " + t.getMessage()));
                } else {
                    PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, t);
                }
                return null;
            });
        }
        catch (Exception e) {
            log.warn("[{}][{}] Failed to reset cursor on subscription {} to time {}", new Object[]{this.clientAppId(), this.topicName, subName, timestamp, e});
            if (e instanceof BrokerServiceException.NotAllowedException) {
                asyncResponse.resume(new RestException(Response.Status.METHOD_NOT_ALLOWED, e.getMessage()));
            }
            PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, e);
        }
    }

    protected void internalCreateSubscription(AsyncResponse asyncResponse, String subscriptionName, MessageIdImpl messageId, boolean authoritative, boolean replicated) {
        CompletableFuture<Object> ret = this.topicName.isGlobal() ? this.validateGlobalNamespaceOwnershipAsync(this.namespaceName) : CompletableFuture.completedFuture(null);
        ((CompletableFuture)ret.thenAccept(__ -> {
            MessageIdImpl targetMessageId = messageId == null ? (MessageIdImpl)MessageId.latest : messageId;
            log.info("[{}][{}] Creating subscription {} at message id {}", new Object[]{this.clientAppId(), this.topicName, subscriptionName, targetMessageId});
            if (this.topicName.isPartitioned()) {
                this.internalCreateSubscriptionForNonPartitionedTopic(asyncResponse, subscriptionName, targetMessageId, authoritative, replicated);
            } else {
                boolean allowAutoTopicCreation = this.pulsar().getBrokerService().isAllowAutoTopicCreation(this.topicName);
                ((CompletableFuture)this.getPartitionedTopicMetadataAsync(this.topicName, authoritative, allowAutoTopicCreation).thenAccept(partitionMetadata -> {
                    int numPartitions = partitionMetadata.partitions;
                    if (numPartitions > 0) {
                        CompletableFuture future = new CompletableFuture();
                        AtomicInteger count = new AtomicInteger(numPartitions);
                        AtomicInteger failureCount = new AtomicInteger(0);
                        AtomicReference partitionException = new AtomicReference();
                        for (int i = 0; i < numPartitions; ++i) {
                            TopicName topicNamePartition = this.topicName.getPartition(i);
                            try {
                                this.pulsar().getAdminClient().topics().createSubscriptionAsync(topicNamePartition.toString(), subscriptionName, targetMessageId).handle((r, ex) -> {
                                    if (!(ex == null || failureCount.incrementAndGet() != numPartitions && ex instanceof PulsarAdminException.ConflictException)) {
                                        partitionException.set(ex);
                                    }
                                    if (count.decrementAndGet() == 0) {
                                        future.complete(null);
                                    }
                                    return null;
                                });
                                continue;
                            }
                            catch (Exception e) {
                                log.warn("[{}] [{}] Failed to create subscription {} at message id {}", new Object[]{this.clientAppId(), topicNamePartition, subscriptionName, targetMessageId, e});
                                future.completeExceptionally(e);
                            }
                        }
                        future.whenComplete((r, ex) -> {
                            if (ex != null) {
                                if (ex instanceof PulsarAdminException) {
                                    asyncResponse.resume(new RestException((PulsarAdminException)ex));
                                    return;
                                }
                                asyncResponse.resume(new RestException((Throwable)ex));
                                return;
                            }
                            if (partitionException.get() != null) {
                                log.warn("[{}] [{}] Failed to create subscription {} at message id {}", new Object[]{this.clientAppId(), this.topicName, subscriptionName, targetMessageId, partitionException.get()});
                                if (partitionException.get() instanceof PulsarAdminException) {
                                    asyncResponse.resume(new RestException((PulsarAdminException)partitionException.get()));
                                    return;
                                }
                                asyncResponse.resume(new RestException((Throwable)partitionException.get()));
                                return;
                            }
                            asyncResponse.resume(Response.noContent().build());
                        });
                    } else {
                        this.internalCreateSubscriptionForNonPartitionedTopic(asyncResponse, subscriptionName, targetMessageId, authoritative, replicated);
                    }
                })).exceptionally(ex -> {
                    if (!PersistentTopicsBase.isRedirectException(ex)) {
                        log.error("[{}] Failed to create subscription {} on topic {}", new Object[]{this.clientAppId(), subscriptionName, this.topicName, ex});
                    }
                    PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, ex);
                    return null;
                });
            }
        })).exceptionally(ex -> {
            if (!PersistentTopicsBase.isRedirectException(ex)) {
                log.error("[{}] Failed to create subscription {} on topic {}", new Object[]{this.clientAppId(), subscriptionName, this.topicName, ex});
            }
            PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, ex);
            return null;
        });
    }

    private void internalCreateSubscriptionForNonPartitionedTopic(AsyncResponse asyncResponse, String subscriptionName, MessageIdImpl targetMessageId, boolean authoritative, boolean replicated) {
        boolean isAllowAutoTopicCreation = this.pulsar().getBrokerService().isAllowAutoTopicCreation(this.topicName);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicOwnershipAsync(this.topicName, authoritative).thenCompose(__ -> {
            this.validateTopicOperation(this.topicName, TopicOperation.SUBSCRIBE);
            return this.pulsar().getBrokerService().getTopic(this.topicName.toString(), isAllowAutoTopicCreation);
        })).thenApply(optTopic -> {
            if (optTopic.isPresent()) {
                return (Topic)optTopic.get();
            }
            throw new RestException(Response.Status.PRECONDITION_FAILED, "Topic does not exist and cannot be auto-created");
        })).thenCompose(topic -> {
            if (topic.getSubscriptions().containsKey(subscriptionName)) {
                throw new RestException(Response.Status.CONFLICT, "Subscription already exists for topic");
            }
            return topic.createSubscription(subscriptionName, CommandSubscribe.InitialPosition.Latest, replicated);
        })).thenCompose(subscription -> {
            ((PersistentSubscription)subscription).deactivateCursor();
            return subscription.resetCursor(PositionImpl.get(targetMessageId.getLedgerId(), targetMessageId.getEntryId()));
        })).thenRun(() -> {
            log.info("[{}][{}] Successfully created subscription {} at message id {}", new Object[]{this.clientAppId(), this.topicName, subscriptionName, targetMessageId});
            asyncResponse.resume(Response.noContent().build());
        })).exceptionally(ex -> {
            Throwable t;
            Throwable throwable = t = ex instanceof CompletionException ? ex.getCause() : ex;
            if (!(t instanceof WebApplicationException)) {
                log.warn("[{}][{}] Failed to create subscription {} at message id {}", new Object[]{this.clientAppId(), this.topicName, subscriptionName, targetMessageId, t});
            }
            if (t instanceof BrokerServiceException.SubscriptionInvalidCursorPosition) {
                asyncResponse.resume(new RestException(Response.Status.PRECONDITION_FAILED, "Unable to find position for position specified: " + t.getMessage()));
            } else if (t instanceof BrokerServiceException.SubscriptionBusyException) {
                asyncResponse.resume(new RestException(Response.Status.PRECONDITION_FAILED, "Failed for Subscription Busy: " + t.getMessage()));
            } else {
                PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, t);
            }
            return null;
        });
    }

    protected void internalResetCursorOnPosition(AsyncResponse asyncResponse, String subName, boolean authoritative, MessageIdImpl messageId, boolean isExcluded, int batchIndex) {
        CompletableFuture<Object> ret = this.topicName.isGlobal() ? this.validateGlobalNamespaceOwnershipAsync(this.namespaceName) : CompletableFuture.completedFuture(null);
        ((CompletableFuture)ret.thenAccept(__ -> {
            log.info("[{}][{}] received reset cursor on subscription {} to position {}", new Object[]{this.clientAppId(), this.topicName, subName, messageId});
            if (!this.topicName.isPartitioned() && this.getPartitionedTopicMetadata((TopicName)this.topicName, (boolean)authoritative, (boolean)false).partitions > 0) {
                log.warn("[{}] Not supported operation on partitioned-topic {} {}", new Object[]{this.clientAppId(), this.topicName, subName});
                asyncResponse.resume(new RestException(Response.Status.METHOD_NOT_ALLOWED, "Reset-cursor at position is not allowed for partitioned-topic"));
                return;
            }
            ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicOwnershipAsync(this.topicName, authoritative).thenCompose(ignore -> this.validateTopicOperationAsync(this.topicName, TopicOperation.RESET_CURSOR, subName))).thenCompose(ignore -> this.getTopicReferenceAsync(this.topicName))).thenAccept(topic -> {
                if (topic == null) {
                    asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Topic not found"));
                    return;
                }
                PersistentSubscription sub = ((PersistentTopic)topic).getSubscription(subName);
                if (sub == null) {
                    asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Subscription not found"));
                    return;
                }
                CompletableFuture<Integer> batchSizeFuture = new CompletableFuture<Integer>();
                this.getEntryBatchSize(batchSizeFuture, (PersistentTopic)topic, messageId, batchIndex);
                ((CompletableFuture)batchSizeFuture.thenAccept(bi -> {
                    PositionImpl seekPosition = this.calculatePositionAckSet(isExcluded, (int)bi, batchIndex, messageId);
                    ((CompletableFuture)sub.resetCursor(seekPosition).thenRun(() -> {
                        log.info("[{}][{}] successfully reset cursor on subscription {} to position {}", new Object[]{this.clientAppId(), this.topicName, subName, messageId});
                        asyncResponse.resume(Response.noContent().build());
                    })).exceptionally(ex -> {
                        Throwable t = ex instanceof CompletionException ? ex.getCause() : ex;
                        log.warn("[{}][{}] Failed to reset cursor on subscription {} to position {}", new Object[]{this.clientAppId(), this.topicName, subName, messageId, t});
                        if (t instanceof BrokerServiceException.SubscriptionInvalidCursorPosition) {
                            asyncResponse.resume(new RestException(Response.Status.PRECONDITION_FAILED, "Unable to find position for position specified: " + t.getMessage()));
                        } else if (t instanceof BrokerServiceException.SubscriptionBusyException) {
                            asyncResponse.resume(new RestException(Response.Status.PRECONDITION_FAILED, "Failed for Subscription Busy: " + t.getMessage()));
                        } else {
                            PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, t);
                        }
                        return null;
                    });
                })).exceptionally(e -> {
                    asyncResponse.resume((Throwable)e);
                    return null;
                });
            })).exceptionally(ex -> {
                if (!PersistentTopicsBase.isRedirectException(ex)) {
                    log.warn("[{}][{}] Failed to reset cursor on subscription {} to position {}", new Object[]{this.clientAppId(), this.topicName, subName, messageId, ex.getCause()});
                }
                PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, ex.getCause());
                return null;
            });
        })).exceptionally(ex -> {
            if (!PersistentTopicsBase.isRedirectException(ex)) {
                log.warn("[{}][{}] Failed to reset cursor on subscription {} to position {}", new Object[]{this.clientAppId(), this.topicName, subName, messageId, ex.getCause()});
            }
            PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, ex.getCause());
            return null;
        });
    }

    private void getEntryBatchSize(final CompletableFuture<Integer> batchSizeFuture, PersistentTopic topic, MessageIdImpl messageId, int batchIndex) {
        if (batchIndex >= 0) {
            try {
                ManagedLedgerImpl ledger = (ManagedLedgerImpl)topic.getManagedLedger();
                ledger.asyncReadEntry(new PositionImpl(messageId.getLedgerId(), messageId.getEntryId()), new AsyncCallbacks.ReadEntryCallback(){

                    @Override
                    public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
                        batchSizeFuture.complete(0);
                    }

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    @Override
                    public void readEntryComplete(Entry entry, Object ctx) {
                        block7: {
                            try {
                                try {
                                    if (entry == null) {
                                        batchSizeFuture.complete(0);
                                        break block7;
                                    }
                                    MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer());
                                    batchSizeFuture.complete(metadata.getNumMessagesInBatch());
                                }
                                catch (Exception e) {
                                    batchSizeFuture.completeExceptionally(new RestException(e));
                                }
                            }
                            finally {
                                if (entry != null) {
                                    entry.release();
                                }
                            }
                        }
                    }
                }, null);
            }
            catch (NullPointerException npe) {
                batchSizeFuture.completeExceptionally(new RestException(Response.Status.NOT_FOUND, "Message not found"));
            }
            catch (Exception exception) {
                log.error("[{}] Failed to get message with ledgerId {} entryId {} from {}", new Object[]{this.clientAppId(), messageId.getLedgerId(), messageId.getEntryId(), this.topicName, exception});
                batchSizeFuture.completeExceptionally(new RestException(exception));
            }
        } else {
            batchSizeFuture.complete(0);
        }
    }

    private PositionImpl calculatePositionAckSet(boolean isExcluded, int batchSize, int batchIndex, MessageIdImpl messageId) {
        PositionImpl seekPosition;
        if (batchSize > 0) {
            BitSetRecyclable bitSet = BitSetRecyclable.create();
            bitSet.set(0, batchSize);
            if (isExcluded) {
                bitSet.clear(0, Math.max(batchIndex + 1, 0));
                if (bitSet.length() > 0) {
                    long[] ackSet = bitSet.toLongArray();
                    seekPosition = PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId(), ackSet);
                } else {
                    seekPosition = PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId());
                    seekPosition = seekPosition.getNext();
                }
            } else if (batchIndex - 1 >= 0) {
                bitSet.clear(0, batchIndex);
                long[] ackSet = bitSet.toLongArray();
                seekPosition = PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId(), ackSet);
            } else {
                seekPosition = PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId());
            }
            bitSet.recycle();
        } else {
            seekPosition = PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId());
            seekPosition = isExcluded ? seekPosition.getNext() : seekPosition;
        }
        return seekPosition;
    }

    protected void internalGetMessageById(final AsyncResponse asyncResponse, long ledgerId, long entryId, boolean authoritative) {
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicOwnershipAsync(this.topicName, authoritative).thenCompose(__ -> this.validateTopicOperationAsync(this.topicName, TopicOperation.PEEK_MESSAGES))).thenCompose(__ -> {
            CompletableFuture<Object> ret = this.topicName.isGlobal() ? this.validateGlobalNamespaceOwnershipAsync(this.namespaceName) : CompletableFuture.completedFuture(null);
            return ret;
        })).thenCompose(__ -> this.getTopicReferenceAsync(this.topicName))).thenAccept(topic -> {
            ManagedLedgerImpl ledger = (ManagedLedgerImpl)((PersistentTopic)topic).getManagedLedger();
            ledger.asyncReadEntry(new PositionImpl(ledgerId, entryId), new AsyncCallbacks.ReadEntryCallback(){

                @Override
                public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
                    asyncResponse.resume(new RestException(exception));
                }

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void readEntryComplete(Entry entry, Object ctx) {
                    try {
                        asyncResponse.resume(PersistentTopicsBase.this.generateResponseWithEntry(entry));
                    }
                    catch (IOException exception) {
                        asyncResponse.resume(new RestException(exception));
                    }
                    finally {
                        if (entry != null) {
                            entry.release();
                        }
                    }
                }
            }, null);
        })).exceptionally(ex -> {
            if (!PersistentTopicsBase.isRedirectException(ex)) {
                log.error("[{}] Failed to get message with ledgerId {} entryId {} from {}", new Object[]{this.clientAppId(), ledgerId, entryId, this.topicName, ex});
            }
            PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, ex);
            return null;
        });
    }

    protected CompletableFuture<MessageId> internalGetMessageIdByTimestamp(long timestamp, boolean authoritative) {
        try {
            if (this.topicName.isGlobal()) {
                this.validateGlobalNamespaceOwnership(this.namespaceName);
            }
            if (!this.topicName.isPartitioned() && this.getPartitionedTopicMetadata((TopicName)this.topicName, (boolean)authoritative, (boolean)false).partitions > 0) {
                throw new RestException(Response.Status.METHOD_NOT_ALLOWED, "Get message ID by timestamp on a partitioned topic is not allowed, please try do it on specific topic partition");
            }
            this.validateTopicOwnership(this.topicName, authoritative);
            this.validateTopicOperation(this.topicName, TopicOperation.PEEK_MESSAGES);
            Topic topic = this.getTopicReference(this.topicName);
            if (!(topic instanceof PersistentTopic)) {
                log.error("[{}] Not supported operation of non-persistent topic {} ", (Object)this.clientAppId(), (Object)this.topicName);
                throw new RestException(Response.Status.METHOD_NOT_ALLOWED, "Get message ID by timestamp on a non-persistent topic is not allowed");
            }
            ManagedLedger ledger = ((PersistentTopic)topic).getManagedLedger();
            return ledger.asyncFindPosition(entry -> {
                try {
                    long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer());
                    boolean bl = MessageImpl.isEntryPublishedEarlierThan(entryTimestamp, timestamp);
                    return bl;
                }
                catch (Exception e) {
                    log.error("[{}] Error deserializing message for message position find", (Object)this.topicName, (Object)e);
                }
                finally {
                    entry.release();
                }
                return false;
            }).thenApply(position -> {
                if (position == null) {
                    return null;
                }
                return new MessageIdImpl(position.getLedgerId(), position.getEntryId(), this.topicName.getPartitionIndex());
            });
        }
        catch (WebApplicationException exception) {
            return FutureUtil.failedFuture(exception);
        }
        catch (Exception exception) {
            return FutureUtil.failedFuture(new RestException(exception));
        }
    }

    protected Response internalPeekNthMessage(String subName, int messagePosition, boolean authoritative) {
        if (!this.topicName.isPartitioned() && this.getPartitionedTopicMetadata((TopicName)this.topicName, (boolean)authoritative, (boolean)false).partitions > 0) {
            throw new RestException(Response.Status.METHOD_NOT_ALLOWED, "Peek messages on a partitioned topic is not allowed");
        }
        this.validateTopicOwnership(this.topicName, authoritative);
        this.validateTopicOperation(this.topicName, TopicOperation.PEEK_MESSAGES);
        if (!(this.getTopicReference(this.topicName) instanceof PersistentTopic)) {
            log.error("[{}] Not supported operation of non-persistent topic {} {}", new Object[]{this.clientAppId(), this.topicName, subName});
            throw new RestException(Response.Status.METHOD_NOT_ALLOWED, "Peek messages on a non-persistent topic is not allowed");
        }
        PersistentTopic topic = (PersistentTopic)this.getTopicReference(this.topicName);
        PersistentReplicator repl = null;
        PersistentSubscription sub = null;
        Entry entry = null;
        if (subName.startsWith(topic.getReplicatorPrefix())) {
            repl = this.getReplicatorReference(subName, topic);
        } else {
            sub = (PersistentSubscription)this.getSubscriptionReference(subName, topic);
        }
        try {
            entry = subName.startsWith(topic.getReplicatorPrefix()) ? repl.peekNthMessage(messagePosition).get() : sub.peekNthMessage(messagePosition).get();
            Response response = this.generateResponseWithEntry(entry);
            return response;
        }
        catch (NullPointerException npe) {
            throw new RestException(Response.Status.NOT_FOUND, "Message not found");
        }
        catch (Exception exception) {
            log.error("[{}] Failed to peek message at position {} from {} {}", new Object[]{this.clientAppId(), messagePosition, this.topicName, subName, exception});
            throw new RestException(exception);
        }
        finally {
            if (entry != null) {
                entry.release();
            }
        }
    }

    protected Response internalExamineMessage(String initialPosition, long messagePosition, boolean authoritative) {
        if (this.topicName.isGlobal()) {
            this.validateGlobalNamespaceOwnership(this.namespaceName);
        }
        if (!this.topicName.isPartitioned() && this.getPartitionedTopicMetadata((TopicName)this.topicName, (boolean)authoritative, (boolean)false).partitions > 0) {
            throw new RestException(Response.Status.METHOD_NOT_ALLOWED, "Examine messages on a partitioned topic is not allowed, please try examine message on specific topic partition");
        }
        this.validateTopicOwnership(this.topicName, authoritative);
        if (!(this.getTopicReference(this.topicName) instanceof PersistentTopic)) {
            log.error("[{}] Not supported operation of non-persistent topic {} ", (Object)this.clientAppId(), (Object)this.topicName);
            throw new RestException(Response.Status.METHOD_NOT_ALLOWED, "Examine messages on a non-persistent topic is not allowed");
        }
        if (messagePosition < 1L) {
            messagePosition = 1L;
        }
        if (null == initialPosition) {
            initialPosition = "latest";
        }
        try {
            PersistentTopic topic = (PersistentTopic)this.getTopicReference(this.topicName);
            long totalMessage = topic.getNumberOfEntries();
            PositionImpl startPosition = topic.getFirstPosition();
            long messageToSkip = initialPosition.equals("earliest") ? messagePosition : totalMessage - messagePosition + 1L;
            final CompletableFuture future = new CompletableFuture();
            PositionImpl readPosition = topic.getPositionAfterN(startPosition, messageToSkip);
            topic.asyncReadEntry(readPosition, new AsyncCallbacks.ReadEntryCallback(){

                @Override
                public void readEntryComplete(Entry entry, Object ctx) {
                    future.complete(entry);
                }

                @Override
                public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
                    future.completeExceptionally(exception);
                }
            }, null);
            return this.generateResponseWithEntry((Entry)future.get());
        }
        catch (Exception exception) {
            exception.printStackTrace();
            log.error("[{}] Failed to examine message at position {} from {} due to {}", new Object[]{this.clientAppId(), messagePosition, this.topicName, exception});
            throw new RestException(exception);
        }
    }

    private Response generateResponseWithEntry(Entry entry) throws IOException {
        PersistentTopicsBase.checkNotNull(entry);
        PositionImpl pos = (PositionImpl)entry.getPosition();
        ByteBuf metadataAndPayload = entry.getDataBuffer();
        long totalSize = metadataAndPayload.readableBytes();
        BrokerEntryMetadata brokerEntryMetadata = Commands.peekBrokerEntryMetadataIfExist(metadataAndPayload);
        MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);
        Response.ResponseBuilder responseBuilder = Response.ok();
        responseBuilder.header("X-Pulsar-Message-ID", pos.toString());
        for (KeyValue keyValue : metadata.getPropertiesList()) {
            responseBuilder.header("X-Pulsar-PROPERTY-" + keyValue.getKey(), keyValue.getValue());
        }
        if (brokerEntryMetadata != null) {
            if (brokerEntryMetadata.hasBrokerTimestamp()) {
                responseBuilder.header("X-Pulsar-Broker-Entry-METADATA-timestamp", DateFormatter.format(brokerEntryMetadata.getBrokerTimestamp()));
            }
            if (brokerEntryMetadata.hasIndex()) {
                responseBuilder.header("X-Pulsar-Broker-Entry-METADATA-index", brokerEntryMetadata.getIndex());
            }
        }
        if (metadata.hasPublishTime()) {
            responseBuilder.header("X-Pulsar-publish-time", DateFormatter.format(metadata.getPublishTime()));
        }
        if (metadata.hasEventTime()) {
            responseBuilder.header("X-Pulsar-event-time", DateFormatter.format(metadata.getEventTime()));
        }
        if (metadata.hasDeliverAtTime()) {
            responseBuilder.header("X-Pulsar-deliver-at-time", DateFormatter.format(metadata.getDeliverAtTime()));
        }
        if (metadata.hasNumMessagesInBatch()) {
            responseBuilder.header("X-Pulsar-num-batch-message", metadata.getNumMessagesInBatch());
            responseBuilder.header("X-Pulsar-batch-size", totalSize - (long)metadata.getSerializedSize());
        }
        if (metadata.hasNullValue()) {
            responseBuilder.header("X-Pulsar-null-value", metadata.isNullValue());
        }
        if (metadata.hasNumChunksFromMsg()) {
            responseBuilder.header("X-Pulsar-PROPERTY-TOTAL-CHUNKS", Integer.toString(metadata.getNumChunksFromMsg()));
            responseBuilder.header("X-Pulsar-PROPERTY-CHUNK-ID", Integer.toString(metadata.getChunkId()));
        }
        responseBuilder.header("X-Pulsar-Is-Encrypted", metadata.getEncryptionKeysCount() > 0);
        if (metadata.hasProducerName()) {
            responseBuilder.header("X-Pulsar-producer-name", metadata.getProducerName());
        }
        if (metadata.hasSequenceId()) {
            responseBuilder.header("X-Pulsar-sequence-id", metadata.getSequenceId());
        }
        if (metadata.hasReplicatedFrom()) {
            responseBuilder.header("X-Pulsar-replicated-from", metadata.getReplicatedFrom());
        }
        for (String replicatedTo : metadata.getReplicateTosList()) {
            responseBuilder.header("X-Pulsar-replicated-to", replicatedTo);
        }
        if (metadata.hasPartitionKey()) {
            responseBuilder.header("X-Pulsar-partition-key", metadata.getPartitionKey());
        }
        if (metadata.hasCompression()) {
            responseBuilder.header("X-Pulsar-compression", (Object)metadata.getCompression());
        }
        if (metadata.hasUncompressedSize()) {
            responseBuilder.header("X-Pulsar-uncompressed-size", metadata.getUncompressedSize());
        }
        if (metadata.hasEncryptionAlgo()) {
            responseBuilder.header("X-Pulsar-encryption-algo", metadata.getEncryptionAlgo());
        }
        for (EncryptionKeys encryptionKeys : metadata.getEncryptionKeysList()) {
            responseBuilder.header("X-Pulsar-Base64-encryption-keys", Base64.getEncoder().encodeToString(encryptionKeys.toByteArray()));
        }
        if (metadata.hasEncryptionParam()) {
            responseBuilder.header("X-Pulsar-Base64-encryption-param", Base64.getEncoder().encodeToString(metadata.getEncryptionParam()));
        }
        if (metadata.hasSchemaVersion()) {
            responseBuilder.header("X-Pulsar-Base64-schema-version", Base64.getEncoder().encodeToString(metadata.getSchemaVersion()));
        }
        if (metadata.hasPartitionKeyB64Encoded()) {
            responseBuilder.header("X-Pulsar-partition-key-b64-encoded", metadata.isPartitionKeyB64Encoded());
        }
        if (metadata.hasOrderingKey()) {
            responseBuilder.header("X-Pulsar-Base64-ordering-key", Base64.getEncoder().encodeToString(metadata.getOrderingKey()));
        }
        if (metadata.hasMarkerType()) {
            responseBuilder.header("X-Pulsar-marker-type", metadata.getMarkerType());
        }
        if (metadata.hasTxnidLeastBits()) {
            responseBuilder.header("X-Pulsar-txnid-least-bits", metadata.getTxnidLeastBits());
        }
        if (metadata.hasTxnidMostBits()) {
            responseBuilder.header("X-Pulsar-txnid-most-bits", metadata.getTxnidMostBits());
        }
        if (metadata.hasHighestSequenceId()) {
            responseBuilder.header("X-Pulsar-highest-sequence-id", metadata.getHighestSequenceId());
        }
        if (metadata.hasUuid()) {
            responseBuilder.header("X-Pulsar-uuid", metadata.getUuid());
        }
        if (metadata.hasNumChunksFromMsg()) {
            responseBuilder.header("X-Pulsar-num-chunks-from-msg", metadata.getNumChunksFromMsg());
        }
        if (metadata.hasTotalChunkMsgSize()) {
            responseBuilder.header("X-Pulsar-total-chunk-msg-size", metadata.getTotalChunkMsgSize());
        }
        if (metadata.hasChunkId()) {
            responseBuilder.header("X-Pulsar-chunk-id", metadata.getChunkId());
        }
        if (metadata.hasNullPartitionKey()) {
            responseBuilder.header("X-Pulsar-null-partition-key", metadata.isNullPartitionKey());
        }
        CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(metadata.getCompression());
        ByteBuf uncompressedPayload = codec.decode(metadataAndPayload, metadata.getUncompressedSize());
        ByteBuf data = PulsarByteBufAllocator.DEFAULT.heapBuffer(uncompressedPayload.readableBytes(), uncompressedPayload.readableBytes());
        data.writeBytes(uncompressedPayload);
        uncompressedPayload.release();
        StreamingOutput stream = output -> {
            output.write(data.array(), data.arrayOffset(), data.readableBytes());
            data.release();
        };
        return responseBuilder.entity(stream).build();
    }

    protected PersistentOfflineTopicStats internalGetBacklog(boolean authoritative) {
        if (this.topicName.isGlobal()) {
            this.validateGlobalNamespaceOwnership(this.namespaceName);
        }
        try {
            this.namespaceResources().getPolicies(this.namespaceName);
        }
        catch (MetadataStoreException.NotFoundException e) {
            log.warn("[{}] Failed to get topic backlog {}: Namespace does not exist", (Object)this.clientAppId(), (Object)this.namespaceName);
            throw new RestException(Response.Status.NOT_FOUND, "Namespace does not exist");
        }
        catch (Exception e) {
            log.error("[{}] Failed to get topic backlog {}", new Object[]{this.clientAppId(), this.namespaceName, e});
            throw new RestException(e);
        }
        PersistentOfflineTopicStats offlineTopicStats = null;
        try {
            long elapsedMs;
            offlineTopicStats = this.pulsar().getBrokerService().getOfflineTopicStat(this.topicName);
            if (offlineTopicStats != null && TimeUnit.MINUTES.convert(elapsedMs = System.currentTimeMillis() - offlineTopicStats.statGeneratedAt.getTime(), TimeUnit.MILLISECONDS) < 10L) {
                return offlineTopicStats;
            }
            ManagedLedgerConfig config = this.pulsar().getBrokerService().getManagedLedgerConfig(this.topicName).get();
            ManagedLedgerOfflineBacklog offlineTopicBacklog = new ManagedLedgerOfflineBacklog(config.getDigestType(), config.getPassword(), this.pulsar().getAdvertisedAddress(), false);
            offlineTopicStats = offlineTopicBacklog.estimateUnloadedTopicBacklog((ManagedLedgerFactoryImpl)this.pulsar().getManagedLedgerFactory(), this.topicName);
            this.pulsar().getBrokerService().cacheOfflineTopicStats(this.topicName, offlineTopicStats);
        }
        catch (Exception exception) {
            throw new RestException(exception);
        }
        return offlineTopicStats;
    }

    protected CompletableFuture<Map<BacklogQuota.BacklogQuotaType, BacklogQuota>> internalGetBacklogQuota(boolean applied, boolean isGlobal) {
        return this.getTopicPoliciesAsyncWithRetry(this.topicName, isGlobal).thenApply(op -> {
            Map<BacklogQuota.BacklogQuotaType, BacklogQuota> quotaMap = op.map(TopicPolicies::getBackLogQuotaMap).map(map -> {
                HashMap hashMap = Maps.newHashMap();
                map.forEach((key, value) -> {
                    BacklogQuota cfr_ignored_0 = hashMap.put(BacklogQuota.BacklogQuotaType.valueOf(key), value);
                });
                return hashMap;
            }).orElse(Maps.newHashMap());
            if (applied && quotaMap.isEmpty() && (quotaMap = this.getNamespacePolicies((NamespaceName)this.namespaceName).backlog_quota_map).isEmpty()) {
                for (BacklogQuota.BacklogQuotaType backlogQuotaType : BacklogQuota.BacklogQuotaType.values()) {
                    quotaMap.put(backlogQuotaType, this.namespaceBacklogQuota(this.namespaceName, backlogQuotaType));
                }
            }
            return quotaMap;
        });
    }

    protected void internalGetBacklogSizeByMessageId(AsyncResponse asyncResponse, MessageIdImpl messageId, boolean authoritative) {
        CompletableFuture<Object> future = this.topicName.isGlobal() ? this.validateGlobalNamespaceOwnershipAsync(this.namespaceName) : CompletableFuture.completedFuture(null);
        ((CompletableFuture)future.thenAccept(__ -> ((CompletableFuture)this.getPartitionedTopicMetadataAsync(this.topicName, authoritative, false).thenAccept(partitionMetadata -> {
            if (!this.topicName.isPartitioned() && partitionMetadata.partitions > 0) {
                log.warn("[{}] Not supported calculate backlog size operation on partitioned-topic {}", (Object)this.clientAppId(), (Object)this.topicName);
                asyncResponse.resume(new RestException(Response.Status.METHOD_NOT_ALLOWED, "calculate backlog size is not allowed for partitioned-topic"));
            } else {
                ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicOwnershipAsync(this.topicName, authoritative).thenCompose(unused -> this.validateTopicOperationAsync(this.topicName, TopicOperation.GET_BACKLOG_SIZE))).thenCompose(unused -> this.getTopicReferenceAsync(this.topicName))).thenAccept(t -> {
                    PersistentTopic topic = (PersistentTopic)t;
                    PositionImpl pos = new PositionImpl(messageId.getLedgerId(), messageId.getEntryId());
                    if (topic == null) {
                        asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Topic not found"));
                        return;
                    }
                    ManagedLedgerImpl managedLedger = (ManagedLedgerImpl)topic.getManagedLedger();
                    if (messageId.getLedgerId() == -1L) {
                        asyncResponse.resume(managedLedger.getTotalSize());
                    } else {
                        asyncResponse.resume(managedLedger.getEstimatedBacklogSize(pos));
                    }
                })).exceptionally(ex -> {
                    if (!PersistentTopicsBase.isRedirectException(ex)) {
                        log.error("[{}] Failed to get backlog size for topic {}", new Object[]{this.clientAppId(), this.topicName, ex});
                    }
                    PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, ex);
                    return null;
                });
            }
        })).exceptionally(ex -> {
            if (!PersistentTopicsBase.isRedirectException(ex)) {
                log.error("[{}] Failed to get backlog size for topic {}", new Object[]{this.clientAppId(), this.topicName, ex});
            }
            PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, ex);
            return null;
        }))).exceptionally(ex -> {
            if (!PersistentTopicsBase.isRedirectException(ex)) {
                log.error("[{}] Failed to validate global namespace ownership to get backlog size for topic {}", new Object[]{this.clientAppId(), this.topicName, ex});
            }
            PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, ex);
            return null;
        });
    }

    protected CompletableFuture<Void> internalSetBacklogQuota(BacklogQuota.BacklogQuotaType backlogQuotaType, BacklogQuotaImpl backlogQuota, boolean isGlobal) {
        BacklogQuota.BacklogQuotaType finalBacklogQuotaType = backlogQuotaType == null ? BacklogQuota.BacklogQuotaType.destination_storage : backlogQuotaType;
        return ((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.BACKLOG, PolicyOperation.WRITE).thenAccept(__ -> this.validatePoliciesReadOnlyAccess())).thenCompose(__ -> this.getTopicPoliciesAsyncWithRetry(this.topicName, isGlobal))).thenCompose(op -> {
            TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
            return this.getRetentionPoliciesAsync(this.topicName, topicPolicies).thenCompose(retentionPolicies -> {
                if (!this.checkBacklogQuota(backlogQuota, (RetentionPolicies)retentionPolicies)) {
                    log.warn("[{}] Failed to update backlog configuration for topic {}: conflicts with retention quota", (Object)this.clientAppId(), (Object)this.topicName);
                    return FutureUtil.failedFuture(new RestException(Response.Status.PRECONDITION_FAILED, "Backlog Quota exceeds configured retention quota for topic. Please increase retention quota and retry"));
                }
                if (backlogQuota != null) {
                    topicPolicies.getBackLogQuotaMap().put(finalBacklogQuotaType.name(), backlogQuota);
                } else {
                    topicPolicies.getBackLogQuotaMap().remove(finalBacklogQuotaType.name());
                }
                Map<String, BacklogQuotaImpl> backLogQuotaMap = topicPolicies.getBackLogQuotaMap();
                topicPolicies.setIsGlobal(isGlobal);
                return this.pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(this.topicName, topicPolicies).thenRun(() -> {
                    try {
                        log.info("[{}] Successfully updated backlog quota map: namespace={}, topic={}, map={}", new Object[]{this.clientAppId(), this.namespaceName, this.topicName.getLocalName(), PersistentTopicsBase.jsonMapper().writeValueAsString(backLogQuotaMap)});
                    }
                    catch (JsonProcessingException jsonProcessingException) {
                        // empty catch block
                    }
                });
            });
        });
    }

    protected CompletableFuture<Void> internalSetReplicationClusters(List<String> clusterIds) {
        return ((CompletableFuture)((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.REPLICATION, PolicyOperation.WRITE).thenCompose(__ -> this.validatePoliciesReadOnlyAccessAsync())).thenAccept(__ -> {
            HashSet<String> replicationClusters = Sets.newHashSet(clusterIds);
            if (replicationClusters.contains("global")) {
                throw new RestException(Response.Status.PRECONDITION_FAILED, "Cannot specify global in the list of replication clusters");
            }
            Set<String> clusters = this.clusters();
            for (String clusterId : replicationClusters) {
                if (!clusters.contains(clusterId)) {
                    throw new RestException(Response.Status.FORBIDDEN, "Invalid cluster id: " + clusterId);
                }
                this.validatePeerClusterConflict(clusterId, replicationClusters);
                this.validateClusterForTenant(this.namespaceName.getTenant(), clusterId);
            }
        })).thenCompose(__ -> this.getTopicPoliciesAsyncWithRetry(this.topicName).thenCompose(op -> {
            TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
            topicPolicies.setReplicationClusters(clusterIds);
            return this.pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(this.topicName, topicPolicies).thenRun(() -> log.info("[{}] Successfully set replication clusters for namespace={}, topic={}, clusters={}", new Object[]{this.clientAppId(), this.namespaceName, this.topicName.getLocalName(), topicPolicies.getReplicationClusters()}));
        }));
    }

    protected CompletableFuture<Void> internalRemoveReplicationClusters() {
        return ((CompletableFuture)this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.REPLICATION, PolicyOperation.WRITE).thenCompose(__ -> this.validatePoliciesReadOnlyAccessAsync())).thenCompose(__ -> this.getTopicPoliciesAsyncWithRetry(this.topicName).thenCompose(op -> {
            TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
            topicPolicies.setReplicationClusters(null);
            return this.pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(this.topicName, topicPolicies).thenRun(() -> log.info("[{}] Successfully set replication clusters for namespace={}, topic={}, clusters={}", new Object[]{this.clientAppId(), this.namespaceName, this.topicName.getLocalName(), topicPolicies.getReplicationClusters()}));
        }));
    }

    protected CompletableFuture<Boolean> internalGetDeduplication(boolean applied, boolean isGlobal) {
        return this.getTopicPoliciesAsyncWithRetry(this.topicName, isGlobal).thenApply(op -> op.map(TopicPolicies::getDeduplicationEnabled).orElseGet(() -> {
            if (applied) {
                Boolean enabled = this.getNamespacePolicies((NamespaceName)this.namespaceName).deduplicationEnabled;
                return enabled == null ? this.config().isBrokerDeduplicationEnabled() : enabled.booleanValue();
            }
            return null;
        }));
    }

    protected CompletableFuture<Void> internalSetDeduplication(Boolean enabled, boolean isGlobal) {
        return this.getTopicPoliciesAsyncWithRetry(this.topicName, isGlobal).thenCompose(op -> {
            TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
            topicPolicies.setDeduplicationEnabled(enabled);
            topicPolicies.setIsGlobal(isGlobal);
            return this.pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(this.topicName, topicPolicies);
        });
    }

    protected CompletableFuture<Void> internalSetMessageTTL(Integer ttlInSecond, boolean isGlobal) {
        if (ttlInSecond != null && ttlInSecond < 0) {
            return FutureUtil.failedFuture(new RestException(Response.Status.PRECONDITION_FAILED, "Invalid value for message TTL"));
        }
        return this.getTopicPoliciesAsyncWithRetry(this.topicName, isGlobal).thenCompose(op -> {
            TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
            topicPolicies.setMessageTTLInSeconds(ttlInSecond);
            topicPolicies.setIsGlobal(isGlobal);
            return this.pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(this.topicName, topicPolicies).thenRun(() -> log.info("[{}] Successfully set topic message ttl: namespace={}, topic={}, ttl={}", new Object[]{this.clientAppId(), this.namespaceName, this.topicName.getLocalName(), ttlInSecond}));
        });
    }

    private CompletableFuture<RetentionPolicies> getRetentionPoliciesAsync(TopicName topicName, TopicPolicies topicPolicies) {
        RetentionPolicies retentionPolicies = topicPolicies.getRetentionPolicies();
        if (retentionPolicies != null) {
            return CompletableFuture.completedFuture(retentionPolicies);
        }
        return this.getNamespacePoliciesAsync(topicName.getNamespaceObject()).thenApply(policies -> policies.retention_policies);
    }

    protected CompletableFuture<RetentionPolicies> internalGetRetention(boolean applied, boolean isGlobal) {
        return this.getTopicPoliciesAsyncWithRetry(this.topicName, isGlobal).thenApply(op -> op.map(TopicPolicies::getRetentionPolicies).orElseGet(() -> {
            if (applied) {
                RetentionPolicies policies = this.getNamespacePolicies((NamespaceName)this.namespaceName).retention_policies;
                return policies == null ? new RetentionPolicies(this.config().getDefaultRetentionTimeInMinutes(), this.config().getDefaultRetentionSizeInMB()) : policies;
            }
            return null;
        }));
    }

    protected CompletableFuture<Void> internalSetRetention(RetentionPolicies retention, boolean isGlobal) {
        if (retention == null) {
            return CompletableFuture.completedFuture(null);
        }
        return this.getTopicPoliciesAsyncWithRetry(this.topicName, isGlobal).thenCompose(op -> {
            TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
            for (BacklogQuota.BacklogQuotaType backlogQuotaType : BacklogQuota.BacklogQuotaType.values()) {
                BacklogQuota backlogQuota = topicPolicies.getBackLogQuotaMap().get(backlogQuotaType.name());
                if (backlogQuota == null) {
                    Policies policies = this.getNamespacePolicies(this.topicName.getNamespaceObject());
                    backlogQuota = policies.backlog_quota_map.get((Object)backlogQuotaType);
                }
                if (this.checkBacklogQuota(backlogQuota, retention)) continue;
                log.warn("[{}] Failed to update retention quota configuration for topic {}: conflicts with retention quota", (Object)this.clientAppId(), (Object)this.topicName);
                return FutureUtil.failedFuture(new RestException(Response.Status.PRECONDITION_FAILED, "Retention Quota must exceed configured backlog quota for topic. Please increase retention quota and retry"));
            }
            topicPolicies.setRetentionPolicies(retention);
            topicPolicies.setIsGlobal(isGlobal);
            return this.pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(this.topicName, topicPolicies);
        });
    }

    protected CompletableFuture<Void> internalRemoveRetention(boolean isGlobal) {
        return this.getTopicPoliciesAsyncWithRetry(this.topicName, isGlobal).thenCompose(op -> {
            if (!op.isPresent()) {
                return CompletableFuture.completedFuture(null);
            }
            ((TopicPolicies)op.get()).setRetentionPolicies(null);
            return this.pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(this.topicName, (TopicPolicies)op.get());
        });
    }

    protected CompletableFuture<PersistencePolicies> internalGetPersistence(boolean applied, boolean isGlobal) {
        return this.getTopicPoliciesAsyncWithRetry(this.topicName, isGlobal).thenApply(op -> op.map(TopicPolicies::getPersistence).orElseGet(() -> {
            if (applied) {
                PersistencePolicies namespacePolicy = this.getNamespacePolicies((NamespaceName)this.namespaceName).persistence;
                return namespacePolicy == null ? new PersistencePolicies(this.pulsar().getConfiguration().getManagedLedgerDefaultEnsembleSize(), this.pulsar().getConfiguration().getManagedLedgerDefaultWriteQuorum(), this.pulsar().getConfiguration().getManagedLedgerDefaultAckQuorum(), this.pulsar().getConfiguration().getManagedLedgerDefaultMarkDeleteRateLimit()) : namespacePolicy;
            }
            return null;
        }));
    }

    protected CompletableFuture<Void> internalSetPersistence(PersistencePolicies persistencePolicies, boolean isGlobal) {
        this.validatePersistencePolicies(persistencePolicies);
        return this.getTopicPoliciesAsyncWithRetry(this.topicName, isGlobal).thenCompose(op -> {
            TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
            topicPolicies.setPersistence(persistencePolicies);
            topicPolicies.setIsGlobal(isGlobal);
            return this.pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(this.topicName, topicPolicies);
        });
    }

    protected CompletableFuture<Void> internalRemovePersistence(boolean isGlobal) {
        return this.getTopicPoliciesAsyncWithRetry(this.topicName, isGlobal).thenCompose(op -> {
            if (!op.isPresent()) {
                return CompletableFuture.completedFuture(null);
            }
            ((TopicPolicies)op.get()).setPersistence(null);
            ((TopicPolicies)op.get()).setIsGlobal(isGlobal);
            return this.pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(this.topicName, (TopicPolicies)op.get());
        });
    }

    protected CompletableFuture<Void> internalSetMaxMessageSize(Integer maxMessageSize, boolean isGlobal) {
        if (maxMessageSize != null && (maxMessageSize < 0 || maxMessageSize > this.config().getMaxMessageSize())) {
            throw new RestException(Response.Status.PRECONDITION_FAILED, "topic-level maxMessageSize must be greater than or equal to 0 and must be smaller than that in the broker-level");
        }
        return this.getTopicPoliciesAsyncWithRetry(this.topicName, isGlobal).thenCompose(op -> {
            TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
            topicPolicies.setMaxMessageSize(maxMessageSize);
            topicPolicies.setIsGlobal(isGlobal);
            return this.pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(this.topicName, topicPolicies);
        });
    }

    protected CompletableFuture<Optional<Integer>> internalGetMaxMessageSize(boolean isGlobal) {
        return this.getTopicPoliciesAsyncWithRetry(this.topicName, isGlobal).thenApply(op -> op.map(TopicPolicies::getMaxMessageSize));
    }

    protected CompletableFuture<Integer> internalGetMaxProducers(boolean applied, boolean isGlobal) {
        return this.getTopicPoliciesAsyncWithRetry(this.topicName, isGlobal).thenApply(op -> op.map(TopicPolicies::getMaxProducerPerTopic).orElseGet(() -> {
            if (applied) {
                Integer maxProducer = this.getNamespacePolicies((NamespaceName)this.namespaceName).max_producers_per_topic;
                return maxProducer == null ? this.config().getMaxProducersPerTopic() : maxProducer.intValue();
            }
            return null;
        }));
    }

    protected CompletableFuture<Void> internalSetMaxProducers(Integer maxProducers, boolean isGlobal) {
        if (maxProducers != null && maxProducers < 0) {
            throw new RestException(Response.Status.PRECONDITION_FAILED, "maxProducers must be 0 or more");
        }
        return this.getTopicPoliciesAsyncWithRetry(this.topicName, isGlobal).thenCompose(op -> {
            TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
            topicPolicies.setMaxProducerPerTopic(maxProducers);
            topicPolicies.setIsGlobal(isGlobal);
            return this.pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(this.topicName, topicPolicies);
        });
    }

    protected CompletableFuture<Optional<Integer>> internalGetMaxSubscriptionsPerTopic(boolean isGlobal) {
        return this.getTopicPoliciesAsyncWithRetry(this.topicName, isGlobal).thenApply(op -> op.map(TopicPolicies::getMaxSubscriptionsPerTopic));
    }

    protected CompletableFuture<Void> internalSetMaxSubscriptionsPerTopic(Integer maxSubscriptionsPerTopic, boolean isGlobal) {
        if (maxSubscriptionsPerTopic != null && maxSubscriptionsPerTopic < 0) {
            throw new RestException(Response.Status.PRECONDITION_FAILED, "maxSubscriptionsPerTopic must be 0 or more");
        }
        return this.getTopicPoliciesAsyncWithRetry(this.topicName, isGlobal).thenCompose(op -> {
            TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
            topicPolicies.setMaxSubscriptionsPerTopic(maxSubscriptionsPerTopic);
            topicPolicies.setIsGlobal(isGlobal);
            return this.pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(this.topicName, topicPolicies);
        });
    }

    protected CompletableFuture<DispatchRateImpl> internalGetReplicatorDispatchRate(boolean applied, boolean isGlobal) {
        return this.getTopicPoliciesAsyncWithRetry(this.topicName, isGlobal).thenApply(op -> op.map(TopicPolicies::getReplicatorDispatchRate).orElseGet(() -> {
            if (applied) {
                DispatchRateImpl namespacePolicy = this.getNamespacePolicies((NamespaceName)this.namespaceName).replicatorDispatchRate.get(this.pulsar().getConfiguration().getClusterName());
                return namespacePolicy == null ? this.replicatorDispatchRate() : namespacePolicy;
            }
            return null;
        }));
    }

    protected CompletableFuture<Void> internalSetReplicatorDispatchRate(DispatchRateImpl dispatchRate, boolean isGlobal) {
        return this.getTopicPoliciesAsyncWithRetry(this.topicName, isGlobal).thenCompose(op -> {
            TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
            topicPolicies.setReplicatorDispatchRate(dispatchRate);
            topicPolicies.setIsGlobal(isGlobal);
            return this.pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(this.topicName, topicPolicies);
        });
    }

    protected CompletableFuture<Void> preValidation(boolean authoritative) {
        if (!this.config().isTopicLevelPoliciesEnabled()) {
            return FutureUtil.failedFuture(new RestException(Response.Status.METHOD_NOT_ALLOWED, "Topic level policies is disabled, to enable the topic level policy and retry."));
        }
        if (this.topicName.isPartitioned()) {
            return FutureUtil.failedFuture(new RestException(Response.Status.PRECONDITION_FAILED, "Not allowed to set/get topic policy for a partition"));
        }
        CompletableFuture<Object> ret = this.topicName.isGlobal() ? this.validateGlobalNamespaceOwnershipAsync(this.namespaceName) : CompletableFuture.completedFuture(null);
        return ((CompletableFuture)ret.thenCompose(__ -> this.checkTopicExistsAsync(this.topicName))).thenCompose(exist -> {
            if (!exist.booleanValue()) {
                throw new RestException(Response.Status.NOT_FOUND, "Topic not found");
            }
            return this.getPartitionedTopicMetadataAsync(this.topicName, false, false).thenCompose(metadata -> {
                if (metadata.partitions > 0) {
                    return this.validateTopicOwnershipAsync(TopicName.get(this.topicName.toString() + "-partition-" + 0), authoritative);
                }
                return this.validateTopicOwnershipAsync(this.topicName, authoritative);
            });
        });
    }

    protected CompletableFuture<Void> internalRemoveMaxProducers(boolean isGlobal) {
        return this.getTopicPoliciesAsyncWithRetry(this.topicName, isGlobal).thenCompose(op -> {
            if (!op.isPresent()) {
                return CompletableFuture.completedFuture(null);
            }
            ((TopicPolicies)op.get()).setMaxProducerPerTopic(null);
            ((TopicPolicies)op.get()).setIsGlobal(isGlobal);
            return this.pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(this.topicName, (TopicPolicies)op.get());
        });
    }

    protected CompletableFuture<Integer> internalGetMaxConsumers(boolean applied, boolean isGlobal) {
        return this.getTopicPoliciesAsyncWithRetry(this.topicName, isGlobal).thenApply(op -> op.map(TopicPolicies::getMaxConsumerPerTopic).orElseGet(() -> {
            if (applied) {
                Integer maxConsumer = this.getNamespacePolicies((NamespaceName)this.namespaceName).max_consumers_per_topic;
                return maxConsumer == null ? this.config().getMaxConsumersPerTopic() : maxConsumer.intValue();
            }
            return null;
        }));
    }

    protected CompletableFuture<Void> internalSetMaxConsumers(Integer maxConsumers, boolean isGlobal) {
        if (maxConsumers != null && maxConsumers < 0) {
            throw new RestException(Response.Status.PRECONDITION_FAILED, "maxConsumers must be 0 or more");
        }
        return this.getTopicPoliciesAsyncWithRetry(this.topicName, isGlobal).thenCompose(op -> {
            TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
            topicPolicies.setMaxConsumerPerTopic(maxConsumers);
            topicPolicies.setIsGlobal(isGlobal);
            return this.pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(this.topicName, topicPolicies);
        });
    }

    protected CompletableFuture<Void> internalRemoveMaxConsumers(boolean isGlobal) {
        return this.getTopicPoliciesAsyncWithRetry(this.topicName, isGlobal).thenCompose(op -> {
            if (!op.isPresent()) {
                return CompletableFuture.completedFuture(null);
            }
            ((TopicPolicies)op.get()).setMaxConsumerPerTopic(null);
            ((TopicPolicies)op.get()).setIsGlobal(isGlobal);
            return this.pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(this.topicName, (TopicPolicies)op.get());
        });
    }

    protected MessageId internalTerminate(boolean authoritative) {
        if (this.topicName.isGlobal()) {
            this.validateGlobalNamespaceOwnership(this.namespaceName);
        }
        PartitionedTopicMetadata partitionMetadata = this.getPartitionedTopicMetadata(this.topicName, authoritative, false);
        if (partitionMetadata.partitions > 0) {
            throw new RestException(Response.Status.METHOD_NOT_ALLOWED, "Termination of a partitioned topic is not allowed");
        }
        this.validateTopicOwnership(this.topicName, authoritative);
        this.validateTopicOperation(this.topicName, TopicOperation.TERMINATE);
        Topic topic = this.getTopicReference(this.topicName);
        try {
            return ((PersistentTopic)topic).terminate().get();
        }
        catch (Exception exception) {
            log.error("[{}] Failed to terminated topic {}", new Object[]{this.clientAppId(), this.topicName, exception});
            throw new RestException(exception);
        }
    }

    protected void internalTerminatePartitionedTopic(AsyncResponse asyncResponse, boolean authoritative) {
        CompletableFuture<Object> future = this.topicName.isGlobal() ? this.validateGlobalNamespaceOwnershipAsync(this.namespaceName) : CompletableFuture.completedFuture(null);
        ((CompletableFuture)future.thenCompose(__ -> ((CompletableFuture)((CompletableFuture)this.validateTopicOperationAsync(this.topicName, TopicOperation.TERMINATE).thenCompose(unused -> this.getPartitionedTopicMetadataAsync(this.topicName, authoritative, false))).thenAccept(partitionMetadata -> {
            if (partitionMetadata.partitions == 0) {
                String msg = "Termination of a non-partitioned topic is not allowed using partitioned-terminate, please use terminate commands";
                log.error("[{}] [{}] {}", new Object[]{this.clientAppId(), this.topicName, msg});
                asyncResponse.resume(new RestException(Response.Status.METHOD_NOT_ALLOWED, msg));
                return;
            }
            if (partitionMetadata.partitions > 0) {
                ConcurrentHashMap messageIds = new ConcurrentHashMap(partitionMetadata.partitions);
                ArrayList<CompletionStage> futures = Lists.newArrayListWithCapacity(partitionMetadata.partitions);
                int i = 0;
                while (i < partitionMetadata.partitions) {
                    TopicName topicNamePartition = this.topicName.getPartition(i);
                    try {
                        int finalI = i++;
                        futures.add(this.pulsar().getAdminClient().topics().terminateTopicAsync(topicNamePartition.toString()).whenComplete((messageId, throwable) -> {
                            if (throwable != null) {
                                log.error("[{}] Failed to terminate topic {}", new Object[]{this.clientAppId(), topicNamePartition, throwable});
                                asyncResponse.resume(new RestException((Throwable)throwable));
                            }
                            messageIds.put(finalI, messageId);
                        }));
                    }
                    catch (Exception e) {
                        log.error("[{}] Failed to terminate topic {}", new Object[]{this.clientAppId(), topicNamePartition, e});
                        throw new RestException(e);
                    }
                }
                FutureUtil.waitForAll(futures).handle((result, exception) -> {
                    if (exception != null) {
                        Throwable t = exception.getCause();
                        if (t instanceof PulsarAdminException.NotFoundException) {
                            asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Topic not found"));
                        } else {
                            log.error("[{}] Failed to terminate topic {}", new Object[]{this.clientAppId(), this.topicName, t});
                            asyncResponse.resume(new RestException(t));
                        }
                    }
                    asyncResponse.resume(messageIds);
                    return null;
                });
            }
        })).exceptionally(ex -> {
            if (!PersistentTopicsBase.isRedirectException(ex)) {
                log.error("[{}] Failed to terminate topic {}", new Object[]{this.clientAppId(), this.topicName, ex});
            }
            PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, ex);
            return null;
        }))).exceptionally(ex -> {
            if (!PersistentTopicsBase.isRedirectException(ex)) {
                log.error("[{}] Failed to terminate topic {}", new Object[]{this.clientAppId(), this.topicName, ex});
            }
            PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, ex);
            return null;
        });
    }

    protected void internalExpireMessagesByTimestamp(AsyncResponse asyncResponse, String subName, int expireTimeInSeconds, boolean authoritative) {
        CompletableFuture<Object> future = this.topicName.isGlobal() ? this.validateGlobalNamespaceOwnershipAsync(this.namespaceName) : CompletableFuture.completedFuture(null);
        ((CompletableFuture)future.thenCompose(__ -> this.getPartitionedTopicMetadataAsync(this.topicName, authoritative, false).thenCompose(partitionMetadata -> {
            if (this.topicName.isPartitioned()) {
                return this.internalExpireMessagesByTimestampForSinglePartitionAsync((PartitionedTopicMetadata)partitionMetadata, subName, expireTimeInSeconds, authoritative).thenAccept(unused -> asyncResponse.resume(Response.noContent().build()));
            }
            if (partitionMetadata.partitions > 0) {
                return CompletableFuture.completedFuture(null).thenAccept(unused -> {
                    ArrayList<CompletableFuture<Void>> futures = Lists.newArrayList();
                    for (int i = 0; i < partitionMetadata.partitions; ++i) {
                        TopicName topicNamePartition = this.topicName.getPartition(i);
                        try {
                            futures.add(this.pulsar().getAdminClient().topics().expireMessagesAsync(topicNamePartition.toString(), subName, expireTimeInSeconds));
                            continue;
                        }
                        catch (Exception e) {
                            log.error("[{}] Failed to expire messages up to {} on {}", new Object[]{this.clientAppId(), expireTimeInSeconds, topicNamePartition, e});
                            asyncResponse.resume(new RestException(e));
                            return;
                        }
                    }
                    FutureUtil.waitForAll(futures).handle((result, exception) -> {
                        if (exception != null) {
                            Throwable t = exception.getCause();
                            if (t instanceof PulsarAdminException.NotFoundException) {
                                asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Subscription not found"));
                                return null;
                            }
                            log.error("[{}] Failed to expire messages up to {} on {}", new Object[]{this.clientAppId(), expireTimeInSeconds, this.topicName, t});
                            asyncResponse.resume(new RestException(t));
                            return null;
                        }
                        asyncResponse.resume(Response.noContent().build());
                        return null;
                    });
                });
            }
            return this.internalExpireMessagesByTimestampForSinglePartitionAsync((PartitionedTopicMetadata)partitionMetadata, subName, expireTimeInSeconds, authoritative).thenAccept(unused -> asyncResponse.resume(Response.noContent().build()));
        }))).exceptionally(ex -> {
            if (!PersistentTopicsBase.isRedirectException(ex)) {
                log.error("[{}] Failed to expire messages up to {} on {}", new Object[]{this.clientAppId(), expireTimeInSeconds, this.topicName, ex});
            }
            PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, ex);
            return null;
        });
    }

    private CompletableFuture<Void> internalExpireMessagesByTimestampForSinglePartitionAsync(PartitionedTopicMetadata partitionMetadata, String subName, int expireTimeInSeconds, boolean authoritative) {
        if (!this.topicName.isPartitioned() && partitionMetadata.partitions > 0) {
            String msg = "This method should not be called for partitioned topic";
            return FutureUtil.failedFuture(new IllegalStateException(msg));
        }
        CompletableFuture<Void> resultFuture = new CompletableFuture<Void>();
        ((CompletableFuture)((CompletableFuture)this.validateTopicOperationAsync(this.topicName, TopicOperation.EXPIRE_MESSAGES).thenCompose(__ -> this.validateTopicOwnershipAsync(this.topicName, authoritative))).thenCompose(__ -> this.getTopicReferenceAsync(this.topicName).thenAccept(t -> {
            boolean issued;
            if (t == null) {
                resultFuture.completeExceptionally(new RestException(Response.Status.NOT_FOUND, "Topic not found"));
                return;
            }
            if (!(t instanceof PersistentTopic)) {
                resultFuture.completeExceptionally(new RestException(Response.Status.METHOD_NOT_ALLOWED, "Expire messages on a non-persistent topic is not allowed"));
                return;
            }
            PersistentTopic topic = (PersistentTopic)t;
            if (subName.startsWith(topic.getReplicatorPrefix())) {
                String remoteCluster = PersistentReplicator.getRemoteCluster(subName);
                PersistentReplicator repl = (PersistentReplicator)topic.getPersistentReplicator(remoteCluster);
                if (repl == null) {
                    resultFuture.completeExceptionally(new RestException(Response.Status.NOT_FOUND, "Replicator not found"));
                    return;
                }
                issued = repl.expireMessages(expireTimeInSeconds);
            } else {
                PersistentSubscription sub = topic.getSubscription(subName);
                if (sub == null) {
                    resultFuture.completeExceptionally(new RestException(Response.Status.NOT_FOUND, "Subscription not found"));
                    return;
                }
                issued = sub.expireMessages(expireTimeInSeconds);
            }
            if (!issued) {
                if (log.isDebugEnabled()) {
                    log.debug("Expire message by timestamp not issued on topic {} for subscription {} due to ongoing message expiration not finished or subscription almost catch up. If it's performed on a partitioned topic operation might succeeded on other partitions, please check stats of individual partition.", (Object)this.topicName, (Object)subName);
                }
                resultFuture.completeExceptionally(new RestException(Response.Status.CONFLICT, "Expire message by timestamp not issued on topic " + this.topicName + " for subscription " + subName + " due to ongoing message expiration not finished or subscription almost catch  up. If it's performed on a partitioned topic operation might succeeded on other partitions, please check stats of individual partition."));
                return;
            }
            log.info("[{}] Message expire started up to {} on {} {}", new Object[]{this.clientAppId(), expireTimeInSeconds, this.topicName, subName});
            resultFuture.complete((Void)__);
        }))).exceptionally(e -> {
            resultFuture.completeExceptionally(FutureUtil.unwrapCompletionException(e));
            return null;
        });
        return resultFuture;
    }

    protected void internalExpireMessagesByPosition(AsyncResponse asyncResponse, String subName, boolean authoritative, MessageIdImpl messageId, boolean isExcluded, int batchIndex) {
        CompletableFuture<Object> future = this.topicName.isGlobal() ? this.validateGlobalNamespaceOwnershipAsync(this.namespaceName) : CompletableFuture.completedFuture(null);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)future.thenCompose(__ -> this.validateTopicOperationAsync(this.topicName, TopicOperation.EXPIRE_MESSAGES))).thenCompose(__ -> this.validateTopicOwnershipAsync(this.topicName, authoritative))).thenCompose(__ -> {
            log.info("[{}][{}] received expire messages on subscription {} to position {}", new Object[]{this.clientAppId(), this.topicName, subName, messageId});
            return this.getPartitionedTopicMetadataAsync(this.topicName, authoritative, false).thenAccept(partitionMetadata -> {
                if (!this.topicName.isPartitioned() && partitionMetadata.partitions > 0) {
                    String msg = "Expire message at position is not supported for partitioned-topic";
                    log.warn("[{}] {} {}({}) {}", new Object[]{this.clientAppId(), msg, this.topicName, messageId, subName});
                    asyncResponse.resume(new RestException(Response.Status.METHOD_NOT_ALLOWED, msg));
                    return;
                }
                if (messageId.getPartitionIndex() != this.topicName.getPartitionIndex()) {
                    String msg = "Invalid parameter for expire message by position, partition index of passed in message position doesn't match partition index for the topic";
                    log.warn("[{}] {} {}({}).", new Object[]{this.clientAppId(), msg, this.topicName, messageId});
                    asyncResponse.resume(new RestException(Response.Status.PRECONDITION_FAILED, msg));
                    return;
                }
                this.internalExpireMessagesNonPartitionedTopicByPosition(asyncResponse, subName, messageId, isExcluded, batchIndex);
            });
        })).exceptionally(ex -> {
            if (!PersistentTopicsBase.isRedirectException(ex)) {
                log.error("[{}] Failed to expire messages up to {} on subscription {} to position {}", new Object[]{this.clientAppId(), this.topicName, subName, messageId, ex});
            }
            PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, ex);
            return null;
        });
    }

    private CompletableFuture<Void> internalExpireMessagesNonPartitionedTopicByPosition(AsyncResponse asyncResponse, String subName, MessageIdImpl messageId, boolean isExcluded, int batchIndex) {
        return ((CompletableFuture)this.getTopicReferenceAsync(this.topicName).thenAccept(t -> {
            PersistentTopic topic = (PersistentTopic)t;
            if (topic == null) {
                asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Topic not found"));
                return;
            }
            try {
                PersistentSubscription sub = topic.getSubscription(subName);
                if (sub == null) {
                    asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Subscription not found"));
                    return;
                }
                CompletableFuture<Integer> batchSizeFuture = new CompletableFuture<Integer>();
                this.getEntryBatchSize(batchSizeFuture, topic, messageId, batchIndex);
                ((CompletableFuture)batchSizeFuture.thenAccept(bi -> {
                    PositionImpl position = this.calculatePositionAckSet(isExcluded, (int)bi, batchIndex, messageId);
                    try {
                        boolean issued;
                        if (subName.startsWith(topic.getReplicatorPrefix())) {
                            String remoteCluster = PersistentReplicator.getRemoteCluster(subName);
                            PersistentReplicator repl = (PersistentReplicator)topic.getPersistentReplicator(remoteCluster);
                            if (repl == null) {
                                asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Replicator not found"));
                                return;
                            }
                            issued = repl.expireMessages(position);
                        } else {
                            issued = sub.expireMessages(position);
                        }
                        if (!issued) {
                            if (log.isDebugEnabled()) {
                                log.debug("Expire message by position not issued on topic {} for subscription {} due to ongoing message expiration not finished or subscription almost catch up.", (Object)this.topicName, (Object)subName);
                            }
                            throw new RestException(Response.Status.CONFLICT, "Expire message by position not issued on topic " + this.topicName + " for subscription " + subName + " due to ongoing message expiration not finished or invalid message position provided.");
                        }
                        log.info("[{}] Message expire started up to {} on {} {}", new Object[]{this.clientAppId(), position, this.topicName, subName});
                    }
                    catch (Exception exception) {
                        log.error("[{}] Failed to expire messages up to {} on {} with subscription {} {}", new Object[]{this.clientAppId(), position, this.topicName, subName, exception});
                        throw new RestException(exception);
                    }
                    asyncResponse.resume(Response.noContent().build());
                })).exceptionally(e -> {
                    log.error("[{}] Failed to expire messages up to {} on {} with subscription {} {}", new Object[]{this.clientAppId(), messageId, this.topicName, subName, e});
                    asyncResponse.resume((Throwable)e);
                    return null;
                });
            }
            catch (Exception e2) {
                log.warn("[{}][{}] Failed to expire messages up to {} on subscription {} to position {}", new Object[]{this.clientAppId(), this.topicName, messageId, subName, messageId, e2});
                PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, e2);
            }
        })).exceptionally(ex -> {
            Throwable cause = ex.getCause();
            log.error("[{}] Failed to expire messages up to {} on subscription {} to position {}", new Object[]{this.clientAppId(), this.topicName, subName, messageId, cause});
            PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, cause);
            return null;
        });
    }

    protected void internalTriggerCompaction(AsyncResponse asyncResponse, boolean authoritative) {
        log.info("[{}] Trigger compaction on topic {}", (Object)this.clientAppId(), (Object)this.topicName);
        CompletableFuture<Object> future = this.topicName.isGlobal() ? this.validateGlobalNamespaceOwnershipAsync(this.namespaceName) : CompletableFuture.completedFuture(null);
        ((CompletableFuture)future.thenAccept(__ -> {
            if (this.topicName.isPartitioned()) {
                this.internalTriggerCompactionNonPartitionedTopic(asyncResponse, authoritative);
            } else {
                ((CompletableFuture)this.getPartitionedTopicMetadataAsync(this.topicName, authoritative, false).thenAccept(partitionMetadata -> {
                    int numPartitions = partitionMetadata.partitions;
                    if (numPartitions > 0) {
                        ArrayList<CompletableFuture<Void>> futures = Lists.newArrayListWithCapacity(numPartitions);
                        for (int i = 0; i < numPartitions; ++i) {
                            TopicName topicNamePartition = this.topicName.getPartition(i);
                            try {
                                futures.add(this.pulsar().getAdminClient().topics().triggerCompactionAsync(topicNamePartition.toString()));
                                continue;
                            }
                            catch (Exception e) {
                                log.error("[{}] Failed to trigger compaction on topic {}", new Object[]{this.clientAppId(), topicNamePartition, e});
                                asyncResponse.resume(new RestException(e));
                                return;
                            }
                        }
                        FutureUtil.waitForAll(futures).handle((result, exception) -> {
                            if (exception != null) {
                                Throwable th = exception.getCause();
                                if (th instanceof PulsarAdminException.NotFoundException) {
                                    asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, th.getMessage()));
                                    return null;
                                }
                                if (th instanceof WebApplicationException) {
                                    asyncResponse.resume(th);
                                    return null;
                                }
                                log.error("[{}] Failed to trigger compaction on topic {}", new Object[]{this.clientAppId(), this.topicName, exception});
                                asyncResponse.resume(new RestException((Throwable)exception));
                                return null;
                            }
                            asyncResponse.resume(Response.noContent().build());
                            return null;
                        });
                    } else {
                        this.internalTriggerCompactionNonPartitionedTopic(asyncResponse, authoritative);
                    }
                })).exceptionally(ex -> {
                    if (!PersistentTopicsBase.isRedirectException(ex)) {
                        log.error("[{}] Failed to trigger compaction on topic {}", new Object[]{this.clientAppId(), this.topicName, ex});
                    }
                    PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, ex);
                    return null;
                });
            }
        })).exceptionally(ex -> {
            if (!PersistentTopicsBase.isRedirectException(ex)) {
                log.error("[{}] Failed to validate global namespace ownership to trigger compaction on topic {}", new Object[]{this.clientAppId(), this.topicName, ex});
            }
            PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, ex);
            return null;
        });
    }

    protected void internalTriggerCompactionNonPartitionedTopic(AsyncResponse asyncResponse, boolean authoritative) {
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicOwnershipAsync(this.topicName, authoritative).thenCompose(__ -> this.validateTopicOperationAsync(this.topicName, TopicOperation.COMPACT))).thenCompose(__ -> this.getTopicReferenceAsync(this.topicName))).thenAccept(topic -> {
            try {
                ((PersistentTopic)topic).triggerCompaction();
                asyncResponse.resume(Response.noContent().build());
            }
            catch (BrokerServiceException.AlreadyRunningException e) {
                PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, new RestException(Response.Status.CONFLICT, e.getMessage()));
                return;
            }
            catch (Exception e) {
                log.error("[{}] Failed to trigger compaction on topic {}", new Object[]{this.clientAppId(), this.topicName, e});
                PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, new RestException(e));
                return;
            }
        })).exceptionally(ex -> {
            if (!PersistentTopicsBase.isRedirectException(ex)) {
                log.error("[{}] Failed to trigger compaction for {}", new Object[]{this.clientAppId(), this.topicName, ex});
            }
            PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, ex);
            return null;
        });
    }

    protected LongRunningProcessStatus internalCompactionStatus(boolean authoritative) {
        this.validateTopicOwnership(this.topicName, authoritative);
        this.validateTopicOperation(this.topicName, TopicOperation.COMPACT);
        PersistentTopic topic = (PersistentTopic)this.getTopicReference(this.topicName);
        return topic.compactionStatus();
    }

    protected void internalTriggerOffload(AsyncResponse asyncResponse, boolean authoritative, MessageIdImpl messageId) {
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicOwnershipAsync(this.topicName, authoritative).thenCompose(__ -> this.validateTopicOperationAsync(this.topicName, TopicOperation.OFFLOAD))).thenCompose(__ -> this.getTopicReferenceAsync(this.topicName))).thenAccept(topic -> {
            try {
                ((PersistentTopic)topic).triggerOffload(messageId);
                asyncResponse.resume(Response.noContent().build());
            }
            catch (BrokerServiceException.AlreadyRunningException e) {
                PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, new RestException(Response.Status.CONFLICT, e.getMessage()));
                return;
            }
            catch (Exception e) {
                log.warn("Unexpected error triggering offload", (Throwable)e);
                PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, new RestException(e));
                return;
            }
        })).exceptionally(ex -> {
            if (!PersistentTopicsBase.isRedirectException(ex)) {
                log.error("[{}] Failed to trigger offload for {}", new Object[]{this.clientAppId(), this.topicName, ex});
            }
            PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, ex);
            return null;
        });
    }

    protected void internalOffloadStatus(AsyncResponse asyncResponse, boolean authoritative) {
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicOwnershipAsync(this.topicName, authoritative).thenCompose(__ -> this.validateTopicOperationAsync(this.topicName, TopicOperation.OFFLOAD))).thenCompose(__ -> this.getTopicReferenceAsync(this.topicName))).thenAccept(topic -> {
            OffloadProcessStatus offloadProcessStatus = ((PersistentTopic)topic).offloadStatus();
            asyncResponse.resume(offloadProcessStatus);
        })).exceptionally(ex -> {
            if (!PersistentTopicsBase.isRedirectException(ex)) {
                log.error("[{}] Failed to offload status on topic {}", new Object[]{this.clientAppId(), this.topicName, ex});
            }
            PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, ex);
            return null;
        });
    }

    public static CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(PulsarService pulsar, String clientAppId, String originalPrincipal, AuthenticationDataSource authenticationData, TopicName topicName) {
        CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture<PartitionedTopicMetadata>();
        try {
            try {
                PersistentTopicsBase.checkAuthorization(pulsar, topicName, clientAppId, authenticationData);
            }
            catch (RestException e) {
                try {
                    PersistentTopicsBase.validateAdminAccessForTenant(pulsar, clientAppId, originalPrincipal, topicName.getTenant(), authenticationData);
                }
                catch (RestException authException) {
                    log.warn("Failed to authorize {} on topic {}", (Object)clientAppId, (Object)topicName);
                    throw new PulsarClientException(String.format("Authorization failed %s on topic %s with error %s", clientAppId, topicName, authException.getMessage()));
                }
            }
            catch (Exception ex2) {
                log.warn("Failed to authorize {} on topic {}", new Object[]{clientAppId, topicName, ex2});
                throw ex2;
            }
            ((CompletableFuture)((CompletableFuture)PersistentTopicsBase.checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject()).thenCompose(res -> pulsar.getBrokerService().fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName))).thenAccept(metadata -> {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Total number of partitions for topic {} is {}", new Object[]{clientAppId, topicName, metadata.partitions});
                }
                metadataFuture.complete((PartitionedTopicMetadata)metadata);
            })).exceptionally(ex -> {
                metadataFuture.completeExceptionally(ex.getCause());
                return null;
            });
        }
        catch (Exception ex3) {
            metadataFuture.completeExceptionally(ex3);
        }
        return metadataFuture;
    }

    public static CompletableFuture<PartitionedTopicMetadata> unsafeGetPartitionedTopicMetadataAsync(PulsarService pulsar, TopicName topicName) {
        CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture<PartitionedTopicMetadata>();
        ((CompletableFuture)((CompletableFuture)PersistentTopicsBase.checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject()).thenCompose(res -> pulsar.getBrokerService().fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName))).thenAccept(metadata -> {
            if (log.isDebugEnabled()) {
                log.debug("Total number of partitions for topic {} is {}", (Object)topicName, (Object)metadata.partitions);
            }
            metadataFuture.complete((PartitionedTopicMetadata)metadata);
        })).exceptionally(ex -> {
            metadataFuture.completeExceptionally(ex.getCause());
            return null;
        });
        return metadataFuture;
    }

    private Topic getTopicReference(TopicName topicName) {
        try {
            return this.pulsar().getBrokerService().getTopicIfExists(topicName.toString()).get(this.pulsar().getConfiguration().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS).orElseThrow(() -> this.topicNotFoundReason(topicName));
        }
        catch (RestException e) {
            throw e;
        }
        catch (Exception e) {
            if (e.getCause() instanceof BrokerServiceException.NotAllowedException) {
                throw new RestException(Response.Status.BAD_REQUEST, e.getCause());
            }
            throw new RestException(e.getCause() == null ? e : e.getCause());
        }
    }

    private CompletableFuture<Topic> getTopicReferenceAsync(TopicName topicName) {
        return this.pulsar().getBrokerService().getTopicIfExists(topicName.toString()).thenCompose(optTopic -> optTopic.map(CompletableFuture::completedFuture).orElseGet(() -> this.topicNotFoundReasonAsync(topicName)));
    }

    private RestException topicNotFoundReason(TopicName topicName) {
        if (!topicName.isPartitioned()) {
            return new RestException(Response.Status.NOT_FOUND, "Topic not found");
        }
        PartitionedTopicMetadata partitionedTopicMetadata = this.getPartitionedTopicMetadata(TopicName.get(topicName.getPartitionedTopicName()), false, false);
        if (partitionedTopicMetadata == null || partitionedTopicMetadata.partitions == 0) {
            String topicErrorType = partitionedTopicMetadata == null ? "has no metadata" : "has zero partitions";
            return new RestException(Response.Status.NOT_FOUND, String.format("Partitioned Topic not found: %s %s", topicName.toString(), topicErrorType));
        }
        if (!this.internalGetList(Optional.empty()).contains(topicName.toString())) {
            return new RestException(Response.Status.NOT_FOUND, "Topic partitions were not yet created");
        }
        return new RestException(Response.Status.NOT_FOUND, "Partitioned Topic not found");
    }

    private CompletableFuture<Topic> topicNotFoundReasonAsync(TopicName topicName) {
        if (!topicName.isPartitioned()) {
            return FutureUtil.failedFuture(new RestException(Response.Status.NOT_FOUND, "Topic not found"));
        }
        return this.getPartitionedTopicMetadataAsync(TopicName.get(topicName.getPartitionedTopicName()), false, false).thenApply(partitionedTopicMetadata -> {
            if (partitionedTopicMetadata == null || partitionedTopicMetadata.partitions == 0) {
                String topicErrorType = partitionedTopicMetadata == null ? "has no metadata" : "has zero partitions";
                throw new RestException(Response.Status.NOT_FOUND, String.format("Partitioned Topic not found: %s %s", topicName.toString(), topicErrorType));
            }
            if (!this.internalGetList(Optional.empty()).contains(topicName.toString())) {
                throw new RestException(Response.Status.NOT_FOUND, "Topic partitions were not yet created");
            }
            throw new RestException(Response.Status.NOT_FOUND, "Partitioned Topic not found");
        });
    }

    private Topic getOrCreateTopic(TopicName topicName, Map<String, String> properties) {
        return (Topic)((CompletableFuture)this.pulsar().getBrokerService().getTopic(topicName.toString(), true, properties).thenApply(Optional::get)).join();
    }

    private Subscription getSubscriptionReference(String subName, PersistentTopic topic) {
        try {
            Subscription sub = topic.getSubscription(subName);
            if (sub == null) {
                sub = topic.createSubscription(subName, CommandSubscribe.InitialPosition.Earliest, false).get();
            }
            return PersistentTopicsBase.checkNotNull(sub);
        }
        catch (Exception e) {
            throw new RestException(Response.Status.NOT_FOUND, "Subscription not found");
        }
    }

    private PersistentReplicator getReplicatorReference(String replName, PersistentTopic topic) {
        try {
            String remoteCluster = PersistentReplicator.getRemoteCluster(replName);
            PersistentReplicator repl = (PersistentReplicator)topic.getPersistentReplicator(remoteCluster);
            return PersistentTopicsBase.checkNotNull(repl);
        }
        catch (Exception e) {
            throw new RestException(Response.Status.NOT_FOUND, "Replicator not found");
        }
    }

    private CompletableFuture<Void> updatePartitionedTopic(TopicName topicName, int numPartitions, boolean force) {
        CompletableFuture<Void> result = new CompletableFuture<Void>();
        ((CompletableFuture)((CompletableFuture)this.createSubscriptions(topicName, numPartitions).thenCompose(__ -> {
            CompletableFuture<Void> future = this.namespaceResources().getPartitionedTopicResources().updatePartitionedTopicAsync(topicName, p -> new PartitionedTopicMetadata(numPartitions));
            future.exceptionally(ex -> {
                ((CompletableFuture)this.getPartitionedTopicMetadataAsync(topicName, false, false).thenAccept(metadata -> {
                    int oldPartition;
                    for (int i = oldPartition = metadata.partitions; i < numPartitions; ++i) {
                        this.topicResources().deletePersistentTopicAsync(topicName.getPartition(i)).exceptionally(ex1 -> {
                            log.warn("[{}] Failed to clean up managedLedger {}", new Object[]{this.clientAppId(), topicName, ex1.getCause()});
                            return null;
                        });
                    }
                })).exceptionally(e -> {
                    log.warn("[{}] Failed to clean up managedLedger", (Object)topicName, e);
                    return null;
                });
                return null;
            });
            return future;
        })).thenAccept(__ -> result.complete(null))).exceptionally(ex -> {
            if (force && ex.getCause() instanceof PulsarAdminException.ConflictException) {
                result.complete(null);
                return null;
            }
            result.completeExceptionally((Throwable)ex);
            return null;
        });
        return result;
    }

    private CompletableFuture<Void> createSubscriptions(TopicName topicName, int numPartitions) {
        CompletableFuture<Void> result = new CompletableFuture<Void>();
        ((CompletableFuture)this.pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName).thenAccept(partitionMetadata -> {
            PulsarAdmin admin;
            if (partitionMetadata.partitions < 1) {
                result.completeExceptionally(new RestException(Response.Status.CONFLICT, "Topic is not partitioned topic"));
                return;
            }
            if (partitionMetadata.partitions >= numPartitions) {
                result.completeExceptionally(new RestException(Response.Status.CONFLICT, "number of partitions must be more than existing " + partitionMetadata.partitions));
                return;
            }
            try {
                admin = this.pulsar().getAdminClient();
            }
            catch (PulsarServerException e1) {
                result.completeExceptionally(e1);
                return;
            }
            ((CompletableFuture)admin.topics().getStatsAsync(topicName.getPartition(0).toString()).thenAccept(stats -> {
                ArrayList subscriptionFutures = new ArrayList();
                stats.getSubscriptions().entrySet().forEach(e -> {
                    String subscription = (String)e.getKey();
                    SubscriptionStats ss = (SubscriptionStats)e.getValue();
                    if (!ss.isDurable()) {
                        return;
                    }
                    for (int i = partitionMetadata.partitions; i < numPartitions; ++i) {
                        String topicNamePartition = topicName.getPartition(i).toString();
                        subscriptionFutures.add(admin.topics().createSubscriptionAsync(topicNamePartition, subscription, MessageId.latest));
                    }
                });
                ((CompletableFuture)FutureUtil.waitForAll(subscriptionFutures).thenRun(() -> {
                    log.info("[{}] Successfully created new partitions {}", (Object)this.clientAppId(), (Object)topicName);
                    result.complete(null);
                })).exceptionally(ex -> {
                    log.warn("[{}] Failed to create subscriptions on new partitions for {}", new Object[]{this.clientAppId(), topicName, ex});
                    result.completeExceptionally((Throwable)ex);
                    return null;
                });
            })).exceptionally(ex -> {
                if (ex.getCause() instanceof PulsarAdminException.NotFoundException) {
                    result.complete(null);
                } else {
                    log.warn("[{}] Failed to get list of subscriptions of {}", new Object[]{this.clientAppId(), topicName.getPartition(0), ex});
                    result.completeExceptionally((Throwable)ex);
                }
                return null;
            });
        })).exceptionally(ex -> {
            log.warn("[{}] Failed to get partition metadata for {}", (Object)this.clientAppId(), (Object)topicName.toString());
            result.completeExceptionally((Throwable)ex);
            return null;
        });
        return result;
    }

    private void validateClientVersion() {
        if (!this.pulsar().getConfiguration().isClientLibraryVersionCheckEnabled()) {
            return;
        }
        String userAgent = this.httpRequest.getHeader("User-Agent");
        if (StringUtils.isBlank(userAgent)) {
            throw new RestException(Response.Status.METHOD_NOT_ALLOWED, "Client lib is not compatible to access partitioned metadata: version in user-agent is not present");
        }
        if (userAgent.contains(DEPRECATED_CLIENT_VERSION_PREFIX)) {
            try {
                String[] splits;
                String[] tokens = userAgent.split(DEPRECATED_CLIENT_VERSION_PREFIX);
                String[] stringArray = splits = tokens.length > 1 ? tokens[1].split("-")[0].trim().split("\\.") : null;
                if (splits != null && splits.length > 1 && (LEAST_SUPPORTED_CLIENT_VERSION_PREFIX.getMajorVersion() > Integer.parseInt(splits[0]) || LEAST_SUPPORTED_CLIENT_VERSION_PREFIX.getMinorVersion() > Integer.parseInt(splits[1]))) {
                    throw new RestException(Response.Status.METHOD_NOT_ALLOWED, "Client lib is not compatible to access partitioned metadata: version " + userAgent + " is not supported");
                }
            }
            catch (RestException re) {
                throw re;
            }
            catch (Exception e) {
                log.warn("[{}] Failed to parse version {} ", (Object)this.clientAppId(), (Object)userAgent);
            }
        }
    }

    private void validatePartitionTopicUpdate(String topicName, int numberOfPartition) {
        List<String> existingTopicList = this.internalGetList(Optional.empty());
        TopicName partitionTopicName = TopicName.get(this.domain(), this.namespaceName, topicName);
        PartitionedTopicMetadata metadata = this.getPartitionedTopicMetadata(partitionTopicName, false, false);
        int oldPartition = metadata.partitions;
        String prefix = partitionTopicName.getPartitionedTopicName() + "-partition-";
        for (String exsitingTopicName : existingTopicList) {
            if (!exsitingTopicName.startsWith(prefix)) continue;
            try {
                long suffix = Long.parseLong(exsitingTopicName.substring(exsitingTopicName.indexOf("-partition-") + "-partition-".length()));
                if (suffix < (long)oldPartition || suffix > (long)numberOfPartition) continue;
                log.warn("[{}] Already have non partition topic {} which contains partition suffix '-partition-' and end with numeric value smaller than the new number of partition. Update of partitioned topic {} could cause conflict.", new Object[]{this.clientAppId(), exsitingTopicName, topicName});
                throw new RestException(Response.Status.PRECONDITION_FAILED, "Already have non partition topic " + exsitingTopicName + " which contains partition suffix '-partition-' and end with numeric value and end with numeric value smaller than the new number of partition. Update of partitioned topic " + topicName + " could cause conflict.");
            }
            catch (NumberFormatException numberFormatException) {
            }
        }
    }

    private void validateNonPartitionTopicName(String topicName) {
        if (topicName.contains("-partition-")) {
            try {
                int partitionIndex = topicName.indexOf("-partition-");
                long suffix = Long.parseLong(topicName.substring(partitionIndex + "-partition-".length()));
                TopicName partitionTopicName = TopicName.get(this.domain(), this.namespaceName, topicName.substring(0, partitionIndex));
                PartitionedTopicMetadata metadata = this.getPartitionedTopicMetadata(partitionTopicName, false, false);
                if (metadata.partitions > 0 && suffix >= (long)metadata.partitions) {
                    log.warn("[{}] Can't create topic {} with \"-partition-\" followed by a number smaller then number of partition of partitioned topic {}.", new Object[]{this.clientAppId(), topicName, partitionTopicName.getLocalName()});
                    throw new RestException(Response.Status.PRECONDITION_FAILED, "Can't create topic " + topicName + " with \"-partition-\" followed by a number smaller then number of partition of partitioned topic " + partitionTopicName.getLocalName());
                }
                if (metadata.partitions == 0) {
                    log.warn("[{}] Can't create topic {} with \"-partition-\" followed by numeric value if there isn't a partitioned topic {} created.", new Object[]{this.clientAppId(), topicName, partitionTopicName.getLocalName()});
                    throw new RestException(Response.Status.PRECONDITION_FAILED, "Can't create topic " + topicName + " with \"-partition-\" followed by numeric value if there isn't a partitioned topic " + partitionTopicName.getLocalName() + " created.");
                }
            }
            catch (NumberFormatException numberFormatException) {
                // empty catch block
            }
        }
    }

    protected void internalGetLastMessageId(AsyncResponse asyncResponse, boolean authoritative) {
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateTopicOwnershipAsync(this.topicName, authoritative).thenCompose(__ -> this.validateTopicOperationAsync(this.topicName, TopicOperation.PEEK_MESSAGES))).thenCompose(__ -> this.getTopicReferenceAsync(this.topicName))).thenAccept(topic -> {
            if (topic == null) {
                asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Topic not found"));
                return;
            }
            if (!(topic instanceof PersistentTopic)) {
                log.error("[{}] Not supported operation of non-persistent topic {}", (Object)this.clientAppId(), (Object)this.topicName);
                asyncResponse.resume(new RestException(Response.Status.METHOD_NOT_ALLOWED, "GetLastMessageId on a non-persistent topic is not allowed"));
                return;
            }
            topic.getLastMessageId().whenComplete((v, e) -> {
                if (e != null) {
                    asyncResponse.resume(new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage()));
                } else {
                    asyncResponse.resume(v);
                }
            });
        })).exceptionally(ex -> {
            if (!PersistentTopicsBase.isRedirectException(ex)) {
                log.error("[{}] Failed to get last messageId {}", new Object[]{this.clientAppId(), this.topicName, ex});
            }
            PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, ex);
            return null;
        });
    }

    protected CompletableFuture<DispatchRateImpl> internalGetDispatchRate(boolean applied, boolean isGlobal) {
        return this.getTopicPoliciesAsyncWithRetry(this.topicName, isGlobal).thenApply(op -> op.map(TopicPolicies::getDispatchRate).orElseGet(() -> {
            if (applied) {
                DispatchRateImpl namespacePolicy = this.getNamespacePolicies((NamespaceName)this.namespaceName).topicDispatchRate.get(this.pulsar().getConfiguration().getClusterName());
                return namespacePolicy == null ? this.dispatchRate() : namespacePolicy;
            }
            return null;
        }));
    }

    protected CompletableFuture<Void> internalSetDispatchRate(DispatchRateImpl dispatchRate, boolean isGlobal) {
        if (dispatchRate == null) {
            return CompletableFuture.completedFuture(null);
        }
        return this.getTopicPoliciesAsyncWithRetry(this.topicName, isGlobal).thenCompose(op -> {
            TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
            topicPolicies.setDispatchRate(dispatchRate);
            topicPolicies.setIsGlobal(isGlobal);
            return this.pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(this.topicName, topicPolicies);
        });
    }

    protected CompletableFuture<Void> internalRemoveDispatchRate(boolean isGlobal) {
        return this.getTopicPoliciesAsyncWithRetry(this.topicName, isGlobal).thenCompose(op -> {
            if (!op.isPresent()) {
                return CompletableFuture.completedFuture(null);
            }
            TopicPolicies topicPolicies = (TopicPolicies)op.get();
            topicPolicies.setDispatchRate(null);
            topicPolicies.setIsGlobal(isGlobal);
            return this.pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(this.topicName, (TopicPolicies)op.get());
        });
    }

    protected CompletableFuture<DispatchRate> internalGetSubscriptionDispatchRate(boolean applied, boolean isGlobal) {
        return this.getTopicPoliciesAsyncWithRetry(this.topicName, isGlobal).thenApply(op -> op.map(TopicPolicies::getSubscriptionDispatchRate).orElseGet(() -> {
            if (applied) {
                DispatchRateImpl namespacePolicy = this.getNamespacePolicies((NamespaceName)this.namespaceName).subscriptionDispatchRate.get(this.pulsar().getConfiguration().getClusterName());
                return namespacePolicy == null ? this.subscriptionDispatchRate() : namespacePolicy;
            }
            return null;
        }));
    }

    protected CompletableFuture<Void> internalSetSubscriptionDispatchRate(DispatchRateImpl dispatchRate, boolean isGlobal) {
        if (dispatchRate == null) {
            return CompletableFuture.completedFuture(null);
        }
        return this.getTopicPoliciesAsyncWithRetry(this.topicName, isGlobal).thenCompose(op -> {
            TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
            topicPolicies.setSubscriptionDispatchRate(dispatchRate);
            topicPolicies.setIsGlobal(isGlobal);
            return this.pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(this.topicName, topicPolicies);
        });
    }

    protected CompletableFuture<Void> internalRemoveSubscriptionDispatchRate(boolean isGlobal) {
        return this.getTopicPoliciesAsyncWithRetry(this.topicName, isGlobal).thenCompose(op -> {
            if (!op.isPresent()) {
                return CompletableFuture.completedFuture(null);
            }
            TopicPolicies topicPolicies = (TopicPolicies)op.get();
            topicPolicies.setSubscriptionDispatchRate(null);
            topicPolicies.setIsGlobal(isGlobal);
            return this.pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(this.topicName, (TopicPolicies)op.get());
        });
    }

    protected CompletableFuture<Optional<Integer>> internalGetMaxConsumersPerSubscription(boolean isGlobal) {
        return this.getTopicPoliciesAsyncWithRetry(this.topicName, isGlobal).thenApply(op -> op.map(TopicPolicies::getMaxConsumersPerSubscription));
    }

    protected CompletableFuture<Void> internalSetMaxConsumersPerSubscription(Integer maxConsumersPerSubscription, boolean isGlobal) {
        if (maxConsumersPerSubscription != null && maxConsumersPerSubscription < 0) {
            throw new RestException(Response.Status.PRECONDITION_FAILED, "Invalid value for maxConsumersPerSubscription");
        }
        return this.getTopicPoliciesAsyncWithRetry(this.topicName, isGlobal).thenCompose(op -> {
            TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
            topicPolicies.setMaxConsumersPerSubscription(maxConsumersPerSubscription);
            topicPolicies.setIsGlobal(isGlobal);
            return this.pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(this.topicName, topicPolicies);
        });
    }

    protected CompletableFuture<Void> internalRemoveMaxConsumersPerSubscription(boolean isGlobal) {
        return this.getTopicPoliciesAsyncWithRetry(this.topicName, isGlobal).thenCompose(op -> {
            if (!op.isPresent()) {
                return CompletableFuture.completedFuture(null);
            }
            ((TopicPolicies)op.get()).setMaxConsumersPerSubscription(null);
            ((TopicPolicies)op.get()).setIsGlobal(isGlobal);
            return this.pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(this.topicName, (TopicPolicies)op.get());
        });
    }

    protected CompletableFuture<Long> internalGetCompactionThreshold(boolean applied, boolean isGlobal) {
        return this.getTopicPoliciesAsyncWithRetry(this.topicName, isGlobal).thenApply(op -> op.map(TopicPolicies::getCompactionThreshold).orElseGet(() -> {
            if (applied) {
                Long namespacePolicy = this.getNamespacePolicies((NamespaceName)this.namespaceName).compaction_threshold;
                return namespacePolicy == null ? this.pulsar().getConfiguration().getBrokerServiceCompactionThresholdInBytes() : namespacePolicy.longValue();
            }
            return null;
        }));
    }

    protected CompletableFuture<Void> internalSetCompactionThreshold(Long compactionThreshold, boolean isGlobal) {
        if (compactionThreshold != null && compactionThreshold < 0L) {
            throw new RestException(Response.Status.PRECONDITION_FAILED, "Invalid value for compactionThreshold");
        }
        return this.getTopicPoliciesAsyncWithRetry(this.topicName, isGlobal).thenCompose(op -> {
            TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
            topicPolicies.setCompactionThreshold(compactionThreshold);
            topicPolicies.setIsGlobal(isGlobal);
            return this.pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(this.topicName, topicPolicies);
        });
    }

    protected CompletableFuture<Void> internalRemoveCompactionThreshold(boolean isGlobal) {
        return this.getTopicPoliciesAsyncWithRetry(this.topicName, isGlobal).thenCompose(op -> {
            if (!op.isPresent()) {
                return CompletableFuture.completedFuture(null);
            }
            TopicPolicies topicPolicies = (TopicPolicies)op.get();
            topicPolicies.setCompactionThreshold(null);
            topicPolicies.setIsGlobal(isGlobal);
            return this.pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(this.topicName, (TopicPolicies)op.get());
        });
    }

    protected CompletableFuture<Optional<PublishRate>> internalGetPublishRate(boolean isGlobal) {
        return this.getTopicPoliciesAsyncWithRetry(this.topicName, isGlobal).thenApply(op -> op.map(TopicPolicies::getPublishRate));
    }

    protected CompletableFuture<Void> internalSetPublishRate(PublishRate publishRate, boolean isGlobal) {
        if (publishRate == null) {
            return CompletableFuture.completedFuture(null);
        }
        return this.getTopicPoliciesAsyncWithRetry(this.topicName, isGlobal).thenCompose(op -> {
            TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
            topicPolicies.setPublishRate(publishRate);
            topicPolicies.setIsGlobal(isGlobal);
            return this.pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(this.topicName, topicPolicies);
        });
    }

    protected CompletableFuture<Optional<List<CommandSubscribe.SubType>>> internalGetSubscriptionTypesEnabled(boolean isGlobal) {
        return this.getTopicPoliciesAsyncWithRetry(this.topicName, isGlobal).thenApply(op -> op.map(TopicPolicies::getSubscriptionTypesEnabled));
    }

    protected CompletableFuture<Void> internalSetSubscriptionTypesEnabled(Set<SubscriptionType> subscriptionTypesEnabled, boolean isGlobal) {
        ArrayList subTypes = Lists.newArrayList();
        subscriptionTypesEnabled.forEach(subscriptionType -> subTypes.add(CommandSubscribe.SubType.valueOf(subscriptionType.name())));
        return this.getTopicPoliciesAsyncWithRetry(this.topicName, isGlobal).thenCompose(op -> {
            TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
            topicPolicies.setSubscriptionTypesEnabled(subTypes);
            topicPolicies.setIsGlobal(isGlobal);
            return this.pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(this.topicName, topicPolicies);
        });
    }

    protected CompletableFuture<Void> internalRemoveSubscriptionTypesEnabled(boolean isGlobal) {
        return this.getTopicPoliciesAsyncWithRetry(this.topicName, isGlobal).thenCompose(op -> {
            if (!op.isPresent()) {
                return CompletableFuture.completedFuture(null);
            }
            ((TopicPolicies)op.get()).setSubscriptionTypesEnabled(Lists.newArrayList());
            ((TopicPolicies)op.get()).setIsGlobal(isGlobal);
            return this.pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(this.topicName, (TopicPolicies)op.get());
        });
    }

    protected CompletableFuture<Void> internalRemovePublishRate(boolean isGlobal) {
        return this.getTopicPoliciesAsyncWithRetry(this.topicName, isGlobal).thenCompose(op -> {
            if (!op.isPresent()) {
                return CompletableFuture.completedFuture(null);
            }
            ((TopicPolicies)op.get()).setPublishRate(null);
            ((TopicPolicies)op.get()).setIsGlobal(isGlobal);
            return this.pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(this.topicName, (TopicPolicies)op.get());
        });
    }

    protected CompletableFuture<SubscribeRate> internalGetSubscribeRate(boolean applied, boolean isGlobal) {
        return this.getTopicPoliciesAsyncWithRetry(this.topicName, isGlobal).thenApply(op -> op.map(TopicPolicies::getSubscribeRate).orElseGet(() -> {
            if (applied) {
                SubscribeRate namespacePolicy = this.getNamespacePolicies((NamespaceName)this.namespaceName).clusterSubscribeRate.get(this.pulsar().getConfiguration().getClusterName());
                return namespacePolicy == null ? this.subscribeRate() : namespacePolicy;
            }
            return null;
        }));
    }

    protected CompletableFuture<Void> internalSetSubscribeRate(SubscribeRate subscribeRate, boolean isGlobal) {
        if (subscribeRate == null) {
            return CompletableFuture.completedFuture(null);
        }
        return this.getTopicPoliciesAsyncWithRetry(this.topicName, isGlobal).thenCompose(op -> {
            TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
            topicPolicies.setSubscribeRate(subscribeRate);
            topicPolicies.setIsGlobal(isGlobal);
            return this.pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(this.topicName, topicPolicies);
        });
    }

    protected CompletableFuture<Void> internalRemoveSubscribeRate(boolean isGlobal) {
        return this.getTopicPoliciesAsyncWithRetry(this.topicName, isGlobal).thenCompose(op -> {
            if (!op.isPresent()) {
                return CompletableFuture.completedFuture(null);
            }
            ((TopicPolicies)op.get()).setSubscribeRate(null);
            ((TopicPolicies)op.get()).setIsGlobal(isGlobal);
            return this.pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(this.topicName, (TopicPolicies)op.get());
        });
    }

    protected void handleTopicPolicyException(String methodName, Throwable thr, AsyncResponse asyncResponse) {
        Throwable cause = thr.getCause();
        if (!(cause instanceof WebApplicationException) || ((WebApplicationException)cause).getResponse().getStatus() != 307) {
            log.error("[{}] Failed to perform {} on topic {}", new Object[]{this.clientAppId(), methodName, this.topicName, cause});
        }
        PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, cause);
    }

    protected void internalTruncateNonPartitionedTopic(AsyncResponse asyncResponse, boolean authoritative) {
        Topic topic;
        try {
            this.validateAdminAccessForTenant(this.topicName.getTenant());
            this.validateTopicOwnership(this.topicName, authoritative);
            topic = this.getTopicReference(this.topicName);
        }
        catch (Exception e2) {
            log.error("[{}] Failed to truncate topic {}", new Object[]{this.clientAppId(), this.topicName, e2});
            PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, e2);
            return;
        }
        CompletableFuture<Void> future = topic.truncate();
        ((CompletableFuture)future.thenAccept(a -> asyncResponse.resume(new RestException(Response.Status.NO_CONTENT.getStatusCode(), Response.Status.NO_CONTENT.getReasonPhrase())))).exceptionally(e -> {
            asyncResponse.resume((Throwable)e);
            return null;
        });
    }

    protected void internalTruncateTopic(AsyncResponse asyncResponse, boolean authoritative) {
        if (this.topicName.isPartitioned()) {
            this.internalTruncateNonPartitionedTopic(asyncResponse, authoritative);
        } else {
            ((CompletableFuture)this.getPartitionedTopicMetadataAsync(this.topicName, authoritative, false).whenComplete((meta, t) -> {
                if (meta.partitions > 0) {
                    ArrayList<CompletableFuture<Void>> futures = Lists.newArrayList();
                    for (int i = 0; i < meta.partitions; ++i) {
                        TopicName topicNamePartition = this.topicName.getPartition(i);
                        try {
                            futures.add(this.pulsar().getAdminClient().topics().truncateAsync(topicNamePartition.toString()));
                            continue;
                        }
                        catch (Exception e) {
                            log.error("[{}] Failed to truncate topic {}", new Object[]{this.clientAppId(), topicNamePartition, e});
                            asyncResponse.resume(new RestException(e));
                            return;
                        }
                    }
                    FutureUtil.waitForAll(futures).handle((result, exception) -> {
                        if (exception != null) {
                            Throwable th = exception.getCause();
                            if (th instanceof PulsarAdminException.NotFoundException) {
                                asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, th.getMessage()));
                            } else if (th instanceof WebApplicationException) {
                                asyncResponse.resume(th);
                            } else {
                                log.error("[{}] Failed to truncate topic {}", new Object[]{this.clientAppId(), this.topicName, exception});
                                asyncResponse.resume(new RestException((Throwable)exception));
                            }
                        } else {
                            asyncResponse.resume(Response.noContent().build());
                        }
                        return null;
                    });
                } else {
                    this.internalTruncateNonPartitionedTopic(asyncResponse, authoritative);
                }
            })).exceptionally(ex -> {
                if (!PersistentTopicsBase.isRedirectException(ex)) {
                    log.error("[{}] Failed to truncate topic {}", new Object[]{this.clientAppId(), this.topicName, ex});
                }
                PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, ex);
                return null;
            });
        }
    }

    protected void internalSetReplicatedSubscriptionStatus(AsyncResponse asyncResponse, String subName, boolean authoritative, boolean enabled) {
        log.info("[{}] Attempting to change replicated subscription status to {} - {} {}", new Object[]{this.clientAppId(), enabled, this.topicName, subName});
        if (!this.topicName.isPersistent()) {
            asyncResponse.resume(new RestException(Response.Status.METHOD_NOT_ALLOWED, "Cannot enable/disable replicated subscriptions on non-persistent topics"));
            return;
        }
        if (!this.topicName.isGlobal()) {
            asyncResponse.resume(new RestException(Response.Status.METHOD_NOT_ALLOWED, "Cannot enable/disable replicated subscriptions on non-global topics"));
            return;
        }
        CompletionStage validateFuture = this.validateTopicOperationAsync(this.topicName, TopicOperation.SET_REPLICATED_SUBSCRIPTION_STATUS, subName).thenCompose(__ -> this.validateGlobalNamespaceOwnershipAsync(this.namespaceName));
        CompletionStage resultFuture = this.topicName.isPartitioned() ? ((CompletableFuture)validateFuture).thenAccept(__ -> this.internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(asyncResponse, subName, authoritative, enabled)) : ((CompletableFuture)((CompletableFuture)validateFuture).thenCompose(__ -> this.getPartitionedTopicMetadataAsync(this.topicName, authoritative, false))).thenAccept(partitionMetadata -> {
            if (partitionMetadata.partitions > 0) {
                ArrayList<CompletableFuture<Void>> futures = Lists.newArrayList();
                for (int i = 0; i < partitionMetadata.partitions; ++i) {
                    TopicName topicNamePartition = this.topicName.getPartition(i);
                    try {
                        futures.add(this.pulsar().getAdminClient().topics().setReplicatedSubscriptionStatusAsync(topicNamePartition.toString(), subName, enabled));
                        continue;
                    }
                    catch (Exception e) {
                        log.warn("[{}] Failed to change replicated subscription status to {} - {} {}", new Object[]{this.clientAppId(), enabled, topicNamePartition, subName, e});
                        PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, e);
                        return;
                    }
                }
                FutureUtil.waitForAll(futures).handle((result, exception) -> {
                    if (exception != null) {
                        Throwable t = exception.getCause();
                        if (t instanceof PulsarAdminException.NotFoundException) {
                            asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Topic or subscription not found"));
                            return null;
                        }
                        if (t instanceof PulsarAdminException.PreconditionFailedException) {
                            asyncResponse.resume(new RestException(Response.Status.PRECONDITION_FAILED, "Cannot enable/disable replicated subscriptions on non-global topics"));
                            return null;
                        }
                        log.warn("[{}] Failed to change replicated subscription status to {} - {} {}", new Object[]{this.clientAppId(), enabled, this.topicName, subName, t});
                        asyncResponse.resume(new RestException(t));
                        return null;
                    }
                    asyncResponse.resume(Response.noContent().build());
                    return null;
                });
            } else {
                this.internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(asyncResponse, subName, authoritative, enabled);
            }
        });
        ((CompletableFuture)resultFuture).exceptionally(ex -> {
            if (!PersistentTopicsBase.isRedirectException(ex)) {
                log.warn("[{}] Failed to change replicated subscription status to {} - {} {}", new Object[]{this.clientAppId(), enabled, this.topicName, subName, ex});
            }
            PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, ex);
            return null;
        });
    }

    private void internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(AsyncResponse asyncResponse, String subName, boolean authoritative, boolean enabled) {
        ((CompletableFuture)((CompletableFuture)this.validateTopicOwnershipAsync(this.topicName, authoritative).thenCompose(__ -> this.getTopicReferenceAsync(this.topicName))).thenAccept(topic -> {
            if (topic == null) {
                asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Topic not found"));
                return;
            }
            Subscription sub = topic.getSubscription(subName);
            if (sub == null) {
                asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Subscription not found"));
                return;
            }
            if (topic instanceof PersistentTopic && sub instanceof PersistentSubscription) {
                if (!((PersistentSubscription)sub).setReplicated(enabled)) {
                    asyncResponse.resume(new RestException(Response.Status.INTERNAL_SERVER_ERROR, "Failed to update cursor properties"));
                    return;
                }
                ((PersistentTopic)topic).checkReplicatedSubscriptionControllerState();
                log.info("[{}] Changed replicated subscription status to {} - {} {}", new Object[]{this.clientAppId(), enabled, this.topicName, subName});
                asyncResponse.resume(Response.noContent().build());
            } else {
                asyncResponse.resume(new RestException(Response.Status.METHOD_NOT_ALLOWED, "Cannot enable/disable replicated subscriptions on non-persistent topics"));
            }
        })).exceptionally(ex -> {
            if (!PersistentTopicsBase.isRedirectException(ex)) {
                log.error("[{}] Failed to set replicated subscription status on {} {}", new Object[]{this.clientAppId(), this.topicName, subName, ex});
            }
            PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, ex);
            return null;
        });
    }

    protected void internalGetReplicatedSubscriptionStatus(AsyncResponse asyncResponse, String subName, boolean authoritative) {
        log.info("[{}] Attempting to get replicated subscription status on {} {}", new Object[]{this.clientAppId(), this.topicName, subName});
        if (!this.topicName.isPersistent()) {
            asyncResponse.resume(new RestException(Response.Status.METHOD_NOT_ALLOWED, "Cannot get replicated subscriptions on non-persistent topics"));
            return;
        }
        if (!this.topicName.isGlobal()) {
            asyncResponse.resume(new RestException(Response.Status.METHOD_NOT_ALLOWED, "Cannot get replicated subscriptions on non-global topics"));
            return;
        }
        try {
            this.validateTopicOperation(this.topicName, TopicOperation.GET_REPLICATED_SUBSCRIPTION_STATUS, subName);
        }
        catch (Exception e) {
            PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, e);
            return;
        }
        if (this.topicName.isPartitioned()) {
            this.internalGetReplicatedSubscriptionStatusForNonPartitionedTopic(asyncResponse, subName, authoritative);
        } else {
            ((CompletableFuture)this.getPartitionedTopicMetadataAsync(this.topicName, authoritative, false).thenAccept(partitionMetadata -> {
                if (partitionMetadata.partitions > 0) {
                    ArrayList<CompletionStage> futures = Lists.newArrayList();
                    HashMap status = Maps.newHashMap();
                    for (int i = 0; i < partitionMetadata.partitions; ++i) {
                        TopicName partition = this.topicName.getPartition(i);
                        try {
                            futures.add(this.pulsar().getAdminClient().topics().getReplicatedSubscriptionStatusAsync(partition.toString(), subName).whenComplete((response, throwable) -> {
                                if (throwable != null) {
                                    log.error("[{}] Failed to get replicated subscriptions on {} {}", new Object[]{this.clientAppId(), partition, subName, throwable});
                                    asyncResponse.resume(new RestException((Throwable)throwable));
                                }
                                status.putAll(response);
                            }));
                            continue;
                        }
                        catch (Exception e) {
                            log.warn("[{}] Failed to get replicated subscription status on {} {}", new Object[]{this.clientAppId(), partition, subName, e});
                            throw new RestException(e);
                        }
                    }
                    FutureUtil.waitForAll(futures).handle((result, exception) -> {
                        if (exception != null) {
                            Throwable t = exception.getCause();
                            if (t instanceof PulsarAdminException.NotFoundException) {
                                asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Topic or subscription not found"));
                            } else if (t instanceof PulsarAdminException.PreconditionFailedException) {
                                asyncResponse.resume(new RestException(Response.Status.PRECONDITION_FAILED, "Cannot get replicated subscriptions on non-global topics"));
                            } else {
                                log.error("[{}] Failed to get replicated subscription status on {} {}", new Object[]{this.clientAppId(), this.topicName, subName, t});
                                asyncResponse.resume(new RestException(t));
                            }
                        }
                        asyncResponse.resume(status);
                        return null;
                    });
                } else {
                    this.internalGetReplicatedSubscriptionStatusForNonPartitionedTopic(asyncResponse, subName, authoritative);
                }
            })).exceptionally(ex -> {
                if (!PersistentTopicsBase.isRedirectException(ex)) {
                    log.error("[{}] Failed to get replicated subscription status on {} {}", new Object[]{this.clientAppId(), this.topicName, subName, ex});
                }
                PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, ex);
                return null;
            });
        }
    }

    private void internalGetReplicatedSubscriptionStatusForNonPartitionedTopic(AsyncResponse asyncResponse, String subName, boolean authoritative) {
        try {
            this.validateTopicOwnership(this.topicName, authoritative);
            Topic topic = this.getTopicReference(this.topicName);
            if (topic == null) {
                asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Topic not found"));
                return;
            }
            Subscription sub = topic.getSubscription(subName);
            if (sub == null) {
                asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Subscription not found"));
                return;
            }
            if (topic instanceof PersistentTopic && sub instanceof PersistentSubscription) {
                HashMap<String, Boolean> res = Maps.newHashMap();
                res.put(this.topicName.toString(), sub.isReplicated());
                asyncResponse.resume(res);
            } else {
                asyncResponse.resume(new RestException(Response.Status.METHOD_NOT_ALLOWED, "Cannot get replicated subscriptions on non-persistent topics"));
            }
        }
        catch (Exception e) {
            log.error("[{}] Failed to get replicated subscription status on {} {}", new Object[]{this.clientAppId(), this.topicName, subName, e});
            PersistentTopicsBase.resumeAsyncResponseExceptionally(asyncResponse, e);
        }
    }

    protected CompletableFuture<SchemaCompatibilityStrategy> internalGetSchemaCompatibilityStrategy(boolean applied) {
        if (applied) {
            return this.getSchemaCompatibilityStrategyAsync();
        }
        return this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.READ).thenCompose(n -> this.getTopicPoliciesAsyncWithRetry(this.topicName).thenApply(op -> {
            if (!op.isPresent()) {
                return null;
            }
            SchemaCompatibilityStrategy strategy = ((TopicPolicies)op.get()).getSchemaCompatibilityStrategy();
            return SchemaCompatibilityStrategy.isUndefined(strategy) ? null : strategy;
        }));
    }

    protected CompletableFuture<Void> internalSetSchemaCompatibilityStrategy(SchemaCompatibilityStrategy strategy) {
        return this.validateTopicPolicyOperationAsync(this.topicName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.WRITE).thenCompose(__ -> this.getTopicPoliciesAsyncWithRetry(this.topicName).thenCompose(op -> {
            TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
            topicPolicies.setSchemaCompatibilityStrategy(strategy == SchemaCompatibilityStrategy.UNDEFINED ? null : strategy);
            return this.pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(this.topicName, topicPolicies);
        }));
    }
}

