/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.controller;

import io.confluent.kafka.multitenant.TenantUtils;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.function.Function;
import java.util.function.IntPredicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
import org.apache.kafka.common.errors.InvalidPartitionsException;
import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
import org.apache.kafka.common.errors.InvalidReplicationFactorException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.NoReassignmentInProgressException;
import org.apache.kafka.common.errors.PolicyViolationException;
import org.apache.kafka.common.errors.ResourceNotFoundException;
import org.apache.kafka.common.errors.ThrottlingQuotaExceededException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicIdException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.AlterPartitionRequestData;
import org.apache.kafka.common.message.AlterPartitionResponseData;
import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
import org.apache.kafka.common.message.CreatePartitionsRequestData;
import org.apache.kafka.common.message.CreatePartitionsResponseData;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsResponseData;
import org.apache.kafka.common.message.ElectLeadersRequestData;
import org.apache.kafka.common.message.ElectLeadersResponseData;
import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.FenceBrokerRecord;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.RemoveTopicRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AlterPartitionRequest;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.controller.BrokerControlStates;
import org.apache.kafka.controller.BrokerHeartbeatManager;
import org.apache.kafka.controller.BrokersToIsrs;
import org.apache.kafka.controller.ClusterControlManager;
import org.apache.kafka.controller.ConfigurationControlManager;
import org.apache.kafka.controller.ControllerRequestContext;
import org.apache.kafka.controller.ControllerResult;
import org.apache.kafka.controller.FeatureControlManager;
import org.apache.kafka.controller.MirrorTopicControlManager;
import org.apache.kafka.controller.PartitionChangeBuilder;
import org.apache.kafka.controller.PartitionReassignmentReplicas;
import org.apache.kafka.controller.PartitionReassignmentRevert;
import org.apache.kafka.controller.ResultOrError;
import org.apache.kafka.controller.TenantControlManager;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
import org.apache.kafka.metadata.BrokerRegistrationInControlledShutdownChange;
import org.apache.kafka.metadata.ConfluentPartitionsPerTopicListener;
import org.apache.kafka.metadata.DegradedBrokerHealthState;
import org.apache.kafka.metadata.KafkaConfigSchema;
import org.apache.kafka.metadata.LeaderRecoveryState;
import org.apache.kafka.metadata.MirrorTopic;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.metadata.Replicas;
import org.apache.kafka.metadata.TopicPlacement;
import org.apache.kafka.metadata.placement.ClusterDescriber;
import org.apache.kafka.metadata.placement.PartitionAssignment;
import org.apache.kafka.metadata.placement.PlacementSpec;
import org.apache.kafka.metadata.placement.TopicAssignment;
import org.apache.kafka.metadata.placement.UsableBroker;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.common.TopicIdPartition;
import org.apache.kafka.server.mutable.BoundedList;
import org.apache.kafka.server.policy.CreateTopicPolicy;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.apache.kafka.timeline.TimelineHashSet;
import org.slf4j.Logger;

public class ReplicationControlManager {
    private final SnapshotRegistry snapshotRegistry;
    private final Logger log;
    private final short defaultReplicationFactor;
    private final int defaultNumPartitions;
    private final int defaultMinIsrCount;
    private final boolean eligibleLeaderReplicasEnabled;
    private final int maxElectionsPerImbalance;
    private final int maxPartitionChangesPerSlice;
    private final ConfigurationControlManager configurationControl;
    private final ClusterControlManager clusterControl;
    private final Optional<CreateTopicPolicy> createTopicPolicy;
    private final boolean applyCreateTopicsPolicyToCreatePartitions;
    private final FeatureControlManager featureControl;
    private final TimelineHashMap<String, Uuid> topicsByName;
    private final TimelineHashMap<String, TimelineHashSet<String>> topicsWithCollisionChars;
    private final TimelineHashMap<Uuid, TopicControlInfo> topics;
    private final MirrorTopicControlManager mirrorTopicControl;
    private final TenantControlManager tenantControl;
    private final BrokersToIsrs brokersToIsrs;
    private final TimelineHashMap<Uuid, int[]> reassigningTopics;
    private final TimelineHashSet<TopicIdPartition> imbalancedInternalPartitions;
    private final TimelineHashSet<TopicIdPartition> imbalancedExternalPartitions;
    private final Optional<TopicPlacement> defaultTopicPlacement;
    private final Function<String, String> nameToTenantCallback;

    KRaftCellDescriber getKRaftCellDescriber(int cellId) {
        return new KRaftCellDescriber(cellId);
    }

    static Map<String, String> translateCreationConfigs(CreateTopicsRequestData.CreateableTopicConfigCollection collection, Optional<TopicPlacement> topicPlacementOpt) {
        HashMap result = new HashMap();
        collection.forEach(config -> {
            if (config.name().equals("confluent.placement.constraints")) {
                return;
            }
            result.put(config.name(), config.value());
        });
        topicPlacementOpt.ifPresent(tp -> result.put("confluent.placement.constraints", tp.toJson()));
        return Collections.unmodifiableMap(result);
    }

    private ReplicationControlManager(SnapshotRegistry snapshotRegistry, LogContext logContext, short defaultReplicationFactor, int defaultNumPartitions, int defaultMinIsrCount, int maxElectionsPerImbalance, boolean eligibleLeaderReplicasEnabled, int maxPartitionChangesPerSlice, ConfigurationControlManager configurationControl, ClusterControlManager clusterControl, Optional<CreateTopicPolicy> createTopicPolicy, Function<String, String> nameToTenantCallback, FeatureControlManager featureControl, boolean applyCreateTopicsPolicyToCreatePartitions, MirrorTopicControlManager mirrorTopicControl, TenantControlManager tenantControl, Optional<TopicPlacement> defaultTopicPlacement) {
        this.snapshotRegistry = snapshotRegistry;
        this.log = logContext.logger(ReplicationControlManager.class);
        this.defaultReplicationFactor = defaultReplicationFactor;
        this.defaultNumPartitions = defaultNumPartitions;
        this.defaultMinIsrCount = defaultMinIsrCount;
        this.maxElectionsPerImbalance = maxElectionsPerImbalance;
        this.maxPartitionChangesPerSlice = maxPartitionChangesPerSlice;
        this.eligibleLeaderReplicasEnabled = eligibleLeaderReplicasEnabled;
        this.configurationControl = configurationControl;
        this.createTopicPolicy = createTopicPolicy;
        this.featureControl = featureControl;
        this.applyCreateTopicsPolicyToCreatePartitions = applyCreateTopicsPolicyToCreatePartitions;
        this.clusterControl = clusterControl;
        this.topicsByName = new TimelineHashMap(snapshotRegistry, 0);
        this.topicsWithCollisionChars = new TimelineHashMap(snapshotRegistry, 0);
        this.topics = new TimelineHashMap(snapshotRegistry, 0);
        this.brokersToIsrs = new BrokersToIsrs(snapshotRegistry);
        this.reassigningTopics = new TimelineHashMap(snapshotRegistry, 0);
        this.imbalancedInternalPartitions = new TimelineHashSet(snapshotRegistry, 0);
        this.imbalancedExternalPartitions = new TimelineHashSet(snapshotRegistry, 0);
        this.mirrorTopicControl = mirrorTopicControl;
        this.tenantControl = tenantControl;
        this.nameToTenantCallback = nameToTenantCallback;
        this.defaultTopicPlacement = defaultTopicPlacement;
    }

    Optional<CreateTopicPolicy> createTopicPolicy() {
        return this.createTopicPolicy;
    }

    public void replay(TopicRecord record) {
        Uuid existingUuid = (Uuid)this.topicsByName.put((Object)record.name(), (Object)record.topicId());
        if (existingUuid != null) {
            if (existingUuid.equals((Object)record.topicId())) {
                throw new RuntimeException("Found duplicate TopicRecord for " + record.name() + " with topic ID " + record.topicId());
            }
            throw new RuntimeException("Found duplicate TopicRecord for " + record.name() + " with a different ID than before. Previous ID was " + existingUuid + " and new ID is " + record.topicId());
        }
        if (Topic.hasCollisionChars((String)record.name())) {
            String normalizedName = Topic.unifyCollisionChars((String)record.name());
            TimelineHashSet topicNames = (TimelineHashSet)this.topicsWithCollisionChars.get((Object)normalizedName);
            if (topicNames == null) {
                topicNames = new TimelineHashSet(this.snapshotRegistry, 1);
                this.topicsWithCollisionChars.put((Object)normalizedName, (Object)topicNames);
            }
            topicNames.add((Object)record.name());
        }
        if (this.topics.put((Object)record.topicId(), (Object)new TopicControlInfo(record.name(), this.snapshotRegistry, record.topicId())) == null) {
            this.updateConfluentPartitionsPerTopicListener(record.name(), 0, 1, Collections.emptyMap(), Collections.emptyMap(), false);
        }
        this.log.info("Replayed TopicRecord for topic {} with topic ID {}.", (Object)record.name(), (Object)record.topicId());
    }

    public void replay(PartitionRecord record) {
        TopicControlInfo topicInfo = (TopicControlInfo)this.topics.get((Object)record.topicId());
        if (topicInfo == null) {
            throw new RuntimeException("Tried to create partition " + record.topicId() + ":" + record.partitionId() + ", but no topic with that ID was found.");
        }
        PartitionRegistration newPartInfo = new PartitionRegistration(record);
        PartitionRegistration prevPartInfo = (PartitionRegistration)topicInfo.parts.get((Object)record.partitionId());
        String description = topicInfo.name + "-" + record.partitionId() + " with topic ID " + record.topicId();
        if (prevPartInfo == null) {
            this.log.info("Replayed PartitionRecord for new partition {} and {}.", (Object)description, (Object)newPartInfo);
            topicInfo.parts.put((Object)record.partitionId(), (Object)newPartInfo);
            this.brokersToIsrs.update(record.topicId(), record.partitionId(), null, newPartInfo.isr, -1, newPartInfo.leader);
            this.updateReassigningTopicsIfNeeded(record.topicId(), record.partitionId(), false, PartitionReassignmentReplicas.isReassignmentInProgress(newPartInfo));
            this.updateConfluentPartitionsPerTopicListener(topicInfo.name, 1, 0, ReplicationControlManager.partitionIdToReplicas(record.partitionId(), newPartInfo.replicas), Collections.emptyMap(), false);
        } else if (!newPartInfo.equals(prevPartInfo)) {
            this.log.info("Replayed PartitionRecord for existing partition {} and {}.", (Object)description, (Object)newPartInfo);
            newPartInfo.maybeLogPartitionChange(this.log, description, prevPartInfo);
            topicInfo.parts.put((Object)record.partitionId(), (Object)newPartInfo);
            this.brokersToIsrs.update(record.topicId(), record.partitionId(), prevPartInfo.isr, newPartInfo.isr, prevPartInfo.leader, newPartInfo.leader);
            this.updateReassigningTopicsIfNeeded(record.topicId(), record.partitionId(), PartitionReassignmentReplicas.isReassignmentInProgress(prevPartInfo), PartitionReassignmentReplicas.isReassignmentInProgress(newPartInfo));
            this.updateConfluentPartitionsPerTopicListener(topicInfo.name, 0, 0, ReplicationControlManager.partitionIdToReplicas(record.partitionId(), newPartInfo.addingReplicas), ReplicationControlManager.partitionIdToReplicas(record.partitionId(), newPartInfo.removingReplicas), false);
        }
        if (newPartInfo.hasPreferredLeader()) {
            this.removeFromImbalancedPartitions(new TopicIdPartition(record.topicId(), record.partitionId()));
        } else {
            this.addToImbalancedPartitions(new TopicIdPartition(record.topicId(), record.partitionId()), topicInfo.name());
        }
    }

    private void updateReassigningTopicsIfNeeded(Uuid topicId, int partitionId, boolean wasReassigning, boolean isReassigning) {
        if (!wasReassigning) {
            if (isReassigning) {
                int[] prevReassigningParts = (int[])this.reassigningTopics.getOrDefault((Object)topicId, (Object)Replicas.NONE);
                this.reassigningTopics.put((Object)topicId, (Object)Replicas.copyWith(prevReassigningParts, partitionId));
            }
        } else if (!isReassigning) {
            int[] prevReassigningParts = (int[])this.reassigningTopics.getOrDefault((Object)topicId, (Object)Replicas.NONE);
            int[] newReassigningParts = Replicas.copyWithout(prevReassigningParts, partitionId);
            if (newReassigningParts.length == 0) {
                this.reassigningTopics.remove((Object)topicId);
            } else {
                this.reassigningTopics.put((Object)topicId, (Object)newReassigningParts);
            }
        }
    }

    public void replay(PartitionChangeRecord record) {
        TopicControlInfo topicInfo = (TopicControlInfo)this.topics.get((Object)record.topicId());
        if (topicInfo == null) {
            throw new RuntimeException("Tried to create partition " + record.topicId() + ":" + record.partitionId() + ", but no topic with that ID was found.");
        }
        PartitionRegistration prevPartitionInfo = (PartitionRegistration)topicInfo.parts.get((Object)record.partitionId());
        if (prevPartitionInfo == null) {
            throw new RuntimeException("Tried to create partition " + record.topicId() + ":" + record.partitionId() + ", but no partition with that id was found.");
        }
        PartitionRegistration newPartitionInfo = prevPartitionInfo.merge(record);
        this.updateReassigningTopicsIfNeeded(record.topicId(), record.partitionId(), PartitionReassignmentReplicas.isReassignmentInProgress(prevPartitionInfo), PartitionReassignmentReplicas.isReassignmentInProgress(newPartitionInfo));
        topicInfo.parts.put((Object)record.partitionId(), (Object)newPartitionInfo);
        this.brokersToIsrs.update(record.topicId(), record.partitionId(), prevPartitionInfo.isr, newPartitionInfo.isr, prevPartitionInfo.leader, newPartitionInfo.leader);
        String topicPart = topicInfo.name + "-" + record.partitionId() + " with topic ID " + record.topicId();
        newPartitionInfo.maybeLogPartitionChange(this.log, topicPart, prevPartitionInfo);
        if (newPartitionInfo.hasPreferredLeader()) {
            this.removeFromImbalancedPartitions(new TopicIdPartition(record.topicId(), record.partitionId()));
        } else {
            this.addToImbalancedPartitions(new TopicIdPartition(record.topicId(), record.partitionId()), topicInfo.name());
        }
        this.updateConfluentPartitionsPerTopicListener(topicInfo.name, 0, 0, ReplicationControlManager.partitionIdToReplicas(record.partitionId(), newPartitionInfo.addingReplicas), ReplicationControlManager.partitionIdToReplicas(record.partitionId(), newPartitionInfo.removingReplicas), false);
        if (record.removingReplicas() != null || record.addingReplicas() != null) {
            this.log.info("Replayed partition assignment change {} for topic {}", (Object)record, (Object)topicInfo.name);
        } else if (this.log.isDebugEnabled()) {
            this.log.debug("Replayed partition change {} for topic {}", (Object)record, (Object)topicInfo.name);
        }
    }

