/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.controller;

import java.io.File;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.BrokerComponent;
import org.apache.kafka.clients.admin.ComponentHealthStatus;
import org.apache.kafka.clients.admin.ExclusionOp;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
import org.apache.kafka.common.errors.DemotionLimitReachedException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.AllocateProducerIdsRequestData;
import org.apache.kafka.common.message.AlterBrokerHealthRequestData;
import org.apache.kafka.common.message.AlterBrokerReplicaExclusionsRequestData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.AlterPartitionRequestData;
import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
import org.apache.kafka.common.message.BrokerRegistrationRequestData;
import org.apache.kafka.common.message.ControllerRegistrationRequestData;
import org.apache.kafka.common.message.CreatePartitionsRequestData;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsResponseData;
import org.apache.kafka.common.message.ElectLeadersRequestData;
import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.RequestHeaderData;
import org.apache.kafka.common.metadata.AbortTransactionRecord;
import org.apache.kafka.common.metadata.BeginTransactionRecord;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.BrokerReplicaExclusionRecord;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.EndTransactionRecord;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.metadata.InstallMetadataEncryptorRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.ProducerIdsRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.RegisterControllerRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AlterBrokerHealthRequest;
import org.apache.kafka.common.requests.AlterPartitionRequest;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.controller.ActivationRecordsGenerator;
import org.apache.kafka.controller.BrokersToIsrs;
import org.apache.kafka.controller.ConfigurationControlManagerTest;
import org.apache.kafka.controller.ControllerRequestContext;
import org.apache.kafka.controller.ControllerRequestContextUtil;
import org.apache.kafka.controller.ControllerResult;
import org.apache.kafka.controller.FeatureControlManager;
import org.apache.kafka.controller.OffsetControlManager;
import org.apache.kafka.controller.QuorumController;
import org.apache.kafka.controller.QuorumControllerIntegrationTestUtils;
import org.apache.kafka.controller.QuorumControllerTestEnv;
import org.apache.kafka.controller.ResultOrError;
import org.apache.kafka.image.AclsDelta;
import org.apache.kafka.image.AclsImage;
import org.apache.kafka.image.ClientQuotasDelta;
import org.apache.kafka.image.ClientQuotasImage;
import org.apache.kafka.image.ClusterDelta;
import org.apache.kafka.image.ClusterImage;
import org.apache.kafka.image.ConfigurationsDelta;
import org.apache.kafka.image.ConfigurationsImage;
import org.apache.kafka.image.DelegationTokenDelta;
import org.apache.kafka.image.DelegationTokenImage;
import org.apache.kafka.image.FeaturesDelta;
import org.apache.kafka.image.FeaturesImage;
import org.apache.kafka.image.ProducerIdsDelta;
import org.apache.kafka.image.ProducerIdsImage;
import org.apache.kafka.image.ScramDelta;
import org.apache.kafka.image.ScramImage;
import org.apache.kafka.image.TopicsDelta;
import org.apache.kafka.image.TopicsImage;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
import org.apache.kafka.metadata.BrokerRegistrationReply;
import org.apache.kafka.metadata.FinalizedControllerFeatures;
import org.apache.kafka.metadata.MetadataEncryptorFactoryTest;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.metadata.authorizer.StandardAuthorizer;
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
import org.apache.kafka.metadata.util.BatchFileWriter;
import org.apache.kafka.metalog.LocalLogManager;
import org.apache.kafka.metalog.LocalLogManagerTestEnv;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
import org.apache.kafka.server.common.Feature;
import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.common.TopicIdPartition;
import org.apache.kafka.snapshot.FileRawSnapshotReader;
import org.apache.kafka.snapshot.RawSnapshotReader;
import org.apache.kafka.snapshot.Snapshots;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.EnumSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Timeout(value=40L)
public class QuorumControllerTest {
    private static final Logger log = LoggerFactory.getLogger(QuorumControllerTest.class);
    static final BootstrapMetadata SIMPLE_BOOTSTRAP = BootstrapMetadata.fromVersion((MetadataVersion)MetadataVersion.IBP_3_7_IV0, (String)"test-provided bootstrap");
    private static final int BROKER_ID_TO_EXCLUDE = 0;
    private static final String REASON_TO_EXCLUDE = "reason-0";
    private static final String FIRST_REASON_TO_DEGRADE = "rcca-123";
    private static final String SECOND_REASON_TO_DEGRADE = "rcca-234";
    private RegisterBrokerRecord.DegradedComponent externalConnectivityStartup = new RegisterBrokerRecord.DegradedComponent().setComponentCode(BrokerComponent.EXTERNAL_CONNECTIVITY_STARTUP.id()).setReason(AlterBrokerHealthRequest.networkHealthStartupAlterBrokerHealthReason);
    private static final Uuid FOO_ID = Uuid.fromString((String)"igRktLOnR8ektWHr79F8mw");
    private static final Map<Integer, Long> ALL_ZERO_BROKER_EPOCHS = IntStream.of(0, 1, 2, 3).boxed().collect(Collectors.toMap(Function.identity(), __ -> 0L));
    private static final BootstrapMetadata COMPLEX_BOOTSTRAP = BootstrapMetadata.fromRecords(Arrays.asList(new ApiMessageAndVersion((ApiMessage)new FeatureLevelRecord().setName("confluent.metadata.version").setFeatureLevel(MetadataVersion.MINIMUM_VERSION.confluentFeatureLevel()), 0), new ApiMessageAndVersion((ApiMessage)new ConfigRecord().setResourceType(ConfigResource.Type.BROKER.id()).setResourceName("").setName("foo").setValue("bar"), 0)), (String)"test bootstrap");

