package org.apache.pulsar.broker.admin.impl;

import com.github.zafarkhaja.semver.Version;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerOfflineBacklog;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.admin.ZkAdminPaths;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.OffloadProcessStatus;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.AuthPolicies;
import org.apache.pulsar.common.policies.data.PartitionedTopicInternalStats;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.class */
public class PersistentTopicsBase extends AdminResource {
    protected static final int PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS = 1000;
    private static final int OFFLINE_TOPIC_STAT_TTL_MINS = 10;
    private static final String DEPRECATED_CLIENT_VERSION_PREFIX = "Pulsar-CPP-v";
    private static final Logger log = LoggerFactory.getLogger(PersistentTopicsBase.class);
    private static final Version LEAST_SUPPORTED_CLIENT_VERSION_PREFIX = Version.forIntegers(1, 21);

    /* JADX INFO: Access modifiers changed from: protected */
    public List<String> internalGetList() {
        validateAdminAccessForTenant(this.namespaceName.getTenant());
        try {
            policiesCache().get(path(ZkAdminPaths.POLICIES, this.namespaceName.toString()));
            ArrayList newArrayList = Lists.newArrayList();
            try {
                for (String str : managedLedgerListCache().get(String.format("/managed-ledgers/%s/%s", this.namespaceName.toString(), domain()))) {
                    if (domain().equals(TopicDomain.persistent.toString())) {
                        newArrayList.add(TopicName.get(domain(), this.namespaceName, Codec.decode(str)).toString());
                    }
                }
            } catch (KeeperException.NoNodeException e) {
            } catch (Exception e2) {
                log.error("[{}] Failed to get topics list for namespace {}", new Object[]{clientAppId(), this.namespaceName, e2});
                throw new RestException(e2);
            }
            newArrayList.sort(null);
            return newArrayList;
        } catch (Exception e3) {
            log.error("[{}] Failed to get topic list {}", new Object[]{clientAppId(), this.namespaceName, e3});
            throw new RestException(e3);
        } catch (KeeperException.NoNodeException e4) {
            log.warn("[{}] Failed to get topic list {}: Namespace does not exist", clientAppId(), this.namespaceName);
            throw new RestException(Response.Status.NOT_FOUND, "Namespace does not exist");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public List<String> internalGetPartitionedTopicList() {
        validateAdminAccessForTenant(this.namespaceName.getTenant());
        try {
            policiesCache().get(path(ZkAdminPaths.POLICIES, this.namespaceName.toString()));
            return getPartitionedTopicList(TopicDomain.getEnum(domain()));
        } catch (Exception e) {
            log.error("[{}] Failed to get partitioned topic list for namespace {}", new Object[]{clientAppId(), this.namespaceName, e});
            throw new RestException(e);
        } catch (KeeperException.NoNodeException e2) {
            log.warn("[{}] Failed to get partitioned topic list {}: Namespace does not exist", clientAppId(), this.namespaceName);
            throw new RestException(Response.Status.NOT_FOUND, "Namespace does not exist");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Map<String, Set<AuthAction>> internalGetPermissionsOnTopic() {
        validateAdminAccessForTenant(this.namespaceName.getTenant());
        String topicName = this.topicName.toString();
        try {
            Policies policies = (Policies) policiesCache().get(path(ZkAdminPaths.POLICIES, this.namespaceName.toString())).orElseThrow(() -> {
                return new RestException(Response.Status.NOT_FOUND, "Namespace does not exist");
            });
            TreeMap newTreeMap = Maps.newTreeMap();
            AuthPolicies authPolicies = policies.auth_policies;
            for (String str : authPolicies.namespace_auth.keySet()) {
                newTreeMap.put(str, authPolicies.namespace_auth.get(str));
            }
            if (authPolicies.destination_auth.containsKey(topicName)) {
                for (Map.Entry entry : ((Map) authPolicies.destination_auth.get(topicName)).entrySet()) {
                    String str2 = (String) entry.getKey();
                    Set set = (Set) entry.getValue();
                    if (newTreeMap.containsKey(str2)) {
                        newTreeMap.put(str2, Sets.union((Set) newTreeMap.get(str2), set));
                    } else {
                        newTreeMap.put(str2, set);
                    }
                }
            }
            return newTreeMap;
        } catch (Exception e) {
            log.error("[{}] Failed to get permissions for topic {}", new Object[]{clientAppId(), topicName, e});
            throw new RestException(e);
        }
    }

    protected void validateAdminAndClientPermission() {
        try {
            validateAdminAccessForTenant(this.topicName.getTenant());
        } catch (Exception e) {
            try {
                checkAuthorization(pulsar(), this.topicName, clientAppId(), clientAuthData());
            } catch (Exception e2) {
                log.warn("Unexpected error while authorizing request. topic={}, role={}. Error: {}", new Object[]{this.topicName, clientAppId(), e2.getMessage(), e2});
                throw new RestException(e2);
            } catch (RestException e3) {
                throw e3;
            }
        }
    }

    public void validateAdminOperationOnTopic(boolean z) {
        validateAdminAccessForTenant(this.topicName.getTenant());
        validateTopicOwnership(this.topicName, z);
    }

    protected void validateAdminAccessForSubscriber(String str, boolean z) {
        validateTopicOwnership(this.topicName, z);
        try {
            validateAdminAccessForTenant(this.topicName.getTenant());
        } catch (Exception e) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] failed to validate admin access for {}", this.topicName, clientAppId());
            }
            validateAdminAccessForSubscriber(str);
        }
    }

    private void validateAdminAccessForSubscriber(String str) {
        try {
            if (pulsar().getBrokerService().getAuthorizationService().canConsume(this.topicName, clientAppId(), clientAuthData(), str)) {
                return;
            }
            log.warn("[{}} Subscriber {} is not authorized to access api", this.topicName, clientAppId());
            throw new RestException(Response.Status.UNAUTHORIZED, String.format("Subscriber %s is not authorized to access this operation", clientAppId()));
        } catch (Exception e) {
            log.warn("Unexpected error while authorizing request. topic={}, role={}. Error: {}", new Object[]{this.topicName, clientAppId(), e.getMessage(), e});
            throw new RestException(e);
        } catch (RestException e2) {
            throw e2;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void internalGrantPermissionsOnTopic(String str, Set<AuthAction> set) {
        validateAdminAccessForTenant(this.namespaceName.getTenant());
        validatePoliciesReadOnlyAccess();
        String topicName = this.topicName.toString();
        try {
            Stat stat = new Stat();
            Policies policies = (Policies) jsonMapper().readValue(globalZk().getData(path(ZkAdminPaths.POLICIES, this.namespaceName.toString()), (Watcher) null, stat), Policies.class);
            if (!policies.auth_policies.destination_auth.containsKey(topicName)) {
                policies.auth_policies.destination_auth.put(topicName, new TreeMap());
            }
            ((Map) policies.auth_policies.destination_auth.get(topicName)).put(str, set);
            globalZk().setData(path(ZkAdminPaths.POLICIES, this.namespaceName.toString()), jsonMapper().writeValueAsBytes(policies), stat.getVersion());
            policiesCache().invalidate(path(ZkAdminPaths.POLICIES, this.namespaceName.toString()));
            log.info("[{}] Successfully granted access for role {}: {} - topic {}", new Object[]{clientAppId(), str, set, topicName});
        } catch (KeeperException.NoNodeException e) {
            log.warn("[{}] Failed to grant permissions on topic {}: Namespace does not exist", clientAppId(), topicName);
            throw new RestException(Response.Status.NOT_FOUND, "Namespace does not exist");
        } catch (Exception e2) {
            log.error("[{}] Failed to grant permissions for topic {}", new Object[]{clientAppId(), topicName, e2});
            throw new RestException(e2);
        } catch (KeeperException.BadVersionException e3) {
            log.warn("[{}] Failed to grant permissions on topic {}: concurrent modification", clientAppId(), topicName);
            throw new RestException(Response.Status.CONFLICT, "Concurrent modification");
        }
    }

    protected void internalDeleteTopicForcefully(boolean z) {
        validateAdminOperationOnTopic(z);
        try {
            getTopicReference(this.topicName).deleteForcefully().get();
        } catch (Exception e) {
            log.error("[{}] Failed to delete topic forcefully {}", new Object[]{clientAppId(), this.topicName, e});
            throw new RestException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void internalRevokePermissionsOnTopic(String str) {
        validateAdminAccessForTenant(this.namespaceName.getTenant());
        validatePoliciesReadOnlyAccess();
        String topicName = this.topicName.toString();
        Stat stat = new Stat();
        try {
            Policies policies = (Policies) jsonMapper().readValue(globalZk().getData(path(ZkAdminPaths.POLICIES, this.namespaceName.toString()), (Watcher) null, stat), Policies.class);
            if (!policies.auth_policies.destination_auth.containsKey(topicName) || !((Map) policies.auth_policies.destination_auth.get(topicName)).containsKey(str)) {
                log.warn("[{}] Failed to revoke permission from role {} on topic: Not set at topic level", new Object[]{clientAppId(), str, topicName});
                throw new RestException(Response.Status.PRECONDITION_FAILED, "Permissions are not set at the topic level");
            }
            ((Map) policies.auth_policies.destination_auth.get(topicName)).remove(str);
            try {
                String path = path(ZkAdminPaths.POLICIES, this.namespaceName.toString());
                globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), stat.getVersion());
                policiesCache().invalidate(path);
                globalZkCache().invalidate(path);
                log.info("[{}] Successfully revoke access for role {} - topic {}", new Object[]{clientAppId(), str, topicName});
            } catch (Exception e) {
                log.error("[{}] Failed to revoke permissions for topic {}", new Object[]{clientAppId(), topicName, e});
                throw new RestException(e);
            }
        } catch (KeeperException.BadVersionException e2) {
            log.warn("[{}] Failed to revoke permissions on topic {}: concurrent modification", clientAppId(), topicName);
            throw new RestException(Response.Status.CONFLICT, "Concurrent modification");
        } catch (Exception e3) {
            log.error("[{}] Failed to revoke permissions for topic {}", new Object[]{clientAppId(), topicName, e3});
            throw new RestException(e3);
        } catch (KeeperException.NoNodeException e4) {
            log.warn("[{}] Failed to revoke permissions on topic {}: Namespace does not exist", clientAppId(), topicName);
            throw new RestException(Response.Status.NOT_FOUND, "Namespace does not exist");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void internalCreatePartitionedTopic(int i) {
        validateAdminAccessForTenant(this.topicName.getTenant());
        if (i <= 0) {
            throw new RestException(Response.Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0");
        }
        try {
            if (pulsar().getNamespaceService().getListOfTopics(this.topicName.getNamespaceObject(), PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL).join().contains(this.topicName.toString())) {
                log.warn("[{}] Failed to create already existing topic {}", clientAppId(), this.topicName);
                throw new RestException(Response.Status.CONFLICT, "This topic already exists");
            }
            try {
                zkCreateOptimistic(ZkAdminPaths.partitionedTopicPath(this.topicName), jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(i)));
                Thread.sleep(1000L);
                log.info("[{}] Successfully created partitioned topic {}", clientAppId(), this.topicName);
            } catch (KeeperException.NodeExistsException e) {
                log.warn("[{}] Failed to create already existing partitioned topic {}", clientAppId(), this.topicName);
                throw new RestException(Response.Status.CONFLICT, "Partitioned topic already exists");
            } catch (Exception e2) {
                log.error("[{}] Failed to create partitioned topic {}", new Object[]{clientAppId(), this.topicName, e2});
                throw new RestException(e2);
            } catch (KeeperException.BadVersionException e3) {
                log.warn("[{}] Failed to create partitioned topic {}: concurrent modification", clientAppId(), this.topicName);
                throw new RestException(Response.Status.CONFLICT, "Concurrent modification");
            }
        } catch (Exception e4) {
            log.error("[{}] Failed to create partitioned topic {}", new Object[]{clientAppId(), this.topicName, e4});
            throw new RestException(e4);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void internalCreateNonPartitionedTopic(boolean z) {
        validateAdminAccessForTenant(this.topicName.getTenant());
        if (this.topicName.isGlobal()) {
            validateGlobalNamespaceOwnership(this.namespaceName);
        }
        validateTopicOwnership(this.topicName, z);
        try {
            log.info("[{}] Successfully created non-partitioned topic {}", clientAppId(), getOrCreateTopic(this.topicName));
        } catch (Exception e) {
            log.error("[{}] Failed to create non-partitioned topic {}", new Object[]{clientAppId(), this.topicName, e});
            throw new RestException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void internalUpdatePartitionedTopic(int i) {
        validateAdminAccessForTenant(this.topicName.getTenant());
        if (this.topicName.isGlobal() && isNamespaceReplicated(this.topicName.getNamespaceObject())) {
            log.error("[{}] Update partitioned-topic is forbidden on global namespace {}", clientAppId(), this.topicName);
            throw new RestException(Response.Status.FORBIDDEN, "Update forbidden on global namespace");
        }
        if (i <= 0) {
            throw new RestException(Response.Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0");
        }
        try {
            updatePartitionedTopic(this.topicName, i).get();
        } catch (Exception e) {
            if (e.getCause() instanceof RestException) {
                throw ((RestException) e.getCause());
            }
            log.error("[{}] Failed to update partitioned topic {}", new Object[]{clientAppId(), this.topicName, e});
            throw new RestException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public PartitionedTopicMetadata internalGetPartitionedMetadata(boolean z, boolean z2) {
        PartitionedTopicMetadata partitionedTopicMetadata = getPartitionedTopicMetadata(this.topicName, z, z2);
        if (partitionedTopicMetadata.partitions > 1) {
            validateClientVersion();
        }
        return partitionedTopicMetadata;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void internalDeletePartitionedTopic(AsyncResponse asyncResponse, boolean z, boolean z2) {
        validateAdminAccessForTenant(this.topicName.getTenant());
        CompletableFuture completableFuture = new CompletableFuture();
        int i = getPartitionedTopicMetadata(this.topicName, z, false).partitions;
        if (i > 0) {
            AtomicInteger atomicInteger = new AtomicInteger(i);
            for (int i2 = 0; i2 < i; i2++) {
                TopicName partition = this.topicName.getPartition(i2);
                try {
                    pulsar().getAdminClient().topics().deleteAsync(partition.toString(), z2).whenComplete((r11, th) -> {
                        if (th == null) {
                            log.info("[{}] Deleted partition {}", clientAppId(), partition);
                        } else if (!(th instanceof PulsarAdminException.NotFoundException)) {
                            log.error("[{}] Failed to delete partition {}", new Object[]{clientAppId(), partition, th});
                            completableFuture.completeExceptionally(th);
                            return;
                        } else if (log.isDebugEnabled()) {
                            log.debug("[{}] Partition not found: {}", clientAppId(), partition);
                        }
                        if (atomicInteger.decrementAndGet() == 0) {
                            completableFuture.complete(null);
                        }
                    });
                } catch (Exception e) {
                    log.error("[{}] Failed to delete partition {}", new Object[]{clientAppId(), partition, e});
                    completableFuture.completeExceptionally(e);
                }
            }
        } else {
            completableFuture.complete(null);
        }
        completableFuture.whenComplete((r9, th2) -> {
            if (th2 != null) {
                if (th2 instanceof PulsarAdminException.PreconditionFailedException) {
                    asyncResponse.resume(new RestException(Response.Status.PRECONDITION_FAILED, "Topic has active producers/subscriptions"));
                    return;
                } else if (th2 instanceof PulsarAdminException) {
                    asyncResponse.resume(new RestException((PulsarAdminException) th2));
                    return;
                } else {
                    asyncResponse.resume(new RestException(th2));
                    return;
                }
            }
            String path = path("partitioned-topics", this.namespaceName.toString(), domain(), this.topicName.getEncodedLocalName());
            try {
                globalZk().delete(path, -1);
                globalZkCache().invalidate(path);
                Thread.sleep(1000L);
                log.info("[{}] Deleted partitioned topic {}", clientAppId(), this.topicName);
                asyncResponse.resume(Response.noContent().build());
            } catch (KeeperException.NoNodeException e2) {
                asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Partitioned topic does not exist"));
            } catch (KeeperException.BadVersionException e3) {
                log.warn("[{}] Failed to delete partitioned topic {}: concurrent modification", clientAppId(), this.topicName);
                asyncResponse.resume(new RestException(Response.Status.CONFLICT, "Concurrent modification"));
            } catch (Exception e4) {
                log.error("[{}] Failed to delete partitioned topic {}", new Object[]{clientAppId(), this.topicName, e4});
                asyncResponse.resume(new RestException(e4));
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void internalUnloadTopic(boolean z) {
        log.info("[{}] Unloading topic {}", clientAppId(), this.topicName);
        if (this.topicName.isGlobal()) {
            validateGlobalNamespaceOwnership(this.namespaceName);
        }
        unloadTopic(this.topicName, z);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void internalDeleteTopic(boolean z, boolean z2) {
        if (z2) {
            internalDeleteTopicForcefully(z);
        } else {
            internalDeleteTopic(z);
        }
    }

    protected void internalDeleteTopic(boolean z) {
        validateAdminOperationOnTopic(z);
        Topic topicReference = getTopicReference(this.topicName);
        if (topicReference.isReplicated()) {
            List keys = topicReference.getReplicators().keys();
            log.error("[{}] Delete forbidden topic {} is replicated on clusters {}", new Object[]{clientAppId(), this.topicName, keys});
            throw new RestException(Response.Status.FORBIDDEN, "Delete forbidden topic is replicated on clusters " + keys);
        }
        try {
            topicReference.delete().get();
            log.info("[{}] Successfully removed topic {}", clientAppId(), this.topicName);
        } catch (Exception e) {
            Throwable cause = e.getCause();
            log.error("[{}] Failed to get delete topic {}", new Object[]{clientAppId(), this.topicName, cause});
            if (!(cause instanceof BrokerServiceException.TopicBusyException)) {
                throw new RestException(cause);
            }
            throw new RestException(Response.Status.PRECONDITION_FAILED, "Topic has active producers/subscriptions");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void internalGetSubscriptions(AsyncResponse asyncResponse, boolean z) {
        if (this.topicName.isGlobal()) {
            validateGlobalNamespaceOwnership(this.namespaceName);
        }
        ArrayList newArrayList = Lists.newArrayList();
        if (getPartitionedTopicMetadata(this.topicName, z, false).partitions > 0) {
            try {
                pulsar().getAdminClient().topics().getSubscriptionsAsync(this.topicName.getPartition(0).toString()).whenComplete((list, th) -> {
                    if (th == null) {
                        newArrayList.addAll(list);
                        asyncResponse.resume(newArrayList);
                        return;
                    }
                    log.warn("[{}] Failed to get list of subscriptions for {}: {}", new Object[]{clientAppId(), this.topicName, th.getMessage()});
                    if (!(th instanceof PulsarAdminException)) {
                        asyncResponse.resume(new RestException(th));
                        return;
                    }
                    PulsarAdminException pulsarAdminException = (PulsarAdminException) th;
                    if (pulsarAdminException.getStatusCode() == Response.Status.NOT_FOUND.getStatusCode()) {
                        asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Internal topics have not been generated yet"));
                    } else {
                        asyncResponse.resume(new RestException(pulsarAdminException));
                    }
                });
                return;
            } catch (Exception e) {
                log.error("[{}] Failed to get list of subscriptions for {}", new Object[]{clientAppId(), this.topicName, e});
                asyncResponse.resume(e);
                return;
            }
        }
        validateAdminOperationOnTopic(z);
        try {
            getTopicReference(this.topicName).getSubscriptions().forEach((str, subscription) -> {
                newArrayList.add(str);
            });
            asyncResponse.resume(newArrayList);
        } catch (Exception e2) {
            log.error("[{}] Failed to get list of subscriptions for {}", new Object[]{clientAppId(), this.topicName, e2});
            asyncResponse.resume(new RestException(e2));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public TopicStats internalGetStats(boolean z) {
        validateAdminAndClientPermission();
        if (this.topicName.isGlobal()) {
            validateGlobalNamespaceOwnership(this.namespaceName);
        }
        validateTopicOwnership(this.topicName, z);
        return getTopicReference(this.topicName).mo100getStats();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public PersistentTopicInternalStats internalGetInternalStats(boolean z) {
        validateAdminAndClientPermission();
        if (this.topicName.isGlobal()) {
            validateGlobalNamespaceOwnership(this.namespaceName);
        }
        validateTopicOwnership(this.topicName, z);
        return getTopicReference(this.topicName).getInternalStats();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void internalGetManagedLedgerInfo(final AsyncResponse asyncResponse) {
        validateAdminAccessForTenant(this.topicName.getTenant());
        if (this.topicName.isGlobal()) {
            validateGlobalNamespaceOwnership(this.namespaceName);
        }
        pulsar().getManagedLedgerFactory().asyncGetManagedLedgerInfo(this.topicName.getPersistenceNamingEncoding(), new AsyncCallbacks.ManagedLedgerInfoCallback() { // from class: org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.1
            public void getInfoComplete(ManagedLedgerInfo managedLedgerInfo, Object obj) {
                asyncResponse.resume(outputStream -> {
                    AdminResource.jsonMapper().writer().writeValue(outputStream, managedLedgerInfo);
                });
            }

            public void getInfoFailed(ManagedLedgerException managedLedgerException, Object obj) {
                asyncResponse.resume(managedLedgerException);
            }
        }, (Object) null);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void internalGetPartitionedStats(AsyncResponse asyncResponse, boolean z, boolean z2) {
        PartitionedTopicMetadata partitionedTopicMetadata = getPartitionedTopicMetadata(this.topicName, z, false);
        if (partitionedTopicMetadata.partitions == 0) {
            throw new RestException(Response.Status.NOT_FOUND, "Partitioned Topic not found");
        }
        if (this.topicName.isGlobal()) {
            validateGlobalNamespaceOwnership(this.namespaceName);
        }
        PartitionedTopicStats partitionedTopicStats = new PartitionedTopicStats(partitionedTopicMetadata);
        ArrayList newArrayList = Lists.newArrayList();
        for (int i = 0; i < partitionedTopicMetadata.partitions; i++) {
            try {
                newArrayList.add(pulsar().getAdminClient().topics().getStatsAsync(this.topicName.getPartition(i).toString()));
            } catch (PulsarServerException e) {
                asyncResponse.resume(new RestException((Throwable) e));
                return;
            }
        }
        FutureUtil.waitForAll(newArrayList).handle((r11, th) -> {
            for (int i2 = 0; i2 < newArrayList.size(); i2++) {
                CompletableFuture completableFuture = (CompletableFuture) newArrayList.get(i2);
                if (completableFuture.isDone() && !completableFuture.isCompletedExceptionally()) {
                    try {
                        partitionedTopicStats.add((TopicStats) completableFuture.get());
                        if (z2) {
                            partitionedTopicStats.partitions.put(this.topicName.getPartition(i2).toString(), completableFuture.get());
                        }
                    } catch (Exception e2) {
                        asyncResponse.resume(new RestException(e2));
                        return null;
                    }
                }
            }
            if (z2 && partitionedTopicStats.partitions.isEmpty()) {
                try {
                    if (!zkPathExists(ZkAdminPaths.partitionedTopicPath(this.topicName))) {
                        asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Internal topics have not been generated yet"));
                        return null;
                    }
                    partitionedTopicStats.partitions.put(this.topicName.toString(), new TopicStats());
                } catch (KeeperException | InterruptedException e3) {
                    asyncResponse.resume(new RestException((Throwable) e3));
                    return null;
                }
            }
            asyncResponse.resume(partitionedTopicStats);
            return null;
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void internalGetPartitionedStatsInternal(AsyncResponse asyncResponse, boolean z) {
        PartitionedTopicMetadata partitionedTopicMetadata = getPartitionedTopicMetadata(this.topicName, z, false);
        if (partitionedTopicMetadata.partitions == 0) {
            throw new RestException(Response.Status.NOT_FOUND, "Partitioned Topic not found");
        }
        if (this.topicName.isGlobal()) {
            validateGlobalNamespaceOwnership(this.namespaceName);
        }
        PartitionedTopicInternalStats partitionedTopicInternalStats = new PartitionedTopicInternalStats(partitionedTopicMetadata);
        ArrayList newArrayList = Lists.newArrayList();
        for (int i = 0; i < partitionedTopicMetadata.partitions; i++) {
            try {
                newArrayList.add(pulsar().getAdminClient().topics().getInternalStatsAsync(this.topicName.getPartition(i).toString()));
            } catch (PulsarServerException e) {
                asyncResponse.resume(new RestException((Throwable) e));
                return;
            }
        }
        FutureUtil.waitForAll(newArrayList).handle((r10, th) -> {
            for (int i2 = 0; i2 < newArrayList.size(); i2++) {
                CompletableFuture completableFuture = (CompletableFuture) newArrayList.get(i2);
                if (completableFuture.isDone() && !completableFuture.isCompletedExceptionally()) {
                    try {
                        partitionedTopicInternalStats.partitions.put(this.topicName.getPartition(i2).toString(), completableFuture.get());
                    } catch (Exception e2) {
                        asyncResponse.resume(new RestException(e2));
                        return null;
                    }
                }
            }
            asyncResponse.resume(!partitionedTopicInternalStats.partitions.isEmpty() ? partitionedTopicInternalStats : new RestException(Response.Status.NOT_FOUND, "Internal topics have not been generated yet"));
            return null;
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void internalDeleteSubscription(AsyncResponse asyncResponse, String str, boolean z) {
        if (this.topicName.isGlobal()) {
            validateGlobalNamespaceOwnership(this.namespaceName);
        }
        PartitionedTopicMetadata partitionedTopicMetadata = getPartitionedTopicMetadata(this.topicName, z, false);
        if (partitionedTopicMetadata.partitions > 0) {
            ArrayList newArrayList = Lists.newArrayList();
            for (int i = 0; i < partitionedTopicMetadata.partitions; i++) {
                TopicName partition = this.topicName.getPartition(i);
                try {
                    newArrayList.add(pulsar().getAdminClient().topics().deleteSubscriptionAsync(partition.toString(), str));
                } catch (Exception e) {
                    log.error("[{}] Failed to delete subscription {} {}", new Object[]{clientAppId(), partition, str, e});
                    asyncResponse.resume(new RestException(e));
                    return;
                }
            }
            FutureUtil.waitForAll(newArrayList).handle((r10, th) -> {
                if (th == null) {
                    asyncResponse.resume(Response.noContent().build());
                    return null;
                }
                Throwable cause = th.getCause();
                if (cause instanceof PulsarAdminException.NotFoundException) {
                    asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Subscription not found"));
                    return null;
                }
                if (cause instanceof PulsarAdminException.PreconditionFailedException) {
                    asyncResponse.resume(new RestException(Response.Status.PRECONDITION_FAILED, "Subscription has active connected consumers"));
                    return null;
                }
                log.error("[{}] Failed to delete subscription {} {}", new Object[]{clientAppId(), this.topicName, str, cause});
                asyncResponse.resume(new RestException(cause));
                return null;
            });
            return;
        }
        validateAdminAccessForSubscriber(str, z);
        try {
            Subscription subscription = getTopicReference(this.topicName).getSubscription(str);
            Preconditions.checkNotNull(subscription);
            subscription.delete().get();
            log.info("[{}][{}] Deleted subscription {}", new Object[]{clientAppId(), this.topicName, str});
            asyncResponse.resume(Response.noContent().build());
        } catch (Exception e2) {
            Throwable cause = e2.getCause();
            if (e2 instanceof NullPointerException) {
                asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Subscription not found"));
            } else if (cause instanceof BrokerServiceException.SubscriptionBusyException) {
                asyncResponse.resume(new RestException(Response.Status.PRECONDITION_FAILED, "Subscription has active connected consumers"));
            } else {
                log.error("[{}] Failed to delete subscription {} {}", new Object[]{clientAppId(), this.topicName, str, e2});
                asyncResponse.resume(new RestException(cause));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void internalSkipAllMessages(AsyncResponse asyncResponse, String str, boolean z) {
        if (this.topicName.isGlobal()) {
            validateGlobalNamespaceOwnership(this.namespaceName);
        }
        PartitionedTopicMetadata partitionedTopicMetadata = getPartitionedTopicMetadata(this.topicName, z, false);
        if (partitionedTopicMetadata.partitions > 0) {
            ArrayList newArrayList = Lists.newArrayList();
            for (int i = 0; i < partitionedTopicMetadata.partitions; i++) {
                TopicName partition = this.topicName.getPartition(i);
                try {
                    newArrayList.add(pulsar().getAdminClient().topics().skipAllMessagesAsync(partition.toString(), str));
                } catch (Exception e) {
                    log.error("[{}] Failed to skip all messages {} {}", new Object[]{clientAppId(), partition, str, e});
                    asyncResponse.resume(new RestException(e));
                    return;
                }
            }
            FutureUtil.waitForAll(newArrayList).handle((r10, th) -> {
                if (th == null) {
                    asyncResponse.resume(Response.noContent().build());
                    return null;
                }
                Throwable cause = th.getCause();
                if (cause instanceof PulsarAdminException.NotFoundException) {
                    asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Subscription not found"));
                    return null;
                }
                log.error("[{}] Failed to skip all messages {} {}", new Object[]{clientAppId(), this.topicName, str, cause});
                asyncResponse.resume(new RestException(cause));
                return null;
            });
            return;
        }
        validateAdminAccessForSubscriber(str, z);
        PersistentTopic persistentTopic = (PersistentTopic) getTopicReference(this.topicName);
        BiConsumer<? super Void, ? super Throwable> biConsumer = (r102, th2) -> {
            if (th2 != null) {
                asyncResponse.resume(new RestException(th2));
                log.error("[{}] Failed to skip all messages {} {}", new Object[]{clientAppId(), this.topicName, str, th2});
            } else {
                asyncResponse.resume(Response.noContent().build());
                log.info("[{}] Cleared backlog on {} {}", new Object[]{clientAppId(), this.topicName, str});
            }
        };
        try {
            if (str.startsWith(persistentTopic.getReplicatorPrefix())) {
                PersistentReplicator persistentReplicator = (PersistentReplicator) persistentTopic.getPersistentReplicator(PersistentReplicator.getRemoteCluster(str));
                Preconditions.checkNotNull(persistentReplicator);
                persistentReplicator.clearBacklog().whenComplete(biConsumer);
            } else {
                PersistentSubscription subscription = persistentTopic.getSubscription(str);
                Preconditions.checkNotNull(subscription);
                subscription.clearBacklog().whenComplete(biConsumer);
            }
        } catch (Exception e2) {
            if (e2 instanceof NullPointerException) {
                asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Subscription not found"));
            } else {
                asyncResponse.resume(new RestException(e2));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void internalSkipMessages(String str, int i, boolean z) {
        if (this.topicName.isGlobal()) {
            validateGlobalNamespaceOwnership(this.namespaceName);
        }
        if (getPartitionedTopicMetadata(this.topicName, z, false).partitions > 0) {
            throw new RestException(Response.Status.METHOD_NOT_ALLOWED, "Skip messages on a partitioned topic is not allowed");
        }
        validateAdminAccessForSubscriber(str, z);
        PersistentTopic persistentTopic = (PersistentTopic) getTopicReference(this.topicName);
        try {
            if (str.startsWith(persistentTopic.getReplicatorPrefix())) {
                PersistentReplicator persistentReplicator = (PersistentReplicator) persistentTopic.getPersistentReplicator(PersistentReplicator.getRemoteCluster(str));
                Preconditions.checkNotNull(persistentReplicator);
                persistentReplicator.skipMessages(i).get();
            } else {
                PersistentSubscription subscription = persistentTopic.getSubscription(str);
                Preconditions.checkNotNull(subscription);
                subscription.skipMessages(i).get();
            }
            log.info("[{}] Skipped {} messages on {} {}", new Object[]{clientAppId(), Integer.valueOf(i), this.topicName, str});
        } catch (NullPointerException e) {
            throw new RestException(Response.Status.NOT_FOUND, "Subscription not found");
        } catch (Exception e2) {
            log.error("[{}] Failed to skip {} messages {} {}", new Object[]{clientAppId(), Integer.valueOf(i), this.topicName, str, e2});
            throw new RestException(e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void internalExpireMessagesForAllSubscriptions(AsyncResponse asyncResponse, int i, boolean z) {
        if (this.topicName.isGlobal()) {
            validateGlobalNamespaceOwnership(this.namespaceName);
        }
        PartitionedTopicMetadata partitionedTopicMetadata = getPartitionedTopicMetadata(this.topicName, z, false);
        if (partitionedTopicMetadata.partitions > 0) {
            ArrayList newArrayList = Lists.newArrayList();
            for (int i2 = 0; i2 < partitionedTopicMetadata.partitions; i2++) {
                TopicName partition = this.topicName.getPartition(i2);
                try {
                    newArrayList.add(pulsar().getAdminClient().topics().expireMessagesForAllSubscriptionsAsync(partition.toString(), i));
                } catch (Exception e) {
                    log.error("[{}] Failed to expire messages up to {} on {}", new Object[]{clientAppId(), Integer.valueOf(i), partition, e});
                    asyncResponse.resume(new RestException(e));
                    return;
                }
            }
            FutureUtil.waitForAll(newArrayList).handle((r10, th) -> {
                if (th == null) {
                    asyncResponse.resume(Response.noContent().build());
                    return null;
                }
                Throwable cause = th.getCause();
                log.error("[{}] Failed to expire messages up to {} on {}", new Object[]{clientAppId(), Integer.valueOf(i), this.topicName, cause});
                asyncResponse.resume(new RestException(cause));
                return null;
            });
            return;
        }
        validateAdminOperationOnTopic(z);
        PersistentTopic persistentTopic = (PersistentTopic) getTopicReference(this.topicName);
        AtomicReference atomicReference = new AtomicReference();
        persistentTopic.getReplicators().forEach((str, replicator) -> {
            try {
                internalExpireMessagesForSinglePartition(str, i, z);
            } catch (Throwable th2) {
                atomicReference.set(th2);
            }
        });
        persistentTopic.getSubscriptions().forEach((str2, persistentSubscription) -> {
            try {
                internalExpireMessagesForSinglePartition(str2, i, z);
            } catch (Throwable th2) {
                atomicReference.set(th2);
            }
        });
        if (atomicReference.get() == null) {
            asyncResponse.resume(Response.noContent().build());
        } else if (atomicReference.get() instanceof WebApplicationException) {
            asyncResponse.resume((WebApplicationException) atomicReference.get());
        } else {
            asyncResponse.resume(new RestException((Throwable) atomicReference.get()));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void internalResetCursor(AsyncResponse asyncResponse, String str, long j, boolean z) {
        if (this.topicName.isGlobal()) {
            validateGlobalNamespaceOwnership(this.namespaceName);
        }
        int i = getPartitionedTopicMetadata(this.topicName, z, false).partitions;
        if (i > 0) {
            CompletableFuture completableFuture = new CompletableFuture();
            AtomicInteger atomicInteger = new AtomicInteger(i);
            AtomicInteger atomicInteger2 = new AtomicInteger(0);
            AtomicReference atomicReference = new AtomicReference();
            for (int i2 = 0; i2 < i; i2++) {
                TopicName partition = this.topicName.getPartition(i2);
                try {
                    pulsar().getAdminClient().topics().resetCursorAsync(partition.toString(), str, j).handle((r17, th) -> {
                        if (th != null) {
                            if (!(th instanceof PulsarAdminException.PreconditionFailedException)) {
                                log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}", new Object[]{clientAppId(), partition, str, Long.valueOf(j), th});
                                completableFuture.completeExceptionally(th);
                                return null;
                            }
                            atomicInteger2.incrementAndGet();
                            atomicReference.set(th);
                        }
                        if (atomicInteger.decrementAndGet() != 0) {
                            return null;
                        }
                        completableFuture.complete(null);
                        return null;
                    });
                } catch (Exception e) {
                    log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}", new Object[]{clientAppId(), partition, str, Long.valueOf(j), e});
                    completableFuture.completeExceptionally(e);
                }
            }
            completableFuture.whenComplete((r16, th2) -> {
                if (th2 != null) {
                    if (th2 instanceof PulsarAdminException) {
                        asyncResponse.resume(new RestException((PulsarAdminException) th2));
                        return;
                    } else {
                        asyncResponse.resume(new RestException(th2));
                        return;
                    }
                }
                if (atomicInteger2.get() == i) {
                    log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}", new Object[]{clientAppId(), this.topicName, str, Long.valueOf(j), atomicReference.get()});
                    asyncResponse.resume(new RestException(Response.Status.PRECONDITION_FAILED, ((Throwable) atomicReference.get()).getMessage()));
                } else {
                    if (atomicInteger2.get() > 0) {
                        log.warn("[{}] [{}] Partial errors for reset cursor on subscription {} to time {}", new Object[]{clientAppId(), this.topicName, str, Long.valueOf(j), atomicReference.get()});
                    }
                    asyncResponse.resume(Response.noContent().build());
                }
            });
            return;
        }
        validateAdminAccessForSubscriber(str, z);
        log.info("[{}] [{}] Received reset cursor on subscription {} to time {}", new Object[]{clientAppId(), this.topicName, str, Long.valueOf(j)});
        PersistentTopic persistentTopic = (PersistentTopic) getTopicReference(this.topicName);
        if (persistentTopic == null) {
            asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Topic not found"));
            return;
        }
        try {
            PersistentSubscription subscription = persistentTopic.getSubscription(str);
            Preconditions.checkNotNull(subscription);
            subscription.resetCursor(j).get();
            log.info("[{}] [{}] Reset cursor on subscription {} to time {}", new Object[]{clientAppId(), this.topicName, str, Long.valueOf(j)});
            asyncResponse.resume(Response.noContent().build());
        } catch (Exception e2) {
            Throwable cause = e2.getCause();
            log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}", new Object[]{clientAppId(), this.topicName, str, Long.valueOf(j), e2});
            if (e2 instanceof NullPointerException) {
                asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Subscription not found"));
                return;
            }
            if (e2 instanceof BrokerServiceException.NotAllowedException) {
                asyncResponse.resume(new RestException(Response.Status.METHOD_NOT_ALLOWED, e2.getMessage()));
            } else if (cause instanceof BrokerServiceException.SubscriptionInvalidCursorPosition) {
                asyncResponse.resume(new RestException(Response.Status.PRECONDITION_FAILED, "Unable to find position for timestamp specified -" + cause.getMessage()));
            } else {
                asyncResponse.resume(new RestException(e2));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void internalCreateSubscription(AsyncResponse asyncResponse, String str, MessageIdImpl messageIdImpl, boolean z, boolean z2) {
        if (this.topicName.isGlobal()) {
            validateGlobalNamespaceOwnership(this.namespaceName);
        }
        MessageIdImpl messageIdImpl2 = messageIdImpl == null ? (MessageIdImpl) MessageId.earliest : messageIdImpl;
        log.info("[{}][{}] Creating subscription {} at message id {}", new Object[]{clientAppId(), this.topicName, str, messageIdImpl2});
        int i = getPartitionedTopicMetadata(this.topicName, z, false).partitions;
        if (i > 0) {
            CompletableFuture completableFuture = new CompletableFuture();
            AtomicInteger atomicInteger = new AtomicInteger(i);
            AtomicInteger atomicInteger2 = new AtomicInteger(0);
            AtomicReference atomicReference = new AtomicReference();
            for (int i2 = 0; i2 < i; i2++) {
                TopicName partition = this.topicName.getPartition(i2);
                try {
                    pulsar().getAdminClient().topics().createSubscriptionAsync(partition.toString(), str, messageIdImpl2).handle((r8, th) -> {
                        if (th != null && (atomicInteger2.incrementAndGet() == i || !(th instanceof PulsarAdminException.ConflictException))) {
                            atomicReference.set(th);
                        }
                        if (atomicInteger.decrementAndGet() != 0) {
                            return null;
                        }
                        completableFuture.complete(null);
                        return null;
                    });
                } catch (Exception e) {
                    log.warn("[{}] [{}] Failed to create subscription {} at message id {}", new Object[]{clientAppId(), partition, str, messageIdImpl2, e});
                    completableFuture.completeExceptionally(e);
                }
            }
            completableFuture.whenComplete((r12, th2) -> {
                if (th2 != null) {
                    if (th2 instanceof PulsarAdminException) {
                        asyncResponse.resume(new RestException((PulsarAdminException) th2));
                        return;
                    } else {
                        asyncResponse.resume(new RestException(th2));
                        return;
                    }
                }
                if (atomicReference.get() == null) {
                    asyncResponse.resume(Response.noContent().build());
                    return;
                }
                log.warn("[{}] [{}] Failed to create subscription {} at message id {}", new Object[]{clientAppId(), this.topicName, str, messageIdImpl2, atomicReference.get()});
                if (atomicReference.get() instanceof PulsarAdminException) {
                    asyncResponse.resume(new RestException((PulsarAdminException) atomicReference.get()));
                } else {
                    asyncResponse.resume(new RestException((Throwable) atomicReference.get()));
                }
            });
            return;
        }
        validateAdminAccessForSubscriber(str, z);
        PersistentTopic persistentTopic = (PersistentTopic) getOrCreateTopic(this.topicName);
        if (persistentTopic.getSubscriptions().containsKey(str)) {
            asyncResponse.resume(new RestException(Response.Status.CONFLICT, "Subscription already exists for topic"));
            return;
        }
        try {
            PersistentSubscription persistentSubscription = (PersistentSubscription) persistentTopic.createSubscription(str, PulsarApi.CommandSubscribe.InitialPosition.Latest, z2).get();
            persistentSubscription.deactivateCursor();
            persistentSubscription.resetCursor((Position) PositionImpl.get(messageIdImpl2.getLedgerId(), messageIdImpl2.getEntryId())).get();
            log.info("[{}][{}] Successfully created subscription {} at message id {}", new Object[]{clientAppId(), this.topicName, str, messageIdImpl2});
            asyncResponse.resume(Response.noContent().build());
        } catch (Throwable th3) {
            Throwable cause = th3.getCause();
            log.warn("[{}] [{}] Failed to create subscription {} at message id {}", new Object[]{clientAppId(), this.topicName, str, messageIdImpl2, th3});
            if (cause instanceof BrokerServiceException.SubscriptionInvalidCursorPosition) {
                asyncResponse.resume(new RestException(Response.Status.PRECONDITION_FAILED, "Unable to find position for position specified: " + cause.getMessage()));
            } else {
                asyncResponse.resume(new RestException(th3));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void internalResetCursorOnPosition(String str, boolean z, MessageIdImpl messageIdImpl) {
        if (this.topicName.isGlobal()) {
            validateGlobalNamespaceOwnership(this.namespaceName);
        }
        log.info("[{}][{}] received reset cursor on subscription {} to position {}", new Object[]{clientAppId(), this.topicName, str, messageIdImpl});
        if (getPartitionedTopicMetadata(this.topicName, z, false).partitions > 0) {
            log.warn("[{}] Not supported operation on partitioned-topic {} {}", new Object[]{clientAppId(), this.topicName, str});
            throw new RestException(Response.Status.METHOD_NOT_ALLOWED, "Reset-cursor at position is not allowed for partitioned-topic");
        }
        validateAdminAccessForSubscriber(str, z);
        PersistentTopic persistentTopic = (PersistentTopic) getTopicReference(this.topicName);
        if (persistentTopic == null) {
            throw new RestException(Response.Status.NOT_FOUND, "Topic not found");
        }
        try {
            PersistentSubscription subscription = persistentTopic.getSubscription(str);
            Preconditions.checkNotNull(subscription);
            subscription.resetCursor((Position) PositionImpl.get(messageIdImpl.getLedgerId(), messageIdImpl.getEntryId())).get();
            log.info("[{}][{}] successfully reset cursor on subscription {} to position {}", new Object[]{clientAppId(), this.topicName, str, messageIdImpl});
        } catch (Exception e) {
            Throwable cause = e.getCause();
            log.warn("[{}] [{}] Failed to reset cursor on subscription {} to position {}", new Object[]{clientAppId(), this.topicName, str, messageIdImpl, e});
            if (e instanceof NullPointerException) {
                throw new RestException(Response.Status.NOT_FOUND, "Subscription not found");
            }
            if (!(cause instanceof BrokerServiceException.SubscriptionInvalidCursorPosition)) {
                throw new RestException(e);
            }
            throw new RestException(Response.Status.PRECONDITION_FAILED, "Unable to find position for position specified: " + cause.getMessage());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Response internalPeekNthMessage(String str, int i, boolean z) {
        if (this.topicName.isGlobal()) {
            validateGlobalNamespaceOwnership(this.namespaceName);
        }
        if (getPartitionedTopicMetadata(this.topicName, z, false).partitions > 0) {
            throw new RestException(Response.Status.METHOD_NOT_ALLOWED, "Peek messages on a partitioned topic is not allowed");
        }
        validateAdminAccessForSubscriber(str, z);
        if (!(getTopicReference(this.topicName) instanceof PersistentTopic)) {
            log.error("[{}] Not supported operation of non-persistent topic {} {}", new Object[]{clientAppId(), this.topicName, str});
            throw new RestException(Response.Status.METHOD_NOT_ALLOWED, "Skip messages on a non-persistent topic is not allowed");
        }
        PersistentTopic persistentTopic = (PersistentTopic) getTopicReference(this.topicName);
        PersistentReplicator persistentReplicator = null;
        PersistentSubscription persistentSubscription = null;
        Entry entry = null;
        if (str.startsWith(persistentTopic.getReplicatorPrefix())) {
            persistentReplicator = getReplicatorReference(str, persistentTopic);
        } else {
            persistentSubscription = (PersistentSubscription) getSubscriptionReference(str, persistentTopic);
        }
        try {
            try {
                try {
                    entry = str.startsWith(persistentTopic.getReplicatorPrefix()) ? persistentReplicator.peekNthMessage(i).get() : persistentSubscription.peekNthMessage(i).get();
                    Preconditions.checkNotNull(entry);
                    PositionImpl position = entry.getPosition();
                    ByteBuf dataBuffer = entry.getDataBuffer();
                    PulsarApi.MessageMetadata parseMessageMetadata = Commands.parseMessageMetadata(dataBuffer);
                    Response.ResponseBuilder ok = Response.ok();
                    ok.header("X-Pulsar-Message-ID", position.toString());
                    for (PulsarApi.KeyValue keyValue : parseMessageMetadata.getPropertiesList()) {
                        ok.header("X-Pulsar-PROPERTY-" + keyValue.getKey(), keyValue.getValue());
                    }
                    if (parseMessageMetadata.hasPublishTime()) {
                        ok.header("X-Pulsar-publish-time", DateFormatter.format(parseMessageMetadata.getPublishTime()));
                    }
                    if (parseMessageMetadata.hasEventTime()) {
                        ok.header("X-Pulsar-event-time", DateFormatter.format(parseMessageMetadata.getEventTime()));
                    }
                    if (parseMessageMetadata.hasNumMessagesInBatch()) {
                        ok.header("X-Pulsar-num-batch-message", Integer.valueOf(parseMessageMetadata.getNumMessagesInBatch()));
                    }
                    ByteBuf decode = CompressionCodecProvider.getCompressionCodec(parseMessageMetadata.getCompression()).decode(dataBuffer, parseMessageMetadata.getUncompressedSize());
                    final ByteBuf heapBuffer = PulsarByteBufAllocator.DEFAULT.heapBuffer(decode.readableBytes(), decode.readableBytes());
                    heapBuffer.writeBytes(decode);
                    decode.release();
                    Response build = ok.entity(new StreamingOutput() { // from class: org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.2
                        public void write(OutputStream outputStream) throws IOException, WebApplicationException {
                            outputStream.write(heapBuffer.array(), heapBuffer.arrayOffset(), heapBuffer.readableBytes());
                            heapBuffer.release();
                        }
                    }).build();
                    if (entry != null) {
                        entry.release();
                    }
                    return build;
                } catch (Exception e) {
                    log.error("[{}] Failed to get message at position {} from {} {}", new Object[]{clientAppId(), Integer.valueOf(i), this.topicName, str, e});
                    throw new RestException(e);
                }
            } catch (NullPointerException e2) {
                throw new RestException(Response.Status.NOT_FOUND, "Message not found");
            }
        } catch (Throwable th) {
            if (entry != null) {
                entry.release();
            }
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public PersistentOfflineTopicStats internalGetBacklog(boolean z) {
        if (this.topicName.isGlobal()) {
            validateGlobalNamespaceOwnership(this.namespaceName);
        }
        try {
            policiesCache().get(path(ZkAdminPaths.POLICIES, this.namespaceName.toString()));
            try {
                PersistentOfflineTopicStats offlineTopicStat = pulsar().getBrokerService().getOfflineTopicStat(this.topicName);
                if (offlineTopicStat != null && TimeUnit.MINUTES.convert(System.currentTimeMillis() - offlineTopicStat.statGeneratedAt.getTime(), TimeUnit.MILLISECONDS) < 10) {
                    return offlineTopicStat;
                }
                ManagedLedgerConfig managedLedgerConfig = pulsar().getBrokerService().getManagedLedgerConfig(this.topicName).get();
                PersistentOfflineTopicStats estimateUnloadedTopicBacklog = new ManagedLedgerOfflineBacklog(managedLedgerConfig.getDigestType(), managedLedgerConfig.getPassword(), pulsar().getAdvertisedAddress(), false).estimateUnloadedTopicBacklog(pulsar().getManagedLedgerFactory(), this.topicName);
                pulsar().getBrokerService().cacheOfflineTopicStats(this.topicName, estimateUnloadedTopicBacklog);
                return estimateUnloadedTopicBacklog;
            } catch (Exception e) {
                throw new RestException(e);
            }
        } catch (KeeperException.NoNodeException e2) {
            log.warn("[{}] Failed to get topic backlog {}: Namespace does not exist", clientAppId(), this.namespaceName);
            throw new RestException(Response.Status.NOT_FOUND, "Namespace does not exist");
        } catch (Exception e3) {
            log.error("[{}] Failed to get topic backlog {}", new Object[]{clientAppId(), this.namespaceName, e3});
            throw new RestException(e3);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public MessageId internalTerminate(boolean z) {
        if (this.topicName.isGlobal()) {
            validateGlobalNamespaceOwnership(this.namespaceName);
        }
        if (getPartitionedTopicMetadata(this.topicName, z, false).partitions > 0) {
            throw new RestException(Response.Status.METHOD_NOT_ALLOWED, "Termination of a partitioned topic is not allowed");
        }
        validateAdminOperationOnTopic(z);
        try {
            return ((PersistentTopic) getTopicReference(this.topicName)).terminate().get();
        } catch (Exception e) {
            log.error("[{}] Failed to terminated topic {}", new Object[]{clientAppId(), this.topicName, e});
            throw new RestException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void internalExpireMessages(AsyncResponse asyncResponse, String str, int i, boolean z) {
        if (this.topicName.isGlobal()) {
            validateGlobalNamespaceOwnership(this.namespaceName);
        }
        PartitionedTopicMetadata partitionedTopicMetadata = getPartitionedTopicMetadata(this.topicName, z, false);
        if (partitionedTopicMetadata.partitions <= 0) {
            try {
                internalExpireMessagesForSinglePartition(str, i, z);
                asyncResponse.resume(Response.noContent().build());
                return;
            } catch (WebApplicationException e) {
                asyncResponse.resume(e);
                return;
            } catch (Exception e2) {
                asyncResponse.resume(new RestException(e2));
                return;
            }
        }
        ArrayList newArrayList = Lists.newArrayList();
        for (int i2 = 0; i2 < partitionedTopicMetadata.partitions; i2++) {
            TopicName partition = this.topicName.getPartition(i2);
            try {
                newArrayList.add(pulsar().getAdminClient().topics().expireMessagesAsync(partition.toString(), str, i));
            } catch (Exception e3) {
                log.error("[{}] Failed to expire messages up to {} on {}", new Object[]{clientAppId(), Integer.valueOf(i), partition, e3});
                asyncResponse.resume(new RestException(e3));
                return;
            }
        }
        FutureUtil.waitForAll(newArrayList).handle((r10, th) -> {
            if (th == null) {
                asyncResponse.resume(Response.noContent().build());
                return null;
            }
            Throwable cause = th.getCause();
            if (cause instanceof PulsarAdminException.NotFoundException) {
                asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Subscription not found"));
                return null;
            }
            log.error("[{}] Failed to expire messages up to {} on {}", new Object[]{clientAppId(), Integer.valueOf(i), this.topicName, cause});
            asyncResponse.resume(new RestException(cause));
            return null;
        });
    }

    private void internalExpireMessagesForSinglePartition(String str, int i, boolean z) {
        if (this.topicName.isGlobal()) {
            validateGlobalNamespaceOwnership(this.namespaceName);
        }
        if (getPartitionedTopicMetadata(this.topicName, z, false).partitions > 0) {
            log.error("[{}] {} {} {}", new Object[]{clientAppId(), "This method should not be called for partitioned topic", this.topicName, str});
            throw new IllegalStateException("This method should not be called for partitioned topic");
        }
        validateAdminAccessForSubscriber(str, z);
        if (!(getTopicReference(this.topicName) instanceof PersistentTopic)) {
            log.error("[{}] Not supported operation of non-persistent topic {} {}", new Object[]{clientAppId(), this.topicName, str});
            throw new RestException(Response.Status.METHOD_NOT_ALLOWED, "Expire messages on a non-persistent topic is not allowed");
        }
        PersistentTopic persistentTopic = (PersistentTopic) getTopicReference(this.topicName);
        try {
            if (str.startsWith(persistentTopic.getReplicatorPrefix())) {
                PersistentReplicator persistentReplicator = (PersistentReplicator) persistentTopic.getPersistentReplicator(PersistentReplicator.getRemoteCluster(str));
                Preconditions.checkNotNull(persistentReplicator);
                persistentReplicator.expireMessages(i);
            } else {
                PersistentSubscription subscription = persistentTopic.getSubscription(str);
                Preconditions.checkNotNull(subscription);
                subscription.expireMessages(i);
            }
            log.info("[{}] Message expire started up to {} on {} {}", new Object[]{clientAppId(), Integer.valueOf(i), this.topicName, str});
        } catch (NullPointerException e) {
            throw new RestException(Response.Status.NOT_FOUND, "Subscription not found");
        } catch (Exception e2) {
            log.error("[{}] Failed to expire messages up to {} on {} with subscription {} {}", new Object[]{clientAppId(), Integer.valueOf(i), this.topicName, str, e2});
            throw new RestException(e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void internalTriggerCompaction(boolean z) {
        validateAdminOperationOnTopic(z);
        try {
            ((PersistentTopic) getTopicReference(this.topicName)).triggerCompaction();
        } catch (BrokerServiceException.AlreadyRunningException e) {
            throw new RestException(Response.Status.CONFLICT, e.getMessage());
        } catch (Exception e2) {
            throw new RestException(e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public LongRunningProcessStatus internalCompactionStatus(boolean z) {
        validateAdminOperationOnTopic(z);
        return ((PersistentTopic) getTopicReference(this.topicName)).compactionStatus();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void internalTriggerOffload(boolean z, MessageIdImpl messageIdImpl) {
        validateAdminOperationOnTopic(z);
        try {
            ((PersistentTopic) getTopicReference(this.topicName)).triggerOffload(messageIdImpl);
        } catch (BrokerServiceException.AlreadyRunningException e) {
            throw new RestException(Response.Status.CONFLICT, e.getMessage());
        } catch (Exception e2) {
            log.warn("Unexpected error triggering offload", e2);
            throw new RestException(e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public OffloadProcessStatus internalOffloadStatus(boolean z) {
        validateAdminOperationOnTopic(z);
        return ((PersistentTopic) getTopicReference(this.topicName)).offloadStatus();
    }

    public static CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(PulsarService pulsarService, String str, String str2, AuthenticationDataSource authenticationDataSource, TopicName topicName) {
        CompletableFuture<PartitionedTopicMetadata> completableFuture = new CompletableFuture<>();
        try {
            try {
                checkAuthorization(pulsarService, topicName, str, authenticationDataSource);
            } catch (Exception e) {
                log.warn("Failed to authorize {} on cluster {} with unexpected exception {}", new Object[]{str, topicName.toString(), e.getMessage(), e});
                throw e;
            } catch (RestException e2) {
                try {
                    validateAdminAccessForTenant(pulsarService, str, str2, topicName.getTenant());
                } catch (RestException e3) {
                    log.warn("Failed to authorize {} on cluster {}", str, topicName.toString());
                    throw new PulsarClientException(String.format("Authorization failed %s on topic %s with error %s", str, topicName.toString(), e3.getMessage()));
                }
            }
            String path = path("partitioned-topics", topicName.getNamespace(), topicName.getDomain().toString(), topicName.getEncodedLocalName());
            checkLocalOrGetPeerReplicationCluster(pulsarService, topicName.getNamespaceObject()).thenCompose(clusterData -> {
                return fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(pulsarService, path, topicName);
            }).thenAccept((Consumer<? super U>) partitionedTopicMetadata -> {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Total number of partitions for topic {} is {}", new Object[]{str, topicName, Integer.valueOf(partitionedTopicMetadata.partitions)});
                }
                completableFuture.complete(partitionedTopicMetadata);
            }).exceptionally(th -> {
                completableFuture.completeExceptionally(th.getCause());
                return null;
            });
        } catch (Exception e4) {
            completableFuture.completeExceptionally(e4);
        }
        return completableFuture;
    }

    private Topic getTopicReference(TopicName topicName) {
        try {
            return pulsar().getBrokerService().getTopicIfExists(topicName.toString()).get(pulsar().getConfiguration().getZooKeeperOperationTimeoutSeconds(), TimeUnit.SECONDS).orElseThrow(() -> {
                return topicNotFoundReason(topicName);
            });
        } catch (Exception e) {
            throw new RestException(e);
        } catch (RestException e2) {
            throw e2;
        }
    }

    private RestException topicNotFoundReason(TopicName topicName) {
        if (!topicName.isPartitioned()) {
            return new RestException(Response.Status.NOT_FOUND, "Topic not found");
        }
        PartitionedTopicMetadata partitionedTopicMetadata = getPartitionedTopicMetadata(TopicName.get(topicName.getPartitionedTopicName()), false, false);
        if (partitionedTopicMetadata == null || partitionedTopicMetadata.partitions == 0) {
            return new RestException(Response.Status.NOT_FOUND, String.format("Partitioned Topic not found: %s %s", topicName.toString(), partitionedTopicMetadata == null ? "has no metadata" : "has zero partitions"));
        }
        return !internalGetList().contains(topicName.toString()) ? new RestException(Response.Status.NOT_FOUND, "Topic partitions were not yet created") : new RestException(Response.Status.NOT_FOUND, "Partitioned Topic not found");
    }

    private Topic getOrCreateTopic(TopicName topicName) {
        return (Topic) pulsar().getBrokerService().getTopic(topicName.toString(), true).thenApply((v0) -> {
            return v0.get();
        }).join();
    }

    private Subscription getSubscriptionReference(String str, PersistentTopic persistentTopic) {
        try {
            return (Subscription) Preconditions.checkNotNull(persistentTopic.getSubscription(str));
        } catch (Exception e) {
            throw new RestException(Response.Status.NOT_FOUND, "Subscription not found");
        }
    }

    private PersistentReplicator getReplicatorReference(String str, PersistentTopic persistentTopic) {
        try {
            return (PersistentReplicator) Preconditions.checkNotNull((PersistentReplicator) persistentTopic.getPersistentReplicator(PersistentReplicator.getRemoteCluster(str)));
        } catch (Exception e) {
            throw new RestException(Response.Status.NOT_FOUND, "Replicator not found");
        }
    }

    private CompletableFuture<Void> updatePartitionedTopic(TopicName topicName, int i) {
        String partitionedTopicPath = ZkAdminPaths.partitionedTopicPath(topicName);
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        createSubscriptions(topicName, i).thenAccept(r11 -> {
            try {
                globalZk().setData(partitionedTopicPath, jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(i)), -1, (i2, str, obj, stat) -> {
                    if (i2 == KeeperException.Code.OK.intValue()) {
                        completableFuture.complete(null);
                    } else {
                        completableFuture.completeExceptionally(KeeperException.create(KeeperException.Code.get(i2), "failed to create update partitions"));
                    }
                }, (Object) null);
            } catch (Exception e) {
                completableFuture.completeExceptionally(e);
            }
        }).exceptionally(th -> {
            completableFuture.completeExceptionally(th);
            return null;
        });
        return completableFuture;
    }

    private CompletableFuture<Void> createSubscriptions(TopicName topicName, int i) {
        String path = path("partitioned-topics", topicName.getPersistenceNamingEncoding());
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        fetchPartitionedTopicMetadataAsync(pulsar(), path).thenAccept(partitionedTopicMetadata -> {
            if (partitionedTopicMetadata.partitions <= 1) {
                completableFuture.completeExceptionally(new RestException(Response.Status.CONFLICT, "Topic is not partitioned topic"));
                return;
            }
            if (partitionedTopicMetadata.partitions >= i) {
                completableFuture.completeExceptionally(new RestException(Response.Status.CONFLICT, "number of partitions must be more than existing " + partitionedTopicMetadata.partitions));
                return;
            }
            try {
                PulsarAdmin adminClient = pulsar().getAdminClient();
                adminClient.topics().getStatsAsync(topicName.getPartition(0).toString()).thenAccept(topicStats -> {
                    topicStats.subscriptions.keySet().forEach(str -> {
                        ArrayList arrayList = new ArrayList();
                        for (int i2 = partitionedTopicMetadata.partitions; i2 < i; i2++) {
                            arrayList.add(adminClient.topics().createSubscriptionAsync(topicName.getPartition(i2).toString(), str, MessageId.latest));
                        }
                        FutureUtil.waitForAll(arrayList).thenRun(() -> {
                            log.info("[{}] Successfully created new partitions {}", clientAppId(), topicName);
                            completableFuture.complete(null);
                        }).exceptionally(th -> {
                            log.warn("[{}] Failed to create subscriptions on new partitions for {}", new Object[]{clientAppId(), topicName, th});
                            completableFuture.completeExceptionally(th);
                            return null;
                        });
                    });
                }).exceptionally(th -> {
                    if (th.getCause() instanceof PulsarAdminException.NotFoundException) {
                        completableFuture.complete(null);
                        return null;
                    }
                    log.warn("[{}] Failed to get list of subscriptions of {}", new Object[]{clientAppId(), topicName.getPartition(0), th});
                    completableFuture.completeExceptionally(th);
                    return null;
                });
            } catch (PulsarServerException e) {
                completableFuture.completeExceptionally(e);
            }
        }).exceptionally(th -> {
            log.warn("[{}] Failed to get partition metadata for {}", clientAppId(), topicName.toString());
            completableFuture.completeExceptionally(th);
            return null;
        });
        return completableFuture;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void unloadTopic(TopicName topicName, boolean z) {
        validateSuperUserAccess();
        validateTopicOwnership(topicName, z);
        try {
            getTopicReference(topicName).close().get();
            log.info("[{}] Successfully unloaded topic {}", clientAppId(), topicName);
        } catch (NullPointerException e) {
            log.error("[{}] topic {} not found", clientAppId(), topicName);
            throw new RestException(Response.Status.NOT_FOUND, "Topic does not exist");
        } catch (Exception e2) {
            log.error("[{}] Failed to unload topic {}, {}", new Object[]{clientAppId(), topicName, e2.getMessage(), e2});
            throw new RestException(e2);
        }
    }

    private void validateClientVersion() {
        if (pulsar().getConfiguration().isClientLibraryVersionCheckEnabled()) {
            String header = this.httpRequest.getHeader("User-Agent");
            if (StringUtils.isBlank(header)) {
                throw new RestException(Response.Status.METHOD_NOT_ALLOWED, "Client lib is not compatible to access partitioned metadata: version in user-agent is not present");
            }
            if (header.contains(DEPRECATED_CLIENT_VERSION_PREFIX)) {
                try {
                    String[] split = header.split(DEPRECATED_CLIENT_VERSION_PREFIX);
                    String[] split2 = split.length > 1 ? split[1].split("-")[0].trim().split("\\.") : null;
                    if (split2 == null || split2.length <= 1 || (LEAST_SUPPORTED_CLIENT_VERSION_PREFIX.getMajorVersion() <= Integer.parseInt(split2[0]) && LEAST_SUPPORTED_CLIENT_VERSION_PREFIX.getMinorVersion() <= Integer.parseInt(split2[1]))) {
                    } else {
                        throw new RestException(Response.Status.METHOD_NOT_ALLOWED, "Client lib is not compatible to access partitioned metadata: version " + header + " is not supported");
                    }
                } catch (Exception e) {
                    log.warn("[{}] Failed to parse version {} ", clientAppId(), header);
                } catch (RestException e2) {
                    throw e2;
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public MessageId internalGetLastMessageId(boolean z) {
        validateAdminOperationOnTopic(z);
        if (!(getTopicReference(this.topicName) instanceof PersistentTopic)) {
            log.error("[{}] Not supported operation of non-persistent topic {}", clientAppId(), this.topicName);
            throw new RestException(Response.Status.METHOD_NOT_ALLOWED, "GetLastMessageId on a non-persistent topic is not allowed");
        }
        PersistentTopic persistentTopic = (PersistentTopic) getTopicReference(this.topicName);
        PositionImpl lastMessageId = persistentTopic.getLastMessageId();
        return new MessageIdImpl(lastMessageId.getLedgerId(), lastMessageId.getEntryId(), TopicName.getPartitionIndex(persistentTopic.getName()));
    }
}