    public void replay(RemoveTopicRecord record) {
        Object normalizedName;
        TimelineHashSet colliding;
        TopicControlInfo topic = (TopicControlInfo)this.topics.remove((Object)record.topicId());
        if (topic == null) {
            throw new UnknownTopicIdException("Can't find topic with ID " + record.topicId() + " to remove.");
        }
        this.topicsByName.remove((Object)topic.name);
        if (Topic.hasCollisionChars((String)topic.name) && (colliding = (TimelineHashSet)this.topicsWithCollisionChars.get(normalizedName = Topic.unifyCollisionChars((String)topic.name))) != null) {
            colliding.remove((Object)topic.name);
            if (colliding.isEmpty()) {
                this.topicsWithCollisionChars.remove(normalizedName);
            }
        }
        this.reassigningTopics.remove((Object)record.topicId());
        this.configurationControl.deleteTopicConfigs(topic.name);
        this.mirrorTopicControl.deleteMirrorTopic(topic.topicId(), topic.name());
        for (Map.Entry entry : topic.parts.entrySet()) {
            int partitionId = (Integer)entry.getKey();
            PartitionRegistration partition = (PartitionRegistration)entry.getValue();
            for (int i = 0; i < partition.isr.length; ++i) {
                this.brokersToIsrs.removeTopicEntryForBroker(topic.id, partition.isr[i]);
            }
            this.removeFromImbalancedPartitions(new TopicIdPartition(record.topicId(), partitionId));
        }
        this.brokersToIsrs.removeTopicEntryForBroker(topic.id, -1);
        this.log.info("Replayed RemoveTopicRecord for topic {} with ID {}.", (Object)topic.name, (Object)record.topicId());
        HashMap<Integer, List<Integer>> partitionIdToReplicasRemoved = new HashMap<Integer, List<Integer>>();
        for (Map.Entry entry : topic.parts.entrySet()) {
            partitionIdToReplicasRemoved.put((Integer)entry.getKey(), Arrays.stream(((PartitionRegistration)entry.getValue()).replicas).boxed().collect(Collectors.toList()));
        }
        this.updateConfluentPartitionsPerTopicListener(topic.name, -topic.parts.size(), -1, Collections.emptyMap(), partitionIdToReplicasRemoved, false);
    }

