package org.apache.pulsar.broker.admin;

import com.github.zafarkhaja.semver.Version;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javassist.compiler.TokenId;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.Encoded;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerOfflineBacklog;
import org.apache.bookkeeper.mledger.impl.MetaStore;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.util.FutureUtil;
import org.apache.pulsar.shade.com.google.common.base.Preconditions;
import org.apache.pulsar.shade.com.google.common.collect.Lists;
import org.apache.pulsar.shade.com.google.common.collect.Maps;
import org.apache.pulsar.shade.com.google.common.collect.Sets;
import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
import org.apache.pulsar.shade.io.netty.buffer.PooledByteBufAllocator;
import org.apache.pulsar.shade.io.netty.handler.codec.http.HttpHeaders;
import org.apache.pulsar.shade.org.apache.commons.cli.HelpFormatter;
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.shade.org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.DestinationDomain;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.shade.org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.AuthPolicies;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.PersistentTopicStats;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.DateFormatter;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Produces({HttpHeaders.Values.APPLICATION_JSON})
@Api(value = "/persistent", description = "Persistent topic admin apis", tags = {"persistent topic"})
@Path("/persistent")
/* loaded from: input_file:org/apache/pulsar/broker/admin/PersistentTopics.class */
public class PersistentTopics extends AdminResource {
    protected static final int PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS = 1000;
    private static final int OFFLINE_TOPIC_STAT_TTL_MINS = 10;
    private static final String DEPRECATED_CLIENT_VERSION_PREFIX = "Pulsar-CPP-v";
    private static final Logger log = LoggerFactory.getLogger(PersistentTopics.class);
    private static final Version LEAST_SUPPORTED_CLIENT_VERSION_PREFIX = Version.forIntegers(1, 21);

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Namespace doesn't exist")})
    @Path("/{property}/{cluster}/{namespace}")
    @ApiOperation(value = "Get the list of destinations under a namespace.", response = String.class, responseContainer = "List")
    public List<String> getList(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3) {
        validateAdminAccessOnProperty(str);
        try {
            policiesCache().get(path(ConfigurationCacheService.POLICIES, str, str2, str3));
            ArrayList newArrayList = Lists.newArrayList();
            try {
                for (String str4 : managedLedgerListCache().get(String.format("/managed-ledgers/%s/%s/%s/%s", str, str2, str3, domain()))) {
                    if (domain().equals(DestinationDomain.persistent.toString())) {
                        newArrayList.add(DestinationName.get(domain(), str, str2, str3, Codec.decode(str4)).toString());
                    }
                }
            } catch (KeeperException.NoNodeException unused) {
            } catch (Exception e) {
                log.error("[{}] Failed to get destination list for namespace {}/{}/{}", new Object[]{clientAppId(), str, str2, str3, e});
                throw new RestException(e);
            }
            newArrayList.sort(null);
            return newArrayList;
        } catch (KeeperException.NoNodeException unused2) {
            log.warn("[{}] Failed to get topic list {}/{}/{}: Namespace does not exist", new Object[]{clientAppId(), str, str2, str3});
            throw new RestException(Response.Status.NOT_FOUND, "Namespace does not exist");
        } catch (Exception e2) {
            log.error("[{}] Failed to get topic list {}/{}/{}", new Object[]{clientAppId(), str, str2, str3, e2});
            throw new RestException(e2);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v23, types: [java.util.List] */
    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Namespace doesn't exist")})
    @Path("/{property}/{cluster}/{namespace}/partitioned")
    @ApiOperation(value = "Get the list of partitioned topics under a namespace.", response = String.class, responseContainer = "List")
    public List<String> getPartitionedTopicList(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3) {
        validateAdminAccessOnProperty(str);
        try {
            policiesCache().get(path(ConfigurationCacheService.POLICIES, str, str2, str3));
            ArrayList newArrayList = Lists.newArrayList();
            try {
                newArrayList = (List) globalZk().getChildren(path(AdminResource.PARTITIONED_TOPIC_PATH_ZNODE, str, str2, str3, domain()), false).stream().map(str4 -> {
                    return String.format("persistent://%s/%s/%s/%s", str, str2, str3, Codec.decode(str4));
                }).collect(Collectors.toList());
            } catch (KeeperException.NoNodeException unused) {
            } catch (Exception e) {
                log.error("[{}] Failed to get partitioned topic list for namespace {}/{}/{}", new Object[]{clientAppId(), str, str2, str3, e});
                throw new RestException(e);
            }
            newArrayList.sort(null);
            return newArrayList;
        } catch (KeeperException.NoNodeException unused2) {
            log.warn("[{}] Failed to get partitioned topic list {}/{}/{}: Namespace does not exist", new Object[]{clientAppId(), str, str2, str3});
            throw new RestException(Response.Status.NOT_FOUND, "Namespace does not exist");
        } catch (Exception e2) {
            log.error("[{}] Failed to get partitioned topic list for namespace {}/{}/{}", new Object[]{clientAppId(), str, str2, str3, e2});
            throw new RestException(e2);
        }
    }

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Namespace doesn't exist")})
    @Path("/{property}/{cluster}/{namespace}/{destination}/permissions")
    @ApiOperation(value = "Get permissions on a destination.", notes = "Retrieve the effective permissions for a destination. These permissions are defined by the permissions set at thenamespace level combined (union) with any eventual specific permission set on the destination.")
    public Map<String, Set<AuthAction>> getPermissionsOnDestination(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("destination") @Encoded String str4) {
        String decode = Codec.decode(str4);
        validateAdminAccessOnProperty(str);
        String destinationName = DestinationName.get(domain(), str, str2, str3, decode).toString();
        try {
            Policies orElseThrow = policiesCache().get(path(ConfigurationCacheService.POLICIES, str, str2, str3)).orElseThrow(() -> {
                return new RestException(Response.Status.NOT_FOUND, "Namespace does not exist");
            });
            TreeMap newTreeMap = Maps.newTreeMap();
            AuthPolicies authPolicies = orElseThrow.auth_policies;
            for (String str5 : authPolicies.namespace_auth.keySet()) {
                newTreeMap.put(str5, authPolicies.namespace_auth.get(str5));
            }
            if (authPolicies.destination_auth.containsKey(destinationName)) {
                for (Map.Entry<String, Set<AuthAction>> entry : authPolicies.destination_auth.get(destinationName).entrySet()) {
                    String key = entry.getKey();
                    Set<AuthAction> value = entry.getValue();
                    if (newTreeMap.containsKey(key)) {
                        newTreeMap.put(key, Sets.union((Set) newTreeMap.get(key), value));
                    } else {
                        newTreeMap.put(key, value);
                    }
                }
            }
            return newTreeMap;
        } catch (Exception e) {
            log.error("[{}] Failed to get permissions for destination {}", new Object[]{clientAppId(), destinationName, e});
            throw new RestException(e);
        }
    }

    protected void validateAdminAndClientPermission(DestinationName destinationName) {
        try {
            validateAdminAccessOnProperty(destinationName.getProperty());
        } catch (Exception unused) {
            try {
                checkAuthorization(pulsar(), destinationName, clientAppId());
            } catch (Exception e) {
                log.warn("Unexpected error while authorizing request. destination={}, role={}. Error: {}", new Object[]{destinationName, clientAppId(), e.getMessage(), e});
                throw new RestException(e);
            } catch (RestException e2) {
                throw e2;
            }
        }
    }

    protected void validateAdminOperationOnDestination(DestinationName destinationName, boolean z) {
        validateAdminAccessOnProperty(destinationName.getProperty());
        validateDestinationOwnership(destinationName, z);
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Namespace doesn't exist"), @ApiResponse(code = 409, message = "Concurrent modification")})
    @Path("/{property}/{cluster}/{namespace}/{destination}/permissions/{role}")
    @ApiOperation("Grant a new permission to a role on a single destination.")
    @POST
    public void grantPermissionsOnDestination(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("destination") @Encoded String str4, @PathParam("role") String str5, Set<AuthAction> set) {
        String decode = Codec.decode(str4);
        validateAdminAccessOnProperty(str);
        validatePoliciesReadOnlyAccess();
        String destinationName = DestinationName.get(domain(), str, str2, str3, decode).toString();
        try {
            Stat stat = new Stat();
            Policies policies = (Policies) jsonMapper().readValue(globalZk().getData(path(ConfigurationCacheService.POLICIES, str, str2, str3), (Watcher) null, stat), Policies.class);
            if (!policies.auth_policies.destination_auth.containsKey(destinationName)) {
                policies.auth_policies.destination_auth.put(destinationName, new TreeMap());
            }
            policies.auth_policies.destination_auth.get(destinationName).put(str5, set);
            globalZk().setData(path(ConfigurationCacheService.POLICIES, str, str2, str3), jsonMapper().writeValueAsBytes(policies), stat.getVersion());
            policiesCache().invalidate(path(ConfigurationCacheService.POLICIES, str, str2, str3));
            log.info("[{}] Successfully granted access for role {}: {} - destination {}", new Object[]{clientAppId(), str5, set, destinationName});
        } catch (KeeperException.NoNodeException unused) {
            log.warn("[{}] Failed to grant permissions on destination {}: Namespace does not exist", clientAppId(), destinationName);
            throw new RestException(Response.Status.NOT_FOUND, "Namespace does not exist");
        } catch (Exception e) {
            log.error("[{}] Failed to grant permissions for destination {}", new Object[]{clientAppId(), destinationName, e});
            throw new RestException(e);
        }
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Namespace doesn't exist"), @ApiResponse(code = TokenId.NULL, message = "Permissions are not set at the destination level")})
    @Path("/{property}/{cluster}/{namespace}/{destination}/permissions/{role}")
    @DELETE
    @ApiOperation(value = "Revoke permissions on a destination.", notes = "Revoke permissions to a role on a single destination. If the permission was not set at the destinationlevel, but rather at the namespace level, this operation will return an error (HTTP status code 412).")
    public void revokePermissionsOnDestination(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("destination") @Encoded String str4, @PathParam("role") String str5) {
        String decode = Codec.decode(str4);
        validateAdminAccessOnProperty(str);
        validatePoliciesReadOnlyAccess();
        String destinationName = DestinationName.get(domain(), str, str2, str3, decode).toString();
        Stat stat = new Stat();
        try {
            Policies policies = (Policies) jsonMapper().readValue(globalZk().getData(path(ConfigurationCacheService.POLICIES, str, str2, str3), (Watcher) null, stat), Policies.class);
            if (!policies.auth_policies.destination_auth.containsKey(destinationName) || !policies.auth_policies.destination_auth.get(destinationName).containsKey(str5)) {
                log.warn("[{}] Failed to revoke permission from role {} on destination: Not set at destination level", new Object[]{clientAppId(), str5, destinationName});
                throw new RestException(Response.Status.PRECONDITION_FAILED, "Permissions are not set at the destination level");
            }
            policies.auth_policies.destination_auth.get(destinationName).remove(str5);
            try {
                String path = path(ConfigurationCacheService.POLICIES, str, str2, str3);
                globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), stat.getVersion());
                policiesCache().invalidate(path);
                globalZkCache().invalidate(path);
                log.info("[{}] Successfully revoke access for role {} - destination {}", new Object[]{clientAppId(), str5, destinationName});
            } catch (Exception e) {
                log.error("[{}] Failed to revoke permissions for destination {}", new Object[]{clientAppId(), destinationName, e});
                throw new RestException(e);
            }
        } catch (KeeperException.NoNodeException unused) {
            log.warn("[{}] Failed to revoke permissions on destination {}: Namespace does not exist", clientAppId(), destinationName);
            throw new RestException(Response.Status.NOT_FOUND, "Namespace does not exist");
        } catch (Exception e2) {
            log.error("[{}] Failed to revoke permissions for destination {}", new Object[]{clientAppId(), destinationName, e2});
            throw new RestException(e2);
        }
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 409, message = "Partitioned topic already exist")})
    @Path("/{property}/{cluster}/{namespace}/{destination}/partitions")
    @ApiOperation(value = "Create a partitioned topic.", notes = "It needs to be called before creating a producer on a partitioned topic.")
    @PUT
    public void createPartitionedTopic(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("destination") @Encoded String str4, int i, @QueryParam("authoritative") @DefaultValue("false") boolean z) {
        DestinationName destinationName = DestinationName.get(domain(), str, str2, str3, Codec.decode(str4));
        validateAdminAccessOnProperty(destinationName.getProperty());
        if (i <= 1) {
            throw new RestException(Response.Status.NOT_ACCEPTABLE, "Number of partitions should be more than 1");
        }
        try {
            zkCreateOptimistic(path(AdminResource.PARTITIONED_TOPIC_PATH_ZNODE, str, str2, str3, domain(), destinationName.getEncodedLocalName()), jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(i)));
            Thread.sleep(1000L);
            log.info("[{}] Successfully created partitioned topic {}", clientAppId(), destinationName);
        } catch (KeeperException.NodeExistsException unused) {
            log.warn("[{}] Failed to create already existing partitioned topic {}", clientAppId(), destinationName);
            throw new RestException(Response.Status.CONFLICT, "Partitioned topic already exist");
        } catch (Exception e) {
            log.error("[{}] Failed to create partitioned topic {}", new Object[]{clientAppId(), destinationName, e});
            throw new RestException(e);
        }
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 409, message = "Partitioned topic does not exist")})
    @Path("/{property}/{cluster}/{namespace}/{destination}/partitions")
    @ApiOperation(value = "Increment partitons of an existing partitioned topic.", notes = "It only increments partitions of existing non-global partitioned-topic")
    @POST
    public void updatePartitionedTopic(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("destination") @Encoded String str4, int i) {
        DestinationName destinationName = DestinationName.get(domain(), str, str2, str3, Codec.decode(str4));
        validateAdminAccessOnProperty(destinationName.getProperty());
        if (destinationName.isGlobal()) {
            log.error("[{}] Update partitioned-topic is forbidden on global namespace {}", clientAppId(), destinationName);
            throw new RestException(Response.Status.FORBIDDEN, "Update forbidden on global namespace");
        }
        if (i <= 1) {
            throw new RestException(Response.Status.NOT_ACCEPTABLE, "Number of partitions should be more than 1");
        }
        try {
            updatePartitionedTopic(destinationName, i).get();
        } catch (Exception e) {
            if (e.getCause() instanceof RestException) {
                throw ((RestException) e.getCause());
            }
            log.error("[{}] Failed to update partitioned topic {}", new Object[]{clientAppId(), destinationName, e.getCause()});
            throw new RestException(e.getCause());
        }
    }

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission")})
    @Path("/{property}/{cluster}/{namespace}/{destination}/partitions")
    @ApiOperation("Get partitioned topic metadata.")
    public PartitionedTopicMetadata getPartitionedMetadata(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("destination") @Encoded String str4, @QueryParam("authoritative") @DefaultValue("false") boolean z) {
        PartitionedTopicMetadata partitionedTopicMetadata = getPartitionedTopicMetadata(str, str2, str3, Codec.decode(str4), z);
        if (partitionedTopicMetadata.partitions > 1) {
            validateClientVersion();
        }
        return partitionedTopicMetadata;
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Partitioned topic does not exist")})
    @Path("/{property}/{cluster}/{namespace}/{destination}/partitions")
    @DELETE
    @ApiOperation(value = "Delete a partitioned topic.", notes = "It will also delete all the partitions of the topic if it exists.")
    public void deletePartitionedTopic(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("destination") @Encoded String str4, @QueryParam("authoritative") @DefaultValue("false") boolean z) {
        String decode = Codec.decode(str4);
        DestinationName destinationName = DestinationName.get(domain(), str, str2, str3, decode);
        validateAdminAccessOnProperty(destinationName.getProperty());
        int i = getPartitionedTopicMetadata(str, str2, str3, decode, z).partitions;
        if (i > 0) {
            CompletableFuture completableFuture = new CompletableFuture();
            AtomicInteger atomicInteger = new AtomicInteger(i);
            for (int i2 = 0; i2 < i; i2++) {
                try {
                    DestinationName partition = destinationName.getPartition(i2);
                    pulsar().getAdminClient().persistentTopics().deleteAsync(partition.toString()).whenComplete((r11, th) -> {
                        if (th == null) {
                            log.info("[{}] Deleted partition {}", clientAppId(), partition);
                        } else if (!(th instanceof PulsarAdminException.NotFoundException)) {
                            completableFuture.completeExceptionally(th);
                            log.error("[{}] Failed to delete partition {}", new Object[]{clientAppId(), partition, th});
                            return;
                        } else if (log.isDebugEnabled()) {
                            log.debug("[{}] Partition not found: {}", clientAppId(), partition);
                        }
                        if (atomicInteger.decrementAndGet() == 0) {
                            completableFuture.complete(null);
                        }
                    });
                } catch (Exception e) {
                    Throwable cause = e.getCause();
                    if (!(cause instanceof PulsarAdminException.PreconditionFailedException)) {
                        throw new RestException(cause);
                    }
                    throw new RestException(Response.Status.PRECONDITION_FAILED, "Topic has active producers/subscriptions");
                }
            }
            completableFuture.get();
        }
        String path = path(AdminResource.PARTITIONED_TOPIC_PATH_ZNODE, str, str2, str3, domain(), destinationName.getEncodedLocalName());
        try {
            globalZk().delete(path, -1);
            globalZkCache().invalidate(path);
            Thread.sleep(1000L);
            log.info("[{}] Deleted partitioned topic {}", clientAppId(), destinationName);
        } catch (KeeperException.NoNodeException unused) {
            throw new RestException(Response.Status.NOT_FOUND, "Partitioned topic does not exist");
        } catch (Exception e2) {
            log.error("[{}] Failed to delete partitioned topic {}", new Object[]{clientAppId(), destinationName, e2});
            throw new RestException(e2);
        }
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist")})
    @Path("/{property}/{cluster}/{namespace}/{destination}/unload")
    @ApiOperation("Unload a topic")
    @PUT
    public void unloadTopic(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("destination") @Encoded String str4, @QueryParam("authoritative") @DefaultValue("false") boolean z) {
        log.info("[{}] Unloading topic {}/{}/{}/{}", new Object[]{clientAppId(), str, str2, str3, str4});
        DestinationName destinationName = DestinationName.get(domain(), str, str2, str3, Codec.decode(str4));
        if (str2.equals(Namespaces.GLOBAL_CLUSTER)) {
            validateGlobalNamespaceOwnership(new NamespaceName(str, str2, str3));
        }
        unloadTopic(destinationName, z);
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = TokenId.NULL, message = "Topic has active producers/subscriptions")})
    @Path("/{property}/{cluster}/{namespace}/{destination}")
    @DELETE
    @ApiOperation(value = "Delete a topic.", notes = "The topic cannot be deleted if there's any active subscription or producer connected to the it.")
    public void deleteTopic(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("destination") @Encoded String str4, @QueryParam("authoritative") @DefaultValue("false") boolean z) {
        DestinationName destinationName = DestinationName.get(domain(), str, str2, str3, Codec.decode(str4));
        validateAdminOperationOnDestination(destinationName, z);
        Topic topicReference = getTopicReference(destinationName);
        if (destinationName.isGlobal()) {
            log.error("[{}] Delete topic is forbidden on global namespace {}", clientAppId(), destinationName);
            throw new RestException(Response.Status.FORBIDDEN, "Delete forbidden on global namespace");
        }
        try {
            topicReference.delete().get();
            log.info("[{}] Successfully removed topic {}", clientAppId(), destinationName);
        } catch (Exception e) {
            Throwable cause = e.getCause();
            log.error("[{}] Failed to get delete topic {}", new Object[]{clientAppId(), destinationName, cause});
            if (!(cause instanceof BrokerServiceException.TopicBusyException)) {
                throw new RestException(cause);
            }
            throw new RestException(Response.Status.PRECONDITION_FAILED, "Topic has active producers/subscriptions");
        }
    }

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist")})
    @Path("/{property}/{cluster}/{namespace}/{destination}/subscriptions")
    @ApiOperation("Get the list of persistent subscriptions for a given topic.")
    public List<String> getSubscriptions(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("destination") @Encoded String str4, @QueryParam("authoritative") @DefaultValue("false") boolean z) {
        String decode = Codec.decode(str4);
        DestinationName destinationName = DestinationName.get(domain(), str, str2, str3, decode);
        if (str2.equals(Namespaces.GLOBAL_CLUSTER)) {
            validateGlobalNamespaceOwnership(new NamespaceName(str, str2, str3));
        }
        ArrayList newArrayList = Lists.newArrayList();
        if (getPartitionedTopicMetadata(str, str2, str3, decode, z).partitions > 0) {
            try {
                newArrayList.addAll(pulsar().getAdminClient().persistentTopics().getSubscriptions(destinationName.getPartition(0).toString()));
            } catch (Exception e) {
                throw new RestException(e);
            }
        } else {
            validateAdminOperationOnDestination(destinationName, z);
            try {
                getTopicReference(destinationName).getSubscriptions().forEach((str5, subscription) -> {
                    newArrayList.add(str5);
                });
            } catch (Exception e2) {
                log.error("[{}] Failed to get list of subscriptions for {}", clientAppId(), destinationName);
                throw new RestException(e2);
            }
        }
        return newArrayList;
    }

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist")})
    @Path("{property}/{cluster}/{namespace}/{destination}/stats")
    @ApiOperation("Get the stats for the topic.")
    public PersistentTopicStats getStats(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("destination") @Encoded String str4, @QueryParam("authoritative") @DefaultValue("false") boolean z) {
        DestinationName destinationName = DestinationName.get(domain(), str, str2, str3, Codec.decode(str4));
        validateAdminAndClientPermission(destinationName);
        if (str2.equals(Namespaces.GLOBAL_CLUSTER)) {
            validateGlobalNamespaceOwnership(new NamespaceName(str, str2, str3));
        }
        validateDestinationOwnership(destinationName, z);
        return getTopicReference(destinationName).getStats();
    }

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist")})
    @Path("{property}/{cluster}/{namespace}/{destination}/internalStats")
    @ApiOperation("Get the internal stats for the topic.")
    public PersistentTopicInternalStats getInternalStats(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("destination") @Encoded String str4, @QueryParam("authoritative") @DefaultValue("false") boolean z) {
        DestinationName destinationName = DestinationName.get(domain(), str, str2, str3, Codec.decode(str4));
        validateAdminAndClientPermission(destinationName);
        if (str2.equals(Namespaces.GLOBAL_CLUSTER)) {
            validateGlobalNamespaceOwnership(new NamespaceName(str, str2, str3));
        }
        validateDestinationOwnership(destinationName, z);
        return getTopicReference(destinationName).getInternalStats();
    }

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist")})
    @Path("{property}/{cluster}/{namespace}/{destination}/internal-info")
    @ApiOperation("Get the internal stats for the topic.")
    public void getManagedLedgerInfo(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("destination") @Encoded String str4, @Suspended final AsyncResponse asyncResponse) {
        DestinationName destinationName = DestinationName.get(domain(), str, str2, str3, Codec.decode(str4));
        validateAdminAccessOnProperty(destinationName.getProperty());
        if (str2.equals(Namespaces.GLOBAL_CLUSTER)) {
            validateGlobalNamespaceOwnership(new NamespaceName(str, str2, str3));
        }
        pulsar().getManagedLedgerFactory().asyncGetManagedLedgerInfo(destinationName.getPersistenceNamingEncoding(), new AsyncCallbacks.ManagedLedgerInfoCallback() { // from class: org.apache.pulsar.broker.admin.PersistentTopics.1
            @Override // org.apache.bookkeeper.mledger.AsyncCallbacks.ManagedLedgerInfoCallback
            public void getInfoComplete(ManagedLedgerInfo managedLedgerInfo, Object obj) {
                asyncResponse.resume(outputStream -> {
                    PersistentTopics.jsonMapper().writer().writeValue(outputStream, managedLedgerInfo);
                });
            }

            @Override // org.apache.bookkeeper.mledger.AsyncCallbacks.ManagedLedgerInfoCallback
            public void getInfoFailed(ManagedLedgerException managedLedgerException, Object obj) {
                asyncResponse.resume(managedLedgerException);
            }
        }, null);
    }

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist")})
    @Path("{property}/{cluster}/{namespace}/{destination}/partitioned-stats")
    @ApiOperation("Get the stats for the partitioned topic.")
    public PartitionedTopicStats getPartitionedStats(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("destination") @Encoded String str4, @QueryParam("authoritative") @DefaultValue("false") boolean z) {
        String decode = Codec.decode(str4);
        DestinationName destinationName = DestinationName.get(domain(), str, str2, str3, decode);
        PartitionedTopicMetadata partitionedTopicMetadata = getPartitionedTopicMetadata(str, str2, str3, decode, z);
        if (partitionedTopicMetadata.partitions == 0) {
            throw new RestException(Response.Status.NOT_FOUND, "Partitioned Topic not found");
        }
        if (str2.equals(Namespaces.GLOBAL_CLUSTER)) {
            validateGlobalNamespaceOwnership(new NamespaceName(str, str2, str3));
        }
        PartitionedTopicStats partitionedTopicStats = new PartitionedTopicStats(partitionedTopicMetadata);
        for (int i = 0; i < partitionedTopicMetadata.partitions; i++) {
            try {
                PersistentTopicStats stats = pulsar().getAdminClient().persistentTopics().getStats(destinationName.getPartition(i).toString());
                partitionedTopicStats.add(stats);
                partitionedTopicStats.partitions.put(destinationName.getPartition(i).toString(), stats);
            } catch (Exception e) {
                throw new RestException(e);
            }
        }
        return partitionedTopicStats;
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = TokenId.NULL, message = "Subscription has active consumers")})
    @Path("/{property}/{cluster}/{namespace}/{destination}/subscription/{subName}")
    @DELETE
    @ApiOperation(value = "Delete a subscription.", notes = "There should not be any active consumers on the subscription.")
    public void deleteSubscription(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("destination") @Encoded String str4, @PathParam("subName") String str5, @QueryParam("authoritative") @DefaultValue("false") boolean z) {
        String decode = Codec.decode(str4);
        DestinationName destinationName = DestinationName.get(domain(), str, str2, str3, decode);
        if (str2.equals(Namespaces.GLOBAL_CLUSTER)) {
            validateGlobalNamespaceOwnership(new NamespaceName(str, str2, str3));
        }
        PartitionedTopicMetadata partitionedTopicMetadata = getPartitionedTopicMetadata(str, str2, str3, decode, z);
        if (partitionedTopicMetadata.partitions > 0) {
            for (int i = 0; i < partitionedTopicMetadata.partitions; i++) {
                try {
                    pulsar().getAdminClient().persistentTopics().deleteSubscription(destinationName.getPartition(i).toString(), str5);
                } catch (Exception e) {
                    if (e instanceof PulsarAdminException.NotFoundException) {
                        throw new RestException(Response.Status.NOT_FOUND, "Subscription not found");
                    }
                    if (e instanceof PulsarAdminException.PreconditionFailedException) {
                        throw new RestException(Response.Status.PRECONDITION_FAILED, "Subscription has active connected consumers");
                    }
                    log.error("[{}] Failed to delete subscription {} {}", new Object[]{clientAppId(), destinationName, str5, e});
                    throw new RestException(e);
                }
            }
            return;
        }
        validateAdminOperationOnDestination(destinationName, z);
        try {
            Subscription subscription = getTopicReference(destinationName).getSubscription(str5);
            Preconditions.checkNotNull(subscription);
            subscription.delete().get();
            log.info("[{}][{}] Deleted subscription {}", new Object[]{clientAppId(), destinationName, str5});
        } catch (Exception e2) {
            Throwable cause = e2.getCause();
            if (e2 instanceof NullPointerException) {
                throw new RestException(Response.Status.NOT_FOUND, "Subscription not found");
            }
            if (cause instanceof BrokerServiceException.SubscriptionBusyException) {
                throw new RestException(Response.Status.PRECONDITION_FAILED, "Subscription has active connected consumers");
            }
            log.error("[{}] Failed to delete subscription {} {}", new Object[]{clientAppId(), destinationName, str5, e2});
            throw new RestException(cause);
        }
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = TokenId.DoubleConstant, message = "Operation not allowed on non-persistent topic"), @ApiResponse(code = 404, message = "Topic or subscription does not exist")})
    @Path("/{property}/{cluster}/{namespace}/{destination}/subscription/{subName}/skip_all")
    @ApiOperation(value = "Skip all messages on a topic subscription.", notes = "Completely clears the backlog on the subscription.")
    @POST
    public void skipAllMessages(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("destination") @Encoded String str4, @PathParam("subName") String str5, @QueryParam("authoritative") @DefaultValue("false") boolean z) {
        String decode = Codec.decode(str4);
        DestinationName destinationName = DestinationName.get(domain(), str, str2, str3, decode);
        if (str2.equals(Namespaces.GLOBAL_CLUSTER)) {
            validateGlobalNamespaceOwnership(new NamespaceName(str, str2, str3));
        }
        PartitionedTopicMetadata partitionedTopicMetadata = getPartitionedTopicMetadata(str, str2, str3, decode, z);
        if (partitionedTopicMetadata.partitions > 0) {
            for (int i = 0; i < partitionedTopicMetadata.partitions; i++) {
                try {
                    pulsar().getAdminClient().persistentTopics().skipAllMessages(destinationName.getPartition(i).toString(), str5);
                } catch (Exception e) {
                    throw new RestException(e);
                }
            }
            return;
        }
        validateAdminOperationOnDestination(destinationName, z);
        PersistentTopic persistentTopic = (PersistentTopic) getTopicReference(destinationName);
        try {
            if (str5.startsWith(persistentTopic.replicatorPrefix)) {
                PersistentReplicator persistentReplicator = (PersistentReplicator) persistentTopic.getPersistentReplicator(PersistentReplicator.getRemoteCluster(str5));
                Preconditions.checkNotNull(persistentReplicator);
                persistentReplicator.clearBacklog().get();
            } else {
                PersistentSubscription subscription = persistentTopic.getSubscription(str5);
                Preconditions.checkNotNull(subscription);
                subscription.clearBacklog().get();
            }
            log.info("[{}] Cleared backlog on {} {}", new Object[]{clientAppId(), destinationName, str5});
        } catch (NullPointerException unused) {
            throw new RestException(Response.Status.NOT_FOUND, "Subscription not found");
        } catch (Exception e2) {
            log.error("[{}] Failed to skip all messages {} {}", new Object[]{clientAppId(), destinationName, str5, e2});
            throw new RestException(e2);
        }
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic or subscription does not exist")})
    @Path("/{property}/{cluster}/{namespace}/{destination}/subscription/{subName}/skip/{numMessages}")
    @ApiOperation("Skip messages on a topic subscription.")
    @POST
    public void skipMessages(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("destination") @Encoded String str4, @PathParam("subName") String str5, @PathParam("numMessages") int i, @QueryParam("authoritative") @DefaultValue("false") boolean z) {
        String decode = Codec.decode(str4);
        DestinationName destinationName = DestinationName.get(domain(), str, str2, str3, decode);
        if (str2.equals(Namespaces.GLOBAL_CLUSTER)) {
            validateGlobalNamespaceOwnership(new NamespaceName(str, str2, str3));
        }
        if (getPartitionedTopicMetadata(str, str2, str3, decode, z).partitions > 0) {
            throw new RestException(Response.Status.METHOD_NOT_ALLOWED, "Skip messages on a partitioned topic is not allowed");
        }
        validateAdminOperationOnDestination(destinationName, z);
        PersistentTopic persistentTopic = (PersistentTopic) getTopicReference(destinationName);
        try {
            if (str5.startsWith(persistentTopic.replicatorPrefix)) {
                PersistentReplicator persistentReplicator = (PersistentReplicator) persistentTopic.getPersistentReplicator(PersistentReplicator.getRemoteCluster(str5));
                Preconditions.checkNotNull(persistentReplicator);
                persistentReplicator.skipMessages(i).get();
            } else {
                PersistentSubscription subscription = persistentTopic.getSubscription(str5);
                Preconditions.checkNotNull(subscription);
                subscription.skipMessages(i).get();
            }
            log.info("[{}] Skipped {} messages on {} {}", new Object[]{clientAppId(), Integer.valueOf(i), destinationName, str5});
        } catch (NullPointerException unused) {
            throw new RestException(Response.Status.NOT_FOUND, "Subscription not found");
        } catch (Exception e) {
            log.error("[{}] Failed to skip {} messages {} {}", new Object[]{clientAppId(), Integer.valueOf(i), destinationName, str5, e});
            throw new RestException(e);
        }
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic or subscription does not exist")})
    @Path("/{property}/{cluster}/{namespace}/{destination}/subscription/{subName}/expireMessages/{expireTimeInSeconds}")
    @ApiOperation("Expire messages on a topic subscription.")
    @POST
    public void expireTopicMessages(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("destination") @Encoded String str4, @PathParam("subName") String str5, @PathParam("expireTimeInSeconds") int i, @QueryParam("authoritative") @DefaultValue("false") boolean z) {
        expireMessages(str, str2, str3, Codec.decode(str4), str5, i, z);
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic or subscription does not exist")})
    @Path("/{property}/{cluster}/{namespace}/{destination}/all_subscription/expireMessages/{expireTimeInSeconds}")
    @ApiOperation("Expire messages on all subscriptions of topic.")
    @POST
    public void expireMessagesForAllSubscriptions(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("destination") @Encoded String str4, @PathParam("expireTimeInSeconds") int i, @QueryParam("authoritative") @DefaultValue("false") boolean z) {
        String decode = Codec.decode(str4);
        DestinationName destinationName = DestinationName.get(domain(), str, str2, str3, decode);
        if (str2.equals(Namespaces.GLOBAL_CLUSTER)) {
            validateGlobalNamespaceOwnership(new NamespaceName(str, str2, str3));
        }
        PartitionedTopicMetadata partitionedTopicMetadata = getPartitionedTopicMetadata(str, str2, str3, decode, z);
        if (partitionedTopicMetadata.partitions <= 0) {
            validateAdminOperationOnDestination(destinationName, z);
            PersistentTopic persistentTopic = (PersistentTopic) getTopicReference(destinationName);
            persistentTopic.getReplicators().forEach((str5, replicator) -> {
                expireMessages(str, str2, str3, decode, str5, i, z);
            });
            persistentTopic.getSubscriptions().forEach((str6, persistentSubscription) -> {
                expireMessages(str, str2, str3, decode, str6, i, z);
            });
            return;
        }
        for (int i2 = 0; i2 < partitionedTopicMetadata.partitions; i2++) {
            try {
                pulsar().getAdminClient().persistentTopics().expireMessagesForAllSubscriptions(destinationName.getPartition(i2).toString(), i);
            } catch (Exception e) {
                log.error("[{}] Failed to expire messages up to {} on {} {}", new Object[]{clientAppId(), Integer.valueOf(i), destinationName, e});
                throw new RestException(e);
            }
        }
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic/Subscription does not exist")})
    @Path("/{property}/{cluster}/{namespace}/{destination}/subscription/{subName}/resetcursor/{timestamp}")
    @ApiOperation(value = "Reset subscription to message position closest to absolute timestamp (in ms).", notes = "It fence cursor and disconnects all active consumers before reseting cursor.")
    @POST
    public void resetCursor(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("destination") @Encoded String str4, @PathParam("subName") String str5, @PathParam("timestamp") long j, @QueryParam("authoritative") @DefaultValue("false") boolean z) {
        String decode = Codec.decode(str4);
        DestinationName destinationName = DestinationName.get(domain(), str, str2, str3, decode);
        if (str2.equals(Namespaces.GLOBAL_CLUSTER)) {
            validateGlobalNamespaceOwnership(new NamespaceName(str, str2, str3));
        }
        PartitionedTopicMetadata partitionedTopicMetadata = getPartitionedTopicMetadata(str, str2, str3, decode, z);
        if (partitionedTopicMetadata.partitions > 0) {
            int i = partitionedTopicMetadata.partitions;
            int i2 = 0;
            PulsarAdminException.PreconditionFailedException preconditionFailedException = null;
            for (int i3 = 0; i3 < i; i3++) {
                try {
                    pulsar().getAdminClient().persistentTopics().resetCursor(destinationName.getPartition(i3).toString(), str5, j);
                } catch (PulsarAdminException.PreconditionFailedException e) {
                    i2 = 0 + 1;
                    preconditionFailedException = e;
                } catch (Exception e2) {
                    log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}", new Object[]{clientAppId(), destinationName, str5, Long.valueOf(j), e2});
                    throw new RestException(e2);
                }
            }
            if (i2 == i) {
                log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}", new Object[]{clientAppId(), destinationName, str5, Long.valueOf(j), preconditionFailedException});
                throw new RestException(Response.Status.PRECONDITION_FAILED, preconditionFailedException.getMessage());
            }
            if (i2 > 0) {
                log.warn("[{}][{}] partial errors for reset cursor on subscription {} to time {} - ", new Object[]{clientAppId(), decode, str5, Long.valueOf(j), preconditionFailedException});
                return;
            }
            return;
        }
        validateAdminOperationOnDestination(destinationName, z);
        log.info("[{}][{}] received reset cursor on subscription {} to time {}", new Object[]{clientAppId(), decode, str5, Long.valueOf(j)});
        PersistentTopic persistentTopic = (PersistentTopic) getTopicReference(destinationName);
        if (persistentTopic == null) {
            throw new RestException(Response.Status.NOT_FOUND, "Topic not found");
        }
        try {
            PersistentSubscription subscription = persistentTopic.getSubscription(str5);
            Preconditions.checkNotNull(subscription);
            subscription.resetCursor(j).get();
            log.info("[{}][{}] reset cursor on subscription {} to time {}", new Object[]{clientAppId(), destinationName, str5, Long.valueOf(j)});
        } catch (Exception e3) {
            Throwable cause = e3.getCause();
            log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}", new Object[]{clientAppId(), destinationName, str5, Long.valueOf(j), e3});
            if (e3 instanceof NullPointerException) {
                throw new RestException(Response.Status.NOT_FOUND, "Subscription not found");
            }
            if (e3 instanceof BrokerServiceException.NotAllowedException) {
                throw new RestException(Response.Status.METHOD_NOT_ALLOWED, e3.getMessage());
            }
            if (!(cause instanceof BrokerServiceException.SubscriptionInvalidCursorPosition)) {
                throw new RestException(e3);
            }
            throw new RestException(Response.Status.PRECONDITION_FAILED, "Unable to find position for timestamp specified -" + cause.getMessage());
        }
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic/Subscription does not exist"), @ApiResponse(code = TokenId.DoubleConstant, message = "Not supported for partitioned topics")})
    @Path("/{property}/{cluster}/{namespace}/{destination}/subscription/{subName}/resetcursor")
    @ApiOperation(value = "Reset subscription to message position closest to given position.", notes = "It fence cursor and disconnects all active consumers before reseting cursor.")
    @POST
    public void resetCursorOnPosition(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("destination") @Encoded String str4, @PathParam("subName") String str5, @QueryParam("authoritative") @DefaultValue("false") boolean z, MessageIdImpl messageIdImpl) {
        String decode = Codec.decode(str4);
        DestinationName destinationName = DestinationName.get(domain(), str, str2, str3, decode);
        if (str2.equals(Namespaces.GLOBAL_CLUSTER)) {
            validateGlobalNamespaceOwnership(new NamespaceName(str, str2, str3));
        }
        log.info("[{}][{}] received reset cursor on subscription {} to position {}", new Object[]{clientAppId(), decode, str5, messageIdImpl});
        if (getPartitionedTopicMetadata(str, str2, str3, decode, z).partitions > 0) {
            log.warn("[{}] Not supported operation on partitioned-topic {} {}", new Object[]{clientAppId(), destinationName, str5});
            throw new RestException(Response.Status.METHOD_NOT_ALLOWED, "Reset-cursor at position is not allowed for partitioned-topic");
        }
        validateAdminOperationOnDestination(destinationName, z);
        PersistentTopic persistentTopic = (PersistentTopic) getTopicReference(destinationName);
        if (persistentTopic == null) {
            throw new RestException(Response.Status.NOT_FOUND, "Topic not found");
        }
        try {
            PersistentSubscription subscription = persistentTopic.getSubscription(str5);
            Preconditions.checkNotNull(subscription);
            subscription.resetCursor(PositionImpl.get(messageIdImpl.getLedgerId(), messageIdImpl.getEntryId())).get();
            log.info("[{}][{}] successfully reset cursor on subscription {} to position {}", new Object[]{clientAppId(), destinationName, str5, messageIdImpl});
        } catch (Exception e) {
            Throwable cause = e.getCause();
            log.warn("[{}] [{}] Failed to reset cursor on subscription {} to position {}", new Object[]{clientAppId(), destinationName, str5, messageIdImpl, e});
            if (e instanceof NullPointerException) {
                throw new RestException(Response.Status.NOT_FOUND, "Subscription not found");
            }
            if (!(cause instanceof BrokerServiceException.SubscriptionInvalidCursorPosition)) {
                throw new RestException(e);
            }
            throw new RestException(Response.Status.PRECONDITION_FAILED, "Unable to find position for position specified: " + cause.getMessage());
        }
    }

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic, subscription or the message position does not exist")})
    @Path("/{property}/{cluster}/{namespace}/{destination}/subscription/{subName}/position/{messagePosition}")
    @ApiOperation("Peek nth message on a topic subscription.")
    public Response peekNthMessage(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("destination") @Encoded String str4, @PathParam("subName") String str5, @PathParam("messagePosition") int i, @QueryParam("authoritative") @DefaultValue("false") boolean z) {
        String decode = Codec.decode(str4);
        DestinationName destinationName = DestinationName.get(domain(), str, str2, str3, decode);
        if (str2.equals(Namespaces.GLOBAL_CLUSTER)) {
            validateGlobalNamespaceOwnership(new NamespaceName(str, str2, str3));
        }
        if (getPartitionedTopicMetadata(str, str2, str3, decode, z).partitions > 0) {
            throw new RestException(Response.Status.METHOD_NOT_ALLOWED, "Peek messages on a partitioned topic is not allowed");
        }
        validateAdminOperationOnDestination(destinationName, z);
        if (!(getTopicReference(destinationName) instanceof PersistentTopic)) {
            log.error("[{}] Not supported operation of non-persistent topic {} {}", new Object[]{clientAppId(), destinationName, str5});
            throw new RestException(Response.Status.METHOD_NOT_ALLOWED, "Skip messages on a non-persistent topic is not allowed");
        }
        PersistentTopic persistentTopic = (PersistentTopic) getTopicReference(destinationName);
        PersistentReplicator persistentReplicator = null;
        PersistentSubscription persistentSubscription = null;
        Entry entry = null;
        if (str5.startsWith(persistentTopic.replicatorPrefix)) {
            persistentReplicator = getReplicatorReference(str5, persistentTopic);
        } else {
            persistentSubscription = (PersistentSubscription) getSubscriptionReference(str5, persistentTopic);
        }
        try {
            try {
                entry = str5.startsWith(persistentTopic.replicatorPrefix) ? persistentReplicator.peekNthMessage(i).get() : persistentSubscription.peekNthMessage(i).get();
                Preconditions.checkNotNull(entry);
                PositionImpl positionImpl = (PositionImpl) entry.getPosition();
                ByteBuf dataBuffer = entry.getDataBuffer();
                PulsarApi.MessageMetadata parseMessageMetadata = Commands.parseMessageMetadata(dataBuffer);
                Response.ResponseBuilder ok = Response.ok();
                ok.header("X-Pulsar-Message-ID", positionImpl.toString());
                for (PulsarApi.KeyValue keyValue : parseMessageMetadata.getPropertiesList()) {
                    ok.header("X-Pulsar-PROPERTY-" + keyValue.getKey(), keyValue.getValue());
                }
                if (parseMessageMetadata.hasPublishTime()) {
                    ok.header("X-Pulsar-publish-time", DateFormatter.format(parseMessageMetadata.getPublishTime()));
                }
                if (parseMessageMetadata.hasEventTime()) {
                    ok.header("X-Pulsar-event-time", DateFormatter.format(parseMessageMetadata.getEventTime()));
                }
                if (parseMessageMetadata.hasNumMessagesInBatch()) {
                    ok.header("X-Pulsar-num-batch-message", Integer.valueOf(parseMessageMetadata.getNumMessagesInBatch()));
                }
                ByteBuf decode2 = CompressionCodecProvider.getCompressionCodec(parseMessageMetadata.getCompression()).decode(dataBuffer, parseMessageMetadata.getUncompressedSize());
                final ByteBuf heapBuffer = PooledByteBufAllocator.DEFAULT.heapBuffer(decode2.readableBytes(), decode2.readableBytes());
                heapBuffer.writeBytes(decode2);
                decode2.release();
                Response build = ok.entity(new StreamingOutput() { // from class: org.apache.pulsar.broker.admin.PersistentTopics.2
                    public void write(OutputStream outputStream) throws IOException, WebApplicationException {
                        outputStream.write(heapBuffer.array(), heapBuffer.arrayOffset(), heapBuffer.readableBytes());
                        heapBuffer.release();
                    }
                }).build();
                if (entry != null) {
                    entry.release();
                }
                return build;
            } catch (NullPointerException unused) {
                throw new RestException(Response.Status.NOT_FOUND, "Message not found");
            } catch (Exception e) {
                log.error("[{}] Failed to get message at position {} from {} {}", new Object[]{clientAppId(), Integer.valueOf(i), destinationName, str5, e});
                throw new RestException(e);
            }
        } catch (Throwable th) {
            if (entry != null) {
                entry.release();
            }
            throw th;
        }
    }

    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Namespace does not exist")})
    @Path("{property}/{cluster}/{namespace}/{destination}/backlog")
    @ApiOperation("Get estimated backlog for offline topic.")
    public PersistentOfflineTopicStats getBacklog(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("destination") @Encoded String str4, @QueryParam("authoritative") @DefaultValue("false") boolean z) {
        String decode = Codec.decode(str4);
        validateAdminAccessOnProperty(str);
        if (str2.equals(Namespaces.GLOBAL_CLUSTER)) {
            validateGlobalNamespaceOwnership(new NamespaceName(str, str2, str3));
        }
        try {
            policiesCache().get(path(ConfigurationCacheService.POLICIES, str, str2, str3));
            DestinationName destinationName = DestinationName.get(domain(), str, str2, str3, decode);
            try {
                PersistentOfflineTopicStats offlineTopicStat = pulsar().getBrokerService().getOfflineTopicStat(destinationName);
                if (offlineTopicStat != null && TimeUnit.MINUTES.convert(System.currentTimeMillis() - offlineTopicStat.statGeneratedAt.getTime(), TimeUnit.MILLISECONDS) < 10) {
                    return offlineTopicStat;
                }
                ManagedLedgerConfig managedLedgerConfig = pulsar().getBrokerService().getManagedLedgerConfig(destinationName).get();
                PersistentOfflineTopicStats estimateUnloadedTopicBacklog = new ManagedLedgerOfflineBacklog(managedLedgerConfig.getDigestType(), managedLedgerConfig.getPassword(), pulsar().getAdvertisedAddress(), false).estimateUnloadedTopicBacklog((ManagedLedgerFactoryImpl) pulsar().getManagedLedgerFactory(), destinationName);
                pulsar().getBrokerService().cacheOfflineTopicStats(destinationName, estimateUnloadedTopicBacklog);
                return estimateUnloadedTopicBacklog;
            } catch (Exception e) {
                throw new RestException(e);
            }
        } catch (KeeperException.NoNodeException unused) {
            log.warn("[{}] Failed to get topic backlog {}/{}/{}: Namespace does not exist", new Object[]{clientAppId(), str, str2, str3});
            throw new RestException(Response.Status.NOT_FOUND, "Namespace does not exist");
        } catch (Exception e2) {
            log.error("[{}] Failed to get topic backlog {}/{}/{}", new Object[]{clientAppId(), str, str2, str3, e2});
            throw new RestException(e2);
        }
    }

    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = TokenId.DoubleConstant, message = "Operation not allowed on non-persistent topic"), @ApiResponse(code = 404, message = "Topic does not exist")})
    @Path("/{property}/{cluster}/{namespace}/{destination}/terminate")
    @ApiOperation("Terminate a topic. A topic that is terminated will not accept any more messages to be published and will let consumer to drain existing messages in backlog")
    @POST
    public MessageId terminate(@PathParam("property") String str, @PathParam("cluster") String str2, @PathParam("namespace") String str3, @PathParam("destination") @Encoded String str4, @QueryParam("authoritative") @DefaultValue("false") boolean z) {
        String decode = Codec.decode(str4);
        DestinationName destinationName = DestinationName.get(domain(), str, str2, str3, decode);
        if (str2.equals(Namespaces.GLOBAL_CLUSTER)) {
            validateGlobalNamespaceOwnership(new NamespaceName(str, str2, str3));
        }
        if (getPartitionedTopicMetadata(str, str2, str3, decode, z).partitions > 0) {
            throw new RestException(Response.Status.METHOD_NOT_ALLOWED, "Termination of a partitioned topic is not allowed");
        }
        validateAdminOperationOnDestination(destinationName, z);
        try {
            return ((PersistentTopic) getTopicReference(destinationName)).terminate().get();
        } catch (Exception e) {
            log.error("[{}] Failed to terminated topic {}", new Object[]{clientAppId(), destinationName, e});
            throw new RestException(e);
        }
    }

    public void expireMessages(String str, String str2, String str3, String str4, String str5, int i, boolean z) {
        DestinationName destinationName = DestinationName.get(domain(), str, str2, str3, str4);
        if (str2.equals(Namespaces.GLOBAL_CLUSTER)) {
            validateGlobalNamespaceOwnership(new NamespaceName(str, str2, str3));
        }
        PartitionedTopicMetadata partitionedTopicMetadata = getPartitionedTopicMetadata(str, str2, str3, str4, z);
        if (partitionedTopicMetadata.partitions > 0) {
            for (int i2 = 0; i2 < partitionedTopicMetadata.partitions; i2++) {
                try {
                    pulsar().getAdminClient().persistentTopics().expireMessages(destinationName.getPartition(i2).toString(), str5, i);
                } catch (Exception e) {
                    throw new RestException(e);
                }
            }
            return;
        }
        validateAdminOperationOnDestination(destinationName, z);
        if (!(getTopicReference(destinationName) instanceof PersistentTopic)) {
            log.error("[{}] Not supported operation of non-persistent topic {} {}", new Object[]{clientAppId(), destinationName, str5});
            throw new RestException(Response.Status.METHOD_NOT_ALLOWED, "Expire messages on a non-persistent topic is not allowed");
        }
        PersistentTopic persistentTopic = (PersistentTopic) getTopicReference(destinationName);
        try {
            if (str5.startsWith(persistentTopic.replicatorPrefix)) {
                PersistentReplicator persistentReplicator = (PersistentReplicator) persistentTopic.getPersistentReplicator(PersistentReplicator.getRemoteCluster(str5));
                Preconditions.checkNotNull(persistentReplicator);
                persistentReplicator.expireMessages(i);
            } else {
                PersistentSubscription subscription = persistentTopic.getSubscription(str5);
                Preconditions.checkNotNull(subscription);
                subscription.expireMessages(i);
            }
            log.info("[{}] Message expire started up to {} on {} {}", new Object[]{clientAppId(), Integer.valueOf(i), destinationName, str5});
        } catch (NullPointerException unused) {
            throw new RestException(Response.Status.NOT_FOUND, "Subscription not found");
        } catch (Exception e2) {
            log.error("[{}] Failed to expire messages up to {} on {} with subscription {} {}", new Object[]{clientAppId(), Integer.valueOf(i), destinationName, str5, e2});
            throw new RestException(e2);
        }
    }

    public static CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(PulsarService pulsarService, String str, DestinationName destinationName) {
        CompletableFuture<PartitionedTopicMetadata> completableFuture = new CompletableFuture<>();
        try {
            try {
                try {
                    checkAuthorization(pulsarService, destinationName, str);
                } catch (RestException unused) {
                    try {
                        validateAdminAccessOnProperty(pulsarService, str, destinationName.getProperty());
                    } catch (RestException e) {
                        log.warn("Failed to authorize {} on cluster {}", str, destinationName.toString());
                        throw new PulsarClientException(String.format("Authorization failed %s on cluster %s with error %s", str, destinationName.toString(), e.getMessage()));
                    }
                }
                String path = path(AdminResource.PARTITIONED_TOPIC_PATH_ZNODE, destinationName.getProperty(), destinationName.getCluster(), destinationName.getNamespacePortion(), "persistent", destinationName.getEncodedLocalName());
                checkLocalOrGetPeerReplicationCluster(pulsarService, destinationName.getNamespaceObject()).thenCompose(clusterData -> {
                    return fetchPartitionedTopicMetadataAsync(pulsarService, path);
                }).thenAccept((Consumer<? super U>) partitionedTopicMetadata -> {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] Total number of partitions for topic {} is {}", new Object[]{str, destinationName, Integer.valueOf(partitionedTopicMetadata.partitions)});
                    }
                    completableFuture.complete(partitionedTopicMetadata);
                }).exceptionally(th -> {
                    completableFuture.completeExceptionally(th.getCause());
                    return null;
                });
            } catch (Exception e2) {
                log.warn("Failed to authorize {} on cluster {} with unexpected exception {}", new Object[]{str, destinationName.toString(), e2.getMessage(), e2});
                throw e2;
            }
        } catch (Exception e3) {
            completableFuture.completeExceptionally(e3);
        }
        return completableFuture;
    }

    private Topic getTopicReference(DestinationName destinationName) {
        try {
            Topic topicReference = pulsar().getBrokerService().getTopicReference(destinationName.toString());
            Preconditions.checkNotNull(topicReference);
            return topicReference;
        } catch (Exception unused) {
            throw new RestException(Response.Status.NOT_FOUND, "Topic not found");
        }
    }

    private Subscription getSubscriptionReference(String str, PersistentTopic persistentTopic) {
        try {
            return (Subscription) Preconditions.checkNotNull(persistentTopic.getSubscription(str));
        } catch (Exception unused) {
            throw new RestException(Response.Status.NOT_FOUND, "Subscription not found");
        }
    }

    private PersistentReplicator getReplicatorReference(String str, PersistentTopic persistentTopic) {
        try {
            return (PersistentReplicator) Preconditions.checkNotNull((PersistentReplicator) persistentTopic.getPersistentReplicator(PersistentReplicator.getRemoteCluster(str)));
        } catch (Exception unused) {
            throw new RestException(Response.Status.NOT_FOUND, "Replicator not found");
        }
    }

    private CompletableFuture<Void> updatePartitionedTopic(DestinationName destinationName, int i) {
        String path = path(AdminResource.PARTITIONED_TOPIC_PATH_ZNODE, destinationName.getProperty(), destinationName.getCluster(), destinationName.getNamespacePortion(), domain(), destinationName.getEncodedLocalName());
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        createSubscriptions(destinationName, i).thenAccept(r11 -> {
            try {
                globalZk().setData(path, jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(i)), -1, (i2, str, obj, stat) -> {
                    if (i2 == KeeperException.Code.OK.intValue()) {
                        completableFuture.complete(null);
                    } else {
                        completableFuture.completeExceptionally(KeeperException.create(KeeperException.Code.get(i2), "failed to create update partitions"));
                    }
                }, null);
            } catch (Exception e) {
                completableFuture.completeExceptionally(e);
            }
        }).exceptionally(th -> {
            completableFuture.completeExceptionally(th);
            return null;
        });
        return completableFuture;
    }

    private CompletableFuture<Void> createSubscriptions(DestinationName destinationName, int i) {
        String path = path(AdminResource.PARTITIONED_TOPIC_PATH_ZNODE, destinationName.getProperty(), destinationName.getCluster(), destinationName.getNamespacePortion(), domain(), destinationName.getEncodedLocalName());
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        fetchPartitionedTopicMetadataAsync(pulsar(), path).thenAccept(partitionedTopicMetadata -> {
            if (partitionedTopicMetadata.partitions <= 1) {
                completableFuture.completeExceptionally(new RestException(Response.Status.CONFLICT, "Topic is not partitioned topic"));
            } else {
                if (partitionedTopicMetadata.partitions >= i) {
                    completableFuture.completeExceptionally(new RestException(Response.Status.CONFLICT, "number of partitions must be more than existing " + partitionedTopicMetadata.partitions));
                    return;
                }
                final String persistenceNamingEncoding = destinationName.getPartition(1).getPersistenceNamingEncoding();
                final Set newConcurrentHashSet = Sets.newConcurrentHashSet();
                ((ManagedLedgerFactoryImpl) pulsar().getManagedLedgerFactory()).getMetaStore().getCursors(persistenceNamingEncoding, new MetaStore.MetaStoreCallback<List<String>>() { // from class: org.apache.pulsar.broker.admin.PersistentTopics.3
                    @Override // org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback
                    public void operationComplete(List<String> list, MetaStore.Stat stat) {
                        ArrayList newArrayList = Lists.newArrayList();
                        Set set = newConcurrentHashSet;
                        PartitionedTopicMetadata partitionedTopicMetadata = partitionedTopicMetadata;
                        int i2 = i;
                        DestinationName destinationName2 = destinationName;
                        list.forEach(str -> {
                            String decode = Codec.decode(str);
                            for (int i3 = partitionedTopicMetadata.partitions; i3 < i2; i3++) {
                                String destinationName3 = destinationName2.getPartition(i3).toString();
                                CompletableFuture completableFuture2 = new CompletableFuture();
                                PersistentTopics.this.pulsar().getBrokerService().getTopic(destinationName3).handle((topic, th) -> {
                                    set.add(topic);
                                    if (th == null) {
                                        topic.createSubscription(decode).handle((subscription, th) -> {
                                            if (th != null) {
                                                PersistentTopics.log.warn("[{}] Failed to create subsciption {} {}", new Object[]{PersistentTopics.this.clientAppId(), destinationName3, decode});
                                                completableFuture2.completeExceptionally(th);
                                                return null;
                                            }
                                            PersistentTopics.log.info("[{}] Successfully created subsciption {} {}", new Object[]{PersistentTopics.this.clientAppId(), destinationName3, decode});
                                            completableFuture2.complete(null);
                                            return null;
                                        });
                                        return null;
                                    }
                                    PersistentTopics.log.warn("[{}] Failed to create topic {}", PersistentTopics.this.clientAppId(), destinationName3);
                                    completableFuture2.completeExceptionally(th);
                                    return null;
                                });
                                newArrayList.add(completableFuture2);
                            }
                        });
                        CompletableFuture waitForAll = FutureUtil.waitForAll(newArrayList);
                        DestinationName destinationName3 = destinationName;
                        CompletableFuture completableFuture2 = completableFuture;
                        Set set2 = newConcurrentHashSet;
                        waitForAll.handle((r10, th) -> {
                            FutureUtil.waitForAll((List) set2.stream().map(topic -> {
                                return topic.close();
                            }).collect(Collectors.toList())).handle((r9, th) -> {
                                if (th != null) {
                                    PersistentTopics.log.warn("Failed to close newly created partitioned topics for {} ", destinationName3, th);
                                }
                                if (th != null) {
                                    completableFuture2.completeExceptionally(th);
                                    return null;
                                }
                                PersistentTopics.log.info("[{}] Successfully created new partitions {}", PersistentTopics.this.clientAppId(), destinationName3.toString());
                                completableFuture2.complete(null);
                                return null;
                            });
                            return null;
                        });
                    }

                    @Override // org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback
                    public void operationFailed(ManagedLedgerException.MetaStoreException metaStoreException) {
                        PersistentTopics.log.warn("[{}] Failed to get list of cursors of {}", PersistentTopics.this.clientAppId(), persistenceNamingEncoding);
                        completableFuture.completeExceptionally(metaStoreException);
                    }
                });
            }
        }).exceptionally(th -> {
            log.warn("[{}] Failed to get partition metadata for {}", clientAppId(), destinationName.toString());
            completableFuture.completeExceptionally(th);
            return null;
        });
        return completableFuture;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void unloadTopic(DestinationName destinationName, boolean z) {
        validateSuperUserAccess();
        validateDestinationOwnership(destinationName, z);
        try {
            getTopicReference(destinationName).close().get();
            log.info("[{}] Successfully unloaded topic {}", clientAppId(), destinationName);
        } catch (NullPointerException unused) {
            log.error("[{}] topic {} not found", clientAppId(), destinationName);
            throw new RestException(Response.Status.NOT_FOUND, "Topic does not exist");
        } catch (Exception e) {
            log.error("[{}] Failed to unload topic {}, {}", new Object[]{clientAppId(), destinationName, e.getCause().getMessage(), e});
            throw new RestException(e.getCause());
        }
    }

    private void validateClientVersion() {
        if (pulsar().getConfiguration().isClientLibraryVersionCheckEnabled()) {
            String header = this.httpRequest.getHeader("User-Agent");
            if (StringUtils.isBlank(header)) {
                throw new RestException(Response.Status.METHOD_NOT_ALLOWED, "Client lib is not compatible to access partitioned metadata: version in user-agent is not present");
            }
            if (header.contains(DEPRECATED_CLIENT_VERSION_PREFIX)) {
                try {
                    String[] split = header.split(DEPRECATED_CLIENT_VERSION_PREFIX);
                    String[] split2 = split.length > 1 ? split[1].split(HelpFormatter.DEFAULT_OPT_PREFIX)[0].trim().split("\\.") : null;
                    if (split2 == null || split2.length <= 1) {
                        return;
                    }
                    if (LEAST_SUPPORTED_CLIENT_VERSION_PREFIX.getMajorVersion() > Integer.parseInt(split2[0]) || LEAST_SUPPORTED_CLIENT_VERSION_PREFIX.getMinorVersion() > Integer.parseInt(split2[1])) {
                        throw new RestException(Response.Status.METHOD_NOT_ALLOWED, "Client lib is not compatible to access partitioned metadata: version " + header + " is not supported");
                    }
                } catch (Exception unused) {
                    log.warn("[{}] Failed to parse version {} ", clientAppId(), header);
                } catch (RestException e) {
                    throw e;
                }
            }
        }
    }
}
