package org.apache.pulsar.broker.admin.impl;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.shade.com.google.common.collect.Maps;
import org.apache.pulsar.shade.io.swagger.annotations.ApiOperation;
import org.apache.pulsar.shade.io.swagger.annotations.ApiResponse;
import org.apache.pulsar.shade.io.swagger.annotations.ApiResponses;
import org.apache.pulsar.shade.javax.ws.rs.DELETE;
import org.apache.pulsar.shade.javax.ws.rs.GET;
import org.apache.pulsar.shade.javax.ws.rs.POST;
import org.apache.pulsar.shade.javax.ws.rs.Path;
import org.apache.pulsar.shade.javax.ws.rs.PathParam;
import org.apache.pulsar.shade.javax.ws.rs.container.AsyncResponse;
import org.apache.pulsar.shade.javax.ws.rs.container.Suspended;
import org.apache.pulsar.shade.javax.ws.rs.core.Response;
import org.apache.pulsar.shade.org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.shade.org.apache.pulsar.common.conf.InternalConfigurationData;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.shade.org.apache.zookeeper.CreateMode;
import org.apache.pulsar.shade.org.apache.zookeeper.ZooDefs;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/admin/impl/BrokersBase.class */
public class BrokersBase extends AdminResource {
    private static final Logger LOG = LoggerFactory.getLogger(BrokersBase.class);
    private int serviceConfigZkVersion = -1;

