package org.apache.pulsar.broker.admin.v2;

import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.Encoded;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Response;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.EntryFilters;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.policies.data.stats.NonPersistentPartitionedTopicStatsImpl;
import org.apache.pulsar.common.policies.data.stats.NonPersistentTopicStatsImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Produces({"application/json"})
@Api(value = "/non-persistent", description = "Non-Persistent topic admin apis", tags = {"non-persistent topic"})
@Path("/non-persistent")
/* loaded from: input_file:org/apache/pulsar/broker/admin/v2/NonPersistentTopics.class */
public class NonPersistentTopics extends PersistentTopics {
    private static final Logger log = LoggerFactory.getLogger(NonPersistentTopics.class);

    @Override // org.apache.pulsar.broker.admin.v2.PersistentTopics
    @GET
    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 401, message = "Don't have permission to manage resources on this tenant"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "The tenant/namespace/topic does not exist"), @ApiResponse(code = 412, message = "Topic name is not valid"), @ApiResponse(code = 500, message = "Internal server error"), @ApiResponse(code = 503, message = "Failed to validate cluster configuration")})
    @Path("/{tenant}/{namespace}/{topic}/partitions")
    @ApiOperation("Get partitioned topic metadata.")
    public void getPartitionedMetadata(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2, @PathParam("topic") @Encoded @ApiParam(value = "Specify topic name", required = true) String str3, @QueryParam("authoritative") @ApiParam("Whether leader broker redirected this call to this broker. For internal use.") @DefaultValue("false") boolean z, @QueryParam("checkAllowAutoCreation") @ApiParam("Is check configuration required to automatically create topic") @DefaultValue("false") boolean z2) {
        super.getPartitionedMetadata(asyncResponse, str, str2, str3, z, z2);
    }

    @Override // org.apache.pulsar.broker.admin.v2.PersistentTopics
    @GET
    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 401, message = "Don't have permission to manage resources on this tenant"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "The tenant/namespace/topic does not exist"), @ApiResponse(code = 412, message = "Topic name is not valid"), @ApiResponse(code = 500, message = "Internal server error")})
    @Path("{tenant}/{namespace}/{topic}/internalStats")
    @ApiOperation("Get the internal stats for the topic.")
    public void getInternalStats(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2, @PathParam("topic") @Encoded @ApiParam(value = "Specify topic name", required = true) String str3, @QueryParam("authoritative") @ApiParam("Whether leader broker redirected this call to this broker. For internal use.") @DefaultValue("false") boolean z, @QueryParam("metadata") @DefaultValue("false") boolean z2) {
        validateTopicName(str, str2, str3);
        CompletableFuture thenCompose = validateTopicOwnershipAsync(this.topicName, z).thenCompose(r5 -> {
            return validateTopicOperationAsync(this.topicName, TopicOperation.GET_STATS);
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r52 -> {
            return getTopicReference(this.topicName).getInternalStats(z2 && hasSuperUserAccess());
        });
        Objects.requireNonNull(asyncResponse);
        thenCompose.thenAccept((v1) -> {
            r1.resume(v1);
        }).exceptionally(th -> {
            if (!isRedirectException(th)) {
                log.error("[{}] Failed to get internal stats for topic {}", new Object[]{clientAppId(), this.topicName, th});
            }
            resumeAsyncResponseExceptionally(asyncResponse, th);
            return null;
        });
    }

    @Override // org.apache.pulsar.broker.admin.v2.PersistentTopics
    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 401, message = "Don't have permission to manage resources on this tenant"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "The tenant/namespace does not exist"), @ApiResponse(code = 406, message = "The number of partitions should be more than 0 and less than or equal to maxNumPartitionsPerPartitionedTopic"), @ApiResponse(code = 409, message = "Partitioned topic already exists"), @ApiResponse(code = 412, message = "Failed Reason : Name is invalid or Namespace does not have any clusters configured"), @ApiResponse(code = 500, message = "Internal server error"), @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
    @Path("/{tenant}/{namespace}/{topic}/partitions")
    @ApiOperation(value = "Create a partitioned topic.", notes = "It needs to be called before creating a producer on a partitioned topic.")
    @PUT
    public void createPartitionedTopic(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2, @PathParam("topic") @Encoded @ApiParam(value = "Specify topic name", required = true) String str3, @ApiParam(value = "The number of partitions for the topic", required = true, type = "int", defaultValue = "0") int i, @QueryParam("createLocalTopicOnly") @DefaultValue("false") boolean z) {
        try {
            validateNamespaceName(str, str2);
            validateGlobalNamespaceOwnership();
            validateTopicName(str, str2, str3);
            internalCreatePartitionedTopic(asyncResponse, i, z);
        } catch (Exception e) {
            log.error("[{}] Failed to create partitioned topic {}", new Object[]{clientAppId(), this.topicName, e});
            resumeAsyncResponseExceptionally(asyncResponse, e);
        }
    }

    @Override // org.apache.pulsar.broker.admin.v2.PersistentTopics
    @GET
    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Namespace or topic does not exist"), @ApiResponse(code = 412, message = "Partitioned topic name is invalid"), @ApiResponse(code = 500, message = "Internal server error"), @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
    @Path("{tenant}/{namespace}/{topic}/partitioned-stats")
    @ApiOperation(value = "Get the stats for the partitioned topic.", response = NonPersistentPartitionedTopicStatsImpl.class)
    public void getPartitionedStats(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2, @PathParam("topic") @Encoded @ApiParam(value = "Specify topic name", required = true) String str3, @QueryParam("perPartition") @ApiParam("Get per partition stats") @DefaultValue("true") boolean z, @QueryParam("authoritative") @ApiParam("Whether leader broker redirected this call to this broker. For internal use.") @DefaultValue("false") boolean z2, @QueryParam("getPreciseBacklog") @ApiParam("If return precise backlog or imprecise backlog") @DefaultValue("false") boolean z3, @QueryParam("subscriptionBacklogSize") @ApiParam("If return backlog size for each subscription, require locking on ledger so be careful not to use when there's heavy traffic.") @DefaultValue("false") boolean z4, @QueryParam("getEarliestTimeInBacklog") @ApiParam("If return the earliest time in backlog") @DefaultValue("false") boolean z5) {
        try {
            validatePartitionedTopicName(str, str2, str3);
            if (this.topicName.isGlobal()) {
                try {
                    validateGlobalNamespaceOwnership(this.namespaceName);
                } catch (Exception e) {
                    log.error("[{}] Failed to get partitioned stats for {}", new Object[]{clientAppId(), this.topicName, e});
                    resumeAsyncResponseExceptionally(asyncResponse, e);
                    return;
                }
            }
            getPartitionedTopicMetadataAsync(this.topicName, z2, false).thenAccept(partitionedTopicMetadata -> {
                if (partitionedTopicMetadata.partitions == 0) {
                    asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Partitioned Topic not found"));
                    return;
                }
                NonPersistentPartitionedTopicStatsImpl nonPersistentPartitionedTopicStatsImpl = new NonPersistentPartitionedTopicStatsImpl(partitionedTopicMetadata);
                ArrayList arrayList = new ArrayList();
                for (int i = 0; i < partitionedTopicMetadata.partitions; i++) {
                    try {
                        arrayList.add(pulsar().getAdminClient().topics().getStatsAsync(this.topicName.getPartition(i).toString(), z3, z4, z5));
                    } catch (PulsarServerException e2) {
                        asyncResponse.resume(new RestException((Throwable) e2));
                        return;
                    }
                }
                FutureUtil.waitForAll(arrayList).handle((r11, th) -> {
                    for (int i2 = 0; i2 < arrayList.size(); i2++) {
                        CompletableFuture completableFuture = (CompletableFuture) arrayList.get(i2);
                        if (completableFuture.isDone() && !completableFuture.isCompletedExceptionally()) {
                            try {
                                nonPersistentPartitionedTopicStatsImpl.add((NonPersistentTopicStatsImpl) completableFuture.get());
                                if (z) {
                                    nonPersistentPartitionedTopicStatsImpl.getPartitions().put(this.topicName.getPartition(i2).toString(), (NonPersistentTopicStatsImpl) completableFuture.get());
                                }
                            } catch (Exception e3) {
                                asyncResponse.resume(new RestException(e3));
                                return null;
                            }
                        }
                    }
                    if (z && nonPersistentPartitionedTopicStatsImpl.partitions.isEmpty()) {
                        try {
                            if (!namespaceResources().getPartitionedTopicResources().partitionedTopicExists(this.topicName)) {
                                asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Internal topics have not been generated yet"));
                                return null;
                            }
                            nonPersistentPartitionedTopicStatsImpl.getPartitions().put(this.topicName.toString(), new NonPersistentTopicStatsImpl());
                        } catch (Exception e4) {
                            asyncResponse.resume(new RestException(e4));
                            return null;
                        }
                    }
                    asyncResponse.resume(nonPersistentPartitionedTopicStatsImpl);
                    return null;
                });
            }).exceptionally(th -> {
                log.error("[{}] Failed to get partitioned stats for {}", new Object[]{clientAppId(), this.topicName, th});
                resumeAsyncResponseExceptionally(asyncResponse, th);
                return null;
            });
        } catch (Exception e2) {
            asyncResponse.resume(new RestException(e2));
        } catch (WebApplicationException e3) {
            asyncResponse.resume(e3);
        }
    }

    @Override // org.apache.pulsar.broker.admin.v2.PersistentTopics
    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 401, message = "This operation requires super-user access"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "The tenant/namespace/topic does not exist"), @ApiResponse(code = 412, message = "Topic name is not valid"), @ApiResponse(code = 500, message = "Internal server error"), @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
    @Path("/{tenant}/{namespace}/{topic}/unload")
    @ApiOperation("Unload a topic")
    @PUT
    public void unloadTopic(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2, @PathParam("topic") @Encoded @ApiParam(value = "Specify topic name", required = true) String str3, @QueryParam("authoritative") @ApiParam("Whether leader broker redirected this call to this broker. For internal use.") @DefaultValue("false") boolean z) {
        try {
            validateTopicName(str, str2, str3);
            internalUnloadTopic(asyncResponse, z);
        } catch (Exception e) {
            asyncResponse.resume(new RestException(e));
        } catch (WebApplicationException e2) {
            asyncResponse.resume(e2);
        }
    }

    @Override // org.apache.pulsar.broker.admin.v2.PersistentTopics
    @GET
    @ApiResponses({@ApiResponse(code = 401, message = "Don't have permission to manage resources on this tenant"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "The tenant/namespace does not exist"), @ApiResponse(code = 412, message = "Namespace name is not valid"), @ApiResponse(code = 500, message = "Internal server error"), @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
    @Path("/{tenant}/{namespace}")
    @ApiOperation(value = "Get the list of non-persistent topics under a namespace.", response = String.class, responseContainer = "List")
    public void getList(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2, @QueryParam("bundle") @ApiParam(value = "Specify the bundle name", required = false) String str3, @QueryParam("includeSystemTopic") @ApiParam("Include system topic") boolean z) {
        try {
            validateNamespaceName(str, str2);
            if (log.isDebugEnabled()) {
                log.debug("[{}] list of topics on namespace {}", clientAppId(), this.namespaceName);
            }
            validateNamespaceOperation(this.namespaceName, NamespaceOperation.GET_TOPICS);
            Policies namespacePolicies = getNamespacePolicies(this.namespaceName);
            validateGlobalNamespaceOwnership(this.namespaceName);
            ArrayList arrayList = new ArrayList();
            List boundaries = namespacePolicies.bundles.getBoundaries();
            for (int i = 0; i < boundaries.size() - 1; i++) {
                String format = String.format("%s_%s", boundaries.get(i), boundaries.get(i + 1));
                if (!StringUtils.isNotBlank(str3) || str3.equals(format)) {
                    try {
                        arrayList.add(pulsar().getAdminClient().topics().getListInBundleAsync(this.namespaceName.toString(), format));
                    } catch (PulsarServerException e) {
                        log.error("[{}] Failed to get list of topics under namespace {}/{}", new Object[]{clientAppId(), this.namespaceName, format, e});
                        asyncResponse.resume(new RestException((Throwable) e));
                        return;
                    }
                }
            }
            FutureUtil.waitForAll(arrayList).whenComplete((r9, th) -> {
                if (th != null) {
                    resumeAsyncResponseExceptionally(asyncResponse, th);
                    return;
                }
                ArrayList arrayList2 = new ArrayList();
                for (int i2 = 0; i2 < arrayList.size(); i2++) {
                    List list = (List) ((CompletableFuture) arrayList.get(i2)).join();
                    if (list != null) {
                        arrayList2.addAll(list);
                    }
                }
                asyncResponse.resume(filterSystemTopic((List) arrayList2.stream().filter(str4 -> {
                    return !TopicName.get(str4).isPersistent();
                }).collect(Collectors.toList()), z));
            });
        } catch (WebApplicationException e2) {
            asyncResponse.resume(e2);
        } catch (Exception e3) {
            asyncResponse.resume(new RestException(e3));
        }
    }

    @GET
    @ApiResponses({@ApiResponse(code = 401, message = "Don't have permission to manage resources on this tenant"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Namespace doesn't exist"), @ApiResponse(code = 412, message = "Namespace name is not valid"), @ApiResponse(code = 500, message = "Internal server error"), @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
    @Path("/{tenant}/{namespace}/{bundle}")
    @ApiOperation(value = "Get the list of non-persistent topics under a namespace bundle.", response = String.class, responseContainer = "List")
    public void getListFromBundle(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2, @PathParam("bundle") @ApiParam(value = "Bundle range of a topic", required = true) String str3) {
        validateNamespaceName(str, str2);
        if (log.isDebugEnabled()) {
            log.debug("[{}] list of topics on namespace bundle {}/{}", new Object[]{clientAppId(), this.namespaceName, str3});
        }
        validateNamespaceOperation(this.namespaceName, NamespaceOperation.GET_BUNDLE);
        Policies namespacePolicies = getNamespacePolicies(this.namespaceName);
        validateGlobalNamespaceOwnership(this.namespaceName);
        isBundleOwnedByAnyBroker(this.namespaceName, namespacePolicies.bundles, str3).thenAccept(bool -> {
            if (bool.booleanValue()) {
                validateNamespaceBundleOwnershipAsync(this.namespaceName, namespacePolicies.bundles, str3, true, true).thenAccept(namespaceBundle -> {
                    ConcurrentOpenHashMap concurrentOpenHashMap = (ConcurrentOpenHashMap) pulsar().getBrokerService().getMultiLayerTopicsMap().get(this.namespaceName.toString());
                    if (concurrentOpenHashMap == null || concurrentOpenHashMap.isEmpty()) {
                        asyncResponse.resume(Collections.emptyList());
                        return;
                    }
                    ArrayList arrayList = new ArrayList();
                    ConcurrentOpenHashMap concurrentOpenHashMap2 = (ConcurrentOpenHashMap) concurrentOpenHashMap.get(this.namespaceName.toString() + "/" + namespaceBundle.getBundleRange());
                    if (concurrentOpenHashMap2 != null) {
                        arrayList.addAll((Collection) concurrentOpenHashMap2.keys().stream().filter(str4 -> {
                            return !TopicName.get(str4).isPersistent();
                        }).collect(Collectors.toList()));
                    }
                    asyncResponse.resume(arrayList);
                }).exceptionally(th -> {
                    Throwable unwrapCompletionException = FutureUtil.unwrapCompletionException(th);
                    log.error("[{}] Failed to list topics on namespace bundle {}/{}", new Object[]{clientAppId(), this.namespaceName, str3, unwrapCompletionException});
                    if (unwrapCompletionException instanceof WebApplicationException) {
                        asyncResponse.resume(unwrapCompletionException);
                        return null;
                    }
                    asyncResponse.resume(new RestException(unwrapCompletionException));
                    return null;
                });
            } else {
                log.info("[{}] Namespace bundle is not owned by any broker {}/{}", new Object[]{clientAppId(), this.namespaceName, str3});
                asyncResponse.resume(Response.noContent().build());
            }
        }).exceptionally(th -> {
            log.error("[{}] Failed to list topics on namespace bundle {}/{}", new Object[]{clientAppId(), this.namespaceName, str3, th});
            if (th.getCause() instanceof WebApplicationException) {
                asyncResponse.resume(th.getCause());
                return null;
            }
            asyncResponse.resume(new RestException(th.getCause()));
            return null;
        });
    }

    @Override // org.apache.pulsar.broker.admin.v2.PersistentTopics
    @ApiResponses({@ApiResponse(code = 412, message = "NonPersistentTopic does not support truncate.")})
    @Path("/{tenant}/{namespace}/{topic}/truncate")
    @DELETE
    @ApiOperation(value = "Truncate a topic.", notes = "NonPersistentTopic does not support truncate.")
    public void truncateTopic(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2, @PathParam("topic") @Encoded @ApiParam(value = "Specify topic name", required = true) String str3, @QueryParam("authoritative") @ApiParam("Whether leader broker redirected this call to this broker. For internal use.") @DefaultValue("false") boolean z) {
        asyncResponse.resume(new RestException(Response.Status.PRECONDITION_FAILED.getStatusCode(), "unsupport truncate"));
    }

    protected void validateAdminOperationOnTopic(TopicName topicName, boolean z) {
        validateAdminAccessForTenant(topicName.getTenant());
        validateTopicOwnership(topicName, z);
    }

    @Override // org.apache.pulsar.broker.admin.v2.PersistentTopics
    @GET
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenants or Namespace doesn't exist")})
    @Path("/{tenant}/{namespace}/{topic}/entryFilters")
    @ApiOperation("Get entry filters for a topic.")
    public void getEntryFilters(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") @ApiParam(value = "Specify the tenant", required = true) String str, @PathParam("namespace") @ApiParam(value = "Specify the namespace", required = true) String str2, @PathParam("topic") @Encoded @ApiParam(value = "Specify topic name", required = true) String str3, @QueryParam("applied") @DefaultValue("false") boolean z, @QueryParam("isGlobal") @DefaultValue("false") boolean z2, @QueryParam("authoritative") @ApiParam("Whether leader broker redirected this call to this broker. For internal use.") @DefaultValue("false") boolean z3) {
        validateTopicName(str, str2, str3);
        CompletableFuture<U> thenCompose = preValidation(z3).thenCompose(r7 -> {
            return internalGetEntryFilters(z, z2);
        });
        Objects.requireNonNull(asyncResponse);
        thenCompose.thenAccept((Consumer<? super U>) (v1) -> {
            r1.resume(v1);
        }).exceptionally(th -> {
            handleTopicPolicyException("getEntryFilters", th, asyncResponse);
            return null;
        });
    }

    @Override // org.apache.pulsar.broker.admin.v2.PersistentTopics
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or namespace or topic doesn't exist"), @ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")})
    @Path("/{tenant}/{namespace}/{topic}/entryFilters")
    @ApiOperation("Set entry filters for specified topic")
    @POST
    public void setEntryFilters(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("isGlobal") @DefaultValue("false") boolean z, @QueryParam("authoritative") @ApiParam("Whether leader broker redirected this call to this broker. For internal use.") @DefaultValue("false") boolean z2, @ApiParam("Enable sub types for the specified topic") EntryFilters entryFilters) {
        validateTopicName(str, str2, str3);
        preValidation(z2).thenCompose(r7 -> {
            return internalSetEntryFilters(entryFilters, z);
        }).thenAccept((Consumer<? super U>) r4 -> {
            asyncResponse.resume(Response.noContent().build());
        }).exceptionally(th -> {
            handleTopicPolicyException("setEntryFilters", th, asyncResponse);
            return null;
        });
    }

    @Override // org.apache.pulsar.broker.admin.v2.PersistentTopics
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or namespace or topic doesn't exist"), @ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")})
    @Path("/{tenant}/{namespace}/{topic}/entryFilters")
    @DELETE
    @ApiOperation("Remove entry filters for specified topic.")
    public void removeEntryFilters(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String str, @PathParam("namespace") String str2, @PathParam("topic") @Encoded String str3, @QueryParam("isGlobal") @DefaultValue("false") boolean z, @QueryParam("authoritative") @ApiParam("Whether leader broker redirected thiscall to this broker. For internal use.") @DefaultValue("false") boolean z2) {
        validateTopicName(str, str2, str3);
        preValidation(z2).thenCompose(r5 -> {
            return internalRemoveEntryFilters(z);
        }).thenRun(() -> {
            log.info("[{}] Successfully remove entry filters: tenant={}, namespace={}, topic={}, isGlobal={}", new Object[]{clientAppId(), str, str2, this.topicName.getLocalName(), Boolean.valueOf(z)});
            asyncResponse.resume(Response.noContent().build());
        }).exceptionally(th -> {
            handleTopicPolicyException("removeEntryFilters", th, asyncResponse);
            return null;
        });
    }

    private Topic getTopicReference(TopicName topicName) {
        try {
            return pulsar().getBrokerService().getTopicIfExists(topicName.toString()).get(config().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS).orElseThrow(() -> {
                return new RestException(Response.Status.NOT_FOUND, "Topic not found");
            });
        } catch (InterruptedException | TimeoutException e) {
            throw new RestException(e);
        } catch (ExecutionException e2) {
            throw new RestException(e2.getCause());
        }
    }
}
