/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.controller;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.FeatureUpdate;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.NotControllerException;
import org.apache.kafka.common.errors.StaleBrokerEpochException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.message.AllocateProducerIdsRequestData;
import org.apache.kafka.common.message.AllocateProducerIdsResponseData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.AlterPartitionRequestData;
import org.apache.kafka.common.message.AlterPartitionResponseData;
import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
import org.apache.kafka.common.message.BrokerRegistrationRequestData;
import org.apache.kafka.common.message.CreatePartitionsRequestData;
import org.apache.kafka.common.message.CreatePartitionsResponseData;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsResponseData;
import org.apache.kafka.common.message.ElectLeadersRequestData;
import org.apache.kafka.common.message.ElectLeadersResponseData;
import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.UpdateFeaturesRequestData;
import org.apache.kafka.common.message.UpdateFeaturesResponseData;
import org.apache.kafka.common.metadata.AccessControlEntryRecord;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.ClientQuotaRecord;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.metadata.FenceBrokerRecord;
import org.apache.kafka.common.metadata.MetadataRecordType;
import org.apache.kafka.common.metadata.NoOpRecord;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.ProducerIdsRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.RemoveAccessControlEntryRecord;
import org.apache.kafka.common.metadata.RemoveTopicRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.controller.AclControlManager;
import org.apache.kafka.controller.ClientQuotaControlManager;
import org.apache.kafka.controller.ClusterControlManager;
import org.apache.kafka.controller.ConfigurationControlManager;
import org.apache.kafka.controller.ConfigurationValidator;
import org.apache.kafka.controller.Controller;
import org.apache.kafka.controller.ControllerMetrics;
import org.apache.kafka.controller.ControllerPurgatory;
import org.apache.kafka.controller.ControllerRequestContext;
import org.apache.kafka.controller.ControllerResult;
import org.apache.kafka.controller.ControllerResultAndOffset;
import org.apache.kafka.controller.DeferredEvent;
import org.apache.kafka.controller.FeatureControlManager;
import org.apache.kafka.controller.LogReplayTracker;
import org.apache.kafka.controller.ProducerIdControlManager;
import org.apache.kafka.controller.QuorumFeatures;
import org.apache.kafka.controller.ReplicationControlManager;
import org.apache.kafka.controller.ResultOrError;
import org.apache.kafka.controller.SnapshotGenerator;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistrationReply;
import org.apache.kafka.metadata.FinalizedControllerFeatures;
import org.apache.kafka.metadata.KafkaConfigSchema;
import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer;
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
import org.apache.kafka.metadata.placement.ReplicaPlacer;
import org.apache.kafka.metadata.placement.StripedReplicaPlacer;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.BatchReader;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.raft.RaftClient;
import org.apache.kafka.server.authorizer.AclCreateResult;
import org.apache.kafka.server.authorizer.AclDeleteResult;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.fault.FaultHandler;
import org.apache.kafka.server.policy.AlterConfigPolicy;
import org.apache.kafka.server.policy.CreateTopicPolicy;
import org.apache.kafka.snapshot.SnapshotReader;
import org.apache.kafka.snapshot.SnapshotWriter;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.slf4j.Logger;