    @Path("/{cluster}")
    @GET
    @ApiOperation(value = "Get the list of active brokers (web service addresses) in the cluster.If authorization is not enabled, any cluster name is valid.", response = String.class, responseContainer = "Set")
    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve this cluster"), @ApiResponse(code = 401, message = "Authentication required"), @ApiResponse(code = 403, message = "This operation requires super-user access"), @ApiResponse(code = 404, message = "Cluster does not exist: cluster={clustername}")})
    public Set<String> getActiveBrokers(@PathParam("cluster") String str) throws Exception {
        validateSuperUserAccess();
        validateClusterOwnership(str);
        try {
            return pulsar().getLocalZkCache().getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT);
        } catch (Exception e) {
            LOG.error("[{}] Failed to get active broker list: cluster={}", new Object[]{clientAppId(), str, e});
            throw new RestException(e);
        }
    }

    @Path("/{clusterName}/{broker-webserviceurl}/ownedNamespaces")
    @GET
    @ApiOperation(value = "Get the list of namespaces served by the specific broker", response = NamespaceOwnershipStatus.class, responseContainer = "Map")
    @ApiResponses({@ApiResponse(code = 307, message = "Current broker doesn't serve the cluster"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Cluster doesn't exist")})
    public Map<String, NamespaceOwnershipStatus> getOwnedNamespaces(@PathParam("clusterName") String str, @PathParam("broker-webserviceurl") String str2) throws Exception {
        validateSuperUserAccess();
        validateClusterOwnership(str);
        validateBrokerName(str2);
        try {
            return pulsar().getNamespaceService().getOwnedNameSpacesStatus();
        } catch (Exception e) {
            LOG.error("[{}] Failed to get the namespace ownership status. cluster={}, broker={}", new Object[]{clientAppId(), str, str2});
            throw new RestException(e);
        }
    }

    @Path("/configuration/{configName}/{configValue}")
    @POST
    @ApiOperation("Update dynamic serviceconfiguration into zk only. This operation requires Pulsar super-user privileges.")
    @ApiResponses({@ApiResponse(code = 204, message = "Service configuration updated successfully"), @ApiResponse(code = 403, message = "You don't have admin permission to update service-configuration"), @ApiResponse(code = 404, message = "Configuration not found"), @ApiResponse(code = 412, message = "Invalid dynamic-config value"), @ApiResponse(code = 500, message = "Internal server error")})
    public void updateDynamicConfiguration(@PathParam("configName") String str, @PathParam("configValue") String str2) throws Exception {
        validateSuperUserAccess();
        updateDynamicConfigurationOnZk(str, str2);
    }

    @DELETE
    @Path("/configuration/{configName}")
    @ApiOperation("Delete dynamic serviceconfiguration into zk only. This operation requires Pulsar super-user privileges.")
    @ApiResponses({@ApiResponse(code = 204, message = "Service configuration updated successfully"), @ApiResponse(code = 403, message = "You don't have admin permission to update service-configuration"), @ApiResponse(code = 412, message = "Invalid dynamic-config value"), @ApiResponse(code = 500, message = "Internal server error")})
    public void deleteDynamicConfiguration(@PathParam("configName") String str) throws Exception {
        validateSuperUserAccess();
        deleteDynamicConfigurationOnZk(str);
    }

    @Path("/configuration/values")
    @GET
    @ApiOperation("Get value of all dynamic configurations' value overridden on local config")
    @ApiResponses({@ApiResponse(code = 404, message = "Configuration not found"), @ApiResponse(code = 500, message = "Internal server error")})
    public Map<String, String> getAllDynamicConfigurations() throws Exception {
        try {
            return pulsar().getBrokerService().getDynamicConfigurationCache().get(BrokerService.BROKER_SERVICE_CONFIGURATION_PATH).orElseThrow(() -> {
                return new RestException(Response.Status.NOT_FOUND, "Couldn't find configuration in zk");
            });
        } catch (RestException e) {
            LOG.error("[{}] couldn't find any configuration in zk {}", new Object[]{clientAppId(), e.getMessage(), e});
            throw e;
        } catch (Exception e2) {
            LOG.error("[{}] Failed to retrieve configuration from zk {}", new Object[]{clientAppId(), e2.getMessage(), e2});
            throw new RestException(e2);
        }
    }

    @GET
    @Path("/configuration")
    @ApiOperation("Get all updatable dynamic configurations's name")
    public List<String> getDynamicConfigurationName() {
        return BrokerService.getDynamicConfiguration();
    }

    @Path("/configuration/runtime")
    @GET
    @ApiOperation("Get all runtime configurations. This operation requires Pulsar super-user privileges.")
    @ApiResponses({@ApiResponse(code = 403, message = "Don't have admin permission")})
    public Map<String, String> getRuntimeConfiguration() {
        validateSuperUserAccess();
        return pulsar().getBrokerService().getRuntimeConfiguration();
    }

    private synchronized void updateDynamicConfigurationOnZk(String str, String str2) {
        try {
            if (!BrokerService.validateDynamicConfiguration(str, str2)) {
                throw new RestException(Response.Status.PRECONDITION_FAILED, " Invalid dynamic-config value");
            }
            if (!BrokerService.isDynamicConfiguration(str)) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("[{}] Can't update non-dynamic configuration {}/{}", new Object[]{clientAppId(), str, str2});
                }
                throw new RestException(Response.Status.PRECONDITION_FAILED, " Can't update non-dynamic configuration");
            }
            ZooKeeperDataCache<Map<String, String>> dynamicConfigurationCache = pulsar().getBrokerService().getDynamicConfigurationCache();
            Map<String, String> orElse = dynamicConfigurationCache.get(BrokerService.BROKER_SERVICE_CONFIGURATION_PATH).orElse(null);
            if (orElse != null) {
                orElse.put(str, str2);
                byte[] writeValueAsBytes = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(orElse);
                dynamicConfigurationCache.invalidate(BrokerService.BROKER_SERVICE_CONFIGURATION_PATH);
                this.serviceConfigZkVersion = localZk().setData(BrokerService.BROKER_SERVICE_CONFIGURATION_PATH, writeValueAsBytes, this.serviceConfigZkVersion).getVersion();
            } else {
                HashMap newHashMap = Maps.newHashMap();
                newHashMap.put(str, str2);
                ZkUtils.createFullPathOptimistic(localZk(), BrokerService.BROKER_SERVICE_CONFIGURATION_PATH, ObjectMapperFactory.getThreadLocal().writeValueAsBytes(newHashMap), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
            LOG.info("[{}] Updated Service configuration {}/{}", new Object[]{clientAppId(), str, str2});
        } catch (RestException e) {
            throw e;
        } catch (Exception e2) {
            LOG.error("[{}] Failed to update configuration {}/{}, {}", new Object[]{clientAppId(), str, str2, e2.getMessage(), e2});
            throw new RestException(e2);
        }
    }

    @GET
    @Path("/internal-configuration")
    @ApiOperation(value = "Get the internal configuration data", response = InternalConfigurationData.class)
    public InternalConfigurationData getInternalConfigurationData() {
        return pulsar().getInternalConfigurationData();
    }

    @Path("/health")
    @GET
    @ApiOperation("Run a healthcheck against the broker")
    @ApiResponses({@ApiResponse(code = 200, message = "Everything is OK"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Cluster doesn't exist"), @ApiResponse(code = 500, message = "Internal server error")})
    public void healthcheck(@Suspended AsyncResponse asyncResponse) throws Exception {
        validateSuperUserAccess();
        String format = String.format("persistent://%s/healthcheck", NamespaceService.getHeartbeatNamespace(pulsar().getAdvertisedAddress(), pulsar().getConfiguration()));
        PulsarClient client = pulsar().getClient();
        String uuid = UUID.randomUUID().toString();
        try {
            pulsar().getBrokerService().getTopic(format, true).get().ifPresent(topic -> {
                topic.getSubscriptions().values().forEach((v0) -> {
                    v0.deleteForcefully();
                });
            });
            CompletableFuture createAsync = client.newProducer(Schema.STRING).topic(format).createAsync();
            CompletableFuture createAsync2 = client.newReader(Schema.STRING).topic(format).startMessageId(MessageId.latest).createAsync();
            CompletableFuture completableFuture = new CompletableFuture();
            CompletableFuture.allOf(createAsync, createAsync2).whenComplete((r11, th) -> {
                if (th != null) {
                    completableFuture.completeExceptionally(th);
                    return;
                }
                createAsync.thenCompose(producer -> {
                    return producer.sendAsync(uuid);
                }).whenComplete((messageId, th) -> {
                    if (th != null) {
                        completableFuture.completeExceptionally(th);
                    }
                });
                healthcheckReadLoop(createAsync2, completableFuture, uuid);
                ScheduledFuture<?> schedule = pulsar().getExecutor().schedule(() -> {
                    completableFuture.completeExceptionally(new TimeoutException("Timed out reading"));
                }, 10L, TimeUnit.SECONDS);
                completableFuture.whenComplete((r4, th2) -> {
                    schedule.cancel(false);
                });
            });
            completableFuture.whenComplete((r8, th2) -> {
                createAsync.thenAccept(producer -> {
                    producer.closeAsync().whenComplete((r4, th2) -> {
                        if (th2 != null) {
                            LOG.warn("Error closing producer for healthcheck", th2);
                        }
                    });
                });
                createAsync2.thenAccept(reader -> {
                    reader.closeAsync().whenComplete((r4, th2) -> {
                        if (th2 != null) {
                            LOG.warn("Error closing reader for healthcheck", th2);
                        }
                    });
                });
                if (th2 != null) {
                    asyncResponse.resume((Throwable) new RestException(th2));
                } else {
                    asyncResponse.resume("ok");
                }
            });
        } catch (Exception e) {
            asyncResponse.resume((Throwable) new RestException(e));
        }
    }

    private void healthcheckReadLoop(CompletableFuture<Reader<String>> completableFuture, CompletableFuture<?> completableFuture2, String str) {
        completableFuture.thenAccept(reader -> {
            reader.readNextAsync().whenComplete((message, th) -> {
                if (th != null) {
                    completableFuture2.completeExceptionally(th);
                } else if (((String) message.getValue()).equals(str)) {
                    completableFuture2.complete(null);
                } else {
                    healthcheckReadLoop(completableFuture, completableFuture2, str);
                }
            });
        });
    }

    private synchronized void deleteDynamicConfigurationOnZk(String str) {
        try {
            if (!BrokerService.isDynamicConfiguration(str)) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("[{}] Can't update non-dynamic configuration {}", clientAppId(), str);
                }
                throw new RestException(Response.Status.PRECONDITION_FAILED, " Can't update non-dynamic configuration");
            }
            ZooKeeperDataCache<Map<String, String>> dynamicConfigurationCache = pulsar().getBrokerService().getDynamicConfigurationCache();
            Map<String, String> orElse = dynamicConfigurationCache.get(BrokerService.BROKER_SERVICE_CONFIGURATION_PATH).orElse(null);
            if (orElse != null && orElse.containsKey(str)) {
                orElse.remove(str);
                byte[] writeValueAsBytes = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(orElse);
                dynamicConfigurationCache.invalidate(BrokerService.BROKER_SERVICE_CONFIGURATION_PATH);
                this.serviceConfigZkVersion = localZk().setData(BrokerService.BROKER_SERVICE_CONFIGURATION_PATH, writeValueAsBytes, this.serviceConfigZkVersion).getVersion();
            }
            LOG.info("[{}] Deleted Service configuration {}", clientAppId(), str);
        } catch (RestException e) {
            throw e;
        } catch (Exception e2) {
            LOG.error("[{}] Failed to update configuration {}, {}", new Object[]{clientAppId(), str, e2.getMessage(), e2});
            throw new RestException(e2);
        }
    }
}