    @Test
    public void testConfigurationOperations() throws Throwable {
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).build();
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).build();){
            controlEnv.activeController().registerBroker(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, new BrokerRegistrationRequestData().setFeatures(QuorumControllerIntegrationTestUtils.brokerFeaturesPlusFeatureVersions(MetadataVersion.MINIMUM_VERSION, MetadataVersion.latestTesting(), Map.of("eligible.leader.replicas.version", EligibleLeaderReplicasVersion.ELRV_1.featureLevel()))).setBrokerId(0).setLogDirs(Collections.singletonList(Uuid.fromString((String)"iiaQjkRPQcuMULNII0MUeA"))).setClusterId(logEnv.clusterId())).get();
            this.testConfigurationOperations(controlEnv.activeController());
            QuorumControllerTest.testToImages(logEnv.allRecords());
        }
    }

    private void testConfigurationOperations(QuorumController controller) throws Throwable {
        Assertions.assertEquals(Collections.singletonMap(ConfigurationControlManagerTest.BROKER0, ApiError.NONE), controller.incrementalAlterConfigs(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, Collections.singletonMap(ConfigurationControlManagerTest.BROKER0, Collections.singletonMap("baz", ConfigurationControlManagerTest.entry(AlterConfigOp.OpType.SET, "123"))), true).get());
        Assertions.assertEquals(Collections.singletonMap(ConfigurationControlManagerTest.BROKER0, new ResultOrError(Collections.emptyMap())), controller.describeConfigs(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, Collections.singletonMap(ConfigurationControlManagerTest.BROKER0, Collections.emptyList())).get());
        Assertions.assertEquals(Collections.singletonMap(ConfigurationControlManagerTest.BROKER0, ApiError.NONE), controller.incrementalAlterConfigs(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, Collections.singletonMap(ConfigurationControlManagerTest.BROKER0, Collections.singletonMap("baz", ConfigurationControlManagerTest.entry(AlterConfigOp.OpType.SET, "123"))), false).get());
        Assertions.assertEquals(Collections.singletonMap(ConfigurationControlManagerTest.BROKER0, new ResultOrError(Collections.singletonMap("baz", "123"))), controller.describeConfigs(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, Collections.singletonMap(ConfigurationControlManagerTest.BROKER0, Collections.emptyList())).get());
    }

    @Test
    public void testDelayedConfigurationOperations() throws Throwable {
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).build();
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).build();){
            controlEnv.activeController().registerBroker(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, new BrokerRegistrationRequestData().setFeatures(QuorumControllerIntegrationTestUtils.brokerFeaturesPlusFeatureVersions(MetadataVersion.MINIMUM_VERSION, MetadataVersion.latestTesting(), Map.of("eligible.leader.replicas.version", EligibleLeaderReplicasVersion.ELRV_1.featureLevel()))).setBrokerId(0).setLogDirs(Collections.singletonList(Uuid.fromString((String)"sTbzRAMnTpahIyIPNjiLhw"))).setClusterId(logEnv.clusterId())).get();
            this.testDelayedConfigurationOperations(logEnv, controlEnv.activeController());
            QuorumControllerTest.testToImages(logEnv.allRecords());
        }
    }

    private void testDelayedConfigurationOperations(LocalLogManagerTestEnv logEnv, QuorumController controller) throws Throwable {
        logEnv.logManagers().forEach(m -> m.setMaxReadOffset(1L));
        CompletableFuture future1 = controller.incrementalAlterConfigs(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, Collections.singletonMap(ConfigurationControlManagerTest.BROKER0, Collections.singletonMap("baz", ConfigurationControlManagerTest.entry(AlterConfigOp.OpType.SET, "123"))), false);
        Assertions.assertFalse((boolean)future1.isDone());
        Assertions.assertEquals(Collections.singletonMap(ConfigurationControlManagerTest.BROKER0, new ResultOrError(Collections.emptyMap())), controller.describeConfigs(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, Collections.singletonMap(ConfigurationControlManagerTest.BROKER0, Collections.emptyList())).get());
        logEnv.logManagers().forEach(m -> m.setMaxReadOffset(8L));
        Assertions.assertEquals(Collections.singletonMap(ConfigurationControlManagerTest.BROKER0, ApiError.NONE), future1.get());
    }

    @Test
    public void testFenceMultipleBrokers() throws Throwable {
        List<Integer> allBrokers = Arrays.asList(1, 2, 3, 4, 5);
        List<Integer> brokersToKeepUnfenced = Arrays.asList(1);
        List<Integer> brokersToFence = Arrays.asList(2, 3, 4, 5);
        short replicationFactor = (short)allBrokers.size();
        short numberOfPartitions = (short)allBrokers.size();
        long sessionTimeoutMillis = 1000L;
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).build();
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).setSessionTimeoutMillis(OptionalLong.of(sessionTimeoutMillis)).setBootstrapMetadata(SIMPLE_BOOTSTRAP).build();){
            BrokerRegistrationRequestData.ListenerCollection listeners = new BrokerRegistrationRequestData.ListenerCollection();
            listeners.add((ImplicitLinkedHashCollection.Element)new BrokerRegistrationRequestData.Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092));
            QuorumController active = controlEnv.activeController();
            HashMap<Integer, Long> brokerEpochs = new HashMap<Integer, Long>();
            for (Integer brokerId2 : allBrokers) {
                CompletableFuture reply = active.registerBroker(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, new BrokerRegistrationRequestData().setBrokerId(brokerId2.intValue()).setClusterId(active.clusterId()).setFeatures(QuorumControllerIntegrationTestUtils.brokerFeatures(MetadataVersion.MINIMUM_VERSION, MetadataVersion.latestTesting())).setIncarnationId(Uuid.randomUuid()).setListeners(listeners));
                brokerEpochs.put(brokerId2, ((BrokerRegistrationReply)reply.get()).epoch());
            }
            allBrokers.forEach(brokerId -> Assertions.assertFalse((boolean)active.clusterControl().isUnfenced(brokerId.intValue()), (String)("Broker " + brokerId + " should have been fenced")));
            this.sendBrokerHeartbeatToUnfenceBrokers(active, allBrokers, brokerEpochs);
            CreateTopicsRequestData createTopicsRequestData = new CreateTopicsRequestData().setTopics(new CreateTopicsRequestData.CreatableTopicCollection(Collections.singleton(new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions((int)numberOfPartitions).setReplicationFactor(replicationFactor)).iterator()));
            CreateTopicsResponseData createTopicsResponseData = (CreateTopicsResponseData)active.createTopics(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, createTopicsRequestData, Collections.singleton("foo")).get();
            Assertions.assertEquals((Object)Errors.NONE, (Object)Errors.forCode((short)createTopicsResponseData.topics().find("foo").errorCode()));
            Uuid topicIdFoo = createTopicsResponseData.topics().find("foo").topicId();
            TestUtils.waitForCondition(() -> {
                this.sendBrokerHeartbeatToUnfenceBrokers(active, brokersToKeepUnfenced, brokerEpochs);
                for (Integer brokerId : brokersToFence) {
                    if (!active.clusterControl().isUnfenced(brokerId.intValue())) continue;
                    return false;
                }
                return true;
            }, (long)(sessionTimeoutMillis * 3L), (String)"Fencing of brokers did not process within expected time");
            this.sendBrokerHeartbeatToUnfenceBrokers(active, brokersToKeepUnfenced, brokerEpochs);
            brokersToKeepUnfenced.forEach(brokerId -> Assertions.assertTrue((boolean)active.clusterControl().isUnfenced(brokerId.intValue()), (String)("Broker " + brokerId + " should have been unfenced")));
            brokersToFence.forEach(brokerId -> Assertions.assertFalse((boolean)active.clusterControl().isUnfenced(brokerId.intValue()), (String)("Broker " + brokerId + " should have been fenced")));
            int[] expectedIsr = new int[]{1};
            int[] isrFoo = active.replicationControl().getPartition((Uuid)topicIdFoo, (int)0).isr;
            Assertions.assertArrayEquals((int[])isrFoo, (int[])expectedIsr, (String)("The ISR for topic foo was " + Arrays.toString(isrFoo) + ". It is expected to be " + Arrays.toString(expectedIsr)));
            int fooLeader = active.replicationControl().getPartition((Uuid)topicIdFoo, (int)0).leader;
            Assertions.assertEquals((int)expectedIsr[0], (int)fooLeader);
            Assertions.assertTrue((boolean)active.replicationControl().arePartitionLeadersImbalanced());
            QuorumControllerTest.testToImages(logEnv.allRecords());
        }
    }

    @Test
    public void testElrEnabledByDefault() throws Throwable {
        long sessionTimeoutMillis = 500L;
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).build();
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).setSessionTimeoutMillis(OptionalLong.of(sessionTimeoutMillis)).setBootstrapMetadata(BootstrapMetadata.fromRecords(Arrays.asList(new ApiMessageAndVersion((ApiMessage)new FeatureLevelRecord().setName("confluent.metadata.version").setFeatureLevel(MetadataVersion.IBP_4_0_IV1A.confluentFeatureLevel()), 0), new ApiMessageAndVersion((ApiMessage)new FeatureLevelRecord().setName("eligible.leader.replicas.version").setFeatureLevel(EligibleLeaderReplicasVersion.ELRV_1.featureLevel()), 0)), (String)"test-provided bootstrap ELR enabled")).build();){
            controlEnv.activeController(true);
            Assertions.assertTrue((boolean)controlEnv.activeController().configurationControl().clusterConfig().containsKey("min.insync.replicas"));
        }
    }

    @Test
    public void testUncleanShutdownBrokerElrEnabled() throws Throwable {
        List<Integer> allBrokers = Arrays.asList(1, 2, 3);
        short replicationFactor = (short)allBrokers.size();
        long sessionTimeoutMillis = 500L;
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).build();
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).setSessionTimeoutMillis(OptionalLong.of(sessionTimeoutMillis)).setBootstrapMetadata(BootstrapMetadata.fromVersion((MetadataVersion)MetadataVersion.IBP_4_0_IV1A, (String)"test-provided bootstrap ELR enabled")).build();){
            BrokerRegistrationRequestData.ListenerCollection listeners = new BrokerRegistrationRequestData.ListenerCollection();
            listeners.add((ImplicitLinkedHashCollection.Element)new BrokerRegistrationRequestData.Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092));
            QuorumController active = controlEnv.activeController();
            HashMap<Integer, Long> brokerEpochs = new HashMap<Integer, Long>();
            BrokerRegistrationRequestData.FeatureCollection features = QuorumControllerIntegrationTestUtils.brokerFeaturesPlusFeatureVersions(MetadataVersion.MINIMUM_VERSION, MetadataVersion.IBP_4_0_IV1A, Map.of("eligible.leader.replicas.version", EligibleLeaderReplicasVersion.ELRV_1.featureLevel()));
            for (Integer brokerId2 : allBrokers) {
                CompletableFuture reply = active.registerBroker(ControllerRequestContextUtil.anonymousContextFor(ApiKeys.BROKER_REGISTRATION), new BrokerRegistrationRequestData().setBrokerId(brokerId2.intValue()).setClusterId(active.clusterId()).setFeatures(features).setIncarnationId(Uuid.randomUuid()).setLogDirs(Collections.singletonList(Uuid.randomUuid())).setListeners(listeners));
                brokerEpochs.put(brokerId2, ((BrokerRegistrationReply)reply.get()).epoch());
            }
            allBrokers.forEach(brokerId -> Assertions.assertFalse((boolean)active.clusterControl().isUnfenced(brokerId.intValue()), (String)("Broker " + brokerId + " should have been fenced")));
            this.sendBrokerHeartbeatToUnfenceBrokers(active, allBrokers, brokerEpochs);
            CreateTopicsRequestData createTopicsRequestData = new CreateTopicsRequestData().setTopics(new CreateTopicsRequestData.CreatableTopicCollection(Collections.singleton(new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions(1).setReplicationFactor(replicationFactor)).iterator()));
            CreateTopicsResponseData createTopicsResponseData = (CreateTopicsResponseData)active.createTopics(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, createTopicsRequestData, Collections.singleton("foo")).get();
            Assertions.assertEquals((Object)Errors.NONE, (Object)Errors.forCode((short)createTopicsResponseData.topics().find("foo").errorCode()));
            Uuid topicIdFoo = createTopicsResponseData.topics().find("foo").topicId();
            ConfigRecord configRecord = new ConfigRecord().setResourceType(ConfigResource.Type.TOPIC.id()).setResourceName("foo").setName("min.insync.replicas").setValue("2");
            RecordTestUtils.replayAll(active.configurationControl(), Collections.singletonList(new ApiMessageAndVersion((ApiMessage)configRecord, 0)));
            TestUtils.waitForCondition(() -> {
                for (Integer brokerId : allBrokers) {
                    if (!active.clusterControl().isUnfenced(brokerId.intValue())) continue;
                    return false;
                }
                return true;
            }, (long)(sessionTimeoutMillis * 30L), (String)"Fencing of brokers did not process within expected time");
            PartitionRegistration partition = active.replicationControl().getPartition(topicIdFoo, 0);
            Assertions.assertEquals((int)1, (int)partition.lastKnownElr.length, (String)partition.toString());
            int[] lastKnownElr = partition.lastKnownElr;
            Assertions.assertEquals((int)0, (int)partition.isr.length, (String)partition.toString());
            Assertions.assertEquals((int)-1, (int)partition.leader, (String)partition.toString());
            Assertions.assertEquals((int)2, (int)partition.elr.length, (String)partition.toString());
            int brokerToUncleanShutdown = lastKnownElr[0];
            int brokerToBeTheLeader = lastKnownElr[0] == partition.elr[0] ? partition.elr[1] : partition.elr[0];
            CompletableFuture reply = active.registerBroker(ControllerRequestContextUtil.anonymousContextFor(ApiKeys.BROKER_REGISTRATION), new BrokerRegistrationRequestData().setBrokerId(brokerToUncleanShutdown).setClusterId(active.clusterId()).setFeatures(features).setIncarnationId(Uuid.randomUuid()).setLogDirs(Collections.singletonList(Uuid.randomUuid())).setListeners(listeners));
            brokerEpochs.put(brokerToUncleanShutdown, ((BrokerRegistrationReply)reply.get()).epoch());
            partition = active.replicationControl().getPartition(topicIdFoo, 0);
            Assertions.assertArrayEquals((int[])new int[]{brokerToBeTheLeader}, (int[])partition.elr, (String)partition.toString());
            Assertions.assertArrayEquals((int[])lastKnownElr, (int[])partition.lastKnownElr, (String)partition.toString());
            active.registerBroker(ControllerRequestContextUtil.anonymousContextFor(ApiKeys.BROKER_REGISTRATION), new BrokerRegistrationRequestData().setBrokerId(brokerToBeTheLeader).setClusterId(active.clusterId()).setFeatures(features).setIncarnationId(Uuid.randomUuid()).setPreviousBrokerEpoch(((Long)brokerEpochs.get(brokerToBeTheLeader)).longValue()).setLogDirs(Collections.singletonList(Uuid.randomUuid())).setListeners(listeners));
            this.sendBrokerHeartbeatToUnfenceBrokers(active, Collections.singletonList(brokerToBeTheLeader), brokerEpochs);
            TestUtils.waitForCondition(() -> active.clusterControl().isUnfenced(brokerToBeTheLeader), (long)(sessionTimeoutMillis * 3L), (String)"Broker should be unfenced.");
            partition = active.replicationControl().getPartition(topicIdFoo, 0);
            Assertions.assertArrayEquals((int[])new int[]{brokerToBeTheLeader}, (int[])partition.isr, (String)partition.toString());
            Assertions.assertEquals((int)0, (int)partition.elr.length, (String)partition.toString());
            Assertions.assertEquals((int)0, (int)partition.lastKnownElr.length, (String)partition.toString());
            Assertions.assertEquals((int)brokerToBeTheLeader, (int)partition.leader, (String)partition.toString());
        }
    }

    @Test
    public void testUncleanShutdownElrDisabled() throws Exception {
        List<Integer> allBrokers = Arrays.asList(1, 2, 3);
        int replicationFactor = allBrokers.size();
        long sessionTimeoutMillis = 500L;
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).build();
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).setControllerBuilderInitializer(controllerBuilder -> controllerBuilder.setFenceStaleBrokerIntervalNs(TimeUnit.SECONDS.toNanos(15L))).setSessionTimeoutMillis(OptionalLong.of(sessionTimeoutMillis)).setBootstrapMetadata(BootstrapMetadata.fromVersion((MetadataVersion)MetadataVersion.IBP_4_0_IV0A, (String)"test-provided bootstrap ELR not supported")).build();){
            BrokerRegistrationRequestData.ListenerCollection listeners = new BrokerRegistrationRequestData.ListenerCollection();
            listeners.add((ImplicitLinkedHashCollection.Element)new BrokerRegistrationRequestData.Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092));
            QuorumController active = controlEnv.activeController();
            HashMap<Integer, Long> brokerEpochs = new HashMap<Integer, Long>();
            for (Integer brokerId2 : allBrokers) {
                CompletableFuture reply = active.registerBroker(ControllerRequestContextUtil.anonymousContextFor(ApiKeys.BROKER_REGISTRATION), new BrokerRegistrationRequestData().setBrokerId(brokerId2.intValue()).setClusterId(active.clusterId()).setFeatures(QuorumControllerIntegrationTestUtils.brokerFeatures(MetadataVersion.MINIMUM_VERSION, MetadataVersion.IBP_4_0_IV0A)).setIncarnationId(Uuid.randomUuid()).setLogDirs(Collections.singletonList(Uuid.randomUuid())).setListeners(listeners));
                brokerEpochs.put(brokerId2, ((BrokerRegistrationReply)reply.get()).epoch());
            }
            allBrokers.forEach(brokerId -> Assertions.assertFalse((boolean)active.clusterControl().isUnfenced(brokerId.intValue()), (String)("Broker " + brokerId + " should have been fenced")));
            this.sendBrokerHeartbeatToUnfenceBrokers(active, allBrokers, brokerEpochs);
            CreateTopicsRequestData createTopicsRequestData = new CreateTopicsRequestData().setTopics(new CreateTopicsRequestData.CreatableTopicCollection(Collections.singleton(new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions(1).setReplicationFactor((short)replicationFactor)).iterator()));
            CreateTopicsResponseData createTopicsResponseData = (CreateTopicsResponseData)active.createTopics(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, createTopicsRequestData, Collections.singleton("foo")).get();
            Assertions.assertEquals((Object)Errors.NONE, (Object)Errors.forCode((short)createTopicsResponseData.topics().find("foo").errorCode()));
            Uuid topicIdFoo = createTopicsResponseData.topics().find("foo").topicId();
            active.time().sleep(sessionTimeoutMillis);
            for (int i = 0; i < replicationFactor; ++i) {
                PartitionRegistration partition = active.replicationControl().getPartition(topicIdFoo, 0);
                Assertions.assertEquals((int)0, (int)partition.elr.length, (String)partition.toString());
                Assertions.assertEquals((int)0, (int)partition.lastKnownElr.length, (String)partition.toString());
                boolean lastStandingIsr = i == replicationFactor - 1;
                int prevLeader = partition.leader;
                int prevLeaderEpoch = partition.leaderEpoch;
                active.registerBroker(ControllerRequestContextUtil.anonymousContextFor(ApiKeys.BROKER_REGISTRATION), new BrokerRegistrationRequestData().setBrokerId(prevLeader).setClusterId(active.clusterId()).setFeatures(QuorumControllerIntegrationTestUtils.brokerFeatures(MetadataVersion.MINIMUM_VERSION, MetadataVersion.IBP_4_0_IV0A)).setIncarnationId(Uuid.randomUuid()).setLogDirs(Collections.singletonList(Uuid.randomUuid())).setListeners(listeners)).get();
                partition = active.replicationControl().getPartition(topicIdFoo, 0);
                int currentLeader = partition.leader;
                int currentLeaderEpoch = partition.leaderEpoch;
                Assertions.assertNotEquals((int)currentLeader, (int)prevLeader);
                Assertions.assertNotEquals((int)currentLeaderEpoch, (int)prevLeaderEpoch);
                if (lastStandingIsr) {
                    Assertions.assertArrayEquals((int[])new int[]{prevLeader}, (int[])partition.isr);
                    Assertions.assertEquals((int)-1, (int)currentLeader);
                    continue;
                }
                List isr = Arrays.stream(partition.isr).boxed().collect(Collectors.toList());
                Assertions.assertFalse((boolean)isr.contains(prevLeader));
            }
        }
    }

    @Test
    public void testMinIsrUpdateWithElr() throws Throwable {
        List<Integer> allBrokers = Arrays.asList(1, 2, 3);
        List<Integer> brokersToKeepUnfenced = Arrays.asList(1);
        List<Integer> brokersToFence = Arrays.asList(2, 3);
        short replicationFactor = (short)allBrokers.size();
        long sessionTimeoutMillis = 300L;
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).build();
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).setSessionTimeoutMillis(OptionalLong.of(sessionTimeoutMillis)).setBootstrapMetadata(BootstrapMetadata.fromVersion((MetadataVersion)MetadataVersion.IBP_4_0_IV1A, (String)"test-provided bootstrap ELR enabled")).build();){
            BrokerRegistrationRequestData.ListenerCollection listeners = new BrokerRegistrationRequestData.ListenerCollection();
            listeners.add((ImplicitLinkedHashCollection.Element)new BrokerRegistrationRequestData.Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092));
            QuorumController active = controlEnv.activeController();
            HashMap<Integer, Long> brokerEpochs = new HashMap<Integer, Long>();
            for (Integer brokerId2 : allBrokers) {
                CompletableFuture reply = active.registerBroker(ControllerRequestContextUtil.anonymousContextFor(ApiKeys.BROKER_REGISTRATION), new BrokerRegistrationRequestData().setBrokerId(brokerId2.intValue()).setClusterId(active.clusterId()).setFeatures(QuorumControllerIntegrationTestUtils.brokerFeaturesPlusFeatureVersions(MetadataVersion.MINIMUM_VERSION, MetadataVersion.latestTesting(), Map.of("eligible.leader.replicas.version", EligibleLeaderReplicasVersion.ELRV_1.featureLevel()))).setIncarnationId(Uuid.randomUuid()).setLogDirs(Collections.singletonList(Uuid.randomUuid())).setListeners(listeners));
                brokerEpochs.put(brokerId2, ((BrokerRegistrationReply)reply.get()).epoch());
            }
            allBrokers.forEach(brokerId -> Assertions.assertFalse((boolean)active.clusterControl().isUnfenced(brokerId.intValue()), (String)("Broker " + brokerId + " should have been fenced")));
            this.sendBrokerHeartbeatToUnfenceBrokers(active, allBrokers, brokerEpochs);
            CreateTopicsRequestData createTopicsRequestData = new CreateTopicsRequestData().setTopics(new CreateTopicsRequestData.CreatableTopicCollection(Arrays.asList(new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions(1).setReplicationFactor(replicationFactor), new CreateTopicsRequestData.CreatableTopic().setName("bar").setNumPartitions(1).setReplicationFactor(replicationFactor)).iterator()));
            CreateTopicsResponseData createTopicsResponseData = (CreateTopicsResponseData)active.createTopics(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, createTopicsRequestData, new HashSet<String>(Arrays.asList("foo", "bar"))).get();
            Assertions.assertEquals((Object)Errors.NONE, (Object)Errors.forCode((short)createTopicsResponseData.topics().find("foo").errorCode()));
            Assertions.assertEquals((Object)Errors.NONE, (Object)Errors.forCode((short)createTopicsResponseData.topics().find("bar").errorCode()));
            Uuid topicIdFoo = createTopicsResponseData.topics().find("foo").topicId();
            Uuid topicIdBar = createTopicsResponseData.topics().find("bar").topicId();
            ConfigRecord configRecord = new ConfigRecord().setResourceType(ConfigResource.Type.BROKER.id()).setResourceName("").setName("min.insync.replicas").setValue("2");
            RecordTestUtils.replayAll(active.configurationControl(), Collections.singletonList(new ApiMessageAndVersion((ApiMessage)configRecord, 0)));
            TestUtils.waitForCondition(() -> {
                this.sendBrokerHeartbeatToUnfenceBrokers(active, brokersToKeepUnfenced, brokerEpochs);
                for (Integer brokerId : brokersToFence) {
                    if (!active.clusterControl().isUnfenced(brokerId.intValue())) continue;
                    return false;
                }
                return true;
            }, (long)(sessionTimeoutMillis * 30L), (String)"Fencing of brokers did not process within expected time");
            this.sendBrokerHeartbeatToUnfenceBrokers(active, brokersToKeepUnfenced, brokerEpochs);
            brokersToKeepUnfenced.forEach(brokerId -> Assertions.assertTrue((boolean)active.clusterControl().isUnfenced(brokerId.intValue()), (String)("Broker " + brokerId + " should have been unfenced")));
            brokersToFence.forEach(brokerId -> Assertions.assertFalse((boolean)active.clusterControl().isUnfenced(brokerId.intValue()), (String)("Broker " + brokerId + " should have been fenced")));
            this.sendBrokerHeartbeatToUnfenceBrokers(active, brokersToKeepUnfenced, brokerEpochs);
            PartitionRegistration partition = active.replicationControl().getPartition(topicIdFoo, 0);
            Assertions.assertArrayEquals((int[])new int[]{1}, (int[])partition.isr, (String)partition.toString());
            Assertions.assertEquals((int)1, (int)partition.elr.length, (String)partition.toString());
            ControllerResult result = active.configurationControl().incrementalAlterConfigs(ConfigurationControlManagerTest.toMap(ConfigurationControlManagerTest.entry(new ConfigResource(ConfigResource.Type.TOPIC, "foo"), ConfigurationControlManagerTest.toMap(ConfigurationControlManagerTest.entry("min.insync.replicas", ConfigurationControlManagerTest.entry(AlterConfigOp.OpType.SET, "1"))))), true, KafkaPrincipal.ANONYMOUS);
            Assertions.assertEquals((int)2, (int)result.records().size(), (String)result.records().toString());
            RecordTestUtils.replayAll(active.configurationControl(), Collections.singletonList((ApiMessageAndVersion)result.records().get(0)));
            RecordTestUtils.replayAll(active.replicationControl(), Collections.singletonList((ApiMessageAndVersion)result.records().get(1)));
            partition = active.replicationControl().getPartition(topicIdFoo, 0);
            Assertions.assertEquals((int)0, (int)partition.elr.length, (String)partition.toString());
            Assertions.assertArrayEquals((int[])new int[]{1}, (int[])partition.isr, (String)partition.toString());
            partition = active.replicationControl().getPartition(topicIdBar, 0);
            Assertions.assertArrayEquals((int[])new int[]{1}, (int[])partition.isr, (String)partition.toString());
            Assertions.assertEquals((int)1, (int)partition.elr.length, (String)partition.toString());
            result = active.configurationControl().incrementalAlterConfigs(ConfigurationControlManagerTest.toMap(ConfigurationControlManagerTest.entry(new ConfigResource(ConfigResource.Type.BROKER, ""), ConfigurationControlManagerTest.toMap(ConfigurationControlManagerTest.entry("min.insync.replicas", ConfigurationControlManagerTest.entry(AlterConfigOp.OpType.SET, "1"))))), true, KafkaPrincipal.ANONYMOUS);
            Assertions.assertEquals((int)2, (int)result.records().size(), (String)result.records().toString());
            RecordTestUtils.replayAll(active.configurationControl(), Collections.singletonList((ApiMessageAndVersion)result.records().get(0)));
            RecordTestUtils.replayAll(active.replicationControl(), Collections.singletonList((ApiMessageAndVersion)result.records().get(1)));
            partition = active.replicationControl().getPartition(topicIdBar, 0);
            Assertions.assertEquals((int)0, (int)partition.elr.length, (String)partition.toString());
            Assertions.assertArrayEquals((int[])new int[]{1}, (int[])partition.isr, (String)partition.toString());
        }
    }

    @Test
    public void testBalancePartitionLeaders() throws Throwable {
        List<Integer> allBrokers = Arrays.asList(1, 2, 3);
        List<Integer> brokersToKeepUnfenced = Arrays.asList(1, 2);
        List<Integer> brokersToFence = Collections.singletonList(3);
        short replicationFactor = (short)allBrokers.size();
        short numberOfPartitions = (short)allBrokers.size();
        long sessionTimeoutMillis = 2000L;
        long leaderImbalanceCheckIntervalNs = 1000000000L;
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).build();
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).setSessionTimeoutMillis(OptionalLong.of(sessionTimeoutMillis)).setLeaderImbalanceCheckIntervalNs(OptionalLong.of(leaderImbalanceCheckIntervalNs)).setBootstrapMetadata(SIMPLE_BOOTSTRAP).build();){
            BrokerRegistrationRequestData.ListenerCollection listeners = new BrokerRegistrationRequestData.ListenerCollection();
            listeners.add((ImplicitLinkedHashCollection.Element)new BrokerRegistrationRequestData.Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092));
            QuorumController active = controlEnv.activeController();
            HashMap<Integer, Long> brokerEpochs = new HashMap<Integer, Long>();
            for (Integer brokerId2 : allBrokers) {
                CompletableFuture reply = active.registerBroker(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, new BrokerRegistrationRequestData().setBrokerId(brokerId2.intValue()).setClusterId(active.clusterId()).setFeatures(QuorumControllerIntegrationTestUtils.brokerFeatures(MetadataVersion.MINIMUM_VERSION, MetadataVersion.IBP_3_7_IV0)).setIncarnationId(Uuid.randomUuid()).setListeners(listeners));
                brokerEpochs.put(brokerId2, ((BrokerRegistrationReply)reply.get()).epoch());
            }
            allBrokers.forEach(brokerId -> Assertions.assertFalse((boolean)active.clusterControl().isUnfenced(brokerId.intValue()), (String)("Broker " + brokerId + " should have been fenced")));
            this.sendBrokerHeartbeatToUnfenceBrokers(active, allBrokers, brokerEpochs);
            CreateTopicsRequestData createTopicsRequestData = new CreateTopicsRequestData().setTopics(new CreateTopicsRequestData.CreatableTopicCollection(Collections.singleton(new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions((int)numberOfPartitions).setReplicationFactor(replicationFactor)).iterator()));
            CreateTopicsResponseData createTopicsResponseData = (CreateTopicsResponseData)active.createTopics(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, createTopicsRequestData, Collections.singleton("foo")).get();
            Assertions.assertEquals((Object)Errors.NONE, (Object)Errors.forCode((short)createTopicsResponseData.topics().find("foo").errorCode()));
            Uuid topicIdFoo = createTopicsResponseData.topics().find("foo").topicId();
            TestUtils.waitForCondition(() -> {
                this.sendBrokerHeartbeatToUnfenceBrokers(active, brokersToKeepUnfenced, brokerEpochs);
                for (Integer brokerId : brokersToFence) {
                    if (!active.clusterControl().isUnfenced(brokerId.intValue())) continue;
                    return false;
                }
                return true;
            }, (long)(sessionTimeoutMillis * 3L), (String)"Fencing of brokers did not process within expected time");
            this.sendBrokerHeartbeatToUnfenceBrokers(active, brokersToKeepUnfenced, brokerEpochs);
            brokersToKeepUnfenced.forEach(brokerId -> Assertions.assertTrue((boolean)active.clusterControl().isUnfenced(brokerId.intValue()), (String)("Broker " + brokerId + " should have been unfenced")));
            brokersToFence.forEach(brokerId -> Assertions.assertFalse((boolean)active.clusterControl().isUnfenced(brokerId.intValue()), (String)("Broker " + brokerId + " should have been fenced")));
            Assertions.assertTrue((boolean)active.replicationControl().arePartitionLeadersImbalanced());
            for (Integer brokerId3 : brokersToFence) {
                CompletableFuture reply = active.registerBroker(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, new BrokerRegistrationRequestData().setBrokerId(brokerId3.intValue()).setClusterId(active.clusterId()).setFeatures(QuorumControllerIntegrationTestUtils.brokerFeatures(MetadataVersion.MINIMUM_VERSION, MetadataVersion.IBP_3_7_IV0)).setIncarnationId(Uuid.randomUuid()).setListeners(listeners));
                brokerEpochs.put(brokerId3, ((BrokerRegistrationReply)reply.get()).epoch());
            }
            this.sendBrokerHeartbeatToUnfenceBrokers(active, allBrokers, brokerEpochs);
            HashSet imbalancedPartitions = new HashSet(active.replicationControl().imbalancedPartitions());
            Assertions.assertEquals((int)1, (int)imbalancedPartitions.size());
            TopicIdPartition impalancedTp = (TopicIdPartition)imbalancedPartitions.iterator().next();
            int imbalancedPartitionId = impalancedTp.partitionId();
            PartitionRegistration partitionRegistration = active.replicationControl().getPartition(topicIdFoo, imbalancedPartitionId);
            AlterPartitionRequestData.PartitionData partitionData = new AlterPartitionRequestData.PartitionData().setPartitionIndex(imbalancedPartitionId).setLeaderEpoch(partitionRegistration.leaderEpoch).setPartitionEpoch(partitionRegistration.partitionEpoch).setNewIsrWithEpochs(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(1, 2, 3)));
            AlterPartitionRequestData.TopicData topicData = new AlterPartitionRequestData.TopicData().setTopicId(impalancedTp.topicId());
            topicData.partitions().add(partitionData);
            AlterPartitionRequestData alterPartitionRequest = new AlterPartitionRequestData().setBrokerId(partitionRegistration.leader).setBrokerEpoch(((Long)brokerEpochs.get(partitionRegistration.leader)).longValue());
            alterPartitionRequest.topics().add(topicData);
            active.alterPartition(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, new AlterPartitionRequest.Builder(alterPartitionRequest).build(ApiKeys.ALTER_PARTITION.oldestVersion()).data()).get();
            AtomicLong lastHeartbeatMs = new AtomicLong(QuorumControllerTest.getMonotonicMs(active.time()));
            this.sendBrokerHeartbeatToUnfenceBrokers(active, allBrokers, brokerEpochs);
            TestUtils.waitForCondition(() -> {
                long currentMonotonicMs = QuorumControllerTest.getMonotonicMs(active.time());
                if (currentMonotonicMs > lastHeartbeatMs.get() + sessionTimeoutMillis / 2L) {
                    lastHeartbeatMs.set(currentMonotonicMs);
                    this.sendBrokerHeartbeatToUnfenceBrokers(active, allBrokers, brokerEpochs);
                }
                return !active.replicationControl().arePartitionLeadersImbalanced();
            }, (long)TimeUnit.MILLISECONDS.convert(leaderImbalanceCheckIntervalNs * 10L, TimeUnit.NANOSECONDS), (String)"Leaders were not balanced after unfencing all of the brokers");
            QuorumControllerTest.testToImages(logEnv.allRecords());
        }
    }

    private static long getMonotonicMs(Time time) {
        return TimeUnit.NANOSECONDS.toMillis(time.nanoseconds());
    }

    @Test
    public void testNoOpRecordWriteAfterTimeout() throws Throwable {
        long maxIdleIntervalNs = TimeUnit.MICROSECONDS.toNanos(100L);
        long maxReplicationDelayMs = 1000L;
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3).build();
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).setControllerBuilderInitializer(controllerBuilder -> controllerBuilder.setMaxIdleIntervalNs(OptionalLong.of(maxIdleIntervalNs))).build();){
            BrokerRegistrationRequestData.ListenerCollection listeners = new BrokerRegistrationRequestData.ListenerCollection();
            listeners.add((ImplicitLinkedHashCollection.Element)new BrokerRegistrationRequestData.Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092));
            QuorumController active = controlEnv.activeController();
            LocalLogManager localLogManager = logEnv.logManagers().stream().filter(logManager -> logManager.nodeId().equals(OptionalInt.of(active.nodeId()))).findAny().get();
            TestUtils.waitForCondition(() -> localLogManager.highWatermark().isPresent(), (long)maxReplicationDelayMs, (String)"High watermark was not established");
            long firstHighWatermark = localLogManager.highWatermark().getAsLong();
            TestUtils.waitForCondition(() -> localLogManager.highWatermark().getAsLong() > firstHighWatermark, (long)maxReplicationDelayMs, (String)"Active controller didn't write NoOpRecord the first time");
            long secondHighWatermark = localLogManager.highWatermark().getAsLong();
            TestUtils.waitForCondition(() -> localLogManager.highWatermark().getAsLong() > secondHighWatermark, (long)maxReplicationDelayMs, (String)"Active controller didn't write NoOpRecord the second time");
        }
    }

    @ParameterizedTest
    @CsvSource(value={"0, 0", "0, 1", "1, 0", "1, 1"})
    public void testRegisterBrokerKRaftVersions(short finalizedKraftVersion, short brokerMaxSupportedKraftVersion) throws Throwable {
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).setLastKRaftVersion(KRaftVersion.fromFeatureLevel((short)finalizedKraftVersion)).build();
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).setControllerBuilderInitializer(controllerBuilder -> controllerBuilder.setConfigSchema(ConfigurationControlManagerTest.SCHEMA)).setBootstrapMetadata(SIMPLE_BOOTSTRAP).build();){
            BrokerRegistrationRequestData.ListenerCollection listeners = new BrokerRegistrationRequestData.ListenerCollection();
            listeners.add((ImplicitLinkedHashCollection.Element)new BrokerRegistrationRequestData.Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092));
            QuorumController active = controlEnv.activeController();
            BrokerRegistrationRequestData.FeatureCollection brokerFeatures = new BrokerRegistrationRequestData.FeatureCollection();
            brokerFeatures.add((ImplicitLinkedHashCollection.Element)new BrokerRegistrationRequestData.Feature().setName("confluent.metadata.version").setMinSupportedVersion(MetadataVersion.MINIMUM_VERSION.confluentFeatureLevel()).setMaxSupportedVersion(MetadataVersion.latestTesting().confluentFeatureLevel()));
            if (brokerMaxSupportedKraftVersion != 0) {
                brokerFeatures.add((ImplicitLinkedHashCollection.Element)new BrokerRegistrationRequestData.Feature().setName("kraft.version").setMinSupportedVersion(Feature.KRAFT_VERSION.minimumProduction()).setMaxSupportedVersion(brokerMaxSupportedKraftVersion));
            }
            BrokerRegistrationRequestData request = new BrokerRegistrationRequestData().setBrokerId(0).setClusterId(active.clusterId()).setIncarnationId(Uuid.fromString((String)"kxAT73dKQsitIedpiPtwBA")).setFeatures(brokerFeatures).setLogDirs(Collections.singletonList(Uuid.fromString((String)"vBpaRsZVSaGsQT53wtYGtg"))).setListeners(listeners);
            if (brokerMaxSupportedKraftVersion < finalizedKraftVersion) {
                Throwable exception = Assertions.assertThrows(ExecutionException.class, () -> active.registerBroker(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, request).get());
                Assertions.assertEquals(UnsupportedVersionException.class, exception.getCause().getClass());
                Assertions.assertEquals((Object)("Unable to register because the broker does not support finalized version " + finalizedKraftVersion + " of kraft.version. The broker wants a version between 0 and " + brokerMaxSupportedKraftVersion + ", inclusive."), (Object)exception.getCause().getMessage());
            } else {
                BrokerRegistrationReply reply = (BrokerRegistrationReply)active.registerBroker(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, request).get();
                Assertions.assertTrue((reply.epoch() >= 4L ? 1 : 0) != 0, (String)("Unexpected broker epoch " + reply.epoch()));
            }
        }
    }

    @Test
    public void testUnregisterBroker() throws Throwable {
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).build();
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).build();){
            BrokerRegistrationRequestData.ListenerCollection listeners = new BrokerRegistrationRequestData.ListenerCollection();
            listeners.add((ImplicitLinkedHashCollection.Element)new BrokerRegistrationRequestData.Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092));
            QuorumController active = controlEnv.activeController();
            CompletableFuture reply = active.registerBroker(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, new BrokerRegistrationRequestData().setBrokerId(0).setClusterId(active.clusterId()).setIncarnationId(Uuid.fromString((String)"kxAT73dKQsitIedpiPtwBA")).setFeatures(QuorumControllerIntegrationTestUtils.brokerFeaturesPlusFeatureVersions(MetadataVersion.MINIMUM_VERSION, MetadataVersion.latestTesting(), Map.of("eligible.leader.replicas.version", EligibleLeaderReplicasVersion.ELRV_1.featureLevel()))).setLogDirs(Collections.singletonList(Uuid.fromString((String)"vBpaRsZVSaGsQT53wtYGtg"))).setListeners(listeners));
            Assertions.assertEquals((long)6L, (long)((BrokerRegistrationReply)reply.get()).epoch());
            CreateTopicsRequestData createTopicsRequestData = new CreateTopicsRequestData().setTopics(new CreateTopicsRequestData.CreatableTopicCollection(Collections.singleton(new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions(1).setReplicationFactor((short)1)).iterator()));
            Assertions.assertEquals((short)Errors.INVALID_REPLICATION_FACTOR.code(), (short)((CreateTopicsResponseData)active.createTopics(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, createTopicsRequestData, Collections.singleton("foo")).get()).topics().find("foo").errorCode());
            Assertions.assertEquals((Object)"Unable to replicate the partition 1 time(s): All brokers are currently fenced.", (Object)((CreateTopicsResponseData)active.createTopics(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, createTopicsRequestData, Collections.singleton("foo")).get()).topics().find("foo").errorMessage());
            Assertions.assertEquals((Object)new BrokerHeartbeatReply(true, false, false, false), active.processBrokerHeartbeat(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, new BrokerHeartbeatRequestData().setWantFence(false).setBrokerEpoch(6L).setBrokerId(0).setCurrentMetadataOffset(100000L)).get());
            Assertions.assertEquals((short)Errors.NONE.code(), (short)((CreateTopicsResponseData)active.createTopics(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, createTopicsRequestData, Collections.singleton("foo")).get()).topics().find("foo").errorCode());
            CompletableFuture topicPartitionFuture = active.appendReadEvent("debugGetPartition", OptionalLong.empty(), () -> {
                BrokersToIsrs.PartitionsOnReplicaIterator iterator = active.replicationControl().brokersToIsrs().iterator(0, true);
                Assertions.assertTrue((boolean)iterator.hasNext());
                return (TopicIdPartition)iterator.next();
            });
            Assertions.assertEquals((int)0, (int)((TopicIdPartition)topicPartitionFuture.get()).partitionId());
            active.unregisterBroker(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, 0).get();
            topicPartitionFuture = active.appendReadEvent("debugGetPartition", OptionalLong.empty(), () -> {
                BrokersToIsrs.PartitionsOnReplicaIterator iterator = active.replicationControl().brokersToIsrs().partitionsWithNoLeader();
                Assertions.assertTrue((boolean)iterator.hasNext());
                return (TopicIdPartition)iterator.next();
            });
            Assertions.assertEquals((int)0, (int)((TopicIdPartition)topicPartitionFuture.get()).partitionId());
            QuorumControllerTest.testToImages(logEnv.allRecords());
        }
    }

    private RegisterBrokerRecord.BrokerFeatureCollection registrationFeatures(MetadataVersion minVersion, MetadataVersion maxVersion) {
        RegisterBrokerRecord.BrokerFeatureCollection features = new RegisterBrokerRecord.BrokerFeatureCollection();
        features.add((ImplicitLinkedHashCollection.Element)new RegisterBrokerRecord.BrokerFeature().setName("confluent.metadata.version").setMinSupportedVersion(minVersion.confluentFeatureLevel()).setMaxSupportedVersion(maxVersion.confluentFeatureLevel()));
        features.add((ImplicitLinkedHashCollection.Element)new RegisterBrokerRecord.BrokerFeature().setName("metadata.version").setMinSupportedVersion(minVersion.apacheFeatureLevel()).setMaxSupportedVersion(maxVersion.apacheFeatureLevel()));
        return features;
    }

    @Test
    public void testSnapshotSaveAndLoad() throws Throwable {
        int numBrokers = 4;
        HashMap<Integer, Long> brokerEpochs = new HashMap<Integer, Long>();
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3).build();
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).setControllerBuilderInitializer(controllerBuilder -> {
            controllerBuilder.setStaticConfig(MetadataEncryptorFactoryTest.TEST_LEGACY_CONFIG);
            controllerBuilder.setMetadataEncryptorFactorySupplier(() -> MetadataEncryptorFactoryTest.TEST_LEGACY_ENCRYPTOR_FACTORY);
        }).setBootstrapMetadata(SIMPLE_BOOTSTRAP).build();){
            int i;
            QuorumController active = controlEnv.activeController();
            for (i = 0; i < logEnv.logManagers().size(); ++i) {
                active.registerController(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, new ControllerRegistrationRequestData().setControllerId(i).setIncarnationId(new Uuid(3465346L, (long)i)).setZkMigrationReady(false).setListeners(new ControllerRegistrationRequestData.ListenerCollection(Collections.singletonList(new ControllerRegistrationRequestData.Listener().setName("CONTROLLER").setHost("localhost").setPort(8000 + i).setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)).iterator())).setFeatures(new ControllerRegistrationRequestData.FeatureCollection(Collections.singletonList(new ControllerRegistrationRequestData.Feature().setName("confluent.metadata.version").setMinSupportedVersion(MetadataVersion.MINIMUM_VERSION.confluentFeatureLevel()).setMaxSupportedVersion(MetadataVersion.IBP_3_7_IV0.confluentFeatureLevel())).iterator()))).get();
            }
            for (i = 0; i < 4; ++i) {
                BrokerRegistrationReply reply = (BrokerRegistrationReply)active.registerBroker(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, new BrokerRegistrationRequestData().setBrokerId(i).setRack(null).setClusterId(active.clusterId()).setFeatures(QuorumControllerIntegrationTestUtils.brokerFeatures(MetadataVersion.MINIMUM_VERSION, MetadataVersion.IBP_3_7_IV0)).setIncarnationId(Uuid.fromString((String)("kxAT73dKQsitIedpiPtwB" + i))).setListeners(new BrokerRegistrationRequestData.ListenerCollection(Collections.singletonList(new BrokerRegistrationRequestData.Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092 + i)).iterator()))).get();
                brokerEpochs.put(i, reply.epoch());
            }
            for (i = 0; i < 3; ++i) {
                Assertions.assertEquals((Object)new BrokerHeartbeatReply(true, false, false, false), active.processBrokerHeartbeat(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, new BrokerHeartbeatRequestData().setWantFence(false).setBrokerEpoch(((Long)brokerEpochs.get(i)).longValue()).setBrokerId(i).setCurrentMetadataOffset(100000L)).get());
            }
            CreateTopicsResponseData fooData = (CreateTopicsResponseData)active.createTopics(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, new CreateTopicsRequestData().setTopics(new CreateTopicsRequestData.CreatableTopicCollection(Collections.singleton(new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions(-1).setReplicationFactor((short)-1).setAssignments(new CreateTopicsRequestData.CreatableReplicaAssignmentCollection(Arrays.asList(new CreateTopicsRequestData.CreatableReplicaAssignment().setPartitionIndex(0).setBrokerIds(Arrays.asList(0, 1, 2)), new CreateTopicsRequestData.CreatableReplicaAssignment().setPartitionIndex(1).setBrokerIds(Arrays.asList(1, 2, 0))).iterator()))).iterator())), Collections.singleton("foo")).get();
            Uuid fooId = fooData.topics().find("foo").topicId();
            active.allocateProducerIds(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, new AllocateProducerIdsRequestData().setBrokerId(0).setBrokerEpoch(((Long)brokerEpochs.get(0)).longValue())).get();
            active.alterBrokerReplicaExclusions(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, new AlterBrokerReplicaExclusionsRequestData().setBrokersToExclude(Collections.singletonList(new AlterBrokerReplicaExclusionsRequestData.BrokerExclusion().setBrokerId(0).setReason(REASON_TO_EXCLUDE).setExclusionOperationCode(ExclusionOp.OpType.SET.id())))).get();
            controlEnv.close();
            Assertions.assertEquals(new HashSet<ApiMessageAndVersion>(this.generateTestRecords(fooId, brokerEpochs, Collections.singletonMap(0, REASON_TO_EXCLUDE))), new HashSet<ApiMessageAndVersion>(logEnv.allRecords()));
            QuorumControllerTest.testToImages(logEnv.allRecords());
        }
    }

    private List<ApiMessageAndVersion> generateTestRecords(Uuid fooId, Map<Integer, Long> brokerEpochs, Map<Integer, String> exclusions) {
        ArrayList expectedBrokerEncryptor = new ArrayList();
        MetadataEncryptorFactoryTest.TEST_LEGACY_ENCRYPTOR_FACTORY.legacyEncryptorIds().forEach(legacyEncryptorId -> expectedBrokerEncryptor.add(new RegisterBrokerRecord.Encryptor().setEncryptorId(legacyEncryptorId)));
        ArrayList expectedControllerEncryptor = new ArrayList();
        MetadataEncryptorFactoryTest.TEST_LEGACY_ENCRYPTOR_FACTORY.legacyEncryptorIds().forEach(legacyEncryptorId -> expectedControllerEncryptor.add(new RegisterControllerRecord.Encryptor().setEncryptorId(legacyEncryptorId)));
        return Arrays.asList(new ApiMessageAndVersion((ApiMessage)new BeginTransactionRecord().setName("Bootstrap records"), 0), new ApiMessageAndVersion((ApiMessage)new FeatureLevelRecord().setName("confluent.metadata.version").setFeatureLevel(MetadataVersion.IBP_3_7_IV0.confluentFeatureLevel()), 0), new ApiMessageAndVersion((ApiMessage)new EndTransactionRecord(), 0), new ApiMessageAndVersion((ApiMessage)new RegisterControllerRecord().setControllerId(0).setMetadataEncryptors(expectedControllerEncryptor).setIncarnationId(Uuid.fromString((String)"AAAAAAA04IIAAAAAAAAAAA")).setEndPoints(new RegisterControllerRecord.ControllerEndpointCollection(Collections.singletonList(new RegisterControllerRecord.ControllerEndpoint().setName("CONTROLLER").setHost("localhost").setPort(8000).setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)).iterator())).setFeatures(new RegisterControllerRecord.ControllerFeatureCollection(Collections.singletonList(new RegisterControllerRecord.ControllerFeature().setName("confluent.metadata.version").setMinSupportedVersion(MetadataVersion.MINIMUM_VERSION.confluentFeatureLevel()).setMaxSupportedVersion(MetadataVersion.IBP_3_7_IV0.confluentFeatureLevel())).iterator())), 0), new ApiMessageAndVersion((ApiMessage)new RegisterControllerRecord().setControllerId(1).setMetadataEncryptors(expectedControllerEncryptor).setIncarnationId(Uuid.fromString((String)"AAAAAAA04IIAAAAAAAAAAQ")).setEndPoints(new RegisterControllerRecord.ControllerEndpointCollection(Collections.singletonList(new RegisterControllerRecord.ControllerEndpoint().setName("CONTROLLER").setHost("localhost").setPort(8001).setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)).iterator())).setFeatures(new RegisterControllerRecord.ControllerFeatureCollection(Collections.singletonList(new RegisterControllerRecord.ControllerFeature().setName("confluent.metadata.version").setMinSupportedVersion(MetadataVersion.MINIMUM_VERSION.confluentFeatureLevel()).setMaxSupportedVersion(MetadataVersion.IBP_3_7_IV0.confluentFeatureLevel())).iterator())), 0), new ApiMessageAndVersion((ApiMessage)new RegisterControllerRecord().setControllerId(2).setMetadataEncryptors(expectedControllerEncryptor).setIncarnationId(Uuid.fromString((String)"AAAAAAA04IIAAAAAAAAAAg")).setEndPoints(new RegisterControllerRecord.ControllerEndpointCollection(Collections.singletonList(new RegisterControllerRecord.ControllerEndpoint().setName("CONTROLLER").setHost("localhost").setPort(8002).setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)).iterator())).setFeatures(new RegisterControllerRecord.ControllerFeatureCollection(Collections.singletonList(new RegisterControllerRecord.ControllerFeature().setName("confluent.metadata.version").setMinSupportedVersion(MetadataVersion.MINIMUM_VERSION.confluentFeatureLevel()).setMaxSupportedVersion(MetadataVersion.IBP_3_7_IV0.confluentFeatureLevel())).iterator())), 0), new ApiMessageAndVersion((ApiMessage)new RegisterBrokerRecord().setMetadataEncryptors(expectedBrokerEncryptor).setBrokerId(0).setBrokerEpoch(brokerEpochs.get(0).longValue()).setIncarnationId(Uuid.fromString((String)"kxAT73dKQsitIedpiPtwB0")).setEndPoints(new RegisterBrokerRecord.BrokerEndpointCollection(Collections.singletonList(new RegisterBrokerRecord.BrokerEndpoint().setName("PLAINTEXT").setHost("localhost").setPort(9092).setSecurityProtocol((short)0)).iterator())).setFeatures(this.registrationFeatures(MetadataVersion.MINIMUM_VERSION, MetadataVersion.IBP_3_7_IV0)).setRack(null).setFenced(true), 2), new ApiMessageAndVersion((ApiMessage)new InstallMetadataEncryptorRecord().setKeyId(MetadataEncryptorFactoryTest.FOO_ID), 0), new ApiMessageAndVersion((ApiMessage)new RegisterBrokerRecord().setMetadataEncryptors(expectedBrokerEncryptor).setBrokerId(1).setBrokerEpoch(brokerEpochs.get(1).longValue()).setIncarnationId(Uuid.fromString((String)"kxAT73dKQsitIedpiPtwB1")).setEndPoints(new RegisterBrokerRecord.BrokerEndpointCollection(Collections.singletonList(new RegisterBrokerRecord.BrokerEndpoint().setName("PLAINTEXT").setHost("localhost").setPort(9093).setSecurityProtocol((short)0)).iterator())).setFeatures(this.registrationFeatures(MetadataVersion.MINIMUM_VERSION, MetadataVersion.IBP_3_7_IV0)).setRack(null).setFenced(true), 2), new ApiMessageAndVersion((ApiMessage)new RegisterBrokerRecord().setMetadataEncryptors(expectedBrokerEncryptor).setBrokerId(2).setBrokerEpoch(brokerEpochs.get(2).longValue()).setIncarnationId(Uuid.fromString((String)"kxAT73dKQsitIedpiPtwB2")).setEndPoints(new RegisterBrokerRecord.BrokerEndpointCollection(Collections.singletonList(new RegisterBrokerRecord.BrokerEndpoint().setName("PLAINTEXT").setHost("localhost").setPort(9094).setSecurityProtocol((short)0)).iterator())).setFeatures(this.registrationFeatures(MetadataVersion.MINIMUM_VERSION, MetadataVersion.IBP_3_7_IV0)).setRack(null).setFenced(true), 2), new ApiMessageAndVersion((ApiMessage)new RegisterBrokerRecord().setMetadataEncryptors(expectedBrokerEncryptor).setBrokerId(3).setBrokerEpoch(brokerEpochs.get(3).longValue()).setIncarnationId(Uuid.fromString((String)"kxAT73dKQsitIedpiPtwB3")).setEndPoints(new RegisterBrokerRecord.BrokerEndpointCollection(Collections.singletonList(new RegisterBrokerRecord.BrokerEndpoint().setName("PLAINTEXT").setHost("localhost").setPort(9095).setSecurityProtocol((short)0)).iterator())).setFeatures(this.registrationFeatures(MetadataVersion.MINIMUM_VERSION, MetadataVersion.IBP_3_7_IV0)).setRack(null).setFenced(true), 2), new ApiMessageAndVersion((ApiMessage)new BrokerRegistrationChangeRecord().setBrokerId(0).setBrokerEpoch(brokerEpochs.get(0).longValue()).setFenced(BrokerRegistrationFencingChange.UNFENCE.value()), 0), new ApiMessageAndVersion((ApiMessage)new BrokerRegistrationChangeRecord().setBrokerId(1).setBrokerEpoch(brokerEpochs.get(1).longValue()).setFenced(BrokerRegistrationFencingChange.UNFENCE.value()), 0), new ApiMessageAndVersion((ApiMessage)new BrokerRegistrationChangeRecord().setBrokerId(2).setBrokerEpoch(brokerEpochs.get(2).longValue()).setFenced(BrokerRegistrationFencingChange.UNFENCE.value()), 0), new ApiMessageAndVersion((ApiMessage)new TopicRecord().setName("foo").setTopicId(fooId), 0), new ApiMessageAndVersion((ApiMessage)new PartitionRecord().setPartitionId(0).setTopicId(fooId).setReplicas(Arrays.asList(0, 1, 2)).setIsr(Arrays.asList(0, 1, 2)).setRemovingReplicas(Collections.emptyList()).setAddingReplicas(Collections.emptyList()).setLeader(0).setLeaderEpoch(0).setPartitionEpoch(0), 0), new ApiMessageAndVersion((ApiMessage)new PartitionRecord().setPartitionId(1).setTopicId(fooId).setReplicas(Arrays.asList(1, 2, 0)).setIsr(Arrays.asList(1, 2, 0)).setRemovingReplicas(Collections.emptyList()).setAddingReplicas(Collections.emptyList()).setLeader(1).setLeaderEpoch(0).setPartitionEpoch(0), 0), new ApiMessageAndVersion((ApiMessage)new ProducerIdsRecord().setBrokerId(0).setBrokerEpoch(brokerEpochs.get(0).longValue()).setNextProducerId(1000L), 0), new ApiMessageAndVersion((ApiMessage)new BrokerReplicaExclusionRecord().setBrokerExclusions(exclusions.entrySet().stream().map(e -> new BrokerReplicaExclusionRecord.BrokerReplicaExclusion().setBrokerId(((Integer)e.getKey()).intValue()).setReason((String)e.getValue())).collect(Collectors.toList())), 0));
    }

    private void alterBrokerHealth(QuorumController controller, List<Integer> brokerIds, BrokerComponent component, ComponentHealthStatus healthStatus) throws Throwable {
        this.alterBrokerHealth(controller, brokerIds, -1L, component, healthStatus, FIRST_REASON_TO_DEGRADE, true);
    }

    private void alterBrokerHealth(QuorumController controller, List<Integer> brokerIds, long brokerEpoch, BrokerComponent component, ComponentHealthStatus healthStatus) throws Throwable {
        this.alterBrokerHealth(controller, brokerIds, brokerEpoch, component, healthStatus, FIRST_REASON_TO_DEGRADE, true);
    }

    private void alterBrokerHealth(QuorumController controller, List<Integer> brokerIds, long brokerEpoch, BrokerComponent component, ComponentHealthStatus healthStatus, String reason, Boolean force) throws Throwable {
        controller.alterBrokerHealth(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, new AlterBrokerHealthRequestData().setBrokerIds(brokerIds).setBrokerEpoch(brokerEpoch).setComponentCode(component.id()).setStatusCode(healthStatus.id()).setReason(reason).setForce(force.booleanValue())).get();
    }

    @ParameterizedTest
    @EnumSource(value=BrokerComponent.class, names={"UNSPECIFIED", "STORAGE", "NETWORK", "EXTERNAL_CONNECTIVITY_STARTUP"})
    public void testAlterBrokerHealth(BrokerComponent component) throws Throwable {
        int numBrokers = 3;
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).build();
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).setControllerBuilderInitializer(controllerBuilder -> {
            controllerBuilder.setConfigSchema(ConfigurationControlManagerTest.SCHEMA);
            controllerBuilder.setStaticConfig(MetadataEncryptorFactoryTest.TEST_LEGACY_CONFIG);
            controllerBuilder.setMetadataEncryptorFactorySupplier(() -> MetadataEncryptorFactoryTest.TEST_LEGACY_ENCRYPTOR_FACTORY);
        }).setBootstrapMetadata(BootstrapMetadata.fromVersion((MetadataVersion)MetadataVersion.IBP_3_7_IV2, (String)"test")).build();){
            QuorumController active = controlEnv.activeController();
            Map<Integer, Long> brokerEpochs = QuorumControllerIntegrationTestUtils.registerBrokersAndUnfence(active, numBrokers, false);
            this.alterBrokerHealth(active, Arrays.asList(0, 2), component, ComponentHealthStatus.DEGRADED);
            ExecutionException executionException = (ExecutionException)Assertions.assertThrows(ExecutionException.class, () -> this.alterBrokerHealth(active, Arrays.asList(1, 5), component, ComponentHealthStatus.DEGRADED));
            Assertions.assertTrue((boolean)(executionException.getCause() instanceof InvalidRequestException));
            this.alterBrokerHealth(active, Arrays.asList(1), brokerEpochs.get(1), component, ComponentHealthStatus.DEGRADED);
            Throwable exception = Assertions.assertThrows(ExecutionException.class, () -> this.alterBrokerHealth(active, Collections.singletonList(2), (Long)brokerEpochs.get(2), component, ComponentHealthStatus.DEGRADED, SECOND_REASON_TO_DEGRADE, false));
            Assertions.assertTrue((boolean)(exception.getCause() instanceof DemotionLimitReachedException));
            this.alterBrokerHealth(active, Arrays.asList(0, 1, 2), component, ComponentHealthStatus.DEGRADED);
            this.alterBrokerHealth(active, Collections.singletonList(0), brokerEpochs.get(0), component, ComponentHealthStatus.DEGRADED, SECOND_REASON_TO_DEGRADE, true);
            executionException = (ExecutionException)Assertions.assertThrows(ExecutionException.class, () -> this.alterBrokerHealth(active, Arrays.asList(0, 5), component, ComponentHealthStatus.HEALTHY));
            Assertions.assertTrue((boolean)(executionException.getCause() instanceof InvalidRequestException));
            this.alterBrokerHealth(active, Arrays.asList(0), brokerEpochs.get(0), component, ComponentHealthStatus.HEALTHY);
            controlEnv.close();
            List<ApiMessageAndVersion> expectedRecords = this.expectedRecordsForBrokerHealth(brokerEpochs, component);
            List<ApiMessageAndVersion> actualRecords = logEnv.allRecords();
            for (int i = 0; i < expectedRecords.size(); ++i) {
                ApiMessageAndVersion expected = expectedRecords.get(i);
                ApiMessageAndVersion actual = actualRecords.get(i);
                if (expected.message() instanceof BrokerRegistrationChangeRecord && actual.message() instanceof BrokerRegistrationChangeRecord) {
                    BrokerRegistrationChangeRecord expectedChangeRecord = (BrokerRegistrationChangeRecord)expected.message();
                    BrokerRegistrationChangeRecord actualChangeRecord = (BrokerRegistrationChangeRecord)actual.message();
                    if (expectedChangeRecord.degradedComponents() != null) {
                        expectedChangeRecord.degradedComponents().sort(Comparator.comparing(BrokerRegistrationChangeRecord.DegradedComponent::reason));
                    }
                    if (actualChangeRecord.degradedComponents() != null) {
                        actualChangeRecord.degradedComponents().sort(Comparator.comparing(BrokerRegistrationChangeRecord.DegradedComponent::reason));
                    }
                }
                Assertions.assertEquals((Object)expected, (Object)actual);
            }
            Assertions.assertEquals(expectedRecords, logEnv.allRecords());
            QuorumControllerTest.testToImages(logEnv.allRecords());
        }
    }

    private List<ApiMessageAndVersion> expectedRecordsForBrokerHealth(Map<Integer, Long> brokerEpochs, BrokerComponent component) {
        ArrayList expectedBrokerEncryptor = new ArrayList();
        MetadataEncryptorFactoryTest.TEST_LEGACY_ENCRYPTOR_FACTORY.legacyEncryptorIds().forEach(legacyEncryptorId -> expectedBrokerEncryptor.add(new RegisterBrokerRecord.Encryptor().setEncryptorId(legacyEncryptorId)));
        return Arrays.asList(new ApiMessageAndVersion((ApiMessage)new BeginTransactionRecord().setName("Bootstrap records"), 0), new ApiMessageAndVersion((ApiMessage)new FeatureLevelRecord().setName("confluent.metadata.version").setFeatureLevel(MetadataVersion.IBP_3_7_IV2.confluentFeatureLevel()), 0), new ApiMessageAndVersion((ApiMessage)new EndTransactionRecord(), 0), new ApiMessageAndVersion((ApiMessage)new RegisterBrokerRecord().setBrokerId(0).setBrokerEpoch(brokerEpochs.get(0).longValue()).setIncarnationId(Uuid.fromString((String)"kxAT73dKQsitIedpiPtwB0")).setEndPoints(new RegisterBrokerRecord.BrokerEndpointCollection(Arrays.asList(new RegisterBrokerRecord.BrokerEndpoint().setName("PLAINTEXT").setHost("localhost").setPort(9092).setSecurityProtocol((short)0)).iterator())).setFeatures(this.registrationFeatures(MetadataVersion.MINIMUM_VERSION, MetadataVersion.latestTesting())).setRack(null).setDegradedComponents(Collections.singletonList(this.externalConnectivityStartup)).setMetadataEncryptors(expectedBrokerEncryptor).setLogDirs(Collections.singletonList(Uuid.fromString((String)("TESTBROKER" + Integer.toString(100000).substring(1) + "DIRAAAA")))), 3), new ApiMessageAndVersion((ApiMessage)new InstallMetadataEncryptorRecord().setKeyId(MetadataEncryptorFactoryTest.FOO_ID), 0), new ApiMessageAndVersion((ApiMessage)new BrokerRegistrationChangeRecord().setBrokerId(0).setBrokerEpoch(brokerEpochs.get(0).longValue()).setFenced(BrokerRegistrationFencingChange.UNFENCE.value()), 0), new ApiMessageAndVersion((ApiMessage)new RegisterBrokerRecord().setBrokerId(1).setBrokerEpoch(brokerEpochs.get(1).longValue()).setIncarnationId(Uuid.fromString((String)"kxAT73dKQsitIedpiPtwB1")).setEndPoints(new RegisterBrokerRecord.BrokerEndpointCollection(Arrays.asList(new RegisterBrokerRecord.BrokerEndpoint().setName("PLAINTEXT").setHost("localhost").setPort(9093).setSecurityProtocol((short)0)).iterator())).setFeatures(this.registrationFeatures(MetadataVersion.MINIMUM_VERSION, MetadataVersion.latestTesting())).setRack(null).setDegradedComponents(Collections.singletonList(this.externalConnectivityStartup)).setMetadataEncryptors(expectedBrokerEncryptor).setLogDirs(Collections.singletonList(Uuid.fromString((String)("TESTBROKER" + Integer.toString(100001).substring(1) + "DIRAAAA")))), 3), new ApiMessageAndVersion((ApiMessage)new BrokerRegistrationChangeRecord().setBrokerId(1).setBrokerEpoch(brokerEpochs.get(1).longValue()).setFenced(BrokerRegistrationFencingChange.UNFENCE.value()), 0), new ApiMessageAndVersion((ApiMessage)new RegisterBrokerRecord().setBrokerId(2).setBrokerEpoch(brokerEpochs.get(2).longValue()).setIncarnationId(Uuid.fromString((String)"kxAT73dKQsitIedpiPtwB2")).setEndPoints(new RegisterBrokerRecord.BrokerEndpointCollection(Arrays.asList(new RegisterBrokerRecord.BrokerEndpoint().setName("PLAINTEXT").setHost("localhost").setPort(9094).setSecurityProtocol((short)0)).iterator())).setFeatures(this.registrationFeatures(MetadataVersion.MINIMUM_VERSION, MetadataVersion.latestTesting())).setRack(null).setDegradedComponents(Collections.singletonList(this.externalConnectivityStartup)).setMetadataEncryptors(expectedBrokerEncryptor).setLogDirs(Collections.singletonList(Uuid.fromString((String)("TESTBROKER" + Integer.toString(100002).substring(1) + "DIRAAAA")))), 3), new ApiMessageAndVersion((ApiMessage)new BrokerRegistrationChangeRecord().setBrokerId(2).setBrokerEpoch(brokerEpochs.get(2).longValue()).setFenced(BrokerRegistrationFencingChange.UNFENCE.value()), 0), new ApiMessageAndVersion((ApiMessage)new BrokerRegistrationChangeRecord().setBrokerId(0).setBrokerEpoch(brokerEpochs.get(0).longValue()).setFenced(BrokerRegistrationFencingChange.NONE.value()).setDegradedComponents(Arrays.asList(new BrokerRegistrationChangeRecord.DegradedComponent().setComponentCode(BrokerComponent.EXTERNAL_CONNECTIVITY_STARTUP.id()).setReason(AlterBrokerHealthRequest.networkHealthStartupAlterBrokerHealthReason), new BrokerRegistrationChangeRecord.DegradedComponent().setComponentCode(component.id()).setReason(FIRST_REASON_TO_DEGRADE))), 1), new ApiMessageAndVersion((ApiMessage)new BrokerRegistrationChangeRecord().setBrokerId(2).setBrokerEpoch(brokerEpochs.get(2).longValue()).setFenced(BrokerRegistrationFencingChange.NONE.value()).setDegradedComponents(Arrays.asList(new BrokerRegistrationChangeRecord.DegradedComponent().setComponentCode(BrokerComponent.EXTERNAL_CONNECTIVITY_STARTUP.id()).setReason(AlterBrokerHealthRequest.networkHealthStartupAlterBrokerHealthReason), new BrokerRegistrationChangeRecord.DegradedComponent().setComponentCode(component.id()).setReason(FIRST_REASON_TO_DEGRADE))), 1), new ApiMessageAndVersion((ApiMessage)new BrokerRegistrationChangeRecord().setBrokerId(1).setBrokerEpoch(brokerEpochs.get(1).longValue()).setFenced(BrokerRegistrationFencingChange.NONE.value()).setDegradedComponents(Arrays.asList(new BrokerRegistrationChangeRecord.DegradedComponent().setComponentCode(BrokerComponent.EXTERNAL_CONNECTIVITY_STARTUP.id()).setReason(AlterBrokerHealthRequest.networkHealthStartupAlterBrokerHealthReason), new BrokerRegistrationChangeRecord.DegradedComponent().setComponentCode(component.id()).setReason(FIRST_REASON_TO_DEGRADE))), 1), new ApiMessageAndVersion((ApiMessage)new BrokerRegistrationChangeRecord().setBrokerId(0).setBrokerEpoch(brokerEpochs.get(0).longValue()).setFenced(BrokerRegistrationFencingChange.NONE.value()).setDegradedComponents(Arrays.asList(new BrokerRegistrationChangeRecord.DegradedComponent().setComponentCode(BrokerComponent.EXTERNAL_CONNECTIVITY_STARTUP.id()).setReason(AlterBrokerHealthRequest.networkHealthStartupAlterBrokerHealthReason), new BrokerRegistrationChangeRecord.DegradedComponent().setComponentCode(component.id()).setReason(FIRST_REASON_TO_DEGRADE), new BrokerRegistrationChangeRecord.DegradedComponent().setComponentCode(component.id()).setReason(SECOND_REASON_TO_DEGRADE))), 1), new ApiMessageAndVersion((ApiMessage)new BrokerRegistrationChangeRecord().setBrokerId(0).setBrokerEpoch(brokerEpochs.get(0).longValue()).setFenced(BrokerRegistrationFencingChange.NONE.value()).setDegradedComponents(Arrays.asList(new BrokerRegistrationChangeRecord.DegradedComponent().setComponentCode(BrokerComponent.EXTERNAL_CONNECTIVITY_STARTUP.id()).setReason(AlterBrokerHealthRequest.networkHealthStartupAlterBrokerHealthReason), new BrokerRegistrationChangeRecord.DegradedComponent().setComponentCode(component.id()).setReason(SECOND_REASON_TO_DEGRADE))), 1));
    }

    @Test
    public void testTimeouts() throws Throwable {
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).build();
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).build();){
            QuorumController controller = controlEnv.activeController();
            CountDownLatch countDownLatch = QuorumControllerIntegrationTestUtils.pause(controller);
            long now = controller.time().nanoseconds();
            ControllerRequestContext context0 = new ControllerRequestContext(new RequestHeaderData(), KafkaPrincipal.ANONYMOUS, OptionalLong.of(now));
            CompletableFuture createFuture = controller.createTopics(context0, new CreateTopicsRequestData().setTimeoutMs(0).setTopics(new CreateTopicsRequestData.CreatableTopicCollection(Collections.singleton(new CreateTopicsRequestData.CreatableTopic().setName("foo")).iterator())), Collections.emptySet());
            CompletableFuture deleteFuture = controller.deleteTopics(context0, Collections.singletonList(Uuid.ZERO_UUID));
            CompletableFuture findTopicIdsFuture = controller.findTopicIds(context0, Collections.singletonList("foo"));
            CompletableFuture findTopicNamesFuture = controller.findTopicNames(context0, Collections.singletonList(Uuid.ZERO_UUID));
            CompletableFuture createPartitionsFuture = controller.createPartitions(context0, Collections.singletonList(new CreatePartitionsRequestData.CreatePartitionsTopic()), false);
            CompletableFuture electLeadersFuture = controller.electLeaders(context0, new ElectLeadersRequestData().setTimeoutMs(0).setTopicPartitions(null));
            CompletableFuture alterReassignmentsFuture = controller.alterPartitionReassignments(context0, new AlterPartitionReassignmentsRequestData().setTimeoutMs(0).setTopics(Collections.singletonList(new AlterPartitionReassignmentsRequestData.ReassignableTopic())));
            CompletableFuture listReassignmentsFuture = controller.listPartitionReassignments(context0, new ListPartitionReassignmentsRequestData().setTopics(null).setTimeoutMs(0));
            while (controller.time().nanoseconds() == now) {
                Thread.sleep(0L, 10);
            }
            countDownLatch.countDown();
            QuorumControllerTest.assertYieldsTimeout(createFuture);
            QuorumControllerTest.assertYieldsTimeout(deleteFuture);
            QuorumControllerTest.assertYieldsTimeout(findTopicIdsFuture);
            QuorumControllerTest.assertYieldsTimeout(findTopicNamesFuture);
            QuorumControllerTest.assertYieldsTimeout(createPartitionsFuture);
            QuorumControllerTest.assertYieldsTimeout(electLeadersFuture);
            QuorumControllerTest.assertYieldsTimeout(alterReassignmentsFuture);
            QuorumControllerTest.assertYieldsTimeout(listReassignmentsFuture);
            QuorumControllerTest.testToImages(logEnv.allRecords());
        }
    }

    private static void assertYieldsTimeout(Future<?> future) {
        Assertions.assertEquals(TimeoutException.class, ((ExecutionException)Assertions.assertThrows(ExecutionException.class, future::get)).getCause().getClass());
    }

    @Test
    public void testEarlyControllerResults() throws Throwable {
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).build();
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).build();){
            QuorumController controller = controlEnv.activeController();
            CountDownLatch countDownLatch = QuorumControllerIntegrationTestUtils.pause(controller);
            CompletableFuture createFuture = controller.createTopics(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, new CreateTopicsRequestData().setTimeoutMs(120000), Collections.emptySet());
            CompletableFuture deleteFuture = controller.deleteTopics(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, Collections.emptyList());
            CompletableFuture findTopicIdsFuture = controller.findTopicIds(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, Collections.emptyList());
            CompletableFuture findTopicNamesFuture = controller.findTopicNames(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, Collections.emptyList());
            CompletableFuture createPartitionsFuture = controller.createPartitions(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, Collections.emptyList(), false);
            CompletableFuture electLeadersFuture = controller.electLeaders(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, new ElectLeadersRequestData());
            CompletableFuture alterReassignmentsFuture = controller.alterPartitionReassignments(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, new AlterPartitionReassignmentsRequestData());
            createFuture.get();
            deleteFuture.get();
            findTopicIdsFuture.get();
            findTopicNamesFuture.get();
            createPartitionsFuture.get();
            electLeadersFuture.get();
            alterReassignmentsFuture.get();
            countDownLatch.countDown();
            QuorumControllerTest.testToImages(logEnv.allRecords());
        }
    }

    private void sendBrokerHeartbeatToUnfenceBrokers(QuorumController controller, List<Integer> brokers, Map<Integer, Long> brokerEpochs) throws Exception {
        if (brokers.isEmpty()) {
            return;
        }
        for (Integer brokerId : brokers) {
            BrokerHeartbeatReply reply = (BrokerHeartbeatReply)controller.processBrokerHeartbeat(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, new BrokerHeartbeatRequestData().setWantFence(false).setBrokerEpoch(brokerEpochs.get(brokerId).longValue()).setBrokerId(brokerId.intValue()).setCurrentMetadataOffset(100000L)).get();
            Assertions.assertEquals((Object)new BrokerHeartbeatReply(true, false, false, false), (Object)reply);
        }
    }

    private BrokerHeartbeatReply sendBrokerHeartbeatWantShutdown(QuorumController controller, int brokerId, Map<Integer, Long> brokerEpochs, long currentMetadataOffset) throws Exception {
        return (BrokerHeartbeatReply)controller.processBrokerHeartbeat(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, new BrokerHeartbeatRequestData().setWantFence(false).setWantShutDown(true).setBrokerEpoch(brokerEpochs.get(brokerId).longValue()).setBrokerId(brokerId).setCurrentMetadataOffset(currentMetadataOffset)).get();
    }

    @Test
    public void testConfigResourceExistenceChecker() throws Throwable {
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3).build();
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).build();){
            QuorumController active = controlEnv.activeController();
            QuorumControllerIntegrationTestUtils.registerBrokersAndUnfence(active, 5, true);
            active.createTopics(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, new CreateTopicsRequestData().setTopics(new CreateTopicsRequestData.CreatableTopicCollection(Collections.singleton(new CreateTopicsRequestData.CreatableTopic().setName("foo").setReplicationFactor((short)3).setNumPartitions(1)).iterator())), Collections.singleton("foo")).get();
            QuorumController.ConfigResourceExistenceChecker checker = new QuorumController.ConfigResourceExistenceChecker(active);
            checker.accept(new ConfigResource(ConfigResource.Type.BROKER, ""));
            checker.accept(new ConfigResource(ConfigResource.Type.BROKER, "3"));
            Assertions.assertThrows(BrokerIdNotRegisteredException.class, () -> checker.accept(new ConfigResource(ConfigResource.Type.BROKER, "10")));
            checker.accept(new ConfigResource(ConfigResource.Type.TOPIC, "foo"));
            Assertions.assertThrows(UnknownTopicOrPartitionException.class, () -> checker.accept(new ConfigResource(ConfigResource.Type.TOPIC, "bar")));
            QuorumControllerTest.testToImages(logEnv.allRecords());
        }
    }

    @Test
    public void testFatalMetadataReplayErrorOnActive() throws Throwable {
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3).build();
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).build();){
            QuorumController active = controlEnv.activeController();
            CompletableFuture future = active.appendWriteEvent("errorEvent", OptionalLong.empty(), () -> ControllerResult.of(Collections.singletonList(new ApiMessageAndVersion((ApiMessage)new ConfigRecord().setName(null).setResourceName(null).setResourceType((byte)-1).setValue(null), 0)), null));
            Assertions.assertThrows(ExecutionException.class, future::get);
            Assertions.assertEquals(NullPointerException.class, controlEnv.fatalFaultHandler(active.nodeId()).firstException().getCause().getClass());
            controlEnv.ignoreFatalFaults();
        }
    }

    @Test
    public void testFatalMetadataErrorDuringSnapshotLoading() throws Exception {
        InitialSnapshot invalidSnapshot = new InitialSnapshot(Collections.singletonList(new ApiMessageAndVersion((ApiMessage)new PartitionRecord(), 0)));
        LocalLogManagerTestEnv.Builder logEnvBuilder = new LocalLogManagerTestEnv.Builder(3).setSnapshotReader((RawSnapshotReader)FileRawSnapshotReader.open((Path)invalidSnapshot.tempDir.toPath(), (OffsetAndEpoch)new OffsetAndEpoch(0L, 0)));
        try (LocalLogManagerTestEnv logEnv = logEnvBuilder.build();
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).build();){
            TestUtils.waitForCondition(() -> controlEnv.controllers().stream().allMatch(controller -> controlEnv.fatalFaultHandler(controller.nodeId()).firstException() != null), (String)"At least one controller failed to detect the fatal fault");
            controlEnv.ignoreFatalFaults();
        }
    }

    @Test
    public void testFatalMetadataErrorDuringLogLoading() throws Exception {
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3).build();){
            logEnv.appendInitialRecords(Collections.singletonList(new ApiMessageAndVersion((ApiMessage)new PartitionRecord(), 0)));
            try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).build();){
                TestUtils.waitForCondition(() -> controlEnv.controllers().stream().allMatch(controller -> controlEnv.fatalFaultHandler(controller.nodeId()).firstException() != null), (String)"At least one controller failed to detect the fatal fault");
                controlEnv.ignoreFatalFaults();
            }
        }
    }

    private static void assertInitialLoadFuturesNotComplete(List<StandardAuthorizer> authorizers) {
        for (int i = 0; i < authorizers.size(); ++i) {
            Assertions.assertFalse((boolean)authorizers.get(i).initialLoadFuture().isDone(), (String)("authorizer " + i + " should not have completed loading."));
        }
    }

    @Test
    public void testConfigRecordsWithAesMetadataEncryptor() throws Throwable {
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3).build();
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).setControllerBuilderInitializer(controllerBuilder -> {
            controllerBuilder.setConfigSchema(ConfigurationControlManagerTest.SCHEMA);
            controllerBuilder.setStaticConfig(MetadataEncryptorFactoryTest.TEST_LEGACY_CONFIG);
            controllerBuilder.setMetadataEncryptorFactorySupplier(() -> MetadataEncryptorFactoryTest.TEST_LEGACY_ENCRYPTOR_FACTORY);
        }).build();){
            QuorumController active = controlEnv.activeController();
            QuorumControllerIntegrationTestUtils.registerBrokersAndUnfence(active, 3, true);
            active.createTopics(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, new CreateTopicsRequestData().setTopics(new CreateTopicsRequestData.CreatableTopicCollection(Collections.singleton(new CreateTopicsRequestData.CreatableTopic().setName("mytopic").setReplicationFactor((short)3).setNumPartitions(1)).iterator())), Collections.singleton("mytopic")).get();
            HashMap<String, Map.Entry<AlterConfigOp.OpType, String>> configChanges = new HashMap<String, Map.Entry<AlterConfigOp.OpType, String>>();
            configChanges.put("def", ConfigurationControlManagerTest.entry(AlterConfigOp.OpType.SET, "new_def"));
            configChanges.put("xyz", ConfigurationControlManagerTest.entry(AlterConfigOp.OpType.SET, "new_secret_value"));
            Assertions.assertEquals(Collections.singletonMap(ConfigurationControlManagerTest.MYTOPIC, ApiError.NONE), active.incrementalAlterConfigs(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, Collections.singletonMap(ConfigurationControlManagerTest.MYTOPIC, configChanges), false).get());
            QuorumControllerTest.testToImages(logEnv.allRecords());
        }
    }

    @Test
    public void testInsertBootstrapRecordsToEmptyLog() throws Exception {
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3).build();
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).setBootstrapMetadata(COMPLEX_BOOTSTRAP).build();){
            QuorumController active = controlEnv.activeController();
            ControllerRequestContext ctx = new ControllerRequestContext(new RequestHeaderData(), KafkaPrincipal.ANONYMOUS, OptionalLong.of(Long.MAX_VALUE));
            TestUtils.waitForCondition(() -> {
                FinalizedControllerFeatures features = (FinalizedControllerFeatures)active.finalizedFeatures(ctx).get();
                Optional metadataVersionOpt = features.get("confluent.metadata.version");
                return Optional.of(MetadataVersion.MINIMUM_VERSION.confluentFeatureLevel()).equals(metadataVersionOpt);
            }, (String)"Failed to see expected metadata.version from bootstrap metadata");
            TestUtils.waitForCondition(() -> {
                ConfigResource defaultBrokerResource = new ConfigResource(ConfigResource.Type.BROKER, "");
                Map configs = Collections.singletonMap(defaultBrokerResource, Collections.emptyList());
                Map results = (Map)active.describeConfigs(ctx, configs).get();
                ResultOrError resultOrError = (ResultOrError)results.get(defaultBrokerResource);
                return resultOrError.isResult() && Collections.singletonMap("foo", "bar").equals(resultOrError.result());
            }, (String)"Failed to see expected config change from bootstrap metadata");
            QuorumControllerTest.testToImages(logEnv.allRecords());
        }
    }

    private static ApiMessageAndVersion rec(int i) {
        return new ApiMessageAndVersion((ApiMessage)new BrokerRegistrationChangeRecord().setBrokerId(i), 0);
    }

    @Test
    public void testAppendRecords() {
        TestAppender appender = new TestAppender();
        Assertions.assertEquals((long)5L, (long)QuorumController.appendRecords((Logger)log, (ControllerResult)ControllerResult.of(Arrays.asList(QuorumControllerTest.rec(0), QuorumControllerTest.rec(1), QuorumControllerTest.rec(2), QuorumControllerTest.rec(3), QuorumControllerTest.rec(4)), null), (int)2, (Function)appender));
    }

    @Test
    public void testAppendRecordsAtomically() {
        TestAppender appender = new TestAppender();
        Assertions.assertEquals((Object)"Attempted to atomically commit 5 records, but maxRecordsPerBatch is 2", (Object)((IllegalStateException)Assertions.assertThrows(IllegalStateException.class, () -> QuorumController.appendRecords((Logger)log, (ControllerResult)ControllerResult.atomicOf(Arrays.asList(QuorumControllerTest.rec(0), QuorumControllerTest.rec(1), QuorumControllerTest.rec(2), QuorumControllerTest.rec(3), QuorumControllerTest.rec(4)), null), (int)2, (Function)appender))).getMessage());
    }

    FeatureControlManager getActivationRecords(MetadataVersion metadataVersion) {
        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
        FeatureControlManager featureControlManager = new FeatureControlManager.Builder().setSnapshotRegistry(snapshotRegistry).setMetadataVersion(metadataVersion).build();
        ControllerResult result = ActivationRecordsGenerator.generate(msg -> {}, (boolean)true, (long)-1L, (BootstrapMetadata)BootstrapMetadata.fromVersion((MetadataVersion)metadataVersion, (String)"test"), (MetadataVersion)metadataVersion, (int)3);
        RecordTestUtils.replayAll(featureControlManager, result.records());
        return featureControlManager;
    }

    @Test
    public void testActivationRecords33() {
        FeatureControlManager featureControl = this.getActivationRecords(MetadataVersion.IBP_3_3_IV3);
        Assertions.assertEquals((Object)MetadataVersion.IBP_3_3_IV3, (Object)featureControl.metadataVersion());
    }

    @Test
    public void testActivationRecords34() {
        FeatureControlManager featureControl = this.getActivationRecords(MetadataVersion.IBP_3_4_IV0);
        Assertions.assertEquals((Object)MetadataVersion.IBP_3_4_IV0, (Object)featureControl.metadataVersion());
    }

    @Test
    public void testActivationRecordsNonEmptyLog() {
        FeatureControlManager featureControl = this.getActivationRecords(MetadataVersion.IBP_3_4_IV0);
        Assertions.assertEquals((Object)MetadataVersion.IBP_3_4_IV0, (Object)featureControl.metadataVersion());
    }

    @Test
    public void testActivationRecordsPartialBootstrap() {
        ControllerResult result = ActivationRecordsGenerator.generate(logMsg -> {}, (boolean)true, (long)0L, (BootstrapMetadata)BootstrapMetadata.fromVersion((MetadataVersion)MetadataVersion.IBP_3_6_IV1, (String)"test"), (MetadataVersion)MetadataVersion.IBP_3_6_IV1, (int)3);
        Assertions.assertFalse((boolean)result.isAtomic());
        Assertions.assertTrue((boolean)RecordTestUtils.recordAtIndexAs(AbortTransactionRecord.class, result.records(), 0).isPresent());
        Assertions.assertTrue((boolean)RecordTestUtils.recordAtIndexAs(BeginTransactionRecord.class, result.records(), 1).isPresent());
        Assertions.assertTrue((boolean)RecordTestUtils.recordAtIndexAs(EndTransactionRecord.class, result.records(), result.records().size() - 1).isPresent());
    }

    private static void testToImages(List<ApiMessageAndVersion> fromRecords) {
        List<RecordTestUtils.ImageDeltaPair> testMatrix = Arrays.asList(new RecordTestUtils.ImageDeltaPair<AclsImage, AclsDelta>(() -> AclsImage.EMPTY, AclsDelta::new), new RecordTestUtils.ImageDeltaPair<ClientQuotasImage, ClientQuotasDelta>(() -> ClientQuotasImage.EMPTY, ClientQuotasDelta::new), new RecordTestUtils.ImageDeltaPair<ClusterImage, ClusterDelta>(() -> ClusterImage.EMPTY, ClusterDelta::new), new RecordTestUtils.ImageDeltaPair<ConfigurationsImage, ConfigurationsDelta>(() -> ConfigurationsImage.EMPTY, ConfigurationsDelta::new), new RecordTestUtils.ImageDeltaPair<DelegationTokenImage, DelegationTokenDelta>(() -> DelegationTokenImage.EMPTY, DelegationTokenDelta::new), new RecordTestUtils.ImageDeltaPair<FeaturesImage, FeaturesDelta>(() -> FeaturesImage.EMPTY, FeaturesDelta::new), new RecordTestUtils.ImageDeltaPair<ProducerIdsImage, ProducerIdsDelta>(() -> ProducerIdsImage.EMPTY, ProducerIdsDelta::new), new RecordTestUtils.ImageDeltaPair<ScramImage, ScramDelta>(() -> ScramImage.EMPTY, ScramDelta::new), new RecordTestUtils.ImageDeltaPair<TopicsImage, TopicsDelta>(() -> TopicsImage.EMPTY, image -> new TopicsDelta(image, __ -> null)));
        for (RecordTestUtils.ImageDeltaPair pair : testMatrix) {
            new RecordTestUtils.TestThroughAllIntermediateImagesLeadingToFinalImageHelper(pair.imageSupplier(), pair.deltaCreator()).test(fromRecords);
        }
    }

    @Test
    public void testActivationRecordsPartialTransaction() {
        OffsetControlManager offsetControlManager = new OffsetControlManager.Builder().build();
        offsetControlManager.replay(new BeginTransactionRecord(), 10L);
        offsetControlManager.handleCommitBatch(Batch.data((long)20L, (int)1, (long)1L, (int)0, Collections.singletonList(new ApiMessageAndVersion((ApiMessage)new BeginTransactionRecord(), 0))));
        ControllerResult result = ActivationRecordsGenerator.generate(logMsg -> {}, (boolean)false, (long)offsetControlManager.transactionStartOffset(), (BootstrapMetadata)BootstrapMetadata.fromVersion((MetadataVersion)MetadataVersion.IBP_3_6_IV1, (String)"test"), (MetadataVersion)MetadataVersion.IBP_3_6_IV1, (int)3);
        Assertions.assertTrue((boolean)result.isAtomic());
        offsetControlManager.replay(RecordTestUtils.recordAtIndexAs(AbortTransactionRecord.class, result.records(), 0).get(), 21L);
        Assertions.assertEquals((long)-1L, (long)offsetControlManager.transactionStartOffset());
    }

    @Test
    public void testActivationRecordsPartialTransactionNoSupport() {
        OffsetControlManager offsetControlManager = new OffsetControlManager.Builder().build();
        offsetControlManager.replay(new BeginTransactionRecord(), 10L);
        offsetControlManager.handleCommitBatch(Batch.data((long)20L, (int)1, (long)1L, (int)0, Collections.singletonList(new ApiMessageAndVersion((ApiMessage)new BeginTransactionRecord(), 0))));
        Assertions.assertThrows(RuntimeException.class, () -> ActivationRecordsGenerator.generate(msg -> {}, (boolean)false, (long)offsetControlManager.transactionStartOffset(), (BootstrapMetadata)BootstrapMetadata.fromVersion((MetadataVersion)MetadataVersion.IBP_3_6_IV0, (String)"test"), (MetadataVersion)MetadataVersion.IBP_3_6_IV0, (int)3));
    }

    static class InitialSnapshot
    implements AutoCloseable {
        File tempDir = TestUtils.tempDirectory();
        BatchFileWriter writer;

        public InitialSnapshot(List<ApiMessageAndVersion> records) throws Exception {
            Path path = Snapshots.snapshotPath((Path)this.tempDir.toPath(), (OffsetAndEpoch)new OffsetAndEpoch(0L, 0));
            this.writer = BatchFileWriter.open((Path)path);
            this.writer.append(records);
            this.writer.close();
            this.writer = null;
        }

        @Override
        public void close() throws Exception {
            Utils.closeQuietly((AutoCloseable)this.writer, (String)"BatchFileWriter");
            Utils.delete((File)this.tempDir);
        }
    }

    static class TestAppender
    implements Function<List<ApiMessageAndVersion>, Long> {
        private long offset = 0L;

        TestAppender() {
        }

        @Override
        public Long apply(List<ApiMessageAndVersion> apiMessageAndVersions) {
            for (ApiMessageAndVersion apiMessageAndVersion : apiMessageAndVersions) {
                BrokerRegistrationChangeRecord record = (BrokerRegistrationChangeRecord)apiMessageAndVersion.message();
                Assertions.assertEquals((int)((int)this.offset), (int)record.brokerId());
                ++this.offset;
            }
            return this.offset;
        }
    }
}