public final class QuorumController
implements Controller {
    private static final int MAX_RECORDS_PER_BATCH = 10000;
    public static final String CONTROLLER_THREAD_SUFFIX = "QuorumControllerEventHandler";
    private static final String ACTIVE_CONTROLLER_EXCEPTION_TEXT_PREFIX = "The active controller appears to be node ";
    private static final String GENERATE_SNAPSHOT = "generateSnapshot";
    private static final int MAX_BATCHES_PER_GENERATE_CALL = 10;
    static final String MAYBE_FENCE_REPLICAS = "maybeFenceReplicas";
    private static final String MAYBE_BALANCE_PARTITION_LEADERS = "maybeBalancePartitionLeaders";
    private static final String WRITE_NO_OP_RECORD = "writeNoOpRecord";
    private final FaultHandler fatalFaultHandler;
    private final LogContext logContext;
    private final Logger log;
    private final int nodeId;
    private final String clusterId;
    private final KafkaEventQueue queue;
    private final Time time;
    private final ControllerMetrics controllerMetrics;
    private final SnapshotRegistry snapshotRegistry;
    private final ControllerPurgatory purgatory;
    private final Consumer<ConfigResource> resourceExists;
    private final ConfigurationControlManager configurationControl;
    private final ClientQuotaControlManager clientQuotaControlManager;
    private final ClusterControlManager clusterControl;
    private final FeatureControlManager featureControl;
    private final ProducerIdControlManager producerIdControlManager;
    private final ReplicationControlManager replicationControl;
    private final Optional<ClusterMetadataAuthorizer> authorizer;
    private final AclControlManager aclControlManager;
    private final LogReplayTracker logReplayTracker;
    private final SnapshotGeneratorManager snapshotGeneratorManager = new SnapshotGeneratorManager();
    private final RaftClient<ApiMessageAndVersion> raftClient;
    private QuorumMetaLogListener metaLogListener;
    private volatile int curClaimEpoch;
    private long lastCommittedOffset = -1L;
    private int lastCommittedEpoch = -1;
    private long lastCommittedTimestamp = -1L;
    private boolean needToCompleteAuthorizerLoad;
    private long writeOffset;
    private final long snapshotMaxNewRecordBytes;
    private long committedBytesSinceLastSnapshot = 0L;
    private final OptionalLong leaderImbalanceCheckIntervalNs;
    private final OptionalLong maxIdleIntervalNs;
    private ImbalanceSchedule imbalancedScheduled = ImbalanceSchedule.DEFERRED;
    private boolean noOpRecordScheduled = false;
    private final BootstrapMetadata bootstrapMetadata;
    private final int maxRecordsPerBatch;

    private NotControllerException newNotControllerException() {
        OptionalInt latestController = this.raftClient.leaderAndEpoch().leaderId();
        if (latestController.isPresent()) {
            return new NotControllerException(ACTIVE_CONTROLLER_EXCEPTION_TEXT_PREFIX + latestController.getAsInt());
        }
        return new NotControllerException("No controller appears to be active.");
    }

    public static int exceptionToApparentController(NotControllerException e) {
        if (e.getMessage().startsWith(ACTIVE_CONTROLLER_EXCEPTION_TEXT_PREFIX)) {
            return Integer.parseInt(e.getMessage().substring(ACTIVE_CONTROLLER_EXCEPTION_TEXT_PREFIX.length()));
        }
        return -1;
    }

    private void handleEventEnd(String name, long startProcessingTimeNs) {
        long endProcessingTime = this.time.nanoseconds();
        long deltaNs = endProcessingTime - startProcessingTimeNs;
        this.log.debug("Processed {} in {} us", (Object)name, (Object)TimeUnit.MICROSECONDS.convert(deltaNs, TimeUnit.NANOSECONDS));
        this.controllerMetrics.updateEventQueueProcessingTime(TimeUnit.NANOSECONDS.toMillis(deltaNs));
    }

    private Throwable handleEventException(String name, OptionalLong startProcessingTimeNs, Throwable exception) {
        if (!startProcessingTimeNs.isPresent()) {
            this.log.error("{}: unable to start processing because of {}.", (Object)name, (Object)exception.getClass().getSimpleName());
            if (exception instanceof ApiException) {
                return exception;
            }
            return new UnknownServerException(exception);
        }
        long endProcessingTime = this.time.nanoseconds();
        long deltaNs = endProcessingTime - startProcessingTimeNs.getAsLong();
        long deltaUs = TimeUnit.MICROSECONDS.convert(deltaNs, TimeUnit.NANOSECONDS);
        if (exception instanceof ApiException) {
            this.log.info("{}: failed with {} in {} us", new Object[]{name, exception.getClass().getSimpleName(), deltaUs});
            return exception;
        }
        if (this.isActiveController()) {
            this.log.warn("{}: failed with unknown server exception {} at epoch {} in {} us.  Renouncing leadership and reverting to the last committed offset {}.", new Object[]{name, exception.getClass().getSimpleName(), this.curClaimEpoch, deltaUs, this.lastCommittedOffset, exception});
            this.renounce();
        } else {
            this.log.warn("{}: failed with unknown server exception {} in {} us.  The controller is already in standby mode.", new Object[]{name, exception.getClass().getSimpleName(), deltaUs, exception});
        }
        return new UnknownServerException(exception);
    }

    private void appendControlEvent(String name, Runnable handler) {
        ControlEvent event = new ControlEvent(name, handler);
        this.queue.append((EventQueue.Event)event);
    }

    ReplicationControlManager replicationControl() {
        return this.replicationControl;
    }

    ClusterControlManager clusterControl() {
        return this.clusterControl;
    }

    FeatureControlManager featureControl() {
        return this.featureControl;
    }

    ConfigurationControlManager configurationControl() {
        return this.configurationControl;
    }

    <T> CompletableFuture<T> appendReadEvent(String name, OptionalLong deadlineNs, Supplier<T> handler) {
        ControllerReadEvent<T> event = new ControllerReadEvent<T>(name, handler);
        if (deadlineNs.isPresent()) {
            this.queue.appendWithDeadline(deadlineNs.getAsLong(), event);
        } else {
            this.queue.append(event);
        }
        return event.future();
    }

    static long appendRecords(Logger log, ControllerResult<?> result, int maxRecordsPerBatch, Function<List<ApiMessageAndVersion>, Long> appender) {
        try {
            List<ApiMessageAndVersion> records = result.records();
            if (result.isAtomic()) {
                if (records.size() > maxRecordsPerBatch) {
                    throw new IllegalStateException("Attempted to atomically commit " + records.size() + " records, but maxRecordsPerBatch is " + maxRecordsPerBatch);
                }
                long offset = appender.apply(records);
                if (log.isTraceEnabled()) {
                    log.trace("Atomically appended {} record(s) ending with offset {}.", (Object)records.size(), (Object)offset);
                }
                return offset;
            }
            int startIndex = 0;
            int numBatches = 0;
            while (true) {
                ++numBatches;
                int endIndex = startIndex + maxRecordsPerBatch;
                if (endIndex > records.size()) {
                    long offset = appender.apply(records.subList(startIndex, records.size()));
                    if (log.isTraceEnabled()) {
                        log.trace("Appended {} record(s) in {} batch(es), ending with offset {}.", new Object[]{records.size(), numBatches, offset});
                    }
                    return offset;
                }
                appender.apply(records.subList(startIndex, endIndex));
                startIndex += maxRecordsPerBatch;
            }
        }
        catch (ApiException e) {
            throw new RuntimeException(e);
        }
    }

    <T> CompletableFuture<T> appendWriteEvent(String name, OptionalLong deadlineNs, ControllerWriteOperation<T> op) {
        ControllerWriteEvent<T> event = new ControllerWriteEvent<T>(name, op);
        if (deadlineNs.isPresent()) {
            this.queue.appendWithDeadline(deadlineNs.getAsLong(), event);
        } else {
            this.queue.append(event);
        }
        return event.future();
    }

    private void maybeCompleteAuthorizerInitialLoad() {
        if (!this.needToCompleteAuthorizerLoad) {
            return;
        }
        OptionalLong highWatermark = this.raftClient.highWatermark();
        if (highWatermark.isPresent()) {
            if (this.lastCommittedOffset + 1L >= highWatermark.getAsLong()) {
                this.log.info("maybeCompleteAuthorizerInitialLoad: completing authorizer initial load at last committed offset {}.", (Object)this.lastCommittedOffset);
                this.authorizer.get().completeInitialLoad();
                this.needToCompleteAuthorizerLoad = false;
            } else {
                this.log.trace("maybeCompleteAuthorizerInitialLoad: can't proceed because lastCommittedOffset  = {}, but highWatermark = {}.", (Object)this.lastCommittedOffset, (Object)highWatermark.getAsLong());
            }
        } else {
            this.log.trace("maybeCompleteAuthorizerInitialLoad: highWatermark not set.");
        }
    }

    private boolean isActiveController() {
        return this.curClaimEpoch != -1;
    }

    private void updateWriteOffset(long offset) {
        this.writeOffset = offset;
        if (this.isActiveController()) {
            this.controllerMetrics.setLastAppliedRecordOffset(this.writeOffset);
            this.controllerMetrics.setLastAppliedRecordTimestamp(this.time.milliseconds());
        } else {
            this.controllerMetrics.setLastAppliedRecordOffset(this.lastCommittedOffset);
            this.controllerMetrics.setLastAppliedRecordTimestamp(this.lastCommittedTimestamp);
        }
    }

    private void claim(int epoch) {
        try {
            if (this.curClaimEpoch != -1) {
                throw new RuntimeException("Cannot claim leadership because we are already the active controller.");
            }
            this.curClaimEpoch = epoch;
            this.controllerMetrics.setActive(true);
            this.updateWriteOffset(this.lastCommittedOffset);
            this.clusterControl.activate();
            this.snapshotRegistry.getOrCreateSnapshot(this.lastCommittedOffset);
            this.queue.prepend(new ControllerWriteEvent<Void>("completeActivation[" + epoch + "]", new CompleteActivationEvent()));
        }
        catch (Throwable e) {
            this.fatalFaultHandler.handleFault("exception while claiming leadership", e);
        }
    }

    private void updateLastCommittedState(long offset, int epoch, long timestamp, long bytesSinceLastSnapshot) {
        this.lastCommittedOffset = offset;
        this.lastCommittedEpoch = epoch;
        this.lastCommittedTimestamp = timestamp;
        this.committedBytesSinceLastSnapshot = bytesSinceLastSnapshot;
        this.controllerMetrics.setLastCommittedRecordOffset(offset);
        if (!this.isActiveController()) {
            this.controllerMetrics.setLastAppliedRecordOffset(offset);
            this.controllerMetrics.setLastAppliedRecordTimestamp(timestamp);
        }
    }

    private void renounce() {
        try {
            if (this.curClaimEpoch == -1) {
                throw new RuntimeException("Cannot renounce leadership because we are not the current leader.");
            }
            this.raftClient.resign(this.curClaimEpoch);
            this.curClaimEpoch = -1;
            this.controllerMetrics.setActive(false);
            this.purgatory.failAll((Exception)this.newNotControllerException());
            if (!this.snapshotRegistry.hasSnapshot(this.lastCommittedOffset)) {
                throw new RuntimeException("Unable to find last committed offset " + this.lastCommittedEpoch + " in snapshot registry.");
            }
            this.snapshotRegistry.revertToSnapshot(this.lastCommittedOffset);
            this.authorizer.ifPresent(a -> a.loadSnapshot(this.aclControlManager.idToAcl()));
            this.updateWriteOffset(-1L);
            this.clusterControl.deactivate();
            this.cancelMaybeFenceReplicas();
            this.cancelMaybeBalancePartitionLeaders();
            this.cancelNextWriteNoOpRecord();
        }
        catch (Throwable e) {
            this.fatalFaultHandler.handleFault("exception while renouncing leadership", e);
        }
    }

    private <T> void scheduleDeferredWriteEvent(String name, long deadlineNs, ControllerWriteOperation<T> op) {
        ControllerWriteEvent<T> event = new ControllerWriteEvent<T>(name, op, true);
        this.queue.scheduleDeferred(name, (Function)new EventQueue.EarliestDeadlineFunction(deadlineNs), event);
        ((ControllerWriteEvent)event).future.exceptionally(e -> {
            if (e instanceof UnknownServerException && e.getCause() != null && e.getCause() instanceof RejectedExecutionException) {
                this.log.error("Cancelling deferred write event {} because the event queue is now closed.", (Object)name);
                return null;
            }
            if (e instanceof NotControllerException) {
                this.log.debug("Cancelling deferred write event {} because this controller is no longer active.", (Object)name);
                return null;
            }
            this.log.error("Unexpected exception while executing deferred write event {}. Rescheduling for a minute from now.", (Object)name, e);
            this.scheduleDeferredWriteEvent(name, deadlineNs + TimeUnit.NANOSECONDS.convert(1L, TimeUnit.MINUTES), op);
            return null;
        });
    }

    private void rescheduleMaybeFenceStaleBrokers() {
        long nextCheckTimeNs = this.clusterControl.heartbeatManager().nextCheckTimeNs();
        if (nextCheckTimeNs == Long.MAX_VALUE) {
            this.cancelMaybeFenceReplicas();
            return;
        }
        this.scheduleDeferredWriteEvent(MAYBE_FENCE_REPLICAS, nextCheckTimeNs, () -> {
            ControllerResult<Void> result = this.replicationControl.maybeFenceOneStaleBroker();
            this.rescheduleMaybeFenceStaleBrokers();
            return result;
        });
    }

    private void cancelMaybeFenceReplicas() {
        this.queue.cancelDeferred(MAYBE_FENCE_REPLICAS);
    }

    private void maybeScheduleNextBalancePartitionLeaders() {
        if (this.imbalancedScheduled != ImbalanceSchedule.SCHEDULED && this.leaderImbalanceCheckIntervalNs.isPresent() && this.replicationControl.arePartitionLeadersImbalanced()) {
            this.log.debug("Scheduling write event for {} because scheduled ({}), checkIntervalNs ({}) and isImbalanced ({})", new Object[]{MAYBE_BALANCE_PARTITION_LEADERS, this.imbalancedScheduled, this.leaderImbalanceCheckIntervalNs, this.replicationControl.arePartitionLeadersImbalanced()});
            ControllerWriteEvent event = new ControllerWriteEvent(MAYBE_BALANCE_PARTITION_LEADERS, () -> {
                ControllerResult<Boolean> result = this.replicationControl.maybeBalancePartitionLeaders();
                this.imbalancedScheduled = result.response() != false ? ImbalanceSchedule.IMMEDIATELY : ImbalanceSchedule.DEFERRED;
                return result;
            }, true);
            long delayNs = this.time.nanoseconds();
            delayNs = this.imbalancedScheduled == ImbalanceSchedule.DEFERRED ? (delayNs += this.leaderImbalanceCheckIntervalNs.getAsLong()) : (delayNs += TimeUnit.NANOSECONDS.convert(10L, TimeUnit.MILLISECONDS));
            this.queue.scheduleDeferred(MAYBE_BALANCE_PARTITION_LEADERS, (Function)new EventQueue.EarliestDeadlineFunction(delayNs), event);
            this.imbalancedScheduled = ImbalanceSchedule.SCHEDULED;
        }
    }

    private void cancelMaybeBalancePartitionLeaders() {
        this.imbalancedScheduled = ImbalanceSchedule.DEFERRED;
        this.queue.cancelDeferred(MAYBE_BALANCE_PARTITION_LEADERS);
    }

    private void maybeScheduleNextWriteNoOpRecord() {
        if (!this.noOpRecordScheduled && this.maxIdleIntervalNs.isPresent() && this.featureControl.metadataVersion().isNoOpRecordSupported()) {
            this.log.debug("Scheduling write event for {} because maxIdleIntervalNs ({}) and metadataVersion ({})", new Object[]{WRITE_NO_OP_RECORD, this.maxIdleIntervalNs.getAsLong(), this.featureControl.metadataVersion()});
            ControllerWriteEvent event = new ControllerWriteEvent(WRITE_NO_OP_RECORD, () -> {
                this.noOpRecordScheduled = false;
                this.maybeScheduleNextWriteNoOpRecord();
                return ControllerResult.of(Arrays.asList(new ApiMessageAndVersion((ApiMessage)new NoOpRecord(), 0)), null);
            }, true);
            long delayNs = this.time.nanoseconds() + this.maxIdleIntervalNs.getAsLong();
            this.queue.scheduleDeferred(WRITE_NO_OP_RECORD, (Function)new EventQueue.EarliestDeadlineFunction(delayNs), event);
            this.noOpRecordScheduled = true;
        }
    }

    private void cancelNextWriteNoOpRecord() {
        this.noOpRecordScheduled = false;
        this.queue.cancelDeferred(WRITE_NO_OP_RECORD);
    }

    private void handleFeatureControlChange() {
        if (this.isActiveController()) {
            if (this.featureControl.metadataVersion().isNoOpRecordSupported()) {
                this.maybeScheduleNextWriteNoOpRecord();
            } else {
                this.cancelNextWriteNoOpRecord();
            }
        }
    }

    private void replay(ApiMessage message, Optional<OffsetAndEpoch> snapshotId, long batchLastOffset) {
        this.logReplayTracker.replay(message);
        MetadataRecordType type = MetadataRecordType.fromId(message.apiKey());
        switch (type) {
            case REGISTER_BROKER_RECORD: {
                this.clusterControl.replay((RegisterBrokerRecord)message, batchLastOffset);
                break;
            }
            case UNREGISTER_BROKER_RECORD: {
                this.clusterControl.replay((UnregisterBrokerRecord)message);
                break;
            }
            case TOPIC_RECORD: {
                this.replicationControl.replay((TopicRecord)message);
                break;
            }
            case PARTITION_RECORD: {
                this.replicationControl.replay((PartitionRecord)message);
                break;
            }
            case CONFIG_RECORD: {
                this.configurationControl.replay((ConfigRecord)message);
                break;
            }
            case PARTITION_CHANGE_RECORD: {
                this.replicationControl.replay((PartitionChangeRecord)message);
                break;
            }
            case FENCE_BROKER_RECORD: {
                this.clusterControl.replay((FenceBrokerRecord)message);
                break;
            }
            case UNFENCE_BROKER_RECORD: {
                this.clusterControl.replay((UnfenceBrokerRecord)message);
                break;
            }
            case REMOVE_TOPIC_RECORD: {
                this.replicationControl.replay((RemoveTopicRecord)message);
                break;
            }
            case FEATURE_LEVEL_RECORD: {
                this.featureControl.replay((FeatureLevelRecord)message);
                this.handleFeatureControlChange();
                break;
            }
            case CLIENT_QUOTA_RECORD: {
                this.clientQuotaControlManager.replay((ClientQuotaRecord)message);
                break;
            }
            case PRODUCER_IDS_RECORD: {
                this.producerIdControlManager.replay((ProducerIdsRecord)message);
                break;
            }
            case BROKER_REGISTRATION_CHANGE_RECORD: {
                this.clusterControl.replay((BrokerRegistrationChangeRecord)message);
                break;
            }
            case ACCESS_CONTROL_ENTRY_RECORD: {
                this.aclControlManager.replay((AccessControlEntryRecord)message, snapshotId);
                break;
            }
            case REMOVE_ACCESS_CONTROL_ENTRY_RECORD: {
                this.aclControlManager.replay((RemoveAccessControlEntryRecord)message, snapshotId);
                break;
            }
            case NO_OP_RECORD: {
                break;
            }
            default: {
                throw new RuntimeException("Unhandled record type " + (Object)((Object)type));
            }
        }
    }

    private void maybeGenerateSnapshot() {
        if (this.committedBytesSinceLastSnapshot >= this.snapshotMaxNewRecordBytes && this.snapshotGeneratorManager.generator == null) {
            if (!this.isActiveController()) {
                this.snapshotRegistry.getOrCreateSnapshot(this.lastCommittedOffset);
            }
            this.log.info("Generating a snapshot that includes (epoch={}, offset={}) after {} committed bytes since the last snapshot.", new Object[]{this.lastCommittedEpoch, this.lastCommittedOffset, this.committedBytesSinceLastSnapshot});
            this.snapshotGeneratorManager.createSnapshotGenerator(this.lastCommittedOffset, this.lastCommittedEpoch, this.lastCommittedTimestamp);
            this.committedBytesSinceLastSnapshot = 0L;
        }
    }

    private void resetToEmptyState() {
        this.snapshotGeneratorManager.cancel();
        this.snapshotRegistry.reset();
        this.updateLastCommittedState(-1L, -1, -1L, 0L);
    }

    private QuorumController(FaultHandler fatalFaultHandler, LogContext logContext, int nodeId, String clusterId, KafkaEventQueue queue, Time time, KafkaConfigSchema configSchema, RaftClient<ApiMessageAndVersion> raftClient, QuorumFeatures quorumFeatures, short defaultReplicationFactor, int defaultNumPartitions, ReplicaPlacer replicaPlacer, long snapshotMaxNewRecordBytes, OptionalLong leaderImbalanceCheckIntervalNs, OptionalLong maxIdleIntervalNs, long sessionTimeoutNs, ControllerMetrics controllerMetrics, Optional<CreateTopicPolicy> createTopicPolicy, Optional<AlterConfigPolicy> alterConfigPolicy, ConfigurationValidator configurationValidator, Optional<ClusterMetadataAuthorizer> authorizer, Map<String, Object> staticConfig, BootstrapMetadata bootstrapMetadata, int maxRecordsPerBatch) {
        this.fatalFaultHandler = fatalFaultHandler;
        this.logContext = logContext;
        this.log = logContext.logger(QuorumController.class);
        this.nodeId = nodeId;
        this.clusterId = clusterId;
        this.queue = queue;
        this.time = time;
        this.controllerMetrics = controllerMetrics;
        this.snapshotRegistry = new SnapshotRegistry(logContext);
        this.purgatory = new ControllerPurgatory();
        this.resourceExists = new ConfigResourceExistenceChecker();
        this.configurationControl = new ConfigurationControlManager.Builder().setLogContext(logContext).setSnapshotRegistry(this.snapshotRegistry).setKafkaConfigSchema(configSchema).setExistenceChecker(this.resourceExists).setAlterConfigPolicy(alterConfigPolicy).setValidator(configurationValidator).setStaticConfig(staticConfig).setNodeId(nodeId).build();
        this.clientQuotaControlManager = new ClientQuotaControlManager(this.snapshotRegistry);
        this.featureControl = new FeatureControlManager.Builder().setLogContext(logContext).setQuorumFeatures(quorumFeatures).setSnapshotRegistry(this.snapshotRegistry).setMetadataVersion(MetadataVersion.MINIMUM_KRAFT_VERSION).build();
        this.clusterControl = new ClusterControlManager.Builder().setLogContext(logContext).setClusterId(clusterId).setTime(time).setSnapshotRegistry(this.snapshotRegistry).setSessionTimeoutNs(sessionTimeoutNs).setReplicaPlacer(replicaPlacer).setControllerMetrics(controllerMetrics).setFeatureControlManager(this.featureControl).build();
        this.producerIdControlManager = new ProducerIdControlManager(this.clusterControl, this.snapshotRegistry);
        this.snapshotMaxNewRecordBytes = snapshotMaxNewRecordBytes;
        this.leaderImbalanceCheckIntervalNs = leaderImbalanceCheckIntervalNs;
        this.maxIdleIntervalNs = maxIdleIntervalNs;
        this.replicationControl = new ReplicationControlManager.Builder().setSnapshotRegistry(this.snapshotRegistry).setLogContext(logContext).setDefaultReplicationFactor(defaultReplicationFactor).setDefaultNumPartitions(defaultNumPartitions).setMaxElectionsPerImbalance(1000).setConfigurationControl(this.configurationControl).setClusterControl(this.clusterControl).setControllerMetrics(controllerMetrics).setCreateTopicPolicy(createTopicPolicy).setFeatureControl(this.featureControl).build();
        this.authorizer = authorizer;
        authorizer.ifPresent(a -> a.setAclMutator(this));
        this.aclControlManager = new AclControlManager(this.snapshotRegistry, authorizer);
        this.logReplayTracker = new LogReplayTracker.Builder().setLogContext(logContext).build();
        this.raftClient = raftClient;
        this.bootstrapMetadata = bootstrapMetadata;
        this.maxRecordsPerBatch = maxRecordsPerBatch;
        this.metaLogListener = new QuorumMetaLogListener();
        this.curClaimEpoch = -1;
        this.needToCompleteAuthorizerLoad = authorizer.isPresent();
        this.updateWriteOffset(-1L);
        this.resetToEmptyState();
        this.log.info("Creating new QuorumController with clusterId {}, authorizer {}.", (Object)clusterId, authorizer);
        this.raftClient.register((RaftClient.Listener)this.metaLogListener);
    }

    @Override
    public CompletableFuture<AlterPartitionResponseData> alterPartition(ControllerRequestContext context, AlterPartitionRequestData request) {
        if (request.topics().isEmpty()) {
            return CompletableFuture.completedFuture(new AlterPartitionResponseData());
        }
        return this.appendWriteEvent("alterPartition", context.deadlineNs(), () -> this.replicationControl.alterPartition(context, request));
    }

    @Override
    public CompletableFuture<CreateTopicsResponseData> createTopics(ControllerRequestContext context, CreateTopicsRequestData request, Set<String> describable) {
        if (request.topics().isEmpty()) {
            return CompletableFuture.completedFuture(new CreateTopicsResponseData());
        }
        return this.appendWriteEvent("createTopics", context.deadlineNs(), () -> this.replicationControl.createTopics(request, describable));
    }

    @Override
    public CompletableFuture<Void> unregisterBroker(ControllerRequestContext context, int brokerId) {
        return this.appendWriteEvent("unregisterBroker", context.deadlineNs(), () -> this.replicationControl.unregisterBroker(brokerId));
    }

    @Override
    public CompletableFuture<Map<String, ResultOrError<Uuid>>> findTopicIds(ControllerRequestContext context, Collection<String> names) {
        if (names.isEmpty()) {
            return CompletableFuture.completedFuture(Collections.emptyMap());
        }
        return this.appendReadEvent("findTopicIds", context.deadlineNs(), () -> this.replicationControl.findTopicIds(this.lastCommittedOffset, names));
    }

    @Override
    public CompletableFuture<Map<String, Uuid>> findAllTopicIds(ControllerRequestContext context) {
        return this.appendReadEvent("findAllTopicIds", context.deadlineNs(), () -> this.replicationControl.findAllTopicIds(this.lastCommittedOffset));
    }

    @Override
    public CompletableFuture<Map<Uuid, ResultOrError<String>>> findTopicNames(ControllerRequestContext context, Collection<Uuid> ids) {
        if (ids.isEmpty()) {
            return CompletableFuture.completedFuture(Collections.emptyMap());
        }
        return this.appendReadEvent("findTopicNames", context.deadlineNs(), () -> this.replicationControl.findTopicNames(this.lastCommittedOffset, ids));
    }

    @Override
    public CompletableFuture<Map<Uuid, ApiError>> deleteTopics(ControllerRequestContext context, Collection<Uuid> ids) {
        if (ids.isEmpty()) {
            return CompletableFuture.completedFuture(Collections.emptyMap());
        }
        return this.appendWriteEvent("deleteTopics", context.deadlineNs(), () -> this.replicationControl.deleteTopics(ids));
    }

    @Override
    public CompletableFuture<Map<ConfigResource, ResultOrError<Map<String, String>>>> describeConfigs(ControllerRequestContext context, Map<ConfigResource, Collection<String>> resources) {
        return this.appendReadEvent("describeConfigs", context.deadlineNs(), () -> this.configurationControl.describeConfigs(this.lastCommittedOffset, resources));
    }

    @Override
    public CompletableFuture<ElectLeadersResponseData> electLeaders(ControllerRequestContext context, ElectLeadersRequestData request) {
        if (request.topicPartitions() != null && request.topicPartitions().isEmpty()) {
            return CompletableFuture.completedFuture(new ElectLeadersResponseData());
        }
        return this.appendWriteEvent("electLeaders", context.deadlineNs(), () -> this.replicationControl.electLeaders(request));
    }

    @Override
    public CompletableFuture<FinalizedControllerFeatures> finalizedFeatures(ControllerRequestContext context) {
        return this.appendReadEvent("getFinalizedFeatures", context.deadlineNs(), () -> this.featureControl.finalizedFeatures(this.lastCommittedOffset));
    }

    @Override
    public CompletableFuture<Map<ConfigResource, ApiError>> incrementalAlterConfigs(ControllerRequestContext context, Map<ConfigResource, Map<String, Map.Entry<AlterConfigOp.OpType, String>>> configChanges, boolean validateOnly) {
        if (configChanges.isEmpty()) {
            return CompletableFuture.completedFuture(Collections.emptyMap());
        }
        return this.appendWriteEvent("incrementalAlterConfigs", context.deadlineNs(), () -> {
            ControllerResult<Map<ConfigResource, ApiError>> result = this.configurationControl.incrementalAlterConfigs(configChanges, false);
            if (validateOnly) {
                return result.withoutRecords();
            }
            return result;
        });
    }

    @Override
    public CompletableFuture<AlterPartitionReassignmentsResponseData> alterPartitionReassignments(ControllerRequestContext context, AlterPartitionReassignmentsRequestData request) {
        if (request.topics().isEmpty()) {
            return CompletableFuture.completedFuture(new AlterPartitionReassignmentsResponseData());
        }
        return this.appendWriteEvent("alterPartitionReassignments", context.deadlineNs(), () -> this.replicationControl.alterPartitionReassignments(request));
    }

    @Override
    public CompletableFuture<ListPartitionReassignmentsResponseData> listPartitionReassignments(ControllerRequestContext context, ListPartitionReassignmentsRequestData request) {
        if (request.topics() != null && request.topics().isEmpty()) {
            return CompletableFuture.completedFuture(new ListPartitionReassignmentsResponseData().setErrorMessage(null));
        }
        return this.appendReadEvent("listPartitionReassignments", context.deadlineNs(), () -> this.replicationControl.listPartitionReassignments(request.topics()));
    }

    @Override
    public CompletableFuture<Map<ConfigResource, ApiError>> legacyAlterConfigs(ControllerRequestContext context, Map<ConfigResource, Map<String, String>> newConfigs, boolean validateOnly) {
        if (newConfigs.isEmpty()) {
            return CompletableFuture.completedFuture(Collections.emptyMap());
        }
        return this.appendWriteEvent("legacyAlterConfigs", context.deadlineNs(), () -> {
            ControllerResult<Map<ConfigResource, ApiError>> result = this.configurationControl.legacyAlterConfigs(newConfigs, false);
            if (validateOnly) {
                return result.withoutRecords();
            }
            return result;
        });
    }

    @Override
    public CompletableFuture<BrokerHeartbeatReply> processBrokerHeartbeat(ControllerRequestContext context, final BrokerHeartbeatRequestData request) {
        return this.appendWriteEvent("processBrokerHeartbeat", context.deadlineNs(), new ControllerWriteOperation<BrokerHeartbeatReply>(){
            private final int brokerId;
            private boolean inControlledShutdown;
            {
                this.brokerId = request.brokerId();
                this.inControlledShutdown = false;
            }

            @Override
            public ControllerResult<BrokerHeartbeatReply> generateRecordsAndResult() {
                OptionalLong offsetForRegisterBrokerRecord = QuorumController.this.clusterControl.registerBrokerRecordOffset(this.brokerId);
                if (!offsetForRegisterBrokerRecord.isPresent()) {
                    throw new StaleBrokerEpochException(String.format("Receive a heartbeat from broker %d before registration", this.brokerId));
                }
                ControllerResult<BrokerHeartbeatReply> result = QuorumController.this.replicationControl.processBrokerHeartbeat(request, offsetForRegisterBrokerRecord.getAsLong());
                this.inControlledShutdown = result.response().inControlledShutdown();
                QuorumController.this.rescheduleMaybeFenceStaleBrokers();
                return result;
            }

            @Override
            public void processBatchEndOffset(long offset) {
                if (this.inControlledShutdown) {
                    QuorumController.this.clusterControl.heartbeatManager().maybeUpdateControlledShutdownOffset(this.brokerId, offset);
                }
            }
        });
    }

    @Override
    public CompletableFuture<BrokerRegistrationReply> registerBroker(ControllerRequestContext context, BrokerRegistrationRequestData request) {
        return this.appendWriteEvent("registerBroker", context.deadlineNs(), () -> {
            ControllerResult<BrokerRegistrationReply> result = this.clusterControl.registerBroker(request, this.writeOffset + 1L, this.featureControl.finalizedFeatures(Long.MAX_VALUE));
            this.rescheduleMaybeFenceStaleBrokers();
            return result;
        });
    }

    @Override
    public CompletableFuture<Map<ClientQuotaEntity, ApiError>> alterClientQuotas(ControllerRequestContext context, Collection<ClientQuotaAlteration> quotaAlterations, boolean validateOnly) {
        if (quotaAlterations.isEmpty()) {
            return CompletableFuture.completedFuture(Collections.emptyMap());
        }
        return this.appendWriteEvent("alterClientQuotas", context.deadlineNs(), () -> {
            ControllerResult<Map<ClientQuotaEntity, ApiError>> result = this.clientQuotaControlManager.alterClientQuotas(quotaAlterations);
            if (validateOnly) {
                return result.withoutRecords();
            }
            return result;
        });
    }

    @Override
    public CompletableFuture<AllocateProducerIdsResponseData> allocateProducerIds(ControllerRequestContext context, AllocateProducerIdsRequestData request) {
        return this.appendWriteEvent("allocateProducerIds", context.deadlineNs(), () -> this.producerIdControlManager.generateNextProducerId(request.brokerId(), request.brokerEpoch())).thenApply(result -> new AllocateProducerIdsResponseData().setProducerIdStart(result.firstProducerId()).setProducerIdLen(result.size()));
    }

    @Override
    public CompletableFuture<UpdateFeaturesResponseData> updateFeatures(ControllerRequestContext context, UpdateFeaturesRequestData request) {
        return this.appendWriteEvent("updateFeatures", context.deadlineNs(), () -> {
            HashMap<String, Short> updates = new HashMap<String, Short>();
            HashMap<String, FeatureUpdate.UpgradeType> upgradeTypes = new HashMap<String, FeatureUpdate.UpgradeType>();
            request.featureUpdates().forEach(featureUpdate -> {
                String featureName = featureUpdate.feature();
                upgradeTypes.put(featureName, FeatureUpdate.UpgradeType.fromCode((int)featureUpdate.upgradeType()));
                updates.put(featureName, featureUpdate.maxVersionLevel());
            });
            return this.featureControl.updateFeatures(updates, upgradeTypes, this.clusterControl.brokerSupportedVersions(), request.validateOnly());
        }).thenApply(result -> {
            UpdateFeaturesResponseData responseData = new UpdateFeaturesResponseData();
            responseData.setResults(new UpdateFeaturesResponseData.UpdatableFeatureResultCollection(result.size()));
            result.forEach((featureName, error) -> responseData.results().add((ImplicitLinkedHashCollection.Element)new UpdateFeaturesResponseData.UpdatableFeatureResult().setFeature(featureName).setErrorCode(error.error().code()).setErrorMessage(error.message())));
            return responseData;
        });
    }

    @Override
    public CompletableFuture<List<CreatePartitionsResponseData.CreatePartitionsTopicResult>> createPartitions(ControllerRequestContext context, List<CreatePartitionsRequestData.CreatePartitionsTopic> topics, boolean validateOnly) {
        if (topics.isEmpty()) {
            return CompletableFuture.completedFuture(Collections.emptyList());
        }
        return this.appendWriteEvent("createPartitions", context.deadlineNs(), () -> {
            ControllerResult<List<CreatePartitionsResponseData.CreatePartitionsTopicResult>> result = this.replicationControl.createPartitions(topics);
            if (validateOnly) {
                this.log.debug("Validate-only CreatePartitions result(s): {}", result.response());
                return result.withoutRecords();
            }
            this.log.debug("CreatePartitions result(s): {}", result.response());
            return result;
        });
    }

    @Override
    public CompletableFuture<Long> beginWritingSnapshot() {
        CompletableFuture<Long> future = new CompletableFuture<Long>();
        this.appendControlEvent("beginWritingSnapshot", () -> {
            if (this.snapshotGeneratorManager.generator == null) {
                this.snapshotGeneratorManager.createSnapshotGenerator(this.lastCommittedOffset, this.lastCommittedEpoch, this.lastCommittedTimestamp);
            }
            future.complete(this.snapshotGeneratorManager.generator.lastContainedLogOffset());
        });
        return future;
    }

    @Override
    public CompletableFuture<List<AclCreateResult>> createAcls(ControllerRequestContext context, List<AclBinding> aclBindings) {
        return this.appendWriteEvent("createAcls", context.deadlineNs(), () -> this.aclControlManager.createAcls(aclBindings));
    }

    @Override
    public CompletableFuture<List<AclDeleteResult>> deleteAcls(ControllerRequestContext context, List<AclBindingFilter> filters) {
        return this.appendWriteEvent("deleteAcls", context.deadlineNs(), () -> this.aclControlManager.deleteAcls(filters));
    }

    @Override
    public CompletableFuture<Void> waitForReadyBrokers(int minBrokers) {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        this.appendControlEvent("waitForReadyBrokers", () -> this.clusterControl.addReadyBrokersFuture(future, minBrokers));
        return future;
    }

    @Override
    public void beginShutdown() {
        this.queue.beginShutdown("QuorumController#beginShutdown");
    }

    public int nodeId() {
        return this.nodeId;
    }

    public String clusterId() {
        return this.clusterId;
    }

    @Override
    public int curClaimEpoch() {
        return this.curClaimEpoch;
    }

    MetadataVersion metadataVersion() {
        return this.featureControl.metadataVersion();
    }

    @Override
    public void close() throws InterruptedException {
        this.queue.close();
        this.controllerMetrics.close();
    }

    CountDownLatch pause() {
        CountDownLatch latch = new CountDownLatch(1);
        this.appendControlEvent("pause", () -> {
            try {
                latch.await();
            }
            catch (InterruptedException e) {
                this.log.info("Interrupted while waiting for unpause.", (Throwable)e);
            }
        });
        return latch;
    }

    Time time() {
        return this.time;
    }

    private static enum ImbalanceSchedule {
        SCHEDULED,
        DEFERRED,
        IMMEDIATELY;

    }

    class CompleteActivationEvent
    implements ControllerWriteOperation<Void> {
        CompleteActivationEvent() {
        }

        @Override
        public ControllerResult<Void> generateRecordsAndResult() throws Exception {
            ArrayList<ApiMessageAndVersion> records = new ArrayList<ApiMessageAndVersion>();
            if (QuorumController.this.logReplayTracker.empty()) {
                QuorumController.this.log.info("The metadata log appears to be empty. Appending {} bootstrap record(s) at metadata.version {} from {}.", new Object[]{QuorumController.this.bootstrapMetadata.records().size(), QuorumController.this.bootstrapMetadata.metadataVersion(), QuorumController.this.bootstrapMetadata.source()});
                records.addAll(QuorumController.this.bootstrapMetadata.records());
            } else if (QuorumController.this.featureControl.metadataVersion().equals((Object)MetadataVersion.MINIMUM_KRAFT_VERSION)) {
                QuorumController.this.log.info("No metadata.version feature level record was found in the log. Treating the log as version {}.", (Object)MetadataVersion.MINIMUM_KRAFT_VERSION);
            }
            return ControllerResult.atomicOf(records, null);
        }

        @Override
        public void processBatchEndOffset(long offset) {
            QuorumController.this.maybeScheduleNextBalancePartitionLeaders();
            QuorumController.this.maybeScheduleNextWriteNoOpRecord();
        }
    }

    class QuorumMetaLogListener
    implements RaftClient.Listener<ApiMessageAndVersion> {
        QuorumMetaLogListener() {
        }

        public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
            this.appendRaftEvent("handleCommit[baseOffset=" + reader.baseOffset() + "]", () -> {
                try {
                    QuorumController.this.maybeCompleteAuthorizerInitialLoad();
                    long processedRecordsSize = 0L;
                    boolean isActive = QuorumController.this.isActiveController();
                    while (reader.hasNext()) {
                        Batch batch = (Batch)reader.next();
                        long offset = batch.lastOffset();
                        int epoch = batch.epoch();
                        List messages = batch.records();
                        if (isActive) {
                            QuorumController.this.log.debug("Completing purgatory items up to offset {} and epoch {}.", (Object)offset, (Object)epoch);
                            QuorumController.this.purgatory.completeUpTo(offset);
                            QuorumController.this.snapshotRegistry.deleteSnapshotsUpTo(QuorumController.this.snapshotGeneratorManager.snapshotLastOffsetFromLog().orElse(offset));
                        } else {
                            if (QuorumController.this.log.isDebugEnabled()) {
                                if (QuorumController.this.log.isTraceEnabled()) {
                                    QuorumController.this.log.trace("Replaying commits from the active node up to offset {} and epoch {}: {}.", new Object[]{offset, epoch, messages.stream().map(ApiMessageAndVersion::toString).collect(Collectors.joining(", "))});
                                } else {
                                    QuorumController.this.log.debug("Replaying commits from the active node up to offset {} and epoch {}.", (Object)offset, (Object)epoch);
                                }
                            }
                            int i = 1;
                            for (ApiMessageAndVersion message : messages) {
                                try {
                                    QuorumController.this.replay(message.message(), Optional.empty(), offset);
                                }
                                catch (Throwable e) {
                                    String failureMessage = String.format("Unable to apply %s record on standby controller, which was %d of %d record(s) in the batch with baseOffset %d.", message.message().getClass().getSimpleName(), i, messages.size(), batch.baseOffset());
                                    throw QuorumController.this.fatalFaultHandler.handleFault(failureMessage, e);
                                }
                                ++i;
                            }
                        }
                        QuorumController.this.updateLastCommittedState(offset, epoch, batch.appendTimestamp(), QuorumController.this.committedBytesSinceLastSnapshot + (long)batch.sizeInBytes());
                    }
                    QuorumController.this.maybeGenerateSnapshot();
                }
                finally {
                    reader.close();
                }
            });
        }

        public void handleSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
            this.appendRaftEvent(String.format("handleSnapshot[snapshotId=%s]", reader.snapshotId()), () -> {
                try {
                    if (QuorumController.this.isActiveController()) {
                        throw QuorumController.this.fatalFaultHandler.handleFault(String.format("Asked to load snapshot (%s) when it is the active controller (%d)", reader.snapshotId(), QuorumController.this.curClaimEpoch));
                    }
                    QuorumController.this.log.info("Starting to replay snapshot ({}), from last commit offset ({}) and epoch ({})", new Object[]{reader.snapshotId(), QuorumController.this.lastCommittedOffset, QuorumController.this.lastCommittedEpoch});
                    QuorumController.this.resetToEmptyState();
                    while (reader.hasNext()) {
                        Batch batch = (Batch)reader.next();
                        long offset = batch.lastOffset();
                        List messages = batch.records();
                        if (QuorumController.this.log.isDebugEnabled()) {
                            if (QuorumController.this.log.isTraceEnabled()) {
                                QuorumController.this.log.trace("Replaying snapshot ({}) batch with last offset of {}: {}", new Object[]{reader.snapshotId(), offset, messages.stream().map(ApiMessageAndVersion::toString).collect(Collectors.joining(", "))});
                            } else {
                                QuorumController.this.log.debug("Replaying snapshot ({}) batch with last offset of {}", (Object)reader.snapshotId(), (Object)offset);
                            }
                        }
                        int i = 1;
                        for (ApiMessageAndVersion message : messages) {
                            try {
                                QuorumController.this.replay(message.message(), Optional.of(reader.snapshotId()), reader.lastContainedLogOffset());
                            }
                            catch (Throwable e) {
                                String failureMessage = String.format("Unable to apply %s record from snapshot %s on standby controller, which was %d of %d record(s) in the batch with baseOffset %d.", message.message().getClass().getSimpleName(), reader.snapshotId(), i, messages.size(), batch.baseOffset());
                                throw QuorumController.this.fatalFaultHandler.handleFault(failureMessage, e);
                            }
                            ++i;
                        }
                    }
                    QuorumController.this.updateLastCommittedState(reader.lastContainedLogOffset(), reader.lastContainedLogEpoch(), reader.lastContainedLogTimestamp(), 0L);
                    QuorumController.this.snapshotRegistry.getOrCreateSnapshot(QuorumController.this.lastCommittedOffset);
                    QuorumController.this.authorizer.ifPresent(a -> a.loadSnapshot(QuorumController.this.aclControlManager.idToAcl()));
                }
                finally {
                    reader.close();
                }
            });
        }

        public void handleLeaderChange(LeaderAndEpoch newLeader) {
            this.appendRaftEvent("handleLeaderChange[" + newLeader.epoch() + "]", () -> {
                String newLeaderName;
                String string = newLeaderName = newLeader.leaderId().isPresent() ? String.valueOf(newLeader.leaderId().getAsInt()) : "(none)";
                if (QuorumController.this.isActiveController()) {
                    if (newLeader.isLeader(QuorumController.this.nodeId)) {
                        QuorumController.this.log.warn("We were the leader in epoch {}, and are still the leader in the new epoch {}.", (Object)QuorumController.this.curClaimEpoch, (Object)newLeader.epoch());
                        QuorumController.this.curClaimEpoch = newLeader.epoch();
                    } else {
                        QuorumController.this.log.warn("Renouncing the leadership due to a metadata log event. We were the leader at epoch {}, but in the new epoch {}, the leader is {}. Reverting to last committed offset {}.", new Object[]{QuorumController.this.curClaimEpoch, newLeader.epoch(), newLeaderName, QuorumController.this.lastCommittedOffset});
                        QuorumController.this.renounce();
                    }
                } else if (newLeader.isLeader(QuorumController.this.nodeId)) {
                    QuorumController.this.log.info("Becoming the active controller at epoch {}, committed offset {}, committed epoch {}", new Object[]{newLeader.epoch(), QuorumController.this.lastCommittedOffset, QuorumController.this.lastCommittedEpoch});
                    QuorumController.this.claim(newLeader.epoch());
                } else {
                    QuorumController.this.log.info("In the new epoch {}, the leader is {}.", (Object)newLeader.epoch(), (Object)newLeaderName);
                }
            });
        }

        public void beginShutdown() {
            QuorumController.this.queue.beginShutdown("MetaLogManager.Listener");
        }

        private void appendRaftEvent(String name, Runnable runnable) {
            QuorumController.this.appendControlEvent(name, () -> {
                if (this != QuorumController.this.metaLogListener) {
                    QuorumController.this.log.debug("Ignoring {} raft event from an old registration", (Object)name);
                } else {
                    try {
                        runnable.run();
                    }
                    finally {
                        QuorumController.this.maybeCompleteAuthorizerInitialLoad();
                    }
                }
            });
        }
    }

    class ControllerWriteEvent<T>
    implements EventQueue.Event,
    DeferredEvent {
        private final String name;
        private final CompletableFuture<T> future;
        private final ControllerWriteOperation<T> op;
        private final long eventCreatedTimeNs;
        private final boolean deferred;
        private OptionalLong startProcessingTimeNs;
        private ControllerResultAndOffset<T> resultAndOffset;

        ControllerWriteEvent(String name, ControllerWriteOperation<T> op) {
            this(name, op, false);
        }

        ControllerWriteEvent(String name, ControllerWriteOperation<T> op, boolean deferred) {
            this.eventCreatedTimeNs = QuorumController.this.time.nanoseconds();
            this.startProcessingTimeNs = OptionalLong.empty();
            this.name = name;
            this.future = new CompletableFuture();
            this.op = op;
            this.deferred = deferred;
            this.resultAndOffset = null;
        }

        CompletableFuture<T> future() {
            return this.future;
        }

        public void run() throws Exception {
            long now = QuorumController.this.time.nanoseconds();
            if (!this.deferred) {
                QuorumController.this.controllerMetrics.updateEventQueueTime(TimeUnit.NANOSECONDS.toMillis(now - this.eventCreatedTimeNs));
            }
            final int controllerEpoch = QuorumController.this.curClaimEpoch;
            if (!QuorumController.this.isActiveController()) {
                throw QuorumController.this.newNotControllerException();
            }
            this.startProcessingTimeNs = OptionalLong.of(now);
            ControllerResult<T> result = this.op.generateRecordsAndResult();
            if (result.records().isEmpty()) {
                this.op.processBatchEndOffset(QuorumController.this.writeOffset);
                OptionalLong maybeOffset = QuorumController.this.purgatory.highestPendingOffset();
                if (!maybeOffset.isPresent()) {
                    this.resultAndOffset = ControllerResultAndOffset.of(-1L, result);
                    QuorumController.this.log.debug("Completing read-only operation {} immediately because the purgatory is empty.", (Object)this);
                    this.complete(null);
                } else {
                    this.resultAndOffset = ControllerResultAndOffset.of(maybeOffset.getAsLong(), result);
                    QuorumController.this.log.debug("Read-only operation {} will be completed when the log reaches offset {}", (Object)this, (Object)this.resultAndOffset.offset());
                }
            } else {
                long offset = QuorumController.appendRecords(QuorumController.this.log, result, QuorumController.this.maxRecordsPerBatch, new Function<List<ApiMessageAndVersion>, Long>(){
                    private long prevEndOffset;
                    {
                        this.prevEndOffset = QuorumController.this.writeOffset;
                    }

                    @Override
                    public Long apply(List<ApiMessageAndVersion> records) {
                        int i = 1;
                        for (ApiMessageAndVersion message : records) {
                            try {
                                QuorumController.this.replay(message.message(), Optional.empty(), this.prevEndOffset + (long)records.size());
                            }
                            catch (Throwable e) {
                                String failureMessage = String.format("Unable to apply %s record, which was %d of %d record(s) in the batch following last write offset %d.", message.message().getClass().getSimpleName(), i, records.size(), this.prevEndOffset);
                                throw QuorumController.this.fatalFaultHandler.handleFault(failureMessage, e);
                            }
                            ++i;
                        }
                        this.prevEndOffset = QuorumController.this.raftClient.scheduleAtomicAppend(controllerEpoch, records);
                        QuorumController.this.snapshotRegistry.getOrCreateSnapshot(this.prevEndOffset);
                        return this.prevEndOffset;
                    }
                });
                this.op.processBatchEndOffset(offset);
                QuorumController.this.updateWriteOffset(offset);
                this.resultAndOffset = ControllerResultAndOffset.of(offset, result);
                QuorumController.this.log.debug("Read-write operation {} will be completed when the log reaches offset {}.", (Object)this, (Object)this.resultAndOffset.offset());
            }
            QuorumController.this.maybeScheduleNextBalancePartitionLeaders();
            if (!this.future.isDone()) {
                QuorumController.this.purgatory.add(this.resultAndOffset.offset(), this);
            }
        }

        public void handleException(Throwable exception) {
            this.complete(exception);
        }

        @Override
        public void complete(Throwable exception) {
            if (exception == null) {
                QuorumController.this.handleEventEnd(this.toString(), this.startProcessingTimeNs.getAsLong());
                this.future.complete(this.resultAndOffset.response());
            } else {
                this.future.completeExceptionally(QuorumController.this.handleEventException(this.name, this.startProcessingTimeNs, exception));
            }
        }

        public String toString() {
            return this.name + "(" + System.identityHashCode(this) + ")";
        }
    }

    static interface ControllerWriteOperation<T> {
        public ControllerResult<T> generateRecordsAndResult() throws Exception;

        default public void processBatchEndOffset(long offset) {
        }
    }

    class ControllerReadEvent<T>
    implements EventQueue.Event {
        private final String name;
        private final CompletableFuture<T> future;
        private final Supplier<T> handler;
        private final long eventCreatedTimeNs;
        private OptionalLong startProcessingTimeNs;

        ControllerReadEvent(String name, Supplier<T> handler) {
            this.eventCreatedTimeNs = QuorumController.this.time.nanoseconds();
            this.startProcessingTimeNs = OptionalLong.empty();
            this.name = name;
            this.future = new CompletableFuture();
            this.handler = handler;
        }

        CompletableFuture<T> future() {
            return this.future;
        }

        public void run() throws Exception {
            long now = QuorumController.this.time.nanoseconds();
            QuorumController.this.controllerMetrics.updateEventQueueTime(TimeUnit.NANOSECONDS.toMillis(now - this.eventCreatedTimeNs));
            this.startProcessingTimeNs = OptionalLong.of(now);
            T value = this.handler.get();
            QuorumController.this.handleEventEnd(this.toString(), this.startProcessingTimeNs.getAsLong());
            this.future.complete(value);
        }

        public void handleException(Throwable exception) {
            this.future.completeExceptionally(QuorumController.this.handleEventException(this.name, this.startProcessingTimeNs, exception));
        }

        public String toString() {
            return this.name + "(" + System.identityHashCode(this) + ")";
        }
    }

    class SnapshotGeneratorManager
    implements Runnable {
        private SnapshotGenerator generator = null;

        SnapshotGeneratorManager() {
        }

        void createSnapshotGenerator(long committedOffset, int committedEpoch, long committedTimestamp) {
            if (this.generator != null) {
                throw new RuntimeException("Snapshot generator already exists.");
            }
            if (!QuorumController.this.snapshotRegistry.hasSnapshot(committedOffset)) {
                throw new RuntimeException(String.format("Cannot generate a snapshot at committed offset %d because it does not exists in the snapshot registry.", committedOffset));
            }
            Optional writer = QuorumController.this.raftClient.createSnapshot(committedOffset, committedEpoch, committedTimestamp);
            if (writer.isPresent()) {
                this.generator = new SnapshotGenerator(QuorumController.this.logContext, (SnapshotWriter<ApiMessageAndVersion>)((SnapshotWriter)writer.get()), 10, Arrays.asList(new SnapshotGenerator.Section("features", QuorumController.this.featureControl.iterator(committedOffset)), new SnapshotGenerator.Section("cluster", QuorumController.this.clusterControl.iterator(committedOffset)), new SnapshotGenerator.Section("replication", QuorumController.this.replicationControl.iterator(committedOffset)), new SnapshotGenerator.Section("configuration", QuorumController.this.configurationControl.iterator(committedOffset)), new SnapshotGenerator.Section("clientQuotas", QuorumController.this.clientQuotaControlManager.iterator(committedOffset)), new SnapshotGenerator.Section("producerIds", QuorumController.this.producerIdControlManager.iterator(committedOffset)), new SnapshotGenerator.Section("acls", QuorumController.this.aclControlManager.iterator(committedOffset))));
                this.reschedule(0L);
            } else {
                QuorumController.this.log.info("Skipping generation of snapshot for committed offset {} and epoch {} since it already exists", (Object)committedOffset, (Object)committedEpoch);
            }
        }

        void cancel() {
            if (this.generator == null) {
                return;
            }
            QuorumController.this.log.error("Cancelling snapshot {}", (Object)this.generator.lastContainedLogOffset());
            this.generator.writer().close();
            this.generator = null;
            QuorumController.this.snapshotRegistry.deleteSnapshotsUpTo(QuorumController.this.lastCommittedOffset);
            QuorumController.this.queue.cancelDeferred(QuorumController.GENERATE_SNAPSHOT);
        }

        void reschedule(long delayNs) {
            ControlEvent event = new ControlEvent(QuorumController.GENERATE_SNAPSHOT, this);
            QuorumController.this.queue.scheduleDeferred(event.name, (Function)new EventQueue.EarliestDeadlineFunction(QuorumController.this.time.nanoseconds() + delayNs), (EventQueue.Event)event);
        }

        @Override
        public void run() {
            OptionalLong nextDelay;
            if (this.generator == null) {
                QuorumController.this.log.debug("No snapshot is in progress.");
                return;
            }
            try {
                nextDelay = this.generator.generateBatches();
            }
            catch (Exception e) {
                QuorumController.this.log.error("Error while generating snapshot {}", (Object)this.generator.lastContainedLogOffset(), (Object)e);
                this.generator.writer().close();
                this.generator = null;
                return;
            }
            if (!nextDelay.isPresent()) {
                QuorumController.this.log.info("Finished generating snapshot {}.", (Object)this.generator.lastContainedLogOffset());
                this.generator.writer().close();
                this.generator = null;
                QuorumController.this.snapshotRegistry.deleteSnapshotsUpTo(QuorumController.this.lastCommittedOffset);
                return;
            }
            this.reschedule(nextDelay.getAsLong());
        }

        OptionalLong snapshotLastOffsetFromLog() {
            if (this.generator == null) {
                return OptionalLong.empty();
            }
            return OptionalLong.of(this.generator.lastContainedLogOffset());
        }
    }

    class ControlEvent
    implements EventQueue.Event {
        private final String name;
        private final Runnable handler;
        private final long eventCreatedTimeNs;
        private OptionalLong startProcessingTimeNs;

        ControlEvent(String name, Runnable handler) {
            this.eventCreatedTimeNs = QuorumController.this.time.nanoseconds();
            this.startProcessingTimeNs = OptionalLong.empty();
            this.name = name;
            this.handler = handler;
        }

        public void run() throws Exception {
            long now = QuorumController.this.time.nanoseconds();
            QuorumController.this.controllerMetrics.updateEventQueueTime(TimeUnit.NANOSECONDS.toMillis(now - this.eventCreatedTimeNs));
            this.startProcessingTimeNs = OptionalLong.of(now);
            QuorumController.this.log.debug("Executing {}.", (Object)this);
            this.handler.run();
            QuorumController.this.handleEventEnd(this.toString(), this.startProcessingTimeNs.getAsLong());
        }

        public void handleException(Throwable exception) {
            QuorumController.this.handleEventException(this.name, this.startProcessingTimeNs, exception);
        }

        public String toString() {
            return this.name;
        }
    }

    class ConfigResourceExistenceChecker
    implements Consumer<ConfigResource> {
        ConfigResourceExistenceChecker() {
        }

        @Override
        public void accept(ConfigResource configResource) {
            switch (configResource.type()) {
                case BROKER_LOGGER: {
                    break;
                }
                case BROKER: {
                    int brokerId;
                    if (configResource.name().isEmpty()) break;
                    try {
                        brokerId = Integer.parseInt(configResource.name());
                    }
                    catch (NumberFormatException e) {
                        throw new InvalidRequestException("Invalid broker name " + configResource.name());
                    }
                    if (QuorumController.this.clusterControl.brokerRegistrations().containsKey(brokerId)) break;
                    throw new BrokerIdNotRegisteredException("No broker with id " + brokerId + " found.");
                }
                case TOPIC: {
                    if (QuorumController.this.replicationControl.getTopicId(configResource.name()) != null) break;
                    throw new UnknownTopicOrPartitionException("The topic '" + configResource.name() + "' does not exist.");
                }
            }
        }
    }

    public static class Builder {
        private final int nodeId;
        private final String clusterId;
        private FaultHandler fatalFaultHandler = null;
        private Time time = Time.SYSTEM;
        private String threadNamePrefix = null;
        private LogContext logContext = null;
        private KafkaConfigSchema configSchema = KafkaConfigSchema.EMPTY;
        private RaftClient<ApiMessageAndVersion> raftClient = null;
        private QuorumFeatures quorumFeatures = null;
        private short defaultReplicationFactor = (short)3;
        private int defaultNumPartitions = 1;
        private ReplicaPlacer replicaPlacer = new StripedReplicaPlacer(new Random());
        private long snapshotMaxNewRecordBytes = Long.MAX_VALUE;
        private OptionalLong leaderImbalanceCheckIntervalNs = OptionalLong.empty();
        private OptionalLong maxIdleIntervalNs = OptionalLong.empty();
        private long sessionTimeoutNs = ClusterControlManager.DEFAULT_SESSION_TIMEOUT_NS;
        private ControllerMetrics controllerMetrics = null;
        private Optional<CreateTopicPolicy> createTopicPolicy = Optional.empty();
        private Optional<AlterConfigPolicy> alterConfigPolicy = Optional.empty();
        private ConfigurationValidator configurationValidator = ConfigurationValidator.NO_OP;
        private Optional<ClusterMetadataAuthorizer> authorizer = Optional.empty();
        private Map<String, Object> staticConfig = Collections.emptyMap();
        private BootstrapMetadata bootstrapMetadata = null;
        private int maxRecordsPerBatch = 10000;

        public Builder(int nodeId, String clusterId) {
            this.nodeId = nodeId;
            this.clusterId = clusterId;
        }

        public Builder setFatalFaultHandler(FaultHandler fatalFaultHandler) {
            this.fatalFaultHandler = fatalFaultHandler;
            return this;
        }

        public int nodeId() {
            return this.nodeId;
        }

        public Builder setTime(Time time) {
            this.time = time;
            return this;
        }

        public Builder setThreadNamePrefix(String threadNamePrefix) {
            this.threadNamePrefix = threadNamePrefix;
            return this;
        }

        public Builder setLogContext(LogContext logContext) {
            this.logContext = logContext;
            return this;
        }

        public Builder setConfigSchema(KafkaConfigSchema configSchema) {
            this.configSchema = configSchema;
            return this;
        }

        public Builder setRaftClient(RaftClient<ApiMessageAndVersion> logManager) {
            this.raftClient = logManager;
            return this;
        }

        public Builder setQuorumFeatures(QuorumFeatures quorumFeatures) {
            this.quorumFeatures = quorumFeatures;
            return this;
        }

        public Builder setDefaultReplicationFactor(short defaultReplicationFactor) {
            this.defaultReplicationFactor = defaultReplicationFactor;
            return this;
        }

        public Builder setDefaultNumPartitions(int defaultNumPartitions) {
            this.defaultNumPartitions = defaultNumPartitions;
            return this;
        }

        public Builder setReplicaPlacer(ReplicaPlacer replicaPlacer) {
            this.replicaPlacer = replicaPlacer;
            return this;
        }

        public Builder setSnapshotMaxNewRecordBytes(long value) {
            this.snapshotMaxNewRecordBytes = value;
            return this;
        }

        public Builder setLeaderImbalanceCheckIntervalNs(OptionalLong value) {
            this.leaderImbalanceCheckIntervalNs = value;
            return this;
        }

        public Builder setMaxIdleIntervalNs(OptionalLong value) {
            this.maxIdleIntervalNs = value;
            return this;
        }

        public Builder setSessionTimeoutNs(long sessionTimeoutNs) {
            this.sessionTimeoutNs = sessionTimeoutNs;
            return this;
        }

        public Builder setMetrics(ControllerMetrics controllerMetrics) {
            this.controllerMetrics = controllerMetrics;
            return this;
        }

        public Builder setBootstrapMetadata(BootstrapMetadata bootstrapMetadata) {
            this.bootstrapMetadata = bootstrapMetadata;
            return this;
        }

        public Builder setMaxRecordsPerBatch(int maxRecordsPerBatch) {
            this.maxRecordsPerBatch = maxRecordsPerBatch;
            return this;
        }

        public Builder setCreateTopicPolicy(Optional<CreateTopicPolicy> createTopicPolicy) {
            this.createTopicPolicy = createTopicPolicy;
            return this;
        }

        public Builder setAlterConfigPolicy(Optional<AlterConfigPolicy> alterConfigPolicy) {
            this.alterConfigPolicy = alterConfigPolicy;
            return this;
        }

        public Builder setConfigurationValidator(ConfigurationValidator configurationValidator) {
            this.configurationValidator = configurationValidator;
            return this;
        }

        public Builder setAuthorizer(ClusterMetadataAuthorizer authorizer) {
            this.authorizer = Optional.of(authorizer);
            return this;
        }

        public Builder setStaticConfig(Map<String, Object> staticConfig) {
            this.staticConfig = staticConfig;
            return this;
        }

        public QuorumController build() throws Exception {
            if (this.raftClient == null) {
                throw new IllegalStateException("You must set a raft client.");
            }
            if (this.bootstrapMetadata == null) {
                throw new IllegalStateException("You must specify an initial metadata.version using the kafka-storage tool.");
            }
            if (this.quorumFeatures == null) {
                throw new IllegalStateException("You must specify the quorum features");
            }
            if (this.fatalFaultHandler == null) {
                throw new IllegalStateException("You must specify a fatal fault handler.");
            }
            if (this.threadNamePrefix == null) {
                this.threadNamePrefix = String.format("Node%d_", this.nodeId);
            }
            if (this.logContext == null) {
                this.logContext = new LogContext(String.format("[Controller %d] ", this.nodeId));
            }
            if (this.controllerMetrics == null) {
                this.controllerMetrics = (ControllerMetrics)Class.forName("org.apache.kafka.controller.MockControllerMetrics").getConstructor(new Class[0]).newInstance(new Object[0]);
            }
            KafkaEventQueue queue = null;
            try {
                queue = new KafkaEventQueue(this.time, this.logContext, this.threadNamePrefix + "QuorumController");
                return new QuorumController(this.fatalFaultHandler, this.logContext, this.nodeId, this.clusterId, queue, this.time, this.configSchema, this.raftClient, this.quorumFeatures, this.defaultReplicationFactor, this.defaultNumPartitions, this.replicaPlacer, this.snapshotMaxNewRecordBytes, this.leaderImbalanceCheckIntervalNs, this.maxIdleIntervalNs, this.sessionTimeoutNs, this.controllerMetrics, this.createTopicPolicy, this.alterConfigPolicy, this.configurationValidator, this.authorizer, this.staticConfig, this.bootstrapMetadata, this.maxRecordsPerBatch);
            }
            catch (Exception e) {
                Utils.closeQuietly(queue, (String)"event queue");
                throw e;
            }
        }
    }
}

