/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.admin.v1;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.admin.v1.PersistentTopics;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.shade.com.google.common.collect.Lists;
import org.apache.pulsar.shade.io.swagger.annotations.Api;
import org.apache.pulsar.shade.io.swagger.annotations.ApiOperation;
import org.apache.pulsar.shade.io.swagger.annotations.ApiResponse;
import org.apache.pulsar.shade.io.swagger.annotations.ApiResponses;
import org.apache.pulsar.shade.javax.ws.rs.DefaultValue;
import org.apache.pulsar.shade.javax.ws.rs.Encoded;
import org.apache.pulsar.shade.javax.ws.rs.GET;
import org.apache.pulsar.shade.javax.ws.rs.PUT;
import org.apache.pulsar.shade.javax.ws.rs.Path;
import org.apache.pulsar.shade.javax.ws.rs.PathParam;
import org.apache.pulsar.shade.javax.ws.rs.Produces;
import org.apache.pulsar.shade.javax.ws.rs.QueryParam;
import org.apache.pulsar.shade.javax.ws.rs.WebApplicationException;
import org.apache.pulsar.shade.javax.ws.rs.container.AsyncResponse;
import org.apache.pulsar.shade.javax.ws.rs.container.Suspended;
import org.apache.pulsar.shade.javax.ws.rs.core.Response;
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.shade.org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.NonPersistentTopicStats;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Path(value="/non-persistent")
@Produces(value={"application/json"})
@Api(value="/non-persistent", description="Non-Persistent topic admin apis", tags={"non-persistent topic"}, hidden=true)
public class NonPersistentTopics
extends PersistentTopics {
    private static final Logger log = LoggerFactory.getLogger(NonPersistentTopics.class);

    @Override
    @GET
    @Path(value="/{property}/{cluster}/{namespace}/{topic}/partitions")
    @ApiOperation(hidden=true, value="Get partitioned topic metadata.")
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace of this topic"), @ApiResponse(code=403, message="Don't have admin permission")})
    public PartitionedTopicMetadata getPartitionedMetadata(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @QueryParam(value="checkAllowAutoCreation") @DefaultValue(value="false") boolean checkAllowAutoCreation) {
        this.validateTopicName(property, cluster, namespace, encodedTopic);
        return this.getPartitionedTopicMetadata(this.topicName, authoritative, checkAllowAutoCreation);
    }

    @Override
    @GET
    @Path(value="{property}/{cluster}/{namespace}/{topic}/stats")
    @ApiOperation(hidden=true, value="Get the stats for the topic.")
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace of this topic"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist")})
    public NonPersistentTopicStats getStats(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @QueryParam(value="getPreciseBacklog") @DefaultValue(value="false") boolean getPreciseBacklog) {
        this.validateTopicName(property, cluster, namespace, encodedTopic);
        this.validateTopicOwnership(this.topicName, authoritative);
        this.validateTopicOperation(this.topicName, TopicOperation.GET_STATS);
        Topic topic = this.getTopicReference(this.topicName);
        return ((NonPersistentTopic)topic).getStats(getPreciseBacklog, false, false);
    }

    @Override
    @GET
    @Path(value="{property}/{cluster}/{namespace}/{topic}/internalStats")
    @ApiOperation(hidden=true, value="Get the internal stats for the topic.")
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace of this topic"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist")})
    public PersistentTopicInternalStats getInternalStats(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @QueryParam(value="metadata") @DefaultValue(value="false") boolean metadata) {
        this.validateTopicName(property, cluster, namespace, encodedTopic);
        this.validateTopicOwnership(this.topicName, authoritative);
        this.validateTopicOperation(this.topicName, TopicOperation.GET_STATS);
        Topic topic = this.getTopicReference(this.topicName);
        try {
            boolean includeMetadata = metadata && this.hasSuperUserAccess();
            return topic.getInternalStats(includeMetadata).get();
        }
        catch (Exception e) {
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e instanceof ExecutionException ? e.getCause().getMessage() : e.getMessage());
        }
    }

    @Override
    @PUT
    @Path(value="/{property}/{cluster}/{namespace}/{topic}/partitions")
    @ApiOperation(hidden=true, value="Create a partitioned topic.", notes="It needs to be called before creating a producer on a partitioned topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=406, message="The number of partitions should be more than 0 and less than or equal to maxNumPartitionsPerPartitionedTopic"), @ApiResponse(code=409, message="Partitioned topic already exist")})
    public void createPartitionedTopic(@Suspended AsyncResponse asyncResponse, @PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, int numPartitions, @QueryParam(value="createLocalTopicOnly") @DefaultValue(value="false") boolean createLocalTopicOnly) {
        try {
            this.validateTopicName(property, cluster, namespace, encodedTopic);
            this.internalCreatePartitionedTopic(asyncResponse, numPartitions, createLocalTopicOnly);
        }
        catch (Exception e) {
            log.error("[{}] Failed to create partitioned topic {}", new Object[]{this.clientAppId(), this.topicName, e});
            NonPersistentTopics.resumeAsyncResponseExceptionally(asyncResponse, e);
        }
    }

    @Override
    @PUT
    @Path(value="/{property}/{cluster}/{namespace}/{topic}/unload")
    @ApiOperation(hidden=true, value="Unload a topic")
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace of this topic"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Topic does not exist")})
    public void unloadTopic(@Suspended AsyncResponse asyncResponse, @PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        try {
            this.validateTopicName(property, cluster, namespace, encodedTopic);
            this.internalUnloadTopic(asyncResponse, authoritative);
        }
        catch (WebApplicationException wae) {
            asyncResponse.resume(wae);
        }
        catch (Exception e) {
            asyncResponse.resume(new RestException(e));
        }
    }

    @Override
    @GET
    @Path(value="/{property}/{cluster}/{namespace}")
    @ApiOperation(value="Get the list of non-persistent topics under a namespace.", response=String.class, responseContainer="List")
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Namespace doesn't exist")})
    public void getList(@Suspended AsyncResponse asyncResponse, @PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace, @QueryParam(value="bundle") String nsBundle) {
        log.info("[{}] list of topics on namespace {}/{}/{}", new Object[]{this.clientAppId(), property, cluster, namespace});
        Policies policies = null;
        NamespaceName nsName = null;
        try {
            this.validateNamespaceOperation(this.namespaceName, NamespaceOperation.GET_TOPICS);
            policies = this.getNamespacePolicies(property, cluster, namespace);
            nsName = NamespaceName.get(property, cluster, namespace);
            if (!cluster.equals("global")) {
                this.validateClusterOwnership(cluster);
                this.validateClusterForTenant(property, cluster);
            } else {
                this.validateGlobalNamespaceOwnership(nsName);
            }
        }
        catch (WebApplicationException wae) {
            asyncResponse.resume(wae);
            return;
        }
        catch (Exception e) {
            asyncResponse.resume(new RestException(e));
            return;
        }
        ArrayList<CompletableFuture<List<String>>> futures = Lists.newArrayList();
        List<String> boundaries = policies.bundles.getBoundaries();
        for (int i = 0; i < boundaries.size() - 1; ++i) {
            String bundle = String.format("%s_%s", boundaries.get(i), boundaries.get(i + 1));
            if (StringUtils.isNotBlank(nsBundle) && !nsBundle.equals(bundle)) continue;
            try {
                futures.add(this.pulsar().getAdminClient().nonPersistentTopics().getListInBundleAsync(nsName.toString(), bundle));
                continue;
            }
            catch (PulsarServerException e) {
                log.error("[{}] Failed to get list of topics under namespace {}/{}/{}/{}", new Object[]{this.clientAppId(), property, cluster, namespace, bundle, e});
                asyncResponse.resume(new RestException(e));
                return;
            }
        }
        ArrayList topics = Lists.newArrayList();
        FutureUtil.waitForAll(futures).handle((result, exception) -> {
            for (int i = 0; i < futures.size(); ++i) {
                try {
                    if (!((CompletableFuture)futures.get(i)).isDone() || ((CompletableFuture)futures.get(i)).get() == null) continue;
                    topics.addAll((Collection)((CompletableFuture)futures.get(i)).get());
                    continue;
                }
                catch (InterruptedException | ExecutionException e) {
                    log.error("[{}] Failed to get list of topics under namespace {}/{}/{}", new Object[]{this.clientAppId(), property, cluster, namespace, e});
                    asyncResponse.resume(new RestException(e instanceof ExecutionException ? e.getCause() : e));
                    return null;
                }
            }
            asyncResponse.resume(topics);
            return null;
        });
    }

    @GET
    @Path(value="/{property}/{cluster}/{namespace}/{bundle}")
    @ApiOperation(value="Get the list of non-persistent topics under a namespace bundle.", response=String.class, responseContainer="List")
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Namespace doesn't exist")})
    public List<String> getListFromBundle(@PathParam(value="property") String property, @PathParam(value="cluster") String cluster, @PathParam(value="namespace") String namespace, @PathParam(value="bundle") String bundleRange) {
        log.info("[{}] list of topics on namespace bundle {}/{}/{}/{}", new Object[]{this.clientAppId(), property, cluster, namespace, bundleRange});
        this.validateNamespaceOperation(this.namespaceName, NamespaceOperation.GET_BUNDLE);
        Policies policies = this.getNamespacePolicies(property, cluster, namespace);
        if (!cluster.equals("global")) {
            this.validateClusterOwnership(cluster);
            this.validateClusterForTenant(property, cluster);
        } else {
            this.validateGlobalNamespaceOwnership(NamespaceName.get(property, cluster, namespace));
        }
        NamespaceName fqnn = NamespaceName.get(property, cluster, namespace);
        try {
            if (!this.isBundleOwnedByAnyBroker(fqnn, policies.bundles, bundleRange).get(this.pulsar().getConfig().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS).booleanValue()) {
                log.info("[{}] Namespace bundle is not owned by any broker {}/{}/{}/{}", new Object[]{this.clientAppId(), property, cluster, namespace, bundleRange});
                return null;
            }
            NamespaceBundle nsBundle = this.validateNamespaceBundleOwnership(fqnn, policies.bundles, bundleRange, true, true);
            ArrayList<String> topicList = Lists.newArrayList();
            this.pulsar().getBrokerService().forEachTopic(topic -> {
                TopicName topicName = TopicName.get(topic.getName());
                if (nsBundle.includes(topicName)) {
                    topicList.add(topic.getName());
                }
            });
            return topicList;
        }
        catch (WebApplicationException wae) {
            throw wae;
        }
        catch (Exception e) {
            log.error("[{}] Failed to unload namespace bundle {}/{}", new Object[]{this.clientAppId(), fqnn.toString(), bundleRange, e});
            throw new RestException(e);
        }
    }

    private Topic getTopicReference(TopicName topicName) {
        try {
            return this.pulsar().getBrokerService().getTopicIfExists(topicName.toString()).get(this.config().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS).orElseThrow(() -> new RestException(Response.Status.NOT_FOUND, "Topic not found"));
        }
        catch (ExecutionException e) {
            throw new RuntimeException(e.getCause());
        }
        catch (InterruptedException | TimeoutException e) {
            throw new RestException(e);
        }
    }
}

