/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.admin.impl;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.loadbalance.LeaderBroker;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.shade.com.google.common.collect.Maps;
import org.apache.pulsar.shade.io.swagger.annotations.ApiOperation;
import org.apache.pulsar.shade.io.swagger.annotations.ApiParam;
import org.apache.pulsar.shade.io.swagger.annotations.ApiResponse;
import org.apache.pulsar.shade.io.swagger.annotations.ApiResponses;
import org.apache.pulsar.shade.javax.ws.rs.DELETE;
import org.apache.pulsar.shade.javax.ws.rs.DefaultValue;
import org.apache.pulsar.shade.javax.ws.rs.GET;
import org.apache.pulsar.shade.javax.ws.rs.POST;
import org.apache.pulsar.shade.javax.ws.rs.Path;
import org.apache.pulsar.shade.javax.ws.rs.PathParam;
import org.apache.pulsar.shade.javax.ws.rs.QueryParam;
import org.apache.pulsar.shade.javax.ws.rs.container.AsyncResponse;
import org.apache.pulsar.shade.javax.ws.rs.container.Suspended;
import org.apache.pulsar.shade.javax.ws.rs.core.Response;
import org.apache.pulsar.shade.org.apache.pulsar.common.conf.InternalConfigurationData;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.TopicVersion;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.BrokerInfo;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BrokersBase
extends AdminResource {
    private static final Logger LOG = LoggerFactory.getLogger(BrokersBase.class);
    public static final String HEALTH_CHECK_TOPIC_SUFFIX = "healthcheck";

    @GET
    @Path(value="/{cluster}")
    @ApiOperation(value="Get the list of active brokers (web service addresses) in the cluster.If authorization is not enabled, any cluster name is valid.", response=String.class, responseContainer="Set")
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve this cluster"), @ApiResponse(code=401, message="Authentication required"), @ApiResponse(code=403, message="This operation requires super-user access"), @ApiResponse(code=404, message="Cluster does not exist: cluster={clustername}")})
    public void getActiveBrokers(@Suspended AsyncResponse asyncResponse, @PathParam(value="cluster") String cluster) {
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.validateSuperUserAccessAsync().thenCompose(__ -> this.validateClusterOwnershipAsync(cluster))).thenCompose(__ -> this.pulsar().getLoadManager().get().getAvailableBrokersAsync())).thenAccept(activeBrokers -> {
            LOG.info("[{}] Successfully to get active brokers, cluster={}", (Object)this.clientAppId(), (Object)cluster);
            asyncResponse.resume(activeBrokers);
        })).exceptionally(ex -> {
            if (!BrokersBase.isRedirectException(ex)) {
                LOG.error("[{}] Fail to get active brokers, cluster={}", new Object[]{this.clientAppId(), cluster, ex});
            }
            BrokersBase.resumeAsyncResponseExceptionally(asyncResponse, ex);
            return null;
        });
    }

    @GET
    @Path(value="/leaderBroker")
    @ApiOperation(value="Get the information of the leader broker.", response=BrokerInfo.class)
    @ApiResponses(value={@ApiResponse(code=401, message="Authentication required"), @ApiResponse(code=403, message="This operation requires super-user access"), @ApiResponse(code=404, message="Leader broker not found")})
    public void getLeaderBroker(@Suspended AsyncResponse asyncResponse) {
        ((CompletableFuture)this.validateSuperUserAccessAsync().thenAccept(__ -> {
            LeaderBroker leaderBroker = this.pulsar().getLeaderElectionService().getCurrentLeader().orElseThrow(() -> new RestException(Response.Status.NOT_FOUND, "Couldn't find leader broker"));
            BrokerInfo brokerInfo = BrokerInfo.builder().serviceUrl(leaderBroker.getServiceUrl()).build();
            LOG.info("[{}] Successfully to get the information of the leader broker.", (Object)this.clientAppId());
            asyncResponse.resume(brokerInfo);
        })).exceptionally(ex -> {
            LOG.error("[{}] Failed to get the information of the leader broker.", (Object)this.clientAppId(), ex);
            BrokersBase.resumeAsyncResponseExceptionally(asyncResponse, ex);
            return null;
        });
    }

    @GET
    @Path(value="/{clusterName}/{broker-webserviceurl}/ownedNamespaces")
    @ApiOperation(value="Get the list of namespaces served by the specific broker", response=NamespaceOwnershipStatus.class, responseContainer="Map")
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the cluster"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Cluster doesn't exist")})
    public Map<String, NamespaceOwnershipStatus> getOwnedNamespaces(@PathParam(value="clusterName") String cluster, @PathParam(value="broker-webserviceurl") String broker) throws Exception {
        this.validateSuperUserAccess();
        this.validateClusterOwnership(cluster);
        this.validateBrokerName(broker);
        try {
            return this.pulsar().getNamespaceService().getOwnedNameSpacesStatus();
        }
        catch (Exception e) {
            LOG.error("[{}] Failed to get the namespace ownership status. cluster={}, broker={}", new Object[]{this.clientAppId(), cluster, broker});
            throw new RestException(e);
        }
    }

    @POST
    @Path(value="/configuration/{configName}/{configValue}")
    @ApiOperation(value="Update dynamic serviceconfiguration into zk only. This operation requires Pulsar super-user privileges.")
    @ApiResponses(value={@ApiResponse(code=204, message="Service configuration updated successfully"), @ApiResponse(code=403, message="You don't have admin permission to update service-configuration"), @ApiResponse(code=404, message="Configuration not found"), @ApiResponse(code=412, message="Invalid dynamic-config value"), @ApiResponse(code=500, message="Internal server error")})
    public void updateDynamicConfiguration(@Suspended AsyncResponse asyncResponse, @PathParam(value="configName") String configName, @PathParam(value="configValue") String configValue) {
        ((CompletableFuture)((CompletableFuture)this.validateSuperUserAccessAsync().thenCompose(__ -> this.persistDynamicConfigurationAsync(configName, configValue))).thenAccept(__ -> {
            LOG.info("[{}] Updated Service configuration {}/{}", new Object[]{this.clientAppId(), configName, configValue});
            asyncResponse.resume(Response.ok().build());
        })).exceptionally(ex -> {
            LOG.error("[{}] Failed to update configuration {}/{}", new Object[]{this.clientAppId(), configName, configValue, ex});
            BrokersBase.resumeAsyncResponseExceptionally(asyncResponse, ex);
            return null;
        });
    }

    @DELETE
    @Path(value="/configuration/{configName}")
    @ApiOperation(value="Delete dynamic ServiceConfiguration into metadata only. This operation requires Pulsar super-user privileges.")
    @ApiResponses(value={@ApiResponse(code=204, message="Service configuration updated successfully"), @ApiResponse(code=403, message="You don't have admin permission to update service-configuration"), @ApiResponse(code=412, message="Invalid dynamic-config value"), @ApiResponse(code=500, message="Internal server error")})
    public void deleteDynamicConfiguration(@Suspended AsyncResponse asyncResponse, @PathParam(value="configName") String configName) {
        ((CompletableFuture)((CompletableFuture)this.validateSuperUserAccessAsync().thenCompose(__ -> this.internalDeleteDynamicConfigurationOnMetadataAsync(configName))).thenAccept(__ -> {
            LOG.info("[{}] Successfully to delete dynamic configuration {}", (Object)this.clientAppId(), (Object)configName);
            asyncResponse.resume(Response.ok().build());
        })).exceptionally(ex -> {
            LOG.error("[{}] Failed to delete dynamic configuration {}", new Object[]{this.clientAppId(), configName, ex});
            BrokersBase.resumeAsyncResponseExceptionally(asyncResponse, ex);
            return null;
        });
    }

    @GET
    @Path(value="/configuration/values")
    @ApiOperation(value="Get value of all dynamic configurations' value overridden on local config")
    @ApiResponses(value={@ApiResponse(code=403, message="You don't have admin permission to view configuration"), @ApiResponse(code=404, message="Configuration not found"), @ApiResponse(code=500, message="Internal server error")})
    public Map<String, String> getAllDynamicConfigurations() throws Exception {
        this.validateSuperUserAccess();
        try {
            return this.dynamicConfigurationResources().getDynamicConfiguration().orElseGet(Collections::emptyMap);
        }
        catch (RestException e) {
            LOG.error("[{}] couldn't find any configuration in zk {}", new Object[]{this.clientAppId(), e.getMessage(), e});
            throw e;
        }
        catch (Exception e) {
            LOG.error("[{}] Failed to retrieve configuration from zk {}", new Object[]{this.clientAppId(), e.getMessage(), e});
            throw new RestException(e);
        }
    }

    @GET
    @Path(value="/configuration")
    @ApiOperation(value="Get all updatable dynamic configurations's name")
    @ApiResponses(value={@ApiResponse(code=403, message="You don't have admin permission to get configuration")})
    public List<String> getDynamicConfigurationName() {
        this.validateSuperUserAccess();
        return BrokerService.getDynamicConfiguration();
    }

    @GET
    @Path(value="/configuration/runtime")
    @ApiOperation(value="Get all runtime configurations. This operation requires Pulsar super-user privileges.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission")})
    public Map<String, String> getRuntimeConfiguration() {
        this.validateSuperUserAccess();
        return this.pulsar().getBrokerService().getRuntimeConfiguration();
    }

    private synchronized CompletableFuture<Void> persistDynamicConfigurationAsync(String configName, String configValue) {
        if (!BrokerService.validateDynamicConfiguration(configName, configValue)) {
            return FutureUtil.failedFuture(new RestException(Response.Status.PRECONDITION_FAILED, " Invalid dynamic-config value"));
        }
        if (BrokerService.isDynamicConfiguration(configName)) {
            return this.dynamicConfigurationResources().setDynamicConfigurationWithCreateAsync(old -> {
                Map configurationMap = old.orElseGet(Maps::newHashMap);
                configurationMap.put(configName, configValue);
                return configurationMap;
            });
        }
        return FutureUtil.failedFuture(new RestException(Response.Status.PRECONDITION_FAILED, "Can't update non-dynamic configuration"));
    }

    @GET
    @Path(value="/internal-configuration")
    @ApiOperation(value="Get the internal configuration data", response=InternalConfigurationData.class)
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission")})
    public InternalConfigurationData getInternalConfigurationData() {
        this.validateSuperUserAccess();
        return this.pulsar().getInternalConfigurationData();
    }

    @GET
    @Path(value="/backlog-quota-check")
    @ApiOperation(value="An REST endpoint to trigger backlogQuotaCheck")
    @ApiResponses(value={@ApiResponse(code=200, message="Everything is OK"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=500, message="Internal server error")})
    public void backlogQuotaCheck(@Suspended AsyncResponse asyncResponse) {
        this.validateSuperUserAccess();
        this.pulsar().getBrokerService().executor().execute(() -> {
            try {
                this.pulsar().getBrokerService().monitorBacklogQuota();
                asyncResponse.resume(Response.noContent().build());
            }
            catch (Exception e) {
                LOG.error("trigger backlogQuotaCheck fail", (Throwable)e);
                asyncResponse.resume(new RestException(e));
            }
        });
    }

    @GET
    @Path(value="/ready")
    @ApiOperation(value="Check if the broker is fully initialized")
    @ApiResponses(value={@ApiResponse(code=200, message="Broker is ready"), @ApiResponse(code=500, message="Broker is not ready")})
    public void isReady(@Suspended AsyncResponse asyncResponse) {
        if (this.pulsar().getState() == PulsarService.State.Started) {
            asyncResponse.resume(Response.ok("ok").build());
        } else {
            asyncResponse.resume(Response.serverError().build());
        }
    }

    @GET
    @Path(value="/health")
    @ApiOperation(value="Run a healthCheck against the broker")
    @ApiResponses(value={@ApiResponse(code=200, message="Everything is OK"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Cluster doesn't exist"), @ApiResponse(code=500, message="Internal server error")})
    @ApiParam(value="Topic Version")
    public void healthCheck(@Suspended AsyncResponse asyncResponse, @QueryParam(value="topicVersion") TopicVersion topicVersion) {
        ((CompletableFuture)((CompletableFuture)this.validateSuperUserAccessAsync().thenCompose(__ -> this.internalRunHealthCheck(topicVersion))).thenAccept(__ -> {
            LOG.info("[{}] Successfully run health check.", (Object)this.clientAppId());
            asyncResponse.resume("ok");
        })).exceptionally(ex -> {
            LOG.error("[{}] Fail to run health check.", (Object)this.clientAppId(), ex);
            BrokersBase.resumeAsyncResponseExceptionally(asyncResponse, ex);
            return null;
        });
    }

    private CompletableFuture<Void> internalRunHealthCheck(TopicVersion topicVersion) {
        NamespaceName namespaceName = topicVersion == TopicVersion.V2 ? NamespaceService.getHeartbeatNamespaceV2(this.pulsar().getAdvertisedAddress(), this.pulsar().getConfiguration()) : NamespaceService.getHeartbeatNamespace(this.pulsar().getAdvertisedAddress(), this.pulsar().getConfiguration());
        String topicName = String.format("persistent://%s/%s", namespaceName, HEALTH_CHECK_TOPIC_SUFFIX);
        LOG.info("[{}] Running healthCheck with topic={}", (Object)this.clientAppId(), (Object)topicName);
        String messageStr = UUID.randomUUID().toString();
        String subscriptionName = "healthCheck-" + messageStr;
        return this.pulsar().getBrokerService().getTopic(topicName, true).thenCompose(topicOptional -> {
            PulsarClient client;
            if (!topicOptional.isPresent()) {
                LOG.error("[{}] Fail to run health check while get topic {}. because get null value.", (Object)this.clientAppId(), (Object)topicName);
                throw new RestException(Response.Status.NOT_FOUND, String.format("Topic [%s] not found after create.", topicName));
            }
            try {
                client = this.pulsar().getClient();
            }
            catch (PulsarServerException e) {
                LOG.error("[{}] Fail to run health check while get client.", (Object)this.clientAppId());
                throw new RestException(e);
            }
            CompletableFuture resultFuture = new CompletableFuture();
            ((CompletableFuture)client.newProducer(Schema.STRING).topic(topicName).createAsync().thenCompose(producer -> ((CompletableFuture)client.newReader(Schema.STRING).topic(topicName).subscriptionName(subscriptionName).startMessageId(MessageId.latest).createAsync().exceptionally(createException -> {
                producer.closeAsync().exceptionally(ex -> {
                    LOG.error("[{}] Close producer fail while heath check.", (Object)this.clientAppId());
                    return null;
                });
                throw FutureUtil.wrapToCompletionException(createException);
            })).thenCompose(reader -> ((CompletableFuture)producer.sendAsync(messageStr).thenCompose(__ -> this.healthCheckRecursiveReadNext((Reader<String>)reader, messageStr))).whenComplete((__, ex) -> this.closeAndReCheck((Producer<String>)producer, (Reader<String>)reader, (Topic)topicOptional.get(), subscriptionName).whenComplete((unused, innerEx) -> {
                if (ex != null) {
                    resultFuture.completeExceptionally((Throwable)ex);
                } else {
                    resultFuture.complete(null);
                }
            }))))).exceptionally(ex -> {
                resultFuture.completeExceptionally((Throwable)ex);
                return null;
            });
            return resultFuture;
        });
    }

    private CompletableFuture<Void> closeAndReCheck(Producer<String> producer, Reader<String> reader, Topic topic, String subscriptionName) {
        CompletableFuture<Void> producerFuture = producer.closeAsync();
        CompletableFuture<Void> readerFuture = reader.closeAsync();
        ArrayList<CompletableFuture<Void>> futures = new ArrayList<CompletableFuture<Void>>(2);
        futures.add(producerFuture);
        futures.add(readerFuture);
        return FutureUtil.waitForAll(Collections.unmodifiableList(futures)).exceptionally(closeException -> {
            if (readerFuture.isCompletedExceptionally()) {
                LOG.error("[{}] Close reader fail while heath check.", (Object)this.clientAppId());
                Subscription subscription = topic.getSubscription(subscriptionName);
                if (subscription != null) {
                    LOG.warn("[{}] Force delete subscription {} when it still exists after the reader is closed.", (Object)this.clientAppId(), (Object)subscription);
                    subscription.deleteForcefully().exceptionally(ex -> {
                        LOG.error("[{}] Force delete subscription fail while health check", (Object)this.clientAppId(), ex);
                        return null;
                    });
                }
            } else {
                LOG.error("[{}] Close producer fail while heath check.", (Object)this.clientAppId());
            }
            return null;
        });
    }

    private CompletableFuture<Void> healthCheckRecursiveReadNext(Reader<String> reader, String content) {
        return reader.readNextAsync().thenCompose(msg -> {
            if (!Objects.equals(content, msg.getValue())) {
                return this.healthCheckRecursiveReadNext(reader, content);
            }
            return CompletableFuture.completedFuture(null);
        });
    }

    private CompletableFuture<Void> internalDeleteDynamicConfigurationOnMetadataAsync(String configName) {
        if (!BrokerService.isDynamicConfiguration(configName)) {
            throw new RestException(Response.Status.PRECONDITION_FAILED, " Can't update non-dynamic configuration");
        }
        return this.dynamicConfigurationResources().setDynamicConfigurationAsync(old -> {
            if (old != null) {
                old.remove(configName);
            }
            return old;
        });
    }

    @GET
    @Path(value="/version")
    @ApiOperation(value="Get version of current broker")
    @ApiResponses(value={@ApiResponse(code=200, message="Everything is OK"), @ApiResponse(code=500, message="Internal server error")})
    public String version() throws Exception {
        return PulsarVersion.getVersion();
    }

    @POST
    @Path(value="/shutdown")
    @ApiOperation(value="Shutdown broker gracefully.")
    @ApiResponses(value={@ApiResponse(code=204, message="Execute shutdown command successfully"), @ApiResponse(code=403, message="You don't have admin permission to update service-configuration"), @ApiResponse(code=500, message="Internal server error")})
    public void shutDownBrokerGracefully(@ApiParam(name="maxConcurrentUnloadPerSec", value="if the value absent(value=0) means no concurrent limitation.") @QueryParam(value="maxConcurrentUnloadPerSec") int maxConcurrentUnloadPerSec, @QueryParam(value="forcedTerminateTopic") @DefaultValue(value="true") boolean forcedTerminateTopic) {
        this.validateSuperUserAccess();
        this.doShutDownBrokerGracefully(maxConcurrentUnloadPerSec, forcedTerminateTopic);
    }

    private void doShutDownBrokerGracefully(int maxConcurrentUnloadPerSec, boolean forcedTerminateTopic) {
        this.pulsar().getBrokerService().unloadNamespaceBundlesGracefully(maxConcurrentUnloadPerSec, forcedTerminateTopic);
        this.pulsar().closeAsync();
    }
}