    ControllerResult<CreateTopicsResponseData> createTopics(ControllerRequestContext context, CreateTopicsRequestData request, Set<String> describable) {
        ApiError error;
        this.clearConfluentPartitionsPerTopicListenerPendingState();
        HashMap<String, ApiError> topicErrors = new HashMap<String, ApiError>();
        BoundedList records = BoundedList.newArrayBacked((int)10000);
        ReplicationControlManager.validateNewTopicNames(topicErrors, request.topics(), this.topicsWithCollisionChars);
        request.topics().stream().filter(creatableTopic -> this.topicsByName.containsKey((Object)creatableTopic.name())).forEach(t -> topicErrors.put(t.name(), new ApiError(Errors.TOPIC_ALREADY_EXISTS, "Topic '" + t.name() + "' already exists.")));
        int cellId = this.computeTenantCellId(request, topicErrors, context.principal(), (List<ApiMessageAndVersion>)records);
        HashMap<String, TopicPlacement> topicPlacements = new HashMap<String, TopicPlacement>();
        Map<ConfigResource, Map<String, Map.Entry<AlterConfigOp.OpType, String>>> configChanges = ReplicationControlManager.computeConfigChanges(topicErrors, topicPlacements, request.topics(), this.featureControl.isTopicPlacementSupported(), this.defaultTopicPlacement);
        HashMap<String, CreateTopicsResponseData.CreatableTopicResult> successes = new HashMap<String, CreateTopicsResponseData.CreatableTopicResult>();
        Set<Integer> excludedBrokerIds = this.clusterControl.activeBrokerReplicaExclusions().keySet();
        for (CreateTopicsRequestData.CreatableTopic topic : request.topics()) {
            List<ApiMessageAndVersion> configRecords;
            if (topicErrors.containsKey(topic.name())) continue;
            ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, topic.name());
            Map<String, Map.Entry<AlterConfigOp.OpType, String>> keyToOps = configChanges.get(configResource);
            if (keyToOps != null) {
                ControllerResult<ApiError> configResult = this.configurationControl.incrementalAlterConfig(configResource, keyToOps, true, context.principal());
                if (configResult.response().isFailure()) {
                    topicErrors.put(topic.name(), configResult.response());
                    continue;
                }
                configRecords = configResult.records();
            } else {
                configRecords = Collections.emptyList();
            }
            try {
                error = this.createTopic(context, topic, (List<ApiMessageAndVersion>)records, successes, configRecords, describable.contains(topic.name()), excludedBrokerIds, context.principal(), cellId, topicPlacements);
            }
            catch (ApiException e) {
                error = ApiError.fromThrowable((Throwable)e);
            }
            if (!error.isFailure()) continue;
            topicErrors.put(topic.name(), error);
        }
        CreateTopicsResponseData data = new CreateTopicsResponseData();
        StringBuilder resultsBuilder = new StringBuilder();
        String resultsPrefix = "";
        for (CreateTopicsRequestData.CreatableTopic topic : request.topics()) {
            error = (ApiError)topicErrors.get(topic.name());
            if (error != null) {
                data.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsResponseData.CreatableTopicResult().setName(topic.name()).setErrorCode(error.error().code()).setErrorMessage(error.message()));
                resultsBuilder.append(resultsPrefix).append(topic).append(": ").append(error.error()).append(" (").append(error.message()).append(")");
                resultsPrefix = ", ";
                continue;
            }
            CreateTopicsResponseData.CreatableTopicResult result = (CreateTopicsResponseData.CreatableTopicResult)successes.get(topic.name());
            data.topics().add((ImplicitLinkedHashCollection.Element)result);
            resultsBuilder.append(resultsPrefix).append(topic).append(": ").append("SUCCESS");
            resultsPrefix = ", ";
        }
        if (request.validateOnly()) {
            this.log.info("Validate-only CreateTopics result(s): {}", (Object)resultsBuilder.toString());
            return ControllerResult.atomicOf(Collections.emptyList(), data);
        }
        this.log.info("CreateTopics result(s): {}", (Object)resultsBuilder);
        return ControllerResult.atomicOf((List<ApiMessageAndVersion>)records, data);
    }

    public boolean isDemoted(Integer brokerId) {
        return this.clusterControl.activeBrokerComponentDegradations().containsKey(brokerId);
    }

    private ApiError createTopic(ControllerRequestContext context, CreateTopicsRequestData.CreatableTopic topic, List<ApiMessageAndVersion> records, Map<String, CreateTopicsResponseData.CreatableTopicResult> successes, List<ApiMessageAndVersion> configRecords, boolean authorizedToReturnConfigs, Set<Integer> excludedBrokerIds, KafkaPrincipal principal, int cellId, Map<String, TopicPlacement> topicPlacements) {
        boolean isMirrorTopic = topic.mirrorTopic() != null;
        Optional<TopicPlacement> topicPlacementOpt = Optional.ofNullable(topicPlacements.get(topic.name()));
        this.log.debug("Topic placement for topic {} is {}.", (Object)topic.name(), topicPlacementOpt);
        Map<String, String> creationConfigs = ReplicationControlManager.translateCreationConfigs(topic.configs(), topicPlacementOpt);
        HashMap<Integer, PartitionRegistration> newParts = new HashMap<Integer, PartitionRegistration>();
        if (!topic.assignments().isEmpty()) {
            if (topic.replicationFactor() != -1) {
                return new ApiError(Errors.INVALID_REQUEST, "A manual partition assignment was specified, but replication factor was not set to -1.");
            }
            if (topic.numPartitions() != -1) {
                return new ApiError(Errors.INVALID_REQUEST, "A manual partition assignment was specified, but numPartitions was not set to -1.");
            }
            if (topicPlacementOpt.isPresent()) {
                return new ApiError(Errors.INVALID_REQUEST, String.format("Both assignments and %s are set. Both cannot be used at the same time.", "confluent.placement.constraints"));
            }
            OptionalInt replicationFactor = OptionalInt.empty();
            for (CreateTopicsRequestData.CreatableReplicaAssignment assignment : topic.assignments()) {
                if (newParts.containsKey(assignment.partitionIndex())) {
                    return new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT, "Found multiple manual partition assignments for partition " + assignment.partitionIndex());
                }
                this.validateManualPartitionAssignment(new PartitionAssignment(assignment.brokerIds(), Collections.emptyList()), replicationFactor, excludedBrokerIds, cellId, Optional.empty());
                replicationFactor = OptionalInt.of(assignment.brokerIds().size());
                List<Integer> isr = assignment.brokerIds().stream().filter(this.clusterControl::isActive).collect(Collectors.toList());
                if (isr.isEmpty()) {
                    return new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT, "All brokers specified in the manual partition assignment for partition " + assignment.partitionIndex() + " are fenced or in controlled shutdown.");
                }
                Integer leader = this.leaderConsideringDemotions(isr, topic.name(), assignment.partitionIndex());
                newParts.put(assignment.partitionIndex(), ReplicationControlManager.buildPartitionRegistration(assignment.brokerIds(), isr, leader));
            }
            for (int i = 0; i < newParts.size(); ++i) {
                if (newParts.containsKey(i)) continue;
                return new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT, "partitions should be a consecutive 0-based integer sequence");
            }
        } else {
            if (topic.replicationFactor() < -1 || topic.replicationFactor() == 0) {
                return new ApiError(Errors.INVALID_REPLICATION_FACTOR, "Replication factor must be larger than 0, or -1 to use the default value or the topic placement.");
            }
            if (topic.numPartitions() < -1 || topic.numPartitions() == 0) {
                return new ApiError(Errors.INVALID_PARTITIONS, "Number of partitions was set to an invalid non-positive value.");
            }
            if (topic.replicationFactor() != -1 && topicPlacementOpt.isPresent()) {
                return new ApiError(Errors.INVALID_REQUEST, String.format("Both replicationFactor and %s are set. Both cannot be used at the same time.", "confluent.placement.constraints"));
            }
            int numPartitions = topic.numPartitions() == -1 ? this.defaultNumPartitions : topic.numPartitions();
            short replicationFactor = topic.replicationFactor() == -1 && !topicPlacementOpt.isPresent() ? this.defaultReplicationFactor : topic.replicationFactor();
            try {
                TopicAssignment topicAssignment = this.clusterControl.replicaPlacer().place(new PlacementSpec(0, numPartitions, replicationFactor, topic.name(), principal, this.clusterControl.activeBrokerReplicaExclusions().keySet(), this.tenantControl.calculatePartitionPlacementStrategy(Optional.ofNullable(principal)), topicPlacementOpt), this.getKRaftCellDescriber(cellId));
                for (int partitionId = 0; partitionId < topicAssignment.assignments().size(); ++partitionId) {
                    PartitionAssignment partitionAssignment = topicAssignment.assignments().get(partitionId);
                    List<Integer> replicas = partitionAssignment.replicas();
                    List<Integer> syncReplicas = partitionAssignment.syncReplicas();
                    List<Integer> observers = partitionAssignment.observers();
                    List<Integer> isr = syncReplicas.stream().filter(this.clusterControl::isActive).collect(Collectors.toList());
                    if (isr.isEmpty()) {
                        return new ApiError(Errors.INVALID_REPLICATION_FACTOR, "Unable to replicate the partition " + replicationFactor + " time(s): All brokers are currently fenced or in controlled shutdown.");
                    }
                    Integer leader = this.leaderConsideringDemotions(isr, topic.name(), partitionId);
                    if (isMirrorTopic) {
                        newParts.put(partitionId, ReplicationControlManager.buildPartitionRegistration(replicas, observers, isr, leader, PartitionRegistration.LinkState.ACTIVE));
                        continue;
                    }
                    newParts.put(partitionId, ReplicationControlManager.buildPartitionRegistration(replicas, observers, isr, leader, PartitionRegistration.LinkState.NOT_MIRROR));
                }
            }
            catch (InvalidReplicationFactorException e) {
                return new ApiError(Errors.INVALID_REPLICATION_FACTOR, "Unable to replicate the partition " + replicationFactor + " time(s): " + e.getMessage());
            }
        }
        HashMap<Integer, List<Integer>> partitionIdToReplicasAdded = new HashMap<Integer, List<Integer>>();
        newParts.forEach((key, value) -> partitionIdToReplicasAdded.put((Integer)key, Replicas.toList(value.replicas)));
        ApiError error = this.maybeCheckCreateTopicPolicy(() -> new CreateTopicPolicy.RequestMetadata(topic.name(), null, null, partitionIdToReplicasAdded, creationConfigs));
        if (error.isFailure()) {
            return error;
        }
        int numPartitions = newParts.size();
        try {
            context.applyPartitionChangeQuota(numPartitions);
        }
        catch (ThrottlingQuotaExceededException e) {
            this.log.debug("Topic creation of {} partitions not allowed because quota is violated. Delay time: {}", (Object)numPartitions, (Object)e.throttleTimeMs());
            return ApiError.fromThrowable((Throwable)e);
        }
        Uuid topicId = Uuid.randomUuid();
        ArrayList mirrorTopicRecord = new ArrayList(1);
        ApiError clusterLinkError = this.mirrorTopicControl.maybeAddMirrorTopicRecord(topic, topicId, mirrorTopicRecord::add);
        if (clusterLinkError != ApiError.NONE) {
            return clusterLinkError;
        }
        CreateTopicsResponseData.CreatableTopicResult result = new CreateTopicsResponseData.CreatableTopicResult().setName(topic.name()).setTopicId(topicId).setErrorCode(Errors.NONE.code()).setErrorMessage(null);
        if (authorizedToReturnConfigs) {
            Map<String, ConfigEntry> effectiveConfig = this.configurationControl.computeEffectiveTopicConfigs(creationConfigs);
            ArrayList<String> configNames = new ArrayList<String>(effectiveConfig.keySet());
            configNames.sort(String::compareTo);
            for (String configName : configNames) {
                ConfigEntry entry = effectiveConfig.get(configName);
                result.configs().add(new CreateTopicsResponseData.CreatableTopicConfigs().setConfigName(entry.name()).setValue(entry.isSensitive() ? null : entry.value()).setReadOnly(entry.isReadOnly()).setConfigSource(KafkaConfigSchema.translateConfigSource(entry.source()).id()).setIsSensitive(entry.isSensitive()));
            }
            result.setNumPartitions(numPartitions);
            result.setReplicationFactor((short)((PartitionRegistration)newParts.values().iterator().next()).replicas.length);
            result.setTopicConfigErrorCode(Errors.NONE.code());
        } else {
            result.setTopicConfigErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code());
        }
        successes.put(topic.name(), result);
        records.add(new ApiMessageAndVersion((ApiMessage)new TopicRecord().setName(topic.name()).setTopicId(topicId), 0));
        records.addAll(mirrorTopicRecord);
        records.addAll(configRecords);
        for (Map.Entry partEntry : newParts.entrySet()) {
            int partitionIndex = (Integer)partEntry.getKey();
            PartitionRegistration info = (PartitionRegistration)partEntry.getValue();
            records.add(info.toRecord(topicId, partitionIndex, this.featureControl.metadataVersion().partitionRecordVersion()));
        }
        this.updateConfluentPartitionsPerTopicListener(topic.name(), newParts.size(), 1, partitionIdToReplicasAdded, Collections.emptyMap(), true);
        return ApiError.NONE;
    }

    private Integer leaderConsideringDemotions(List<Integer> isr, String topicName, Integer partition) {
        Integer leaderId = isr.get(0);
        Optional<Integer> firstUndemotedReplicaId = isr.stream().filter(b -> !this.isDemoted((Integer)b)).findFirst();
        if (firstUndemotedReplicaId.isPresent()) {
            Integer replica = firstUndemotedReplicaId.get();
            if (!leaderId.equals(replica)) {
                leaderId = replica;
                this.log.debug("Preferred leader of topic {} partition {} is demoted. Selecting broker {} as the current leader.", new Object[]{topicName, partition, leaderId});
            }
        } else {
            this.log.warn("All brokers specified in the manual partition assignment for topic {} partition {} are demoted. If this partition experiences unavailability, consider moving leadership to another broker. Broker {} is currently the leader.", new Object[]{topicName, partition, leaderId});
        }
        return leaderId;
    }

    private static PartitionRegistration buildPartitionRegistration(List<Integer> replicas, List<Integer> isr, Integer leader) {
        return new PartitionRegistration.Builder().setReplicas(Replicas.toArray(replicas)).setIsr(Replicas.toArray(isr)).setLeader(leader).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(0).build();
    }

    private static PartitionRegistration buildPartitionRegistration(List<Integer> replicas, List<Integer> observers, List<Integer> isr, Integer leader, PartitionRegistration.LinkState linkState) {
        return new PartitionRegistration.Builder().setReplicas(Replicas.toArray(replicas)).setObservers(Replicas.toArray(observers)).setIsr(Replicas.toArray(isr)).setLeader(leader).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(0).setLinkState(linkState).build();
    }

    private ApiError maybeCheckCreateTopicPolicy(Supplier<CreateTopicPolicy.RequestMetadata> supplier) {
        if (this.createTopicPolicy.isPresent()) {
            try {
                this.createTopicPolicy.get().validate(supplier.get());
            }
            catch (PolicyViolationException e) {
                return new ApiError(Errors.POLICY_VIOLATION, e.getMessage());
            }
        }
        return ApiError.NONE;
    }

    static void validateNewTopicNames(Map<String, ApiError> topicErrors, CreateTopicsRequestData.CreatableTopicCollection topics, Map<String, ? extends Set<String>> topicsWithCollisionChars) {
        for (CreateTopicsRequestData.CreatableTopic topic : topics) {
            String normalizedName;
            Set<String> colliding;
            if (topicErrors.containsKey(topic.name())) continue;
            try {
                Topic.validate((String)topic.name());
            }
            catch (InvalidTopicException e) {
                topicErrors.put(topic.name(), new ApiError(Errors.INVALID_TOPIC_EXCEPTION, e.getMessage()));
            }
            if (!Topic.hasCollisionChars((String)topic.name()) || (colliding = topicsWithCollisionChars.get(normalizedName = Topic.unifyCollisionChars((String)topic.name()))) == null) continue;
            topicErrors.put(topic.name(), new ApiError(Errors.INVALID_TOPIC_EXCEPTION, "Topic '" + topic.name() + "' collides with existing topic: " + colliding.iterator().next()));
        }
    }

    static Map<ConfigResource, Map<String, Map.Entry<AlterConfigOp.OpType, String>>> computeConfigChanges(Map<String, ApiError> topicErrors, Map<String, TopicPlacement> topicPlacements, CreateTopicsRequestData.CreatableTopicCollection topics, boolean isTopicPlacementSupported, Optional<TopicPlacement> defaultTopicPlacement) {
        HashMap<ConfigResource, Map<String, Map.Entry<AlterConfigOp.OpType, String>>> configChanges = new HashMap<ConfigResource, Map<String, Map.Entry<AlterConfigOp.OpType, String>>>();
        for (CreateTopicsRequestData.CreatableTopic topic : topics) {
            if (topicErrors.containsKey(topic.name())) continue;
            HashMap<String, Map.Entry<AlterConfigOp.OpType, String>> topicConfigs = new HashMap<String, Map.Entry<AlterConfigOp.OpType, String>>();
            ArrayList<String> nullConfigs = new ArrayList<String>();
            for (CreateTopicsRequestData.CreateableTopicConfig config : topic.configs()) {
                if (config.value() == null) {
                    nullConfigs.add(config.name());
                    continue;
                }
                topicConfigs.put(config.name(), new AbstractMap.SimpleImmutableEntry<AlterConfigOp.OpType, String>(AlterConfigOp.OpType.SET, config.value()));
            }
            ReplicationControlManager.computeTopicPlacementConfigChanges(topicErrors, topicPlacements, topic, isTopicPlacementSupported, defaultTopicPlacement, topicConfigs);
            if (topicErrors.containsKey(topic.name())) continue;
            if (!nullConfigs.isEmpty()) {
                topicErrors.put(topic.name(), new ApiError(Errors.INVALID_CONFIG, "Null value not supported for topic configs: " + String.join((CharSequence)",", nullConfigs)));
                continue;
            }
            if (topicConfigs.isEmpty()) continue;
            configChanges.put(new ConfigResource(ConfigResource.Type.TOPIC, topic.name()), topicConfigs);
        }
        return configChanges;
    }

    static void computeTopicPlacementConfigChanges(Map<String, ApiError> topicErrors, Map<String, TopicPlacement> topicPlacements, CreateTopicsRequestData.CreatableTopic topic, boolean isTopicPlacementSupported, Optional<TopicPlacement> defaultTopicPlacement, Map<String, Map.Entry<AlterConfigOp.OpType, String>> topicConfigs) {
        Optional<Object> topicPlacementOpt;
        Optional<String> topicPlacementStr;
        try {
            topicPlacementStr = ReplicationControlManager.getTopicPlacementFromRequest(isTopicPlacementSupported, topic.configs());
        }
        catch (InvalidRequestException e) {
            topicErrors.put(topic.name(), new ApiError(Errors.INVALID_REQUEST, e.getMessage()));
            return;
        }
        if (topicPlacementStr.isPresent()) {
            try {
                topicPlacementOpt = TopicPlacement.parse((String)topicPlacementStr.get());
            }
            catch (IllegalArgumentException e) {
                topicErrors.put(topic.name(), new ApiError(Errors.INVALID_CONFIG, "Invalid topic placement."));
                return;
            }
        } else {
            topicPlacementOpt = topic.replicationFactor() == -1 ? defaultTopicPlacement : Optional.empty();
        }
        topicPlacementOpt.ifPresent(topicPlacement -> {
            topicPlacements.put(topic.name(), (TopicPlacement)topicPlacement);
            topicConfigs.put("confluent.placement.constraints", new AbstractMap.SimpleImmutableEntry<AlterConfigOp.OpType, String>(AlterConfigOp.OpType.SET, topicPlacement.toJson()));
        });
    }

    Map<String, ResultOrError<Uuid>> findTopicIds(long offset, Collection<String> names) {
        HashMap<String, ResultOrError<Uuid>> results = new HashMap<String, ResultOrError<Uuid>>(names.size());
        for (String name : names) {
            if (name == null) {
                results.put(null, new ResultOrError(Errors.INVALID_REQUEST, "Invalid null topic name."));
                continue;
            }
            Uuid id = (Uuid)this.topicsByName.get((Object)name, offset);
            if (id == null) {
                results.put(name, new ResultOrError(new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION)));
                continue;
            }
            results.put(name, new ResultOrError<Uuid>(id));
        }
        return results;
    }

    Map<String, Uuid> findAllTopicIds(long offset) {
        HashMap<String, Uuid> result = new HashMap<String, Uuid>(this.topicsByName.size(offset));
        for (Map.Entry entry : this.topicsByName.entrySet(offset)) {
            result.put((String)entry.getKey(), (Uuid)entry.getValue());
        }
        return result;
    }

    Map<Uuid, ResultOrError<String>> findTopicNames(long offset, Collection<Uuid> ids) {
        HashMap<Uuid, ResultOrError<String>> results = new HashMap<Uuid, ResultOrError<String>>(ids.size());
        for (Uuid id : ids) {
            if (id == null || id.equals((Object)Uuid.ZERO_UUID)) {
                results.put(id, new ResultOrError(new ApiError(Errors.INVALID_REQUEST, "Attempt to find topic with invalid topicId " + id)));
                continue;
            }
            TopicControlInfo topic = (TopicControlInfo)this.topics.get((Object)id, offset);
            if (topic == null) {
                results.put(id, new ResultOrError(new ApiError(Errors.UNKNOWN_TOPIC_ID)));
                continue;
            }
            results.put(id, new ResultOrError<String>(topic.name));
        }
        return results;
    }

    ControllerResult<Map<Uuid, ApiError>> deleteTopics(ControllerRequestContext context, Collection<Uuid> ids) {
        HashMap<Uuid, ApiError> results = new HashMap<Uuid, ApiError>(ids.size());
        BoundedList records = BoundedList.newArrayBacked((int)10000, (int)ids.size());
        for (Uuid id : ids) {
            try {
                this.deleteTopic(context, id, (List<ApiMessageAndVersion>)records);
                results.put(id, ApiError.NONE);
            }
            catch (ApiException e) {
                results.put(id, ApiError.fromThrowable((Throwable)e));
            }
            catch (Exception e) {
                this.log.error("Unexpected deleteTopics error for {}", (Object)id, (Object)e);
                results.put(id, ApiError.fromThrowable((Throwable)e));
            }
        }
        return ControllerResult.atomicOf((List<ApiMessageAndVersion>)records, results);
    }

    void deleteTopic(ControllerRequestContext context, Uuid id, List<ApiMessageAndVersion> records) {
        TopicControlInfo topic = (TopicControlInfo)this.topics.get((Object)id);
        if (topic == null) {
            throw new UnknownTopicIdException(Errors.UNKNOWN_TOPIC_ID.message());
        }
        int numPartitions = topic.parts.size();
        try {
            context.applyPartitionChangeQuota(numPartitions);
        }
        catch (ThrottlingQuotaExceededException e) {
            this.log.debug("Topic deletion of {} partitions not allowed because quota is violated. Delay time: {}", (Object)numPartitions, (Object)e.throttleTimeMs());
            throw e;
        }
        records.add(new ApiMessageAndVersion((ApiMessage)new RemoveTopicRecord().setTopicId(id), 0));
    }

    PartitionRegistration getPartition(Uuid topicId, int partitionId) {
        TopicControlInfo topic = (TopicControlInfo)this.topics.get((Object)topicId);
        if (topic == null) {
            return null;
        }
        return (PartitionRegistration)topic.parts.get((Object)partitionId);
    }

    TopicControlInfo getTopic(Uuid topicId) {
        return (TopicControlInfo)this.topics.get((Object)topicId);
    }

    Uuid getTopicId(String name) {
        return (Uuid)this.topicsByName.get((Object)name);
    }

    Set<String> getAllTopicNames() {
        return this.topicsByName.keySet();
    }

    BrokersToIsrs brokersToIsrs() {
        return this.brokersToIsrs;
    }

    Set<TopicIdPartition> imbalancedPartitions() {
        HashSet<TopicIdPartition> imbalancedPartitions = new HashSet<TopicIdPartition>();
        imbalancedPartitions.addAll((Collection<TopicIdPartition>)this.imbalancedInternalPartitions);
        imbalancedPartitions.addAll((Collection<TopicIdPartition>)this.imbalancedExternalPartitions);
        return imbalancedPartitions;
    }

    ControllerResult<AlterPartitionResponseData> alterPartition(ControllerRequestContext context, AlterPartitionRequestData request) {
        short requestVersion = context.requestHeader().requestApiVersion();
        this.clusterControl.checkBrokerEpoch(request.brokerId(), request.brokerEpoch());
        AlterPartitionResponseData response = new AlterPartitionResponseData();
        ArrayList<ApiMessageAndVersion> records = new ArrayList<ApiMessageAndVersion>();
        for (AlterPartitionRequestData.TopicData topicData : request.topics()) {
            Uuid topicId;
            AlterPartitionResponseData.TopicData responseTopicData = new AlterPartitionResponseData.TopicData().setTopicName(topicData.topicName()).setTopicId(topicData.topicId());
            response.topics().add(responseTopicData);
            Uuid uuid = topicId = requestVersion > 1 ? topicData.topicId() : (Uuid)this.topicsByName.get((Object)topicData.topicName());
            if (topicId == null || topicId.equals((Object)Uuid.ZERO_UUID) || !this.topics.containsKey((Object)topicId)) {
                Errors error = requestVersion > 1 ? Errors.UNKNOWN_TOPIC_ID : Errors.UNKNOWN_TOPIC_OR_PARTITION;
                for (AlterPartitionRequestData.PartitionData partitionData : topicData.partitions()) {
                    responseTopicData.partitions().add(new AlterPartitionResponseData.PartitionData().setPartitionIndex(partitionData.partitionIndex()).setErrorCode(error.code()));
                }
                this.log.info("Rejecting AlterPartition request for unknown topic ID {} or name {}.", (Object)topicData.topicId(), (Object)topicData.topicName());
                continue;
            }
            TopicControlInfo topic = (TopicControlInfo)this.topics.get((Object)topicId);
            Map<Integer, Set<DegradedBrokerHealthState>> activeDegradations = this.clusterControl.activeBrokerComponentDegradations();
            for (AlterPartitionRequestData.PartitionData partitionData : topicData.partitions()) {
                if (requestVersion < 3) {
                    partitionData.setNewIsrWithEpochs(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs((List)partitionData.newIsr()));
                }
                int partitionId = partitionData.partitionIndex();
                PartitionRegistration partition = (PartitionRegistration)topic.parts.get((Object)partitionId);
                Errors validationError = this.validateAlterPartitionData(request.brokerId(), topic, partitionId, partition, context.requestHeader().requestApiVersion(), partitionData);
                if (validationError != Errors.NONE) {
                    responseTopicData.partitions().add(new AlterPartitionResponseData.PartitionData().setPartitionIndex(partitionId).setErrorCode(validationError.code()));
                    continue;
                }
                PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topic.id, partitionId, this.clusterControl::isActive, this.featureControl.metadataVersion(), activeDegradations, this.getTopicEffectiveMinIsr(topic.name));
                builder.setZkMigrationEnabled(this.clusterControl.zkRegistrationAllowed());
                if (this.configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name())) {
                    builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
                }
                builder.setTargetIsrWithBrokerStates(partitionData.newIsrWithEpochs());
                builder.setTargetLeaderRecoveryState(LeaderRecoveryState.of(partitionData.leaderRecoveryState()));
                this.buildClusterLinkState(topicId, partitionId, partitionData.clusterLinkState(), builder);
                Optional<ApiMessageAndVersion> record = builder.build();
                if (record.isPresent()) {
                    records.add(record.get());
                    if (partitionData.clusterLinkState().linkFailed()) {
                        this.mirrorTopicControl.failMirrorTopic(topicId, partitionData.mirrorTopicError(), records::add);
                    }
                    PartitionChangeRecord change = (PartitionChangeRecord)record.get().message();
                    partition = partition.merge(change);
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Node {} has altered ISR for {}-{} to {}.", new Object[]{request.brokerId(), topic.name, partitionId, change.isr()});
                    }
                    if (change.leader() != request.brokerId() && change.leader() != -2) {
                        Errors error = requestVersion > 1 ? Errors.NEW_LEADER_ELECTED : Errors.FENCED_LEADER_EPOCH;
                        this.log.info("AlterPartition request from node {} for {}-{} completed the ongoing partition reassignment and triggered a leadership change. Returning {}.", new Object[]{request.brokerId(), topic.name, partitionId, error});
                        responseTopicData.partitions().add(new AlterPartitionResponseData.PartitionData().setPartitionIndex(partitionId).setErrorCode(error.code()));
                        continue;
                    }
                    if (PartitionChangeBuilder.completedReassignment(change)) {
                        this.log.info("AlterPartition request from node {} for {}-{} completed the ongoing partition reassignment.", new Object[]{request.brokerId(), topic.name, partitionId});
                    }
                }
                responseTopicData.partitions().add(new AlterPartitionResponseData.PartitionData().setPartitionIndex(partitionId).setErrorCode(Errors.NONE.code()).setLeaderId(partition.leader).setIsr(Replicas.toList(partition.isr)).setLeaderRecoveryState(partition.leaderRecoveryState.value()).setLeaderEpoch(partition.leaderEpoch).setPartitionEpoch(partition.partitionEpoch));
            }
        }
        return ControllerResult.of(records, response);
    }

    private void buildClusterLinkState(Uuid topicId, int partitionId, AlterPartitionRequestData.ClusterLinkState clusterLinkState, PartitionChangeBuilder builder) {
        if (this.mirrorTopicControl.isMirrorTopic(topicId)) {
            builder.setLinkedLeaderEpoch(clusterLinkState.linkedLeaderEpoch());
            builder.setLinkFailed(clusterLinkState.linkFailed());
        } else if (clusterLinkState.linkedLeaderEpoch() != -1 || clusterLinkState.linkFailed()) {
            this.log.error("Ignoring cluster link data for non-mirrored topic {} partition {}.", (Object)topicId, (Object)partitionId);
        }
    }

    void unlinkMirrorTopic(Uuid topicId) {
        TopicControlInfo topic = (TopicControlInfo)this.topics.get((Object)topicId);
        Iterator iterator = topic.parts.keySet().iterator();
        while (iterator.hasNext()) {
            int partitionId = (Integer)iterator.next();
            topic.parts.computeIfPresent((Object)partitionId, (__, prevPartition) -> prevPartition.unlink());
        }
    }

    private Errors validateAlterPartitionData(int brokerId, TopicControlInfo topic, int partitionId, PartitionRegistration partition, short requestApiVersion, AlterPartitionRequestData.PartitionData partitionData) {
        if (partition == null) {
            this.log.info("Rejecting AlterPartition request for unknown partition {}-{}.", (Object)topic.name, (Object)partitionId);
            return Errors.UNKNOWN_TOPIC_OR_PARTITION;
        }
        if (partitionData.leaderEpoch() > partition.leaderEpoch) {
            this.log.debug("Rejecting AlterPartition request from node {} for {}-{} because the current leader epoch is {}, which is greater than the local value {}.", new Object[]{brokerId, topic.name, partitionId, partition.leaderEpoch, partitionData.leaderEpoch()});
            return Errors.NOT_CONTROLLER;
        }
        if (partitionData.partitionEpoch() > partition.partitionEpoch) {
            this.log.debug("Rejecting AlterPartition request from node {} for {}-{} because the current partition epoch is {}, which is greater than the local value {}.", new Object[]{brokerId, topic.name, partitionId, partition.partitionEpoch, partitionData.partitionEpoch()});
            return Errors.NOT_CONTROLLER;
        }
        if (partitionData.leaderEpoch() < partition.leaderEpoch) {
            this.log.debug("Rejecting AlterPartition request from node {} for {}-{} because the current leader epoch is {}, not {}.", new Object[]{brokerId, topic.name, partitionId, partition.leaderEpoch, partitionData.leaderEpoch()});
            return Errors.FENCED_LEADER_EPOCH;
        }
        if (brokerId != partition.leader) {
            this.log.info("Rejecting AlterPartition request from node {} for {}-{} because the current leader is {}.", new Object[]{brokerId, topic.name, partitionId, partition.leader});
            return Errors.INVALID_REQUEST;
        }
        if (partitionData.partitionEpoch() < partition.partitionEpoch) {
            this.log.info("Rejecting AlterPartition request from node {} for {}-{} because the current partition epoch is {}, not {}.", new Object[]{brokerId, topic.name, partitionId, partition.partitionEpoch, partitionData.partitionEpoch()});
            return Errors.INVALID_UPDATE_VERSION;
        }
        int[] newIsr = partitionData.newIsrWithEpochs().stream().mapToInt(brokerState -> brokerState.brokerId()).toArray();
        if (!Replicas.validateIsr(partition.replicas, newIsr)) {
            this.log.error("Rejecting AlterPartition request from node {} for {}-{} because it specified an invalid ISR {}.", new Object[]{brokerId, topic.name, partitionId, partitionData.newIsrWithEpochs()});
            return Errors.INVALID_REQUEST;
        }
        if (!Replicas.contains(newIsr, partition.leader)) {
            this.log.error("Rejecting AlterPartition request from node {} for {}-{} because it specified an invalid ISR {} that doesn't include itself.", new Object[]{brokerId, topic.name, partitionId, partitionData.newIsrWithEpochs()});
            return Errors.INVALID_REQUEST;
        }
        LeaderRecoveryState leaderRecoveryState = LeaderRecoveryState.of(partitionData.leaderRecoveryState());
        if (partitionData.isUnclean()) {
            leaderRecoveryState = LeaderRecoveryState.RECOVERING;
        }
        if (leaderRecoveryState == LeaderRecoveryState.RECOVERING && newIsr.length > 1) {
            this.log.info("Rejecting AlterPartition request from node {} for {}-{} because the ISR {} had more than one replica while the leader was still recovering from an unclean leader election {}.", new Object[]{brokerId, topic.name, partitionId, partitionData.newIsrWithEpochs(), leaderRecoveryState});
            return Errors.INVALID_REQUEST;
        }
        if (partition.leaderRecoveryState == LeaderRecoveryState.RECOVERED && leaderRecoveryState == LeaderRecoveryState.RECOVERING) {
            this.log.info("Rejecting AlterPartition request from node {} for {}-{} because the leader recovery state cannot change from RECOVERED to RECOVERING.", new Object[]{brokerId, topic.name, partitionId});
            return Errors.INVALID_REQUEST;
        }
        List<IneligibleReplica> ineligibleReplicas = this.ineligibleReplicasForIsr(partitionData.newIsrWithEpochs());
        if (!ineligibleReplicas.isEmpty()) {
            this.log.info("Rejecting AlterPartition request from node {} for {}-{} because it specified ineligible replicas {} in the new ISR {}.", new Object[]{brokerId, topic.name, partitionId, ineligibleReplicas, partitionData.newIsrWithEpochs()});
            if (requestApiVersion > 1) {
                return Errors.INELIGIBLE_REPLICA;
            }
            return Errors.OPERATION_NOT_ATTEMPTED;
        }
        return Errors.NONE;
    }

    private List<IneligibleReplica> ineligibleReplicasForIsr(List<AlterPartitionRequestData.BrokerState> brokerStates) {
        ArrayList<IneligibleReplica> ineligibleReplicas = new ArrayList<IneligibleReplica>(0);
        for (AlterPartitionRequestData.BrokerState brokerState : brokerStates) {
            int brokerId = brokerState.brokerId();
            BrokerRegistration registration = this.clusterControl.registration(brokerId);
            if (registration == null) {
                ineligibleReplicas.add(new IneligibleReplica(brokerId, "not registered"));
                continue;
            }
            if (registration.inControlledShutdown()) {
                ineligibleReplicas.add(new IneligibleReplica(brokerId, "shutting down"));
                continue;
            }
            if (registration.fenced()) {
                ineligibleReplicas.add(new IneligibleReplica(brokerId, "fenced"));
                continue;
            }
            if (brokerState.brokerEpoch() == -1L || registration.epoch() == brokerState.brokerEpoch()) continue;
            ineligibleReplicas.add(new IneligibleReplica(brokerId, "broker epoch mismatch: requested=" + brokerState.brokerEpoch() + " VS expected=" + registration.epoch()));
        }
        return ineligibleReplicas;
    }

    void handleBrokerFenced(int brokerId, List<ApiMessageAndVersion> records) {
        BrokerRegistration brokerRegistration = this.clusterControl.brokerRegistrations().get(brokerId);
        if (brokerRegistration == null) {
            throw new RuntimeException("Can't find broker registration for broker " + brokerId);
        }
        this.generateLeaderAndIsrUpdates("handleBrokerFenced", brokerId, -1, records, this.brokersToIsrs.partitionsWithBrokerInIsr(brokerId), Integer.MAX_VALUE);
        if (this.featureControl.metadataVersion().isBrokerRegistrationChangeRecordSupported()) {
            records.add(new ApiMessageAndVersion((ApiMessage)new BrokerRegistrationChangeRecord().setBrokerId(brokerId).setBrokerEpoch(brokerRegistration.epoch()).setFenced(BrokerRegistrationFencingChange.FENCE.value()), 0));
        } else {
            records.add(new ApiMessageAndVersion((ApiMessage)new FenceBrokerRecord().setId(brokerId).setEpoch(brokerRegistration.epoch()), 0));
        }
    }

    void handleBrokerUnregistered(int brokerId, long brokerEpoch, List<ApiMessageAndVersion> records) {
        this.generateLeaderAndIsrUpdates("handleBrokerUnregistered", brokerId, -1, records, this.brokersToIsrs.partitionsWithBrokerInIsr(brokerId), Integer.MAX_VALUE);
        records.add(new ApiMessageAndVersion((ApiMessage)new UnregisterBrokerRecord().setBrokerId(brokerId).setBrokerEpoch(brokerEpoch), 0));
    }

    void handleBrokerUnfenced(int brokerId, long brokerEpoch, List<ApiMessageAndVersion> records) {
        if (this.featureControl.metadataVersion().isBrokerRegistrationChangeRecordSupported()) {
            records.add(new ApiMessageAndVersion((ApiMessage)new BrokerRegistrationChangeRecord().setBrokerId(brokerId).setBrokerEpoch(brokerEpoch).setFenced(BrokerRegistrationFencingChange.UNFENCE.value()), 0));
        } else {
            records.add(new ApiMessageAndVersion((ApiMessage)new UnfenceBrokerRecord().setId(brokerId).setEpoch(brokerEpoch), 0));
        }
        this.generateLeaderAndIsrUpdates("handleBrokerUnfenced", -1, brokerId, records, this.brokersToIsrs.partitionsWithNoLeader(), Integer.MAX_VALUE);
    }

    void handleBrokerInControlledShutdown(int brokerId, long brokerEpoch, List<ApiMessageAndVersion> records) {
        if (this.featureControl.metadataVersion().isInControlledShutdownStateSupported() && !this.clusterControl.inControlledShutdown(brokerId)) {
            records.add(new ApiMessageAndVersion((ApiMessage)new BrokerRegistrationChangeRecord().setBrokerId(brokerId).setBrokerEpoch(brokerEpoch).setInControlledShutdown(BrokerRegistrationInControlledShutdownChange.IN_CONTROLLED_SHUTDOWN.value()), 1));
        }
    }

    ControllerResult<Void> partitionChangesBecauseOfControlledShutdown() {
        ArrayList<ApiMessageAndVersion> records = new ArrayList<ApiMessageAndVersion>();
        Optional<BrokerHeartbeatManager.BrokerHeartbeatState> firstQualifiedBroker = this.clusterControl.heartbeatManager().shuttingDownBrokersWithPendingPartitionChanges().findFirst();
        if (firstQualifiedBroker.isPresent()) {
            BrokerHeartbeatManager.BrokerHeartbeatState state = firstQualifiedBroker.get();
            int brokerId = state.id();
            this.generateLeaderAndIsrUpdates("partitionChangeInControlledShutdownForBroker[" + brokerId + "]", brokerId, -1, records, this.brokersToIsrs.partitionsWithBrokerInIsr(brokerId), this.maxPartitionChangesPerSlice);
        }
        return ControllerResult.of(records, null);
    }

    ControllerResult<ElectLeadersResponseData> electLeaders(ElectLeadersRequestData request) {
        ElectionType electionType = ReplicationControlManager.electionType(request.electionType());
        BoundedList records = BoundedList.newArrayBacked((int)10000);
        ElectLeadersResponseData response = new ElectLeadersResponseData();
        if (request.topicPartitions() == null) {
            for (Map.Entry topicEntry : this.topicsByName.entrySet()) {
                String topicName = (String)topicEntry.getKey();
                ElectLeadersResponseData.ReplicaElectionResult topicResults = new ElectLeadersResponseData.ReplicaElectionResult().setTopic(topicName);
                response.replicaElectionResults().add(topicResults);
                TopicControlInfo topic = (TopicControlInfo)this.topics.get(topicEntry.getValue());
                if (topic == null) continue;
                Iterator iterator = topic.parts.keySet().iterator();
                while (iterator.hasNext()) {
                    int partitionId = (Integer)iterator.next();
                    ApiError error = this.electLeader(topicName, partitionId, electionType, (List<ApiMessageAndVersion>)records);
                    if (error.error() == Errors.ELECTION_NOT_NEEDED) continue;
                    topicResults.partitionResult().add(new ElectLeadersResponseData.PartitionResult().setPartitionId(partitionId).setErrorCode(error.error().code()).setErrorMessage(error.message()));
                }
            }
        } else {
            for (ElectLeadersRequestData.TopicPartitions topic : request.topicPartitions()) {
                ElectLeadersResponseData.ReplicaElectionResult topicResults = new ElectLeadersResponseData.ReplicaElectionResult().setTopic(topic.topic());
                response.replicaElectionResults().add(topicResults);
                Iterator iterator = topic.partitions().iterator();
                while (iterator.hasNext()) {
                    int partitionId = (Integer)iterator.next();
                    ApiError error = this.electLeader(topic.topic(), partitionId, electionType, (List<ApiMessageAndVersion>)records);
                    topicResults.partitionResult().add(new ElectLeadersResponseData.PartitionResult().setPartitionId(partitionId).setErrorCode(error.error().code()).setErrorMessage(error.message()));
                }
            }
        }
        return ControllerResult.of((List<ApiMessageAndVersion>)records, response);
    }

    private static ElectionType electionType(byte electionType) {
        try {
            return ElectionType.valueOf((byte)electionType);
        }
        catch (IllegalArgumentException e) {
            throw new InvalidRequestException("Unknown election type " + electionType);
        }
    }

    ApiError electLeader(String topic, int partitionId, ElectionType electionType, List<ApiMessageAndVersion> records) {
        Uuid topicId = (Uuid)this.topicsByName.get((Object)topic);
        if (topicId == null) {
            return new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "No such topic as " + topic);
        }
        TopicControlInfo topicInfo = (TopicControlInfo)this.topics.get((Object)topicId);
        if (topicInfo == null) {
            return new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "No such topic id as " + topicId);
        }
        PartitionRegistration partition = (PartitionRegistration)topicInfo.parts.get((Object)partitionId);
        if (partition == null) {
            return new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "No such partition as " + topic + "-" + partitionId);
        }
        if (electionType == ElectionType.PREFERRED && partition.hasPreferredLeader() || electionType == ElectionType.UNCLEAN && partition.hasLeader()) {
            return new ApiError(Errors.ELECTION_NOT_NEEDED);
        }
        PartitionChangeBuilder.Election election = PartitionChangeBuilder.Election.PREFERRED;
        if (electionType == ElectionType.UNCLEAN) {
            election = PartitionChangeBuilder.Election.UNCLEAN;
        }
        PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicId, partitionId, this.clusterControl::isActive, this.featureControl.metadataVersion(), this.clusterControl.activeBrokerComponentDegradations(), this.getTopicEffectiveMinIsr(topic));
        builder.setElection(election).setZkMigrationEnabled(this.clusterControl.zkRegistrationAllowed());
        Optional<ApiMessageAndVersion> record = builder.build();
        if (!record.isPresent()) {
            if (electionType == ElectionType.PREFERRED) {
                return new ApiError(Errors.PREFERRED_LEADER_NOT_AVAILABLE);
            }
            return new ApiError(Errors.ELIGIBLE_LEADERS_NOT_AVAILABLE);
        }
        records.add(record.get());
        return ApiError.NONE;
    }

    ControllerResult<BrokerHeartbeatReply> processBrokerHeartbeat(BrokerHeartbeatRequestData request, long registerBrokerRecordOffset) {
        int brokerId = request.brokerId();
        long brokerEpoch = request.brokerEpoch();
        this.clusterControl.checkBrokerEpoch(brokerId, brokerEpoch);
        BrokerHeartbeatManager heartbeatManager = this.clusterControl.heartbeatManager();
        BrokerControlStates states = heartbeatManager.calculateNextBrokerState(brokerId, request, registerBrokerRecordOffset, () -> this.brokersToIsrs.hasLeaderships(brokerId));
        if (request.wantShutDown()) {
            heartbeatManager.markBrokerInControlledShutdown(brokerId);
        }
        ArrayList<ApiMessageAndVersion> records = new ArrayList<ApiMessageAndVersion>();
        if (states.current() != states.next()) {
            switch (states.next()) {
                case FENCED: {
                    this.handleBrokerFenced(brokerId, records);
                    break;
                }
                case UNFENCED: {
                    this.handleBrokerUnfenced(brokerId, brokerEpoch, records);
                    break;
                }
                case CONTROLLED_SHUTDOWN: {
                    this.handleBrokerInControlledShutdown(brokerId, brokerEpoch, records);
                    break;
                }
                case SHUTDOWN_NOW: {
                    this.handleBrokerFenced(brokerId, records);
                }
            }
        }
        heartbeatManager.touch(brokerId, states.next().fenced(), request.currentMetadataOffset());
        boolean isCaughtUp = request.currentMetadataOffset() >= registerBrokerRecordOffset;
        BrokerHeartbeatReply reply = new BrokerHeartbeatReply(isCaughtUp, states.next().fenced(), states.next().inControlledShutdown(), states.next().shouldShutDown());
        return ControllerResult.of(records, reply);
    }

    void processExpiredBrokerHeartbeat(BrokerHeartbeatRequestData request) {
        int brokerId = request.brokerId();
        this.clusterControl.checkBrokerEpoch(brokerId, request.brokerEpoch());
        this.clusterControl.heartbeatManager().touch(brokerId, this.clusterControl.brokerRegistrations().get(brokerId).fenced(), request.currentMetadataOffset());
        this.log.error("processExpiredBrokerHeartbeat: controller event queue overloaded. Timed out heartbeat from broker {}.", (Object)brokerId);
    }

    public ControllerResult<Void> unregisterBroker(int brokerId) {
        BrokerRegistration registration = this.clusterControl.brokerRegistrations().get(brokerId);
        if (registration == null) {
            throw new BrokerIdNotRegisteredException("Broker ID " + brokerId + " is not currently registered");
        }
        BoundedList records = BoundedList.newArrayBacked((int)10000);
        this.handleBrokerUnregistered(brokerId, registration.epoch(), (List<ApiMessageAndVersion>)records);
        return ControllerResult.of((List<ApiMessageAndVersion>)records, null);
    }

    ControllerResult<Void> maybeFenceOneStaleBroker() {
        ArrayList<ApiMessageAndVersion> records = new ArrayList<ApiMessageAndVersion>();
        BrokerHeartbeatManager heartbeatManager = this.clusterControl.heartbeatManager();
        heartbeatManager.findOneStaleBroker().ifPresent(brokerId -> {
            this.log.info("Fencing broker {} because its session has timed out.", brokerId);
            this.handleBrokerFenced((int)brokerId, (List<ApiMessageAndVersion>)records);
            heartbeatManager.fence((int)brokerId);
        });
        return ControllerResult.of(records, null);
    }

    boolean arePartitionLeadersImbalanced() {
        return !this.imbalancedInternalPartitions.isEmpty() || !this.imbalancedExternalPartitions.isEmpty();
    }

    ControllerResult<Boolean> maybeBalancePartitionLeaders() {
        ArrayList<ApiMessageAndVersion> records = new ArrayList<ApiMessageAndVersion>();
        Map<Integer, Set<DegradedBrokerHealthState>> activeDegradations = this.clusterControl.activeBrokerComponentDegradations();
        boolean rescheduleImmediately = false;
        TimelineHashSet<TopicIdPartition> imbalancedPartitions = this.imbalancedExternalPartitions;
        if (!this.imbalancedInternalPartitions.isEmpty()) {
            imbalancedPartitions = this.imbalancedInternalPartitions;
            rescheduleImmediately = true;
        }
        for (TopicIdPartition topicPartition : imbalancedPartitions) {
            if (records.size() >= this.maxElectionsPerImbalance) {
                rescheduleImmediately = true;
                break;
            }
            TopicControlInfo topic = (TopicControlInfo)this.topics.get((Object)topicPartition.topicId());
            if (topic == null) {
                this.log.error("Skipping unknown imbalanced topic {}", (Object)topicPartition);
                continue;
            }
            PartitionRegistration partition = (PartitionRegistration)topic.parts.get((Object)topicPartition.partitionId());
            if (partition == null) {
                this.log.error("Skipping unknown imbalanced partition {}", (Object)topicPartition);
                continue;
            }
            PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicPartition.topicId(), topicPartition.partitionId(), this.clusterControl::isActive, this.featureControl.metadataVersion(), activeDegradations, this.getTopicEffectiveMinIsr(topic.name));
            builder.setElection(PartitionChangeBuilder.Election.PREFERRED).setZkMigrationEnabled(this.clusterControl.zkRegistrationAllowed());
            builder.build().ifPresent(records::add);
        }
        return ControllerResult.of(records, rescheduleImmediately);
    }

    ControllerResult<Void> tryUnelectDemotedLeaders() {
        Map<Integer, Set<DegradedBrokerHealthState>> activeDegradations = this.clusterControl.activeBrokerComponentDegradations();
        ArrayList<ApiMessageAndVersion> records = new ArrayList<ApiMessageAndVersion>();
        for (Integer degradedBrokerId : activeDegradations.keySet()) {
            this.tryOnlineElection(this.brokersToIsrs.partitionsLedByBroker(degradedBrokerId), activeDegradations, records);
        }
        return ControllerResult.of(records, null);
    }

    ControllerResult<Void> tryReelectPromotedLeaders(List<Integer> brokerIds) {
        Map<Integer, Set<DegradedBrokerHealthState>> activeDegradations = this.clusterControl.activeBrokerComponentDegradations();
        ArrayList<ApiMessageAndVersion> records = new ArrayList<ApiMessageAndVersion>();
        for (Integer brokerId : brokerIds) {
            this.tryOnlineElection(this.brokersToIsrs.partitionsWithBrokerInIsr(brokerId), activeDegradations, records);
        }
        return ControllerResult.of(records, null);
    }

    private void tryOnlineElection(Iterator<TopicIdPartition> partitions, Map<Integer, Set<DegradedBrokerHealthState>> activeDegradations, List<ApiMessageAndVersion> records) {
        while (partitions.hasNext()) {
            TopicIdPartition topicPartition = partitions.next();
            PartitionRegistration partition = this.getPartition(topicPartition.topicId(), topicPartition.partitionId());
            if (partition == null) continue;
            PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicPartition.topicId(), topicPartition.partitionId(), this.clusterControl::isActive, this.featureControl.metadataVersion(), activeDegradations, 2);
            builder.setElection(PartitionChangeBuilder.Election.PREFERRED).setZkMigrationEnabled(this.clusterControl.zkRegistrationAllowed());
            builder.build().ifPresent(records::add);
        }
    }

    ControllerResult<List<CreatePartitionsResponseData.CreatePartitionsTopicResult>> createPartitions(ControllerRequestContext context, List<CreatePartitionsRequestData.CreatePartitionsTopic> topics) {
        this.clearConfluentPartitionsPerTopicListenerPendingState();
        BoundedList records = BoundedList.newArrayBacked((int)10000);
        BoundedList results = BoundedList.newArrayBacked((int)10000);
        for (CreatePartitionsRequestData.CreatePartitionsTopic topic : topics) {
            ApiError apiError = ApiError.NONE;
            try {
                this.createPartitions(context, topic, (List<ApiMessageAndVersion>)records);
            }
            catch (ApiException e) {
                apiError = ApiError.fromThrowable((Throwable)e);
            }
            catch (Exception e) {
                this.log.error("Unexpected createPartitions error for {}", (Object)topic, (Object)e);
                apiError = ApiError.fromThrowable((Throwable)e);
            }
            results.add(new CreatePartitionsResponseData.CreatePartitionsTopicResult().setName(topic.name()).setErrorCode(apiError.error().code()).setErrorMessage(apiError.message()));
        }
        return ControllerResult.atomicOf((List<ApiMessageAndVersion>)records, results);
    }

    static Optional<String> getTopicPlacementFromRequest(boolean isTopicPlacementSupported, CreateTopicsRequestData.CreateableTopicConfigCollection collection) {
        CreateTopicsRequestData.CreateableTopicConfig topicPlacement = collection.find("confluent.placement.constraints");
        if (topicPlacement == null) {
            return Optional.empty();
        }
        String topicPlacementStr = topicPlacement.value();
        if (topicPlacementStr != null && !topicPlacementStr.isEmpty() && !isTopicPlacementSupported) {
            throw new InvalidRequestException("Topic placement is not supported.");
        }
        if (topicPlacementStr.isEmpty()) {
            return Optional.empty();
        }
        return Optional.ofNullable(topicPlacementStr);
    }

    void createPartitions(ControllerRequestContext context, CreatePartitionsRequestData.CreatePartitionsTopic topic, List<ApiMessageAndVersion> records) {
        ApiError error;
        List<Object> isrs;
        List<Object> partitionAssignments;
        Uuid topicId = (Uuid)this.topicsByName.get((Object)topic.name());
        if (topicId == null) {
            throw new UnknownTopicOrPartitionException();
        }
        if (this.mirrorTopicControl.isMirrorTopic(topicId) && topic.assignments() != null) {
            throw new InvalidRequestException("Partition assignments specified for mirror topic " + topic.name() + " with id " + topicId);
        }
        TopicControlInfo topicInfo = (TopicControlInfo)this.topics.get((Object)topicId);
        if (topicInfo == null) {
            throw new UnknownTopicOrPartitionException();
        }
        if (topic.count() == topicInfo.parts.size()) {
            throw new InvalidPartitionsException("Topic already has " + topicInfo.parts.size() + " partition(s).");
        }
        if (topic.count() < topicInfo.parts.size()) {
            throw new InvalidPartitionsException("The topic " + topic.name() + " currently has " + topicInfo.parts.size() + " partition(s); " + topic.count() + " would not be an increase.");
        }
        int additional = topic.count() - topicInfo.parts.size();
        Optional<TopicPlacement> topicPlacement = this.existingTopicPlacementConfig(topic.name());
        if (topic.assignments() != null) {
            if (topic.assignments().size() != additional) {
                throw new InvalidReplicaAssignmentException("Attempted to add " + additional + " additional partition(s), but only " + topic.assignments().size() + " assignment(s) were specified.");
            }
            if (topicPlacement.isPresent()) {
                throw new InvalidRequestException("A manual partition assignment was specified, but a topic placement " + topicPlacement.get().toJson() + " exists for the topic.");
            }
        }
        try {
            context.applyPartitionChangeQuota(additional);
        }
        catch (ThrottlingQuotaExceededException e) {
            this.log.debug("Partition creation of {} partitions not allowed because quota is violated. Delay time: {}", (Object)additional, (Object)e.throttleTimeMs());
            throw e;
        }
        try {
            context.applyPartitionChangeQuota(additional);
        }
        catch (ThrottlingQuotaExceededException e) {
            this.log.debug("Partition creation of {} partitions not allowed because quota is violated. Delay time: {}", (Object)additional, (Object)e.throttleTimeMs());
            throw e;
        }
        Iterator iterator = topicInfo.parts.values().iterator();
        if (!iterator.hasNext()) {
            throw new UnknownServerException("Invalid state: topic " + topic.name() + " appears to have no partitions.");
        }
        PartitionRegistration partitionInfo = (PartitionRegistration)iterator.next();
        if (partitionInfo.replicas.length > Short.MAX_VALUE) {
            throw new UnknownServerException("Invalid replication factor " + partitionInfo.replicas.length + ": expected a number equal to less than " + Short.MAX_VALUE);
        }
        Set<Integer> excludedBrokerIds = this.clusterControl.activeBrokerReplicaExclusions().keySet();
        short replicationFactor = (short)partitionInfo.replicas.length;
        int startPartitionId = topicInfo.parts.size();
        Optional<MirrorTopic> mirrorTopicState = this.mirrorTopicControl.mirrorTopic(topicId);
        PartitionRegistration.LinkState linkState = this.getLinkState(mirrorTopicState);
        Optional tenantIdOpt = TenantUtils.extractTenantId(Collections.singletonList(topic.name()));
        int cellId = -1;
        if (this.tenantControl.isTenantCellPlacementEnabled(Optional.ofNullable(context.principal()))) {
            cellId = tenantIdOpt.map(this.tenantControl::getTenantCellId).orElse(-1);
        }
        if (topic.assignments() != null) {
            partitionAssignments = new ArrayList();
            isrs = new ArrayList();
            for (int i = 0; i < topic.assignments().size(); ++i) {
                CreatePartitionsRequestData.CreatePartitionsAssignment assignment = (CreatePartitionsRequestData.CreatePartitionsAssignment)topic.assignments().get(i);
                this.validateManualPartitionAssignment(new PartitionAssignment(assignment.brokerIds(), Collections.emptyList()), OptionalInt.of(replicationFactor), excludedBrokerIds, cellId, Optional.empty());
                partitionAssignments.add(new PartitionAssignment(assignment.brokerIds()));
                List isr = assignment.brokerIds().stream().filter(this.clusterControl::isActive).collect(Collectors.toList());
                if (isr.isEmpty()) {
                    throw new InvalidReplicaAssignmentException("All brokers specified in the manual partition assignment for partition " + (startPartitionId + i) + " are fenced or in controlled shutdown.");
                }
                isrs.add(isr);
            }
        } else {
            partitionAssignments = this.clusterControl.replicaPlacer().place(new PlacementSpec(startPartitionId, additional, replicationFactor, topic.name(), context.principal(), this.clusterControl.activeBrokerReplicaExclusions().keySet(), this.tenantControl.calculatePartitionPlacementStrategy(Optional.ofNullable(context.principal())), topicPlacement), this.getKRaftCellDescriber(cellId)).assignments();
            isrs = partitionAssignments.stream().map(PartitionAssignment::syncReplicas).collect(Collectors.toList());
        }
        HashMap<Integer, List<Integer>> partitionIdToReplicasAdded = new HashMap<Integer, List<Integer>>();
        for (int i = 0; i < partitionAssignments.size(); ++i) {
            partitionIdToReplicasAdded.put(i + startPartitionId, ((PartitionAssignment)partitionAssignments.get(i)).replicas());
        }
        if (this.applyCreateTopicsPolicyToCreatePartitions && (error = this.maybeCheckCreateTopicPolicy(() -> new CreateTopicPolicy.RequestMetadata(topic.name(), null, null, partitionIdToReplicasAdded, this.configurationControl.getTopicConfigs(topicInfo.name)))).isFailure()) {
            throw error.exception();
        }
        int partitionId = startPartitionId;
        for (int i = 0; i < partitionAssignments.size(); ++i) {
            PartitionAssignment partitionAssignment = (PartitionAssignment)partitionAssignments.get(i);
            List<Integer> replicas = partitionAssignment.replicas();
            List<Integer> observers = partitionAssignment.observers();
            List<Integer> isr = ((List)isrs.get(i)).stream().filter(this.clusterControl::isActive).collect(Collectors.toList());
            if (isr.isEmpty()) {
                throw new InvalidReplicationFactorException("Unable to replicate the partition " + replicationFactor + " time(s): All brokers are currently fenced or in controlled shutdown.");
            }
            Integer leaderId = this.leaderConsideringDemotions(isr, topic.name(), partitionId);
            records.add(new ApiMessageAndVersion((ApiMessage)new PartitionRecord().setPartitionId(partitionId).setTopicId(topicId).setReplicas(replicas).setObservers(observers).setIsr(isr).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()).setRemovingReplicas(Collections.emptyList()).setAddingReplicas(Collections.emptyList()).setLeader(leaderId).setLeaderEpoch(0).setLinkState(linkState.levelCode).setPartitionEpoch(0), this.featureControl.metadataVersion().partitionRecordVersion()));
            ++partitionId;
        }
        if (this.applyCreateTopicsPolicyToCreatePartitions) {
            this.updateConfluentPartitionsPerTopicListener(topic.name(), additional, 0, partitionIdToReplicasAdded, Collections.emptyMap(), true);
        }
    }

    private PartitionRegistration.LinkState getLinkState(Optional<MirrorTopic> mirrorTopic) {
        if (!mirrorTopic.isPresent()) {
            return PartitionRegistration.LinkState.NOT_MIRROR;
        }
        MirrorTopic.State mirrorTopicState = mirrorTopic.get().mirrorState();
        if (mirrorTopicState.equals((Object)MirrorTopic.State.STOPPED)) {
            return PartitionRegistration.LinkState.NOT_MIRROR;
        }
        if (mirrorTopicState.equals((Object)MirrorTopic.State.FAILED)) {
            return PartitionRegistration.LinkState.FAILED;
        }
        return PartitionRegistration.LinkState.ACTIVE;
    }

    private Optional<TopicPlacement> existingTopicPlacementConfig(String topic) {
        Optional<TopicPlacement> placement = this.configurationControl.topicPlacementConfig(topic);
        if (placement.isPresent() && !this.featureControl.isTopicPlacementSupported()) {
            this.log.error("Found topic placement config {} for topic {} even though its not supported at {}", new Object[]{placement.get(), topic, this.featureControl.metadataVersion()});
            throw new InvalidRequestException("Topic placement configuration exists but is not supported in the cluster. Please delete the confluent.placement.constraints configuration for " + topic + ", or upgrade the cluster metadata to at least " + MetadataVersion.IBP_3_5_IV0.name());
        }
        return placement;
    }

    void validateManualPartitionAssignment(PartitionAssignment assignment, OptionalInt replicationFactor, Set<Integer> excludedBrokerIds, int cellId, Optional<TopicPlacement> topicPlacementOpt) {
        if (assignment.replicas().isEmpty()) {
            throw new InvalidReplicaAssignmentException("The manual partition assignment includes an empty replica list.");
        }
        ArrayList<Integer> sortedBrokerIds = new ArrayList<Integer>(assignment.replicas());
        sortedBrokerIds.sort(Integer::compare);
        Integer prevBrokerId = null;
        for (Integer brokerId : sortedBrokerIds) {
            if (!this.clusterControl.brokerRegistrations().containsKey(brokerId)) {
                throw new InvalidReplicaAssignmentException("The manual partition assignment includes broker " + brokerId + ", but no such broker is registered.");
            }
            if (brokerId.equals(prevBrokerId)) {
                throw new InvalidReplicaAssignmentException("The manual partition assignment includes the broker " + prevBrokerId + " more than once.");
            }
            if (excludedBrokerIds.contains(brokerId)) {
                throw new InvalidReplicaAssignmentException("The manual partition assignment includes the broker " + brokerId + " which is excluded from replica placement.");
            }
            prevBrokerId = brokerId;
        }
        if (replicationFactor.isPresent() && sortedBrokerIds.size() != replicationFactor.getAsInt()) {
            throw new InvalidReplicaAssignmentException("The manual partition assignment includes a partition with " + sortedBrokerIds.size() + " replica(s), but this is not consistent with previous partitions, which have " + replicationFactor.getAsInt() + " replica(s).");
        }
        this.validateCellAssignment(sortedBrokerIds, cellId);
        if (topicPlacementOpt.isPresent()) {
            this.validateTopicPlacementAssignment(assignment, topicPlacementOpt.get());
        } else if (!assignment.observers().isEmpty()) {
            throw new InvalidReplicaAssignmentException("Assignment contains observers but there is no topic placement configured for the topic.");
        }
    }

    private void validateTopicPlacementAssignment(PartitionAssignment assignment, TopicPlacement topicPlacement) {
        this.validateObserversIsReplicasSuffix(assignment);
        List<TopicPlacement.Replica> syncReplicas = this.brokerIdsToReplicas(assignment.syncReplicas());
        List<TopicPlacement.Replica> observers = this.brokerIdsToReplicas(assignment.observers());
        Optional errMsgOpt = TopicPlacement.validateAssignment((TopicPlacement)topicPlacement, syncReplicas, observers);
        errMsgOpt.ifPresent(errMsg -> {
            throw new InvalidReplicaAssignmentException(errMsg);
        });
    }

    private void validateObserversIsReplicasSuffix(PartitionAssignment assignment) {
        List<Integer> replicas = assignment.replicas();
        List<Integer> observers = assignment.observers();
        if (observers.size() >= replicas.size()) {
            throw new InvalidReplicaAssignmentException("Observers must be a subset of replicas.");
        }
        if (!observers.equals(replicas.subList(replicas.size() - observers.size(), replicas.size()))) {
            throw new InvalidReplicaAssignmentException("The assignment contains observers but the replicas suffix doesn't match the observers.");
        }
    }

    private List<TopicPlacement.Replica> brokerIdsToReplicas(List<Integer> brokerIds) {
        return brokerIds.stream().map(brokerId -> {
            BrokerRegistration registration = this.clusterControl.registration((int)brokerId);
            if (registration == null) {
                throw new InvalidReplicaAssignmentException("Broker " + brokerId + " is not a member of the cluster.");
            }
            return TopicPlacement.Replica.of((int)brokerId, this.rackToAttributes(registration.rack()));
        }).collect(Collectors.toList());
    }

    private Optional<Map<String, String>> rackToAttributes(Optional<String> rack) {
        return Optional.of(rack.map(rackStr -> Collections.singletonMap("rack", rackStr)).orElse(Collections.emptyMap()));
    }

    void generateLeaderAndIsrUpdates(String context, int brokerToRemove, int brokerToAdd, List<ApiMessageAndVersion> records, Iterator<TopicIdPartition> iterator, int partitionSlicingBatchSize) {
        int oldSize = records.size();
        IntPredicate isAcceptableLeader = r -> r != brokerToRemove && (r == brokerToAdd || this.clusterControl.isActive(r));
        Map<Integer, Set<DegradedBrokerHealthState>> activeDegradations = this.clusterControl.activeBrokerComponentDegradations();
        while (iterator.hasNext() && records.size() - oldSize < partitionSlicingBatchSize) {
            TopicIdPartition topicIdPart = iterator.next();
            TopicControlInfo topic = (TopicControlInfo)this.topics.get((Object)topicIdPart.topicId());
            if (topic == null) {
                throw new RuntimeException("Topic ID " + topicIdPart.topicId() + " existed in isrMembers, but not in the topics map.");
            }
            PartitionRegistration partition = (PartitionRegistration)topic.parts.get((Object)topicIdPart.partitionId());
            if (partition == null) {
                throw new RuntimeException("Partition " + topicIdPart + " existed in isrMembers, but not in the partitions map.");
            }
            PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicIdPart.topicId(), topicIdPart.partitionId(), isAcceptableLeader, this.featureControl.metadataVersion(), activeDegradations, this.getTopicEffectiveMinIsr(topic.name));
            builder.setZkMigrationEnabled(this.clusterControl.zkRegistrationAllowed());
            if (this.configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name)) {
                builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
            }
            builder.setTargetIsr(Replicas.toList(Replicas.copyWithout(partition.isr, brokerToRemove)));
            builder.build().ifPresent(records::add);
        }
        if (records.size() != oldSize) {
            if (this.log.isDebugEnabled()) {
                StringBuilder bld = new StringBuilder();
                String prefix = "";
                ListIterator<ApiMessageAndVersion> iter = records.listIterator(oldSize);
                while (iter.hasNext()) {
                    ApiMessageAndVersion apiMessageAndVersion = iter.next();
                    PartitionChangeRecord record = (PartitionChangeRecord)apiMessageAndVersion.message();
                    bld.append(prefix).append(((TopicControlInfo)this.topics.get((Object)record.topicId())).name).append("-").append(record.partitionId());
                    prefix = ", ";
                }
                this.log.debug("{}: changing partition(s): {}", (Object)context, (Object)bld);
            } else if (this.log.isInfoEnabled()) {
                this.log.info("{}: changing {} partition(s)", (Object)context, (Object)(records.size() - oldSize));
            }
        }
    }

    boolean brokerRequiresPartitionChangesForControlledShutdown(int brokerId) {
        ArrayList<ApiMessageAndVersion> records = new ArrayList<ApiMessageAndVersion>(1);
        this.generateLeaderAndIsrUpdates("checkLastBatchForBroker[" + brokerId + "]", brokerId, -1, records, this.brokersToIsrs().partitionsWithBrokerInIsr(brokerId), 1);
        return records.size() != 0;
    }

    ControllerResult<AlterPartitionReassignmentsResponseData> alterPartitionReassignments(AlterPartitionReassignmentsRequestData request, KafkaPrincipal principal) {
        BoundedList records = BoundedList.newArrayBacked((int)10000);
        AlterPartitionReassignmentsResponseData result = new AlterPartitionReassignmentsResponseData().setErrorMessage(null);
        int successfulAlterations = 0;
        int totalAlterations = 0;
        for (AlterPartitionReassignmentsRequestData.ReassignableTopic topic : request.topics()) {
            AlterPartitionReassignmentsResponseData.ReassignableTopicResponse topicResponse = new AlterPartitionReassignmentsResponseData.ReassignableTopicResponse().setName(topic.name());
            for (AlterPartitionReassignmentsRequestData.ReassignablePartition partition : topic.partitions()) {
                ApiError error = ApiError.NONE;
                try {
                    this.alterPartitionReassignment(topic.name(), partition, (List<ApiMessageAndVersion>)records, principal);
                    ++successfulAlterations;
                }
                catch (Throwable e) {
                    this.log.info("Unable to alter partition reassignment for " + topic.name() + ":" + partition.partitionIndex() + " because of an " + e.getClass().getSimpleName() + " error: " + e.getMessage());
                    error = ApiError.fromThrowable((Throwable)e);
                }
                ++totalAlterations;
                topicResponse.partitions().add(new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(partition.partitionIndex()).setErrorCode(error.error().code()).setErrorMessage(error.message()));
            }
            result.responses().add(topicResponse);
        }
        this.log.info("Successfully altered {} out of {} partition reassignment(s).", (Object)successfulAlterations, (Object)totalAlterations);
        return ControllerResult.atomicOf((List<ApiMessageAndVersion>)records, result);
    }

    void alterPartitionReassignment(String topicName, AlterPartitionReassignmentsRequestData.ReassignablePartition target, List<ApiMessageAndVersion> records, KafkaPrincipal principal) {
        Uuid topicId = (Uuid)this.topicsByName.get((Object)topicName);
        if (topicId == null) {
            throw new UnknownTopicOrPartitionException("Unable to find a topic named " + topicName + ".");
        }
        TopicControlInfo topicInfo = (TopicControlInfo)this.topics.get((Object)topicId);
        if (topicInfo == null) {
            throw new UnknownTopicOrPartitionException("Unable to find a topic with ID " + topicId + ".");
        }
        TopicIdPartition tp = new TopicIdPartition(topicId, target.partitionIndex());
        PartitionRegistration part = (PartitionRegistration)topicInfo.parts.get((Object)target.partitionIndex());
        if (part == null) {
            throw new UnknownTopicOrPartitionException("Unable to find partition " + topicName + ":" + target.partitionIndex() + ".");
        }
        Optional<ApiMessageAndVersion> record = target.replicas() == null ? this.cancelPartitionReassignment(topicName, tp, part) : this.changePartitionReassignment(tp, part, target, topicName, principal);
        record.ifPresent(records::add);
    }

    Optional<ApiMessageAndVersion> cancelPartitionReassignment(String topicName, TopicIdPartition tp, PartitionRegistration part) {
        if (!PartitionReassignmentReplicas.isReassignmentInProgress(part)) {
            throw new NoReassignmentInProgressException(Errors.NO_REASSIGNMENT_IN_PROGRESS.message());
        }
        PartitionReassignmentRevert revert = new PartitionReassignmentRevert(part);
        if (revert.unclean() && !this.configurationControl.uncleanLeaderElectionEnabledForTopic(topicName)) {
            throw new InvalidReplicaAssignmentException("Unable to revert partition assignment for " + topicName + ":" + tp.partitionId() + " because it would require an unclean leader election.");
        }
        PartitionChangeBuilder builder = new PartitionChangeBuilder(part, tp.topicId(), tp.partitionId(), this.clusterControl::isActive, this.featureControl.metadataVersion(), this.clusterControl.activeBrokerComponentDegradations(), this.getTopicEffectiveMinIsr(topicName));
        builder.setZkMigrationEnabled(this.clusterControl.zkRegistrationAllowed());
        if (this.configurationControl.uncleanLeaderElectionEnabledForTopic(topicName)) {
            builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
        }
        builder.setTargetIsr(revert.isr()).setTargetReplicas(revert.replicas()).setTargetObservers(revert.observers()).setTargetRemoving(Collections.emptyList()).setTargetAdding(Collections.emptyList()).setTargetRemovingObservers(Collections.emptyList()).setTargetAddingObservers(Collections.emptyList());
        return builder.build();
    }

    Optional<ApiMessageAndVersion> changePartitionReassignment(TopicIdPartition tp, PartitionRegistration part, AlterPartitionReassignmentsRequestData.ReassignablePartition target, String topicName, KafkaPrincipal principal) {
        PartitionAssignment currentAssignment = new PartitionAssignment(Replicas.toList(part.replicas), Replicas.toList(part.observers));
        HashSet<Integer> excludedBrokersToValidateAgainst = new HashSet<Integer>(this.clusterControl.activeBrokerReplicaExclusions().keySet());
        excludedBrokersToValidateAgainst.removeAll(currentAssignment.replicas());
        Optional tenantIdOpt = TenantUtils.extractTenantId(Collections.singletonList(topicName));
        int cellId = -1;
        if (this.tenantControl.isTenantCellPlacementEnabled(Optional.ofNullable(principal))) {
            cellId = tenantIdOpt.map(this.tenantControl::getTenantCellId).orElse(-1);
        }
        PartitionAssignment targetAssignment = new PartitionAssignment(target.replicas(), target.observers());
        this.validateManualPartitionAssignment(targetAssignment, OptionalInt.empty(), excludedBrokersToValidateAgainst, cellId, this.existingTopicPlacementConfig(topicName));
        PartitionReassignmentReplicas reassignment = new PartitionReassignmentReplicas(currentAssignment, targetAssignment);
        PartitionChangeBuilder builder = new PartitionChangeBuilder(part, tp.topicId(), tp.partitionId(), this.clusterControl::isActive, this.featureControl.metadataVersion(), this.clusterControl.activeBrokerComponentDegradations(), this.getTopicEffectiveMinIsr(topicName));
        builder.setZkMigrationEnabled(this.clusterControl.zkRegistrationAllowed());
        builder.setEligibleLeaderReplicasEnabled(this.eligibleLeaderReplicasEnabled);
        if (!reassignment.replicas().equals(currentAssignment.replicas())) {
            builder.setTargetReplicas(reassignment.replicas());
        }
        if (!reassignment.observers().equals(currentAssignment.observers())) {
            builder.setTargetObservers(reassignment.observers());
        }
        if (!reassignment.removing().isEmpty()) {
            builder.setTargetRemoving(reassignment.removing());
        }
        if (!reassignment.adding().isEmpty()) {
            builder.setTargetAdding(reassignment.adding());
        }
        if (!reassignment.removingObservers().isEmpty()) {
            builder.setTargetRemovingObservers(reassignment.removingObservers());
        }
        if (!reassignment.addingObservers().isEmpty()) {
            builder.setTargetAddingObservers(reassignment.addingObservers());
        }
        return builder.build();
    }

    ListPartitionReassignmentsResponseData listPartitionReassignments(List<ListPartitionReassignmentsRequestData.ListPartitionReassignmentsTopics> topicList, long epoch) {
        ListPartitionReassignmentsResponseData response = new ListPartitionReassignmentsResponseData().setErrorMessage(null);
        if (topicList == null) {
            for (Map.Entry entry : this.reassigningTopics.entrySet(epoch)) {
                this.listReassigningTopic(response, (Uuid)entry.getKey(), Replicas.toList((int[])entry.getValue()));
            }
        } else {
            for (ListPartitionReassignmentsRequestData.ListPartitionReassignmentsTopics topic : topicList) {
                Uuid topicId = (Uuid)this.topicsByName.get((Object)topic.name(), epoch);
                if (topicId == null) continue;
                this.listReassigningTopic(response, topicId, topic.partitionIndexes());
            }
        }
        return response;
    }

    private void listReassigningTopic(ListPartitionReassignmentsResponseData response, Uuid topicId, List<Integer> partitionIds) {
        TopicControlInfo topicInfo = (TopicControlInfo)this.topics.get((Object)topicId);
        if (topicInfo == null) {
            return;
        }
        ListPartitionReassignmentsResponseData.OngoingTopicReassignment ongoingTopic = new ListPartitionReassignmentsResponseData.OngoingTopicReassignment().setName(topicInfo.name);
        for (int partitionId : partitionIds) {
            Optional<ListPartitionReassignmentsResponseData.OngoingPartitionReassignment> ongoing = this.getOngoingPartitionReassignment(topicInfo, partitionId);
            if (!ongoing.isPresent()) continue;
            ongoingTopic.partitions().add(ongoing.get());
        }
        if (!ongoingTopic.partitions().isEmpty()) {
            response.topics().add(ongoingTopic);
        }
    }

    private Optional<ListPartitionReassignmentsResponseData.OngoingPartitionReassignment> getOngoingPartitionReassignment(TopicControlInfo topicInfo, int partitionId) {
        PartitionRegistration partition = (PartitionRegistration)topicInfo.parts.get((Object)partitionId);
        if (partition == null || !PartitionReassignmentReplicas.isReassignmentInProgress(partition)) {
            return Optional.empty();
        }
        return Optional.of(new ListPartitionReassignmentsResponseData.OngoingPartitionReassignment().setAddingReplicas(Replicas.toList(partition.addingReplicas)).setRemovingReplicas(Replicas.toList(partition.removingReplicas)).setPartitionIndex(partitionId).setReplicas(Replicas.toList(partition.replicas)).setObservers(Replicas.toList(partition.observers)));
    }

    private void clearConfluentPartitionsPerTopicListenerPendingState() {
        this.createTopicPolicy.ifPresent(policy -> {
            if (policy instanceof ConfluentPartitionsPerTopicListener) {
                ConfluentPartitionsPerTopicListener listener = (ConfluentPartitionsPerTopicListener)policy;
                listener.clearPending();
            }
        });
    }

    private void updateConfluentPartitionsPerTopicListener(String topic, int numPartitionsChange, int numTopicsChange, Map<Integer, List<Integer>> partitionIdToReplicasAdded, Map<Integer, List<Integer>> partitionIdToReplicasDeleted, boolean pending) {
        this.createTopicPolicy.ifPresent(policy -> {
            if (policy instanceof ConfluentPartitionsPerTopicListener) {
                ConfluentPartitionsPerTopicListener listener = (ConfluentPartitionsPerTopicListener)policy;
                listener.partialUpdate(topic, numPartitionsChange, numTopicsChange, partitionIdToReplicasAdded, partitionIdToReplicasDeleted, pending);
            }
        });
    }

    void resetConfluentPartitionsPerTopicListener() {
        this.createTopicPolicy.ifPresent(policy -> {
            if (policy instanceof ConfluentPartitionsPerTopicListener) {
                ConfluentPartitionsPerTopicListener listener = (ConfluentPartitionsPerTopicListener)policy;
                final Collection topicControls = this.topics.values();
                listener.fullUpdate(new Iterator<Map.Entry<String, Integer>>(){
                    Iterator topicIterator;
                    {
                        this.topicIterator = topicControls.iterator();
                    }

                    @Override
                    public boolean hasNext() {
                        return this.topicIterator.hasNext();
                    }

                    @Override
                    public Map.Entry<String, Integer> next() {
                        TopicControlInfo topicInfo = (TopicControlInfo)this.topicIterator.next();
                        return new AbstractMap.SimpleImmutableEntry<String, Integer>(topicInfo.name, topicInfo.parts.size());
                    }
                }, ReplicationControlManager.brokerReplicaCounts(topicControls));
            }
        });
    }

    private void validateCellAssignment(List<Integer> assignment, int cellId) {
        if (cellId != -1) {
            Set<Integer> cellBrokers = this.clusterControl.cellBrokers(cellId);
            HashSet<Integer> invalidBrokers = new HashSet<Integer>(assignment);
            invalidBrokers.removeAll(cellBrokers);
            if (!invalidBrokers.isEmpty()) {
                String invalidBrokersString = invalidBrokers.stream().sorted().map(String::valueOf).collect(Collectors.joining(", "));
                String validBrokersString = cellBrokers.stream().sorted().map(String::valueOf).collect(Collectors.joining(", "));
                throw new InvalidReplicaAssignmentException("The manual partition assignment includes brokers " + invalidBrokersString + ", but they must be from " + validBrokersString + ".");
            }
        }
    }

    private int computeTenantCellId(CreateTopicsRequestData request, Map<String, ApiError> topicErrors, KafkaPrincipal principal, List<ApiMessageAndVersion> records) {
        if (!this.tenantControl.isTenantCellPlacementEnabled(Optional.ofNullable(principal))) {
            return -1;
        }
        Optional tenantIdOpt = TenantUtils.extractTenantId((Collection)request.topics().stream().map(CreateTopicsRequestData.CreatableTopic::name).collect(Collectors.toList()));
        int cellId = -1;
        if (tenantIdOpt.isPresent()) {
            String tenantId = (String)tenantIdOpt.get();
            HashSet<Integer> usableBrokers = new HashSet<Integer>();
            this.clusterControl.usableBrokers().forEachRemaining(b -> usableBrokers.add(b.id()));
            try {
                cellId = this.tenantControl.createTenantToCellAssignmentIfNotExists(tenantId, usableBrokers, records::add);
            }
            catch (ResourceNotFoundException e) {
                request.topics().stream().filter(t -> !topicErrors.containsKey(t.name())).forEach(t -> topicErrors.put(t.name(), ApiError.fromThrowable((Throwable)e)));
            }
        }
        return cellId;
    }

    private static Map<Integer, Integer> brokerReplicaCounts(Collection<TopicControlInfo> topicControls) {
        return topicControls.stream().flatMap(t -> ((TopicControlInfo)t).parts.values().stream()).flatMap(p -> Arrays.stream(p.replicas).boxed()).collect(Collectors.groupingBy(Function.identity(), Collectors.reducing(0, e -> 1, Integer::sum)));
    }

    private static Map<Integer, List<Integer>> partitionIdToReplicas(int partitionId, int[] arr) {
        HashMap<Integer, List<Integer>> counter = new HashMap<Integer, List<Integer>>();
        if (arr != null) {
            counter.put(partitionId, Arrays.stream(arr).boxed().collect(Collectors.toList()));
        }
        return counter;
    }

    int getTopicEffectiveMinIsr(String topicName) {
        int currentMinIsr = this.defaultMinIsrCount;
        String minIsrConfig = this.configurationControl.getTopicConfig(topicName, "min.insync.replicas");
        if (minIsrConfig != null) {
            currentMinIsr = Integer.parseInt(minIsrConfig);
        } else {
            this.log.warn("Can't find the min isr config for topic: " + topicName + " using default value " + this.defaultMinIsrCount);
        }
        int replicationFactor = this.defaultReplicationFactor;
        try {
            Uuid topicId = (Uuid)this.topicsByName.get((Object)topicName);
            replicationFactor = ((PartitionRegistration)((TopicControlInfo)((TopicControlInfo)this.topics.get((Object)topicId))).parts.get((Object)Integer.valueOf((int)0))).replicas.length;
        }
        catch (Exception e) {
            this.log.warn("Can't find the replication factor for topic: " + topicName + " using default value " + replicationFactor + ". Error=" + e);
        }
        return Math.min(currentMinIsr, replicationFactor);
    }

    private void addToImbalancedPartitions(TopicIdPartition topicIdPartition, String topicName) {
        if (Topic.isPreferredLeaderElectionWarmup((String)topicName)) {
            this.imbalancedInternalPartitions.add((Object)topicIdPartition);
        } else {
            this.imbalancedExternalPartitions.add((Object)topicIdPartition);
        }
    }

    private void removeFromImbalancedPartitions(TopicIdPartition topicIdPartition) {
        this.imbalancedInternalPartitions.remove((Object)topicIdPartition);
        this.imbalancedExternalPartitions.remove((Object)topicIdPartition);
    }

    private static final class IneligibleReplica {
        private final int replicaId;
        private final String reason;

        private IneligibleReplica(int replicaId, String reason) {
            this.replicaId = replicaId;
            this.reason = reason;
        }

        public String toString() {
            return this.replicaId + " (" + this.reason + ")";
        }
    }

    static class TopicControlInfo {
        private final String name;
        private final Uuid id;
        private final TimelineHashMap<Integer, PartitionRegistration> parts;

        TopicControlInfo(String name, SnapshotRegistry snapshotRegistry, Uuid id) {
            this.name = name;
            this.id = id;
            this.parts = new TimelineHashMap(snapshotRegistry, 0);
        }

        public String name() {
            return this.name;
        }

        public Uuid topicId() {
            return this.id;
        }

        public int numPartitions(long epoch) {
            return this.parts.size(epoch);
        }
    }

    class KRaftCellDescriber
    implements ClusterDescriber {
        private final int cellId;

        KRaftCellDescriber(int cellId) {
            this.cellId = cellId;
        }

        @Override
        public Iterator<UsableBroker> usableBrokers() {
            Iterator<UsableBroker> usableBrokers = ReplicationControlManager.this.clusterControl.usableBrokers();
            if (this.cellId == -1) {
                return usableBrokers;
            }
            ArrayList usableBrokersInCell = new ArrayList();
            usableBrokers.forEachRemaining(broker -> {
                if (broker.cell() == this.cellId) {
                    usableBrokersInCell.add(broker);
                }
            });
            return usableBrokersInCell.iterator();
        }

        @Override
        public Iterator<String> topicNames() {
            return Collections.unmodifiableCollection(ReplicationControlManager.this.topicsByName.keySet()).iterator();
        }

        @Override
        public List<List<Integer>> replicasForTopicName(String topicName) {
            Uuid id = (Uuid)ReplicationControlManager.this.topicsByName.get((Object)topicName);
            if (id == null) {
                return Collections.emptyList();
            }
            TopicControlInfo topicInfo = (TopicControlInfo)ReplicationControlManager.this.topics.get((Object)id);
            if (topicInfo == null) {
                return Collections.emptyList();
            }
            ArrayList<Map.Entry> partInfo = new ArrayList<Map.Entry>();
            for (Map.Entry entry : topicInfo.parts.entrySet()) {
                PartitionRegistration registration = (PartitionRegistration)entry.getValue();
                partInfo.add(new AbstractMap.SimpleImmutableEntry(entry.getKey(), Replicas.toList(registration.replicas)));
            }
            partInfo.sort(Comparator.comparingInt(Map.Entry::getKey));
            ArrayList<List<Integer>> results = new ArrayList<List<Integer>>();
            for (Map.Entry entry : partInfo) {
                results.add((List<Integer>)entry.getValue());
            }
            return results;
        }

        @Override
        public int getTenantCellId(String tenant) {
            return this.cellId;
        }
    }

    static class Builder {
        private SnapshotRegistry snapshotRegistry = null;
        private LogContext logContext = null;
        private short defaultReplicationFactor = (short)3;
        private int defaultNumPartitions = 1;
        private int defaultMinIsrCount = 2;
        private int maxElectionsPerImbalance = 1000;
        private ConfigurationControlManager configurationControl = null;
        private ClusterControlManager clusterControl = null;
        private Optional<CreateTopicPolicy> createTopicPolicy = Optional.empty();
        private FeatureControlManager featureControl = null;
        private Function<String, String> nameToTenantCallback = null;
        private boolean applyCreateTopicsPolicyToCreatePartitions = false;
        private MirrorTopicControlManager mirrorControl = null;
        private TenantControlManager tenantControl = null;
        private Optional<TopicPlacement> defaultTopicPlacement = Optional.empty();
        private boolean eligibleLeaderReplicasEnabled = false;
        private int maxPartitionChangesPerSlice = Integer.MAX_VALUE;

        Builder() {
        }

        Builder setSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
            this.snapshotRegistry = snapshotRegistry;
            return this;
        }

        Builder setLogContext(LogContext logContext) {
            this.logContext = logContext;
            return this;
        }

        Builder setDefaultReplicationFactor(short defaultReplicationFactor) {
            this.defaultReplicationFactor = defaultReplicationFactor;
            return this;
        }

        Builder setDefaultNumPartitions(int defaultNumPartitions) {
            this.defaultNumPartitions = defaultNumPartitions;
            return this;
        }

        Builder setDefaultMinIsrCount(int defaultMinIsrCount) {
            this.defaultMinIsrCount = defaultMinIsrCount;
            return this;
        }

        Builder setEligibleLeaderReplicasEnabled(boolean eligibleLeaderReplicasEnabled) {
            this.eligibleLeaderReplicasEnabled = eligibleLeaderReplicasEnabled;
            return this;
        }

        Builder setMaxElectionsPerImbalance(int maxElectionsPerImbalance) {
            this.maxElectionsPerImbalance = maxElectionsPerImbalance;
            return this;
        }

        Builder setMaxPartitionChangesPerSlice(int maxPartitionChangesPerSlice) {
            this.maxPartitionChangesPerSlice = maxPartitionChangesPerSlice;
            return this;
        }

        Builder setConfigurationControl(ConfigurationControlManager configurationControl) {
            this.configurationControl = configurationControl;
            return this;
        }

        Builder setClusterControl(ClusterControlManager clusterControl) {
            this.clusterControl = clusterControl;
            return this;
        }

        Builder setCreateTopicPolicy(Optional<CreateTopicPolicy> createTopicPolicy) {
            this.createTopicPolicy = createTopicPolicy;
            return this;
        }

        Builder setApplyCreateTopicsPolicyToCreatePartitions(boolean applyCreateTopicsPolicyToCreatePartitions) {
            this.applyCreateTopicsPolicyToCreatePartitions = applyCreateTopicsPolicyToCreatePartitions;
            return this;
        }

        Builder setMirrorTopicControl(MirrorTopicControlManager mirrorControl) {
            this.mirrorControl = mirrorControl;
            return this;
        }

        Builder setTenantControl(TenantControlManager tenantControl) {
            this.tenantControl = tenantControl;
            return this;
        }

        public Builder setNameToTenantCallback(Function<String, String> callback) {
            this.nameToTenantCallback = callback;
            return this;
        }

        public Builder setFeatureControl(FeatureControlManager featureControl) {
            this.featureControl = featureControl;
            return this;
        }

        public Builder setDefaultTopicPlacement(Optional<TopicPlacement> defaultTopicPlacement) {
            this.defaultTopicPlacement = defaultTopicPlacement;
            return this;
        }

        ReplicationControlManager build() {
            if (this.configurationControl == null) {
                throw new IllegalStateException("Configuration control must be set before building");
            }
            if (this.clusterControl == null) {
                throw new IllegalStateException("Cluster control must be set before building");
            }
            if (this.tenantControl == null) {
                throw new IllegalStateException("Tenant control must be set before building");
            }
            if (this.logContext == null) {
                this.logContext = new LogContext();
            }
            if (this.mirrorControl == null) {
                this.mirrorControl = new MirrorTopicControlManager(this.snapshotRegistry, this.logContext, Time.SYSTEM, __ -> Optional.empty(), __ -> Optional.empty(), __ -> Optional.empty());
            }
            if (this.snapshotRegistry == null) {
                this.snapshotRegistry = this.configurationControl.snapshotRegistry();
            }
            if (this.featureControl == null) {
                throw new IllegalStateException("FeatureControlManager must not be null");
            }
            return new ReplicationControlManager(this.snapshotRegistry, this.logContext, this.defaultReplicationFactor, this.defaultNumPartitions, this.defaultMinIsrCount, this.maxElectionsPerImbalance, this.eligibleLeaderReplicasEnabled, this.maxPartitionChangesPerSlice, this.configurationControl, this.clusterControl, this.createTopicPolicy, this.nameToTenantCallback, this.featureControl, this.applyCreateTopicsPolicyToCreatePartitions, this.mirrorControl, this.tenantControl, this.defaultTopicPlacement);
        }
    }
}

