/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.controller;

import java.io.File;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.AllocateProducerIdsRequestData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.AlterPartitionRequestData;
import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
import org.apache.kafka.common.message.BrokerRegistrationRequestData;
import org.apache.kafka.common.message.ControllerRegistrationRequestData;
import org.apache.kafka.common.message.CreatePartitionsRequestData;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsResponseData;
import org.apache.kafka.common.message.ElectLeadersRequestData;
import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.RequestHeaderData;
import org.apache.kafka.common.metadata.AbortTransactionRecord;
import org.apache.kafka.common.metadata.BeginTransactionRecord;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.EndTransactionRecord;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.ProducerIdsRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.RegisterControllerRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AlterPartitionRequest;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.test.api.Flaky;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.controller.ActivationRecordsGenerator;
import org.apache.kafka.controller.BrokersToIsrs;
import org.apache.kafka.controller.ConfigurationControlManagerTest;
import org.apache.kafka.controller.ControllerRequestContext;
import org.apache.kafka.controller.ControllerRequestContextUtil;
import org.apache.kafka.controller.ControllerResult;
import org.apache.kafka.controller.FeatureControlManager;
import org.apache.kafka.controller.OffsetControlManager;
import org.apache.kafka.controller.QuorumController;
import org.apache.kafka.controller.QuorumControllerIntegrationTestUtils;
import org.apache.kafka.controller.QuorumControllerTestEnv;
import org.apache.kafka.controller.ResultOrError;
import org.apache.kafka.image.AclsDelta;
import org.apache.kafka.image.AclsImage;
import org.apache.kafka.image.ClientQuotasDelta;
import org.apache.kafka.image.ClientQuotasImage;
import org.apache.kafka.image.ClusterDelta;
import org.apache.kafka.image.ClusterImage;
import org.apache.kafka.image.ConfigurationsDelta;
import org.apache.kafka.image.ConfigurationsImage;
import org.apache.kafka.image.DelegationTokenDelta;
import org.apache.kafka.image.DelegationTokenImage;
import org.apache.kafka.image.FeaturesDelta;
import org.apache.kafka.image.FeaturesImage;
import org.apache.kafka.image.ProducerIdsDelta;
import org.apache.kafka.image.ProducerIdsImage;
import org.apache.kafka.image.ScramDelta;
import org.apache.kafka.image.ScramImage;
import org.apache.kafka.image.TopicsDelta;
import org.apache.kafka.image.TopicsImage;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
import org.apache.kafka.metadata.BrokerRegistrationReply;
import org.apache.kafka.metadata.FinalizedControllerFeatures;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
import org.apache.kafka.metadata.util.BatchFileWriter;
import org.apache.kafka.metalog.LocalLogManager;
import org.apache.kafka.metalog.LocalLogManagerTestEnv;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
import org.apache.kafka.server.common.Feature;
import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.server.common.TopicIdPartition;
import org.apache.kafka.snapshot.FileRawSnapshotReader;
import org.apache.kafka.snapshot.RawSnapshotReader;
import org.apache.kafka.snapshot.Snapshots;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Timeout(value=40L)
public class QuorumControllerTest {
    private static final Logger log = LoggerFactory.getLogger(QuorumControllerTest.class);
    static final BootstrapMetadata SIMPLE_BOOTSTRAP = BootstrapMetadata.fromVersion((MetadataVersion)MetadataVersion.IBP_3_7_IV0, (String)"test-provided bootstrap");
    private static final BootstrapMetadata COMPLEX_BOOTSTRAP = BootstrapMetadata.fromRecords(List.of(new ApiMessageAndVersion((ApiMessage)new FeatureLevelRecord().setName("metadata.version").setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel()), 0), new ApiMessageAndVersion((ApiMessage)new ConfigRecord().setResourceType(ConfigResource.Type.BROKER.id()).setResourceName("").setName("foo").setValue("bar"), 0)), (String)"test bootstrap");

    @Test
    public void testConfigurationOperations() throws Throwable {
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).build();
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).build();){
            controlEnv.activeController().registerBroker(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, new BrokerRegistrationRequestData().setFeatures(QuorumControllerIntegrationTestUtils.brokerFeaturesPlusFeatureVersions(MetadataVersion.MINIMUM_VERSION, MetadataVersion.latestTesting(), Map.of("eligible.leader.replicas.version", EligibleLeaderReplicasVersion.ELRV_1.featureLevel()))).setBrokerId(0).setLogDirs(List.of(Uuid.fromString((String)"iiaQjkRPQcuMULNII0MUeA"))).setClusterId(logEnv.clusterId())).get();
            this.testConfigurationOperations(controlEnv.activeController());
            QuorumControllerTest.testToImages(logEnv.allRecords());
        }
    }

    private void testConfigurationOperations(QuorumController controller) throws Throwable {
        Assertions.assertEquals(Map.of(ConfigurationControlManagerTest.BROKER0, ApiError.NONE), controller.incrementalAlterConfigs(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, Map.of(ConfigurationControlManagerTest.BROKER0, Map.of("baz", ConfigurationControlManagerTest.entry(AlterConfigOp.OpType.SET, "123"))), true).get());
        Assertions.assertEquals(Map.of(ConfigurationControlManagerTest.BROKER0, new ResultOrError(Map.of())), controller.describeConfigs(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, Map.of(ConfigurationControlManagerTest.BROKER0, List.of())).get());
        Assertions.assertEquals(Map.of(ConfigurationControlManagerTest.BROKER0, ApiError.NONE), controller.incrementalAlterConfigs(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, Map.of(ConfigurationControlManagerTest.BROKER0, Map.of("baz", ConfigurationControlManagerTest.entry(AlterConfigOp.OpType.SET, "123"))), false).get());
        Assertions.assertEquals(Map.of(ConfigurationControlManagerTest.BROKER0, new ResultOrError(Map.of("baz", "123"))), controller.describeConfigs(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, Map.of(ConfigurationControlManagerTest.BROKER0, List.of())).get());
    }

    @Test
    public void testDelayedConfigurationOperations() throws Throwable {
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).build();
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).build();){
            controlEnv.activeController().registerBroker(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, new BrokerRegistrationRequestData().setFeatures(QuorumControllerIntegrationTestUtils.brokerFeaturesPlusFeatureVersions(MetadataVersion.MINIMUM_VERSION, MetadataVersion.latestTesting(), Map.of("eligible.leader.replicas.version", EligibleLeaderReplicasVersion.ELRV_1.featureLevel()))).setBrokerId(0).setLogDirs(List.of(Uuid.fromString((String)"sTbzRAMnTpahIyIPNjiLhw"))).setClusterId(logEnv.clusterId())).get();
            this.testDelayedConfigurationOperations(logEnv, controlEnv.activeController());
            QuorumControllerTest.testToImages(logEnv.allRecords());
        }
    }

    private void testDelayedConfigurationOperations(LocalLogManagerTestEnv logEnv, QuorumController controller) throws Throwable {
        logEnv.logManagers().forEach(m -> m.setMaxReadOffset(1L));
        CompletableFuture future1 = controller.incrementalAlterConfigs(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, Map.of(ConfigurationControlManagerTest.BROKER0, Map.of("baz", ConfigurationControlManagerTest.entry(AlterConfigOp.OpType.SET, "123"))), false);
        Assertions.assertFalse((boolean)future1.isDone());
        Assertions.assertEquals(Map.of(ConfigurationControlManagerTest.BROKER0, new ResultOrError(Map.of())), controller.describeConfigs(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, Map.of(ConfigurationControlManagerTest.BROKER0, List.of())).get());
        logEnv.logManagers().forEach(m -> m.setMaxReadOffset(8L));
        Assertions.assertEquals(Map.of(ConfigurationControlManagerTest.BROKER0, ApiError.NONE), future1.get());
    }

    @Test
    public void testFenceMultipleBrokers() throws Throwable {
        List<Integer> allBrokers = List.of(Integer.valueOf(1), Integer.valueOf(2), Integer.valueOf(3), Integer.valueOf(4), Integer.valueOf(5));
        List<Integer> brokersToKeepUnfenced = List.of(Integer.valueOf(1));
        List<Integer> brokersToFence = List.of(Integer.valueOf(2), Integer.valueOf(3), Integer.valueOf(4), Integer.valueOf(5));
        short replicationFactor = (short)allBrokers.size();
        short numberOfPartitions = (short)allBrokers.size();
        long sessionTimeoutMillis = 1000L;
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).build();
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).setSessionTimeoutMillis(OptionalLong.of(sessionTimeoutMillis)).setBootstrapMetadata(SIMPLE_BOOTSTRAP).build();){
            BrokerRegistrationRequestData.ListenerCollection listeners = new BrokerRegistrationRequestData.ListenerCollection();
            listeners.add((ImplicitLinkedHashCollection.Element)new BrokerRegistrationRequestData.Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092));
            QuorumController active = controlEnv.activeController();
            HashMap<Integer, Long> brokerEpochs = new HashMap<Integer, Long>();
            for (Integer brokerId2 : allBrokers) {
                CompletableFuture reply = active.registerBroker(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, new BrokerRegistrationRequestData().setBrokerId(brokerId2.intValue()).setClusterId(active.clusterId()).setFeatures(QuorumControllerIntegrationTestUtils.brokerFeatures(MetadataVersion.MINIMUM_VERSION, MetadataVersion.latestTesting())).setIncarnationId(Uuid.randomUuid()).setListeners(listeners));
                brokerEpochs.put(brokerId2, ((BrokerRegistrationReply)reply.get()).epoch());
            }
            allBrokers.forEach(brokerId -> Assertions.assertFalse((boolean)active.clusterControl().isUnfenced(brokerId.intValue()), (String)("Broker " + brokerId + " should have been fenced")));
            QuorumControllerIntegrationTestUtils.sendBrokerHeartbeatToUnfenceBrokers(active, allBrokers, brokerEpochs);
            CreateTopicsRequestData createTopicsRequestData = new CreateTopicsRequestData().setTopics(new CreateTopicsRequestData.CreatableTopicCollection(Set.of(new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions((int)numberOfPartitions).setReplicationFactor(replicationFactor)).iterator()));
            CreateTopicsResponseData createTopicsResponseData = (CreateTopicsResponseData)active.createTopics(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, createTopicsRequestData, Set.of("foo")).get();
            Assertions.assertEquals((Object)Errors.NONE, (Object)Errors.forCode((short)createTopicsResponseData.topics().find("foo").errorCode()));
            Uuid topicIdFoo = createTopicsResponseData.topics().find("foo").topicId();
            TestUtils.waitForCondition(() -> {
                QuorumControllerIntegrationTestUtils.sendBrokerHeartbeatToUnfenceBrokers(active, brokersToKeepUnfenced, brokerEpochs);
                for (Integer brokerId : brokersToFence) {
                    if (!active.clusterControl().isUnfenced(brokerId.intValue())) continue;
                    return false;
                }
                return true;
            }, (long)(sessionTimeoutMillis * 3L), (String)"Fencing of brokers did not process within expected time");
            QuorumControllerIntegrationTestUtils.sendBrokerHeartbeatToUnfenceBrokers(active, brokersToKeepUnfenced, brokerEpochs);
            brokersToKeepUnfenced.forEach(brokerId -> Assertions.assertTrue((boolean)active.clusterControl().isUnfenced(brokerId.intValue()), (String)("Broker " + brokerId + " should have been unfenced")));
            brokersToFence.forEach(brokerId -> Assertions.assertFalse((boolean)active.clusterControl().isUnfenced(brokerId.intValue()), (String)("Broker " + brokerId + " should have been fenced")));
            int[] expectedIsr = new int[]{1};
            int[] isrFoo = active.replicationControl().getPartition((Uuid)topicIdFoo, (int)0).isr;
            Assertions.assertArrayEquals((int[])isrFoo, (int[])expectedIsr, (String)("The ISR for topic foo was " + Arrays.toString(isrFoo) + ". It is expected to be " + Arrays.toString(expectedIsr)));
            int fooLeader = active.replicationControl().getPartition((Uuid)topicIdFoo, (int)0).leader;
            Assertions.assertEquals((int)expectedIsr[0], (int)fooLeader);
            Assertions.assertTrue((boolean)active.replicationControl().arePartitionLeadersImbalanced());
            QuorumControllerTest.testToImages(logEnv.allRecords());
        }
    }

    @Test
    public void testElrEnabledByDefault() throws Throwable {
        long sessionTimeoutMillis = 500L;
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).build();
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).setSessionTimeoutMillis(OptionalLong.of(sessionTimeoutMillis)).setBootstrapMetadata(BootstrapMetadata.fromRecords(List.of(new ApiMessageAndVersion((ApiMessage)new FeatureLevelRecord().setName("metadata.version").setFeatureLevel(MetadataVersion.IBP_4_0_IV1.featureLevel()), 0), new ApiMessageAndVersion((ApiMessage)new FeatureLevelRecord().setName("eligible.leader.replicas.version").setFeatureLevel(EligibleLeaderReplicasVersion.ELRV_1.featureLevel()), 0)), (String)"test-provided bootstrap ELR enabled")).build();){
            controlEnv.activeController(true);
            Assertions.assertTrue((boolean)controlEnv.activeController().configurationControl().clusterConfig().containsKey("min.insync.replicas"));
        }
    }

    @Test
    public void testUncleanShutdownBrokerElrEnabled() throws Throwable {
        List<Integer> allBrokers = List.of(Integer.valueOf(1), Integer.valueOf(2), Integer.valueOf(3));
        Map brokerLogDirs = allBrokers.stream().collect(Collectors.toMap(Function.identity(), brokerId -> Uuid.randomUuid()));
        short replicationFactor = (short)allBrokers.size();
        long sessionTimeoutMillis = 500L;
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).build();
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).setSessionTimeoutMillis(OptionalLong.of(sessionTimeoutMillis)).setBootstrapMetadata(BootstrapMetadata.fromVersion((MetadataVersion)MetadataVersion.IBP_4_0_IV1, (String)"test-provided bootstrap ELR enabled")).build();){
            BrokerRegistrationRequestData.ListenerCollection listeners = new BrokerRegistrationRequestData.ListenerCollection();
            listeners.add((ImplicitLinkedHashCollection.Element)new BrokerRegistrationRequestData.Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092));
            QuorumController active = controlEnv.activeController();
            HashMap<Integer, Long> brokerEpochs = new HashMap<Integer, Long>();
            BrokerRegistrationRequestData.FeatureCollection features = QuorumControllerIntegrationTestUtils.brokerFeaturesPlusFeatureVersions(MetadataVersion.MINIMUM_VERSION, MetadataVersion.IBP_4_0_IV1, Map.of("eligible.leader.replicas.version", EligibleLeaderReplicasVersion.ELRV_1.featureLevel()));
            for (Integer brokerId2 : allBrokers) {
                CompletableFuture reply = active.registerBroker(ControllerRequestContextUtil.anonymousContextFor(ApiKeys.BROKER_REGISTRATION), new BrokerRegistrationRequestData().setBrokerId(brokerId2.intValue()).setClusterId(active.clusterId()).setFeatures(features).setIncarnationId(Uuid.randomUuid()).setLogDirs(List.of(brokerLogDirs.get(brokerId2))).setListeners(listeners));
                brokerEpochs.put(brokerId2, ((BrokerRegistrationReply)reply.get()).epoch());
            }
            allBrokers.forEach(brokerId -> Assertions.assertFalse((boolean)active.clusterControl().isUnfenced(brokerId.intValue()), (String)("Broker " + brokerId + " should have been fenced")));
            QuorumControllerIntegrationTestUtils.sendBrokerHeartbeatToUnfenceBrokers(active, allBrokers, brokerEpochs);
            CreateTopicsRequestData createTopicsRequestData = new CreateTopicsRequestData().setTopics(new CreateTopicsRequestData.CreatableTopicCollection(Set.of(new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions(1).setReplicationFactor(replicationFactor)).iterator()));
            CreateTopicsResponseData createTopicsResponseData = (CreateTopicsResponseData)active.createTopics(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, createTopicsRequestData, Set.of("foo")).get();
            Assertions.assertEquals((Object)Errors.NONE, (Object)Errors.forCode((short)createTopicsResponseData.topics().find("foo").errorCode()));
            Uuid topicIdFoo = createTopicsResponseData.topics().find("foo").topicId();
            ConfigRecord configRecord = new ConfigRecord().setResourceType(ConfigResource.Type.TOPIC.id()).setResourceName("foo").setName("min.insync.replicas").setValue("2");
            RecordTestUtils.replayAll(active.configurationControl(), List.of(new ApiMessageAndVersion((ApiMessage)configRecord, 0)));
            TestUtils.waitForCondition(() -> {
                for (Integer brokerId : allBrokers) {
                    if (!active.clusterControl().isUnfenced(brokerId.intValue())) continue;
                    return false;
                }
                return true;
            }, (long)(sessionTimeoutMillis * 30L), (String)"Fencing of brokers did not process within expected time");
            PartitionRegistration partition = active.replicationControl().getPartition(topicIdFoo, 0);
            Assertions.assertEquals((int)1, (int)partition.lastKnownElr.length, (String)partition.toString());
            int[] lastKnownElr = partition.lastKnownElr;
            Assertions.assertEquals((int)0, (int)partition.isr.length, (String)partition.toString());
            Assertions.assertEquals((int)-1, (int)partition.leader, (String)partition.toString());
            Assertions.assertEquals((int)2, (int)partition.elr.length, (String)partition.toString());
            int brokerToUncleanShutdown = lastKnownElr[0];
            int brokerToBeTheLeader = lastKnownElr[0] == partition.elr[0] ? partition.elr[1] : partition.elr[0];
            CompletableFuture reply = active.registerBroker(ControllerRequestContextUtil.anonymousContextFor(ApiKeys.BROKER_REGISTRATION), new BrokerRegistrationRequestData().setBrokerId(brokerToUncleanShutdown).setClusterId(active.clusterId()).setFeatures(features).setIncarnationId(Uuid.randomUuid()).setLogDirs(List.of(brokerLogDirs.get(brokerToUncleanShutdown))).setListeners(listeners));
            brokerEpochs.put(brokerToUncleanShutdown, ((BrokerRegistrationReply)reply.get()).epoch());
            partition = active.replicationControl().getPartition(topicIdFoo, 0);
            Assertions.assertArrayEquals((int[])new int[]{brokerToBeTheLeader}, (int[])partition.elr, (String)partition.toString());
            Assertions.assertArrayEquals((int[])lastKnownElr, (int[])partition.lastKnownElr, (String)partition.toString());
            CompletableFuture replyLeader = active.registerBroker(ControllerRequestContextUtil.anonymousContextFor(ApiKeys.BROKER_REGISTRATION), new BrokerRegistrationRequestData().setBrokerId(brokerToBeTheLeader).setClusterId(active.clusterId()).setFeatures(features).setIncarnationId(Uuid.randomUuid()).setPreviousBrokerEpoch(((Long)brokerEpochs.get(brokerToBeTheLeader)).longValue()).setLogDirs(List.of(brokerLogDirs.get(brokerToBeTheLeader))).setListeners(listeners));
            brokerEpochs.put(brokerToBeTheLeader, ((BrokerRegistrationReply)replyLeader.get()).epoch());
            partition = active.replicationControl().getPartition(topicIdFoo, 0);
            int[] expectedIsr = new int[]{brokerToBeTheLeader};
            Assertions.assertArrayEquals((int[])expectedIsr, (int[])partition.elr, (String)("The ELR for topic partition foo-0 was " + Arrays.toString(partition.elr) + ". It is expected to be " + Arrays.toString(expectedIsr)));
            Assertions.assertArrayEquals((int[])lastKnownElr, (int[])partition.lastKnownElr, (String)("The last known ELR for topic partition foo-0 was " + Arrays.toString(partition.lastKnownElr) + ". It is expected to be " + Arrays.toString(lastKnownElr)));
            QuorumControllerIntegrationTestUtils.sendBrokerHeartbeatToUnfenceBrokers(active, List.of(Integer.valueOf(brokerToBeTheLeader)), brokerEpochs);
            TestUtils.waitForCondition(() -> active.clusterControl().isUnfenced(brokerToBeTheLeader), (long)(sessionTimeoutMillis * 3L), (String)"Broker should be unfenced.");
            partition = active.replicationControl().getPartition(topicIdFoo, 0);
            Assertions.assertArrayEquals((int[])new int[]{brokerToBeTheLeader}, (int[])partition.isr, (String)partition.toString());
            Assertions.assertEquals((int)0, (int)partition.elr.length, (String)partition.toString());
            Assertions.assertEquals((int)0, (int)partition.lastKnownElr.length, (String)partition.toString());
            Assertions.assertEquals((int)brokerToBeTheLeader, (int)partition.leader, (String)partition.toString());
        }
    }

    @Test
    public void testUncleanShutdownElrDisabled() throws Exception {
        List<Integer> allBrokers = List.of(Integer.valueOf(1), Integer.valueOf(2), Integer.valueOf(3));
        int replicationFactor = allBrokers.size();
        long sessionTimeoutMillis = 500L;
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).build();
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).setControllerBuilderInitializer(controllerBuilder -> controllerBuilder.setFenceStaleBrokerIntervalNs(TimeUnit.SECONDS.toNanos(15L))).setSessionTimeoutMillis(OptionalLong.of(sessionTimeoutMillis)).setBootstrapMetadata(BootstrapMetadata.fromVersion((MetadataVersion)MetadataVersion.IBP_4_0_IV0, (String)"test-provided bootstrap ELR not supported")).build();){
            BrokerRegistrationRequestData.ListenerCollection listeners = new BrokerRegistrationRequestData.ListenerCollection();
            listeners.add((ImplicitLinkedHashCollection.Element)new BrokerRegistrationRequestData.Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092));
            QuorumController active = controlEnv.activeController();
            HashMap<Integer, Long> brokerEpochs = new HashMap<Integer, Long>();
            BrokerRegistrationRequestData.FeatureCollection features = QuorumControllerIntegrationTestUtils.brokerFeaturesPlusFeatureVersions(MetadataVersion.MINIMUM_VERSION, MetadataVersion.IBP_4_0_IV0, Map.of("eligible.leader.replicas.version", EligibleLeaderReplicasVersion.ELRV_0.featureLevel()));
            for (Integer brokerId2 : allBrokers) {
                CompletableFuture reply = active.registerBroker(ControllerRequestContextUtil.anonymousContextFor(ApiKeys.BROKER_REGISTRATION), new BrokerRegistrationRequestData().setBrokerId(brokerId2.intValue()).setClusterId(active.clusterId()).setFeatures(features).setIncarnationId(Uuid.randomUuid()).setLogDirs(List.of(Uuid.randomUuid())).setListeners(listeners));
                brokerEpochs.put(brokerId2, ((BrokerRegistrationReply)reply.get()).epoch());
            }
            allBrokers.forEach(brokerId -> Assertions.assertFalse((boolean)active.clusterControl().isUnfenced(brokerId.intValue()), (String)("Broker " + brokerId + " should have been fenced")));
            QuorumControllerIntegrationTestUtils.sendBrokerHeartbeatToUnfenceBrokers(active, allBrokers, brokerEpochs);
            CreateTopicsRequestData createTopicsRequestData = new CreateTopicsRequestData().setTopics(new CreateTopicsRequestData.CreatableTopicCollection(Set.of(new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions(1).setReplicationFactor((short)replicationFactor)).iterator()));
            CreateTopicsResponseData createTopicsResponseData = (CreateTopicsResponseData)active.createTopics(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, createTopicsRequestData, Set.of("foo")).get();
            Assertions.assertEquals((Object)Errors.NONE, (Object)Errors.forCode((short)createTopicsResponseData.topics().find("foo").errorCode()));
            Uuid topicIdFoo = createTopicsResponseData.topics().find("foo").topicId();
            active.time().sleep(sessionTimeoutMillis);
            for (int i = 0; i < replicationFactor; ++i) {
                PartitionRegistration partition = active.replicationControl().getPartition(topicIdFoo, 0);
                Assertions.assertEquals((int)0, (int)partition.elr.length, (String)partition.toString());
                Assertions.assertEquals((int)0, (int)partition.lastKnownElr.length, (String)partition.toString());
                boolean lastStandingIsr = i == replicationFactor - 1;
                int prevLeader = partition.leader;
                int prevLeaderEpoch = partition.leaderEpoch;
                active.registerBroker(ControllerRequestContextUtil.anonymousContextFor(ApiKeys.BROKER_REGISTRATION), new BrokerRegistrationRequestData().setBrokerId(prevLeader).setClusterId(active.clusterId()).setFeatures(features).setIncarnationId(Uuid.randomUuid()).setLogDirs(List.of(Uuid.randomUuid())).setListeners(listeners)).get();
                partition = active.replicationControl().getPartition(topicIdFoo, 0);
                int currentLeader = partition.leader;
                int currentLeaderEpoch = partition.leaderEpoch;
                Assertions.assertNotEquals((int)currentLeader, (int)prevLeader);
                Assertions.assertNotEquals((int)currentLeaderEpoch, (int)prevLeaderEpoch);
                if (lastStandingIsr) {
                    Assertions.assertArrayEquals((int[])new int[]{prevLeader}, (int[])partition.isr);
                    Assertions.assertEquals((int)-1, (int)currentLeader);
                    continue;
                }
                List<Integer> isr = Arrays.stream(partition.isr).boxed().toList();
                Assertions.assertFalse((boolean)isr.contains(prevLeader));
            }
        }
    }

    @Flaky(value="KAFKA-18981")
    @Test
    public void testMinIsrUpdateWithElr() throws Throwable {
        List<Integer> allBrokers = List.of(Integer.valueOf(1), Integer.valueOf(2), Integer.valueOf(3));
        List<Integer> brokersToKeepUnfenced = List.of(Integer.valueOf(1));
        List<Integer> brokersToFence = List.of(Integer.valueOf(2), Integer.valueOf(3));
        short replicationFactor = (short)allBrokers.size();
        long sessionTimeoutMillis = 300L;
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).build();
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).setSessionTimeoutMillis(OptionalLong.of(sessionTimeoutMillis)).setBootstrapMetadata(BootstrapMetadata.fromVersion((MetadataVersion)MetadataVersion.IBP_4_0_IV1, (String)"test-provided bootstrap ELR enabled")).build();){
            BrokerRegistrationRequestData.ListenerCollection listeners = new BrokerRegistrationRequestData.ListenerCollection();
            listeners.add((ImplicitLinkedHashCollection.Element)new BrokerRegistrationRequestData.Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092));
            QuorumController active = controlEnv.activeController();
            HashMap<Integer, Long> brokerEpochs = new HashMap<Integer, Long>();
            for (Integer brokerId2 : allBrokers) {
                CompletableFuture reply = active.registerBroker(ControllerRequestContextUtil.anonymousContextFor(ApiKeys.BROKER_REGISTRATION), new BrokerRegistrationRequestData().setBrokerId(brokerId2.intValue()).setClusterId(active.clusterId()).setFeatures(QuorumControllerIntegrationTestUtils.brokerFeaturesPlusFeatureVersions(MetadataVersion.MINIMUM_VERSION, MetadataVersion.latestTesting(), Map.of("eligible.leader.replicas.version", EligibleLeaderReplicasVersion.ELRV_1.featureLevel()))).setIncarnationId(Uuid.randomUuid()).setLogDirs(List.of(Uuid.randomUuid())).setListeners(listeners));
                brokerEpochs.put(brokerId2, ((BrokerRegistrationReply)reply.get()).epoch());
            }
            allBrokers.forEach(brokerId -> Assertions.assertFalse((boolean)active.clusterControl().isUnfenced(brokerId.intValue()), (String)("Broker " + brokerId + " should have been fenced")));
            QuorumControllerIntegrationTestUtils.sendBrokerHeartbeatToUnfenceBrokers(active, allBrokers, brokerEpochs);
            CreateTopicsRequestData createTopicsRequestData = new CreateTopicsRequestData().setTopics(new CreateTopicsRequestData.CreatableTopicCollection(List.of(new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions(1).setReplicationFactor(replicationFactor), new CreateTopicsRequestData.CreatableTopic().setName("bar").setNumPartitions(1).setReplicationFactor(replicationFactor)).iterator()));
            CreateTopicsResponseData createTopicsResponseData = (CreateTopicsResponseData)active.createTopics(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, createTopicsRequestData, new HashSet<String>(List.of("foo", "bar"))).get();
            Assertions.assertEquals((Object)Errors.NONE, (Object)Errors.forCode((short)createTopicsResponseData.topics().find("foo").errorCode()));
            Assertions.assertEquals((Object)Errors.NONE, (Object)Errors.forCode((short)createTopicsResponseData.topics().find("bar").errorCode()));
            Uuid topicIdFoo = createTopicsResponseData.topics().find("foo").topicId();
            Uuid topicIdBar = createTopicsResponseData.topics().find("bar").topicId();
            ConfigRecord configRecord = new ConfigRecord().setResourceType(ConfigResource.Type.BROKER.id()).setResourceName("").setName("min.insync.replicas").setValue("2");
            RecordTestUtils.replayAll(active.configurationControl(), List.of(new ApiMessageAndVersion((ApiMessage)configRecord, 0)));
            TestUtils.waitForCondition(() -> {
                QuorumControllerIntegrationTestUtils.sendBrokerHeartbeatToUnfenceBrokers(active, brokersToKeepUnfenced, brokerEpochs);
                for (Integer brokerId : brokersToFence) {
                    if (!active.clusterControl().isUnfenced(brokerId.intValue())) continue;
                    return false;
                }
                return true;
            }, (long)(sessionTimeoutMillis * 30L), (String)"Fencing of brokers did not process within expected time");
            QuorumControllerIntegrationTestUtils.sendBrokerHeartbeatToUnfenceBrokers(active, brokersToKeepUnfenced, brokerEpochs);
            brokersToKeepUnfenced.forEach(brokerId -> Assertions.assertTrue((boolean)active.clusterControl().isUnfenced(brokerId.intValue()), (String)("Broker " + brokerId + " should have been unfenced")));
            brokersToFence.forEach(brokerId -> Assertions.assertFalse((boolean)active.clusterControl().isUnfenced(brokerId.intValue()), (String)("Broker " + brokerId + " should have been fenced")));
            QuorumControllerIntegrationTestUtils.sendBrokerHeartbeatToUnfenceBrokers(active, brokersToKeepUnfenced, brokerEpochs);
            PartitionRegistration partition = active.replicationControl().getPartition(topicIdFoo, 0);
            Assertions.assertArrayEquals((int[])new int[]{1}, (int[])partition.isr, (String)partition.toString());
            Assertions.assertEquals((int)1, (int)partition.elr.length, (String)partition.toString());
            ControllerResult result = active.configurationControl().incrementalAlterConfigs(ConfigurationControlManagerTest.toMap(ConfigurationControlManagerTest.entry(new ConfigResource(ConfigResource.Type.TOPIC, "foo"), ConfigurationControlManagerTest.toMap(ConfigurationControlManagerTest.entry("min.insync.replicas", ConfigurationControlManagerTest.entry(AlterConfigOp.OpType.SET, "1"))))), true);
            Assertions.assertEquals((int)2, (int)result.records().size(), (String)result.records().toString());
            RecordTestUtils.replayAll(active.configurationControl(), List.of((ApiMessageAndVersion)result.records().get(0)));
            RecordTestUtils.replayAll(active.replicationControl(), List.of((ApiMessageAndVersion)result.records().get(1)));
            partition = active.replicationControl().getPartition(topicIdFoo, 0);
            Assertions.assertEquals((int)0, (int)partition.elr.length, (String)partition.toString());
            Assertions.assertArrayEquals((int[])new int[]{1}, (int[])partition.isr, (String)partition.toString());
            partition = active.replicationControl().getPartition(topicIdBar, 0);
            Assertions.assertArrayEquals((int[])new int[]{1}, (int[])partition.isr, (String)partition.toString());
            Assertions.assertEquals((int)1, (int)partition.elr.length, (String)partition.toString());
            result = active.configurationControl().incrementalAlterConfigs(ConfigurationControlManagerTest.toMap(ConfigurationControlManagerTest.entry(new ConfigResource(ConfigResource.Type.BROKER, ""), ConfigurationControlManagerTest.toMap(ConfigurationControlManagerTest.entry("min.insync.replicas", ConfigurationControlManagerTest.entry(AlterConfigOp.OpType.SET, "1"))))), true);
            Assertions.assertEquals((int)2, (int)result.records().size(), (String)result.records().toString());
            RecordTestUtils.replayAll(active.configurationControl(), List.of((ApiMessageAndVersion)result.records().get(0)));
            RecordTestUtils.replayAll(active.replicationControl(), List.of((ApiMessageAndVersion)result.records().get(1)));
            partition = active.replicationControl().getPartition(topicIdBar, 0);
            Assertions.assertEquals((int)0, (int)partition.elr.length, (String)partition.toString());
            Assertions.assertArrayEquals((int[])new int[]{1}, (int[])partition.isr, (String)partition.toString());
        }
    }

    @Test
    public void testBalancePartitionLeaders() throws Throwable {
        List<Integer> allBrokers = List.of(Integer.valueOf(1), Integer.valueOf(2), Integer.valueOf(3));
        List<Integer> brokersToKeepUnfenced = List.of(Integer.valueOf(1), Integer.valueOf(2));
        List<Integer> brokersToFence = List.of(Integer.valueOf(3));
        short replicationFactor = (short)allBrokers.size();
        short numberOfPartitions = (short)allBrokers.size();
        long sessionTimeoutMillis = 2000L;
        long leaderImbalanceCheckIntervalNs = 1000000000L;
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).build();
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).setSessionTimeoutMillis(OptionalLong.of(sessionTimeoutMillis)).setLeaderImbalanceCheckIntervalNs(OptionalLong.of(leaderImbalanceCheckIntervalNs)).setBootstrapMetadata(SIMPLE_BOOTSTRAP).build();){
            BrokerRegistrationRequestData.ListenerCollection listeners = new BrokerRegistrationRequestData.ListenerCollection();
            listeners.add((ImplicitLinkedHashCollection.Element)new BrokerRegistrationRequestData.Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092));
            QuorumController active = controlEnv.activeController();
            HashMap<Integer, Long> brokerEpochs = new HashMap<Integer, Long>();
            for (Integer brokerId2 : allBrokers) {
                CompletableFuture reply = active.registerBroker(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, new BrokerRegistrationRequestData().setBrokerId(brokerId2.intValue()).setClusterId(active.clusterId()).setFeatures(QuorumControllerIntegrationTestUtils.brokerFeatures(MetadataVersion.MINIMUM_VERSION, MetadataVersion.IBP_3_7_IV0)).setIncarnationId(Uuid.randomUuid()).setListeners(listeners));
                brokerEpochs.put(brokerId2, ((BrokerRegistrationReply)reply.get()).epoch());
            }
            allBrokers.forEach(brokerId -> Assertions.assertFalse((boolean)active.clusterControl().isUnfenced(brokerId.intValue()), (String)("Broker " + brokerId + " should have been fenced")));
            QuorumControllerIntegrationTestUtils.sendBrokerHeartbeatToUnfenceBrokers(active, allBrokers, brokerEpochs);
            CreateTopicsRequestData createTopicsRequestData = new CreateTopicsRequestData().setTopics(new CreateTopicsRequestData.CreatableTopicCollection(Set.of(new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions((int)numberOfPartitions).setReplicationFactor(replicationFactor)).iterator()));
            CreateTopicsResponseData createTopicsResponseData = (CreateTopicsResponseData)active.createTopics(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, createTopicsRequestData, Set.of("foo")).get();
            Assertions.assertEquals((Object)Errors.NONE, (Object)Errors.forCode((short)createTopicsResponseData.topics().find("foo").errorCode()));
            Uuid topicIdFoo = createTopicsResponseData.topics().find("foo").topicId();
            TestUtils.waitForCondition(() -> {
                QuorumControllerIntegrationTestUtils.sendBrokerHeartbeatToUnfenceBrokers(active, brokersToKeepUnfenced, brokerEpochs);
                for (Integer brokerId : brokersToFence) {
                    if (!active.clusterControl().isUnfenced(brokerId.intValue())) continue;
                    return false;
                }
                return true;
            }, (long)(sessionTimeoutMillis * 3L), (String)"Fencing of brokers did not process within expected time");
            QuorumControllerIntegrationTestUtils.sendBrokerHeartbeatToUnfenceBrokers(active, brokersToKeepUnfenced, brokerEpochs);
            brokersToKeepUnfenced.forEach(brokerId -> Assertions.assertTrue((boolean)active.clusterControl().isUnfenced(brokerId.intValue()), (String)("Broker " + brokerId + " should have been unfenced")));
            brokersToFence.forEach(brokerId -> Assertions.assertFalse((boolean)active.clusterControl().isUnfenced(brokerId.intValue()), (String)("Broker " + brokerId + " should have been fenced")));
            Assertions.assertTrue((boolean)active.replicationControl().arePartitionLeadersImbalanced());
            for (Integer brokerId3 : brokersToFence) {
                CompletableFuture reply = active.registerBroker(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, new BrokerRegistrationRequestData().setBrokerId(brokerId3.intValue()).setClusterId(active.clusterId()).setFeatures(QuorumControllerIntegrationTestUtils.brokerFeatures(MetadataVersion.MINIMUM_VERSION, MetadataVersion.IBP_3_7_IV0)).setIncarnationId(Uuid.randomUuid()).setListeners(listeners));
                brokerEpochs.put(brokerId3, ((BrokerRegistrationReply)reply.get()).epoch());
            }
            QuorumControllerIntegrationTestUtils.sendBrokerHeartbeatToUnfenceBrokers(active, allBrokers, brokerEpochs);
            HashSet imbalancedPartitions = new HashSet(active.replicationControl().imbalancedPartitions());
            Assertions.assertEquals((int)1, (int)imbalancedPartitions.size());
            TopicIdPartition impalancedTp = (TopicIdPartition)imbalancedPartitions.iterator().next();
            int imbalancedPartitionId = impalancedTp.partitionId();
            PartitionRegistration partitionRegistration = active.replicationControl().getPartition(topicIdFoo, imbalancedPartitionId);
            AlterPartitionRequestData.PartitionData partitionData = new AlterPartitionRequestData.PartitionData().setPartitionIndex(imbalancedPartitionId).setLeaderEpoch(partitionRegistration.leaderEpoch).setPartitionEpoch(partitionRegistration.partitionEpoch).setNewIsrWithEpochs(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(List.of(Integer.valueOf(1), Integer.valueOf(2), Integer.valueOf(3))));
            AlterPartitionRequestData.TopicData topicData = new AlterPartitionRequestData.TopicData().setTopicId(impalancedTp.topicId());
            topicData.partitions().add(partitionData);
            AlterPartitionRequestData alterPartitionRequest = new AlterPartitionRequestData().setBrokerId(partitionRegistration.leader).setBrokerEpoch(((Long)brokerEpochs.get(partitionRegistration.leader)).longValue());
            alterPartitionRequest.topics().add(topicData);
            active.alterPartition(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, new AlterPartitionRequest.Builder(alterPartitionRequest).build(ApiKeys.ALTER_PARTITION.oldestVersion()).data()).get();
            AtomicLong lastHeartbeatMs = new AtomicLong(QuorumControllerTest.getMonotonicMs(active.time()));
            QuorumControllerIntegrationTestUtils.sendBrokerHeartbeatToUnfenceBrokers(active, allBrokers, brokerEpochs);
            TestUtils.waitForCondition(() -> {
                long currentMonotonicMs = QuorumControllerTest.getMonotonicMs(active.time());
                if (currentMonotonicMs > lastHeartbeatMs.get() + sessionTimeoutMillis / 2L) {
                    lastHeartbeatMs.set(currentMonotonicMs);
                    QuorumControllerIntegrationTestUtils.sendBrokerHeartbeatToUnfenceBrokers(active, allBrokers, brokerEpochs);
                }
                return !active.replicationControl().arePartitionLeadersImbalanced();
            }, (long)TimeUnit.MILLISECONDS.convert(leaderImbalanceCheckIntervalNs * 10L, TimeUnit.NANOSECONDS), (String)"Leaders were not balanced after unfencing all of the brokers");
            QuorumControllerTest.testToImages(logEnv.allRecords());
        }
    }

    private static long getMonotonicMs(Time time) {
        return TimeUnit.NANOSECONDS.toMillis(time.nanoseconds());
    }

    @Test
    public void testNoOpRecordWriteAfterTimeout() throws Throwable {
        long maxIdleIntervalNs = TimeUnit.MICROSECONDS.toNanos(100L);
        long maxReplicationDelayMs = 1000L;
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3).build();
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).setControllerBuilderInitializer(controllerBuilder -> controllerBuilder.setMaxIdleIntervalNs(OptionalLong.of(maxIdleIntervalNs))).build();){
            BrokerRegistrationRequestData.ListenerCollection listeners = new BrokerRegistrationRequestData.ListenerCollection();
            listeners.add((ImplicitLinkedHashCollection.Element)new BrokerRegistrationRequestData.Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092));
            QuorumController active = controlEnv.activeController();
            LocalLogManager localLogManager = logEnv.logManagers().stream().filter(logManager -> logManager.nodeId().equals(OptionalInt.of(active.nodeId()))).findAny().get();
            TestUtils.waitForCondition(() -> localLogManager.highWatermark().isPresent(), (long)maxReplicationDelayMs, (String)"High watermark was not established");
            long firstHighWatermark = localLogManager.highWatermark().getAsLong();
            TestUtils.waitForCondition(() -> localLogManager.highWatermark().getAsLong() > firstHighWatermark, (long)maxReplicationDelayMs, (String)"Active controller didn't write NoOpRecord the first time");
            long secondHighWatermark = localLogManager.highWatermark().getAsLong();
            TestUtils.waitForCondition(() -> localLogManager.highWatermark().getAsLong() > secondHighWatermark, (long)maxReplicationDelayMs, (String)"Active controller didn't write NoOpRecord the second time");
        }
    }

    @ParameterizedTest
    @CsvSource(value={"0, 0", "0, 1", "1, 0", "1, 1"})
    public void testRegisterBrokerKRaftVersions(short finalizedKraftVersion, short brokerMaxSupportedKraftVersion) throws Throwable {
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).setLastKRaftVersion(KRaftVersion.fromFeatureLevel((short)finalizedKraftVersion)).build();
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).setBootstrapMetadata(SIMPLE_BOOTSTRAP).build();){
            BrokerRegistrationRequestData.ListenerCollection listeners = new BrokerRegistrationRequestData.ListenerCollection();
            listeners.add((ImplicitLinkedHashCollection.Element)new BrokerRegistrationRequestData.Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092));
            QuorumController active = controlEnv.activeController();
            BrokerRegistrationRequestData.FeatureCollection brokerFeatures = new BrokerRegistrationRequestData.FeatureCollection();
            brokerFeatures.add((ImplicitLinkedHashCollection.Element)new BrokerRegistrationRequestData.Feature().setName("metadata.version").setMinSupportedVersion(MetadataVersion.MINIMUM_VERSION.featureLevel()).setMaxSupportedVersion(MetadataVersion.latestTesting().featureLevel()));
            if (brokerMaxSupportedKraftVersion != 0) {
                brokerFeatures.add((ImplicitLinkedHashCollection.Element)new BrokerRegistrationRequestData.Feature().setName("kraft.version").setMinSupportedVersion(Feature.KRAFT_VERSION.minimumProduction()).setMaxSupportedVersion(brokerMaxSupportedKraftVersion));
            }
            BrokerRegistrationRequestData request = new BrokerRegistrationRequestData().setBrokerId(0).setClusterId(active.clusterId()).setIncarnationId(Uuid.fromString((String)"kxAT73dKQsitIedpiPtwBA")).setFeatures(brokerFeatures).setLogDirs(List.of(Uuid.fromString((String)"vBpaRsZVSaGsQT53wtYGtg"))).setListeners(listeners);
            if (brokerMaxSupportedKraftVersion < finalizedKraftVersion) {
                Throwable exception = Assertions.assertThrows(ExecutionException.class, () -> active.registerBroker(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, request).get());
                Assertions.assertEquals(UnsupportedVersionException.class, exception.getCause().getClass());
                Assertions.assertEquals((Object)("Unable to register because the broker does not support finalized version " + finalizedKraftVersion + " of kraft.version. The broker wants a version between 0 and " + brokerMaxSupportedKraftVersion + ", inclusive."), (Object)exception.getCause().getMessage());
            } else {
                BrokerRegistrationReply reply = (BrokerRegistrationReply)active.registerBroker(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, request).get();
                Assertions.assertTrue((reply.epoch() >= 4L ? 1 : 0) != 0, (String)("Unexpected broker epoch " + reply.epoch()));
            }
        }
    }

    @Test
    public void testUnregisterBroker() throws Throwable {
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).build();
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).build();){
            BrokerRegistrationRequestData.ListenerCollection listeners = new BrokerRegistrationRequestData.ListenerCollection();
            listeners.add((ImplicitLinkedHashCollection.Element)new BrokerRegistrationRequestData.Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092));
            QuorumController active = controlEnv.activeController();
            CompletableFuture reply = active.registerBroker(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, new BrokerRegistrationRequestData().setBrokerId(0).setClusterId(active.clusterId()).setIncarnationId(Uuid.fromString((String)"kxAT73dKQsitIedpiPtwBA")).setFeatures(QuorumControllerIntegrationTestUtils.brokerFeaturesPlusFeatureVersions(MetadataVersion.MINIMUM_VERSION, MetadataVersion.latestTesting(), Map.of("eligible.leader.replicas.version", EligibleLeaderReplicasVersion.ELRV_1.featureLevel()))).setLogDirs(List.of(Uuid.fromString((String)"vBpaRsZVSaGsQT53wtYGtg"))).setListeners(listeners));
            Assertions.assertEquals((long)6L, (long)((BrokerRegistrationReply)reply.get()).epoch());
            CreateTopicsRequestData createTopicsRequestData = new CreateTopicsRequestData().setTopics(new CreateTopicsRequestData.CreatableTopicCollection(Set.of(new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions(1).setReplicationFactor((short)1)).iterator()));
            Assertions.assertEquals((short)Errors.INVALID_REPLICATION_FACTOR.code(), (short)((CreateTopicsResponseData)active.createTopics(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, createTopicsRequestData, Set.of("foo")).get()).topics().find("foo").errorCode());
            Assertions.assertEquals((Object)"Unable to replicate the partition 1 time(s): All brokers are currently fenced.", (Object)((CreateTopicsResponseData)active.createTopics(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, createTopicsRequestData, Set.of("foo")).get()).topics().find("foo").errorMessage());
            Assertions.assertEquals((Object)new BrokerHeartbeatReply(true, false, false, false), active.processBrokerHeartbeat(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, new BrokerHeartbeatRequestData().setWantFence(false).setBrokerEpoch(6L).setBrokerId(0).setCurrentMetadataOffset(100000L)).get());
            Assertions.assertEquals((short)Errors.NONE.code(), (short)((CreateTopicsResponseData)active.createTopics(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, createTopicsRequestData, Set.of("foo")).get()).topics().find("foo").errorCode());
            CompletableFuture topicPartitionFuture = active.appendReadEvent("debugGetPartition", OptionalLong.empty(), () -> {
                BrokersToIsrs.PartitionsOnReplicaIterator iterator = active.replicationControl().brokersToIsrs().iterator(0, true);
                Assertions.assertTrue((boolean)iterator.hasNext());
                return (TopicIdPartition)iterator.next();
            });
            Assertions.assertEquals((int)0, (int)((TopicIdPartition)topicPartitionFuture.get()).partitionId());
            active.unregisterBroker(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, 0).get();
            topicPartitionFuture = active.appendReadEvent("debugGetPartition", OptionalLong.empty(), () -> {
                BrokersToIsrs.PartitionsOnReplicaIterator iterator = active.replicationControl().brokersToIsrs().partitionsWithNoLeader();
                Assertions.assertTrue((boolean)iterator.hasNext());
                return (TopicIdPartition)iterator.next();
            });
            Assertions.assertEquals((int)0, (int)((TopicIdPartition)topicPartitionFuture.get()).partitionId());
            QuorumControllerTest.testToImages(logEnv.allRecords());
        }
    }

    private RegisterBrokerRecord.BrokerFeatureCollection registrationFeatures(MetadataVersion minVersion, MetadataVersion maxVersion) {
        RegisterBrokerRecord.BrokerFeatureCollection features = new RegisterBrokerRecord.BrokerFeatureCollection();
        features.add((ImplicitLinkedHashCollection.Element)new RegisterBrokerRecord.BrokerFeature().setName("metadata.version").setMinSupportedVersion(minVersion.featureLevel()).setMaxSupportedVersion(maxVersion.featureLevel()));
        return features;
    }

    @Test
    public void testSnapshotSaveAndLoad() throws Throwable {
        int numBrokers = 4;
        HashMap<Integer, Long> brokerEpochs = new HashMap<Integer, Long>();
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3).build();
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).setBootstrapMetadata(SIMPLE_BOOTSTRAP).build();){
            int i;
            QuorumController active = controlEnv.activeController();
            for (i = 0; i < logEnv.logManagers().size(); ++i) {
                active.registerController(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, new ControllerRegistrationRequestData().setControllerId(i).setIncarnationId(new Uuid(3465346L, (long)i)).setZkMigrationReady(false).setListeners(new ControllerRegistrationRequestData.ListenerCollection(List.of(new ControllerRegistrationRequestData.Listener().setName("CONTROLLER").setHost("localhost").setPort(8000 + i).setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)).iterator())).setFeatures(new ControllerRegistrationRequestData.FeatureCollection(List.of(new ControllerRegistrationRequestData.Feature().setName("metadata.version").setMinSupportedVersion(MetadataVersion.MINIMUM_VERSION.featureLevel()).setMaxSupportedVersion(MetadataVersion.IBP_3_7_IV0.featureLevel())).iterator()))).get();
            }
            for (i = 0; i < 4; ++i) {
                BrokerRegistrationReply reply = (BrokerRegistrationReply)active.registerBroker(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, new BrokerRegistrationRequestData().setBrokerId(i).setRack(null).setClusterId(active.clusterId()).setFeatures(QuorumControllerIntegrationTestUtils.brokerFeatures(MetadataVersion.MINIMUM_VERSION, MetadataVersion.IBP_3_7_IV0)).setIncarnationId(Uuid.fromString((String)("kxAT73dKQsitIedpiPtwB" + i))).setListeners(new BrokerRegistrationRequestData.ListenerCollection(List.of(new BrokerRegistrationRequestData.Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092 + i)).iterator()))).get();
                brokerEpochs.put(i, reply.epoch());
            }
            for (i = 0; i < 3; ++i) {
                Assertions.assertEquals((Object)new BrokerHeartbeatReply(true, false, false, false), active.processBrokerHeartbeat(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, new BrokerHeartbeatRequestData().setWantFence(false).setBrokerEpoch(((Long)brokerEpochs.get(i)).longValue()).setBrokerId(i).setCurrentMetadataOffset(100000L)).get());
            }
            CreateTopicsResponseData fooData = (CreateTopicsResponseData)active.createTopics(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, new CreateTopicsRequestData().setTopics(new CreateTopicsRequestData.CreatableTopicCollection(Set.of(new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions(-1).setReplicationFactor((short)-1).setAssignments(new CreateTopicsRequestData.CreatableReplicaAssignmentCollection(List.of(new CreateTopicsRequestData.CreatableReplicaAssignment().setPartitionIndex(0).setBrokerIds(List.of(Integer.valueOf(0), Integer.valueOf(1), Integer.valueOf(2))), new CreateTopicsRequestData.CreatableReplicaAssignment().setPartitionIndex(1).setBrokerIds(List.of(Integer.valueOf(1), Integer.valueOf(2), Integer.valueOf(0)))).iterator()))).iterator())), Set.of("foo")).get();
            Uuid fooId = fooData.topics().find("foo").topicId();
            active.allocateProducerIds(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, new AllocateProducerIdsRequestData().setBrokerId(0).setBrokerEpoch(((Long)brokerEpochs.get(0)).longValue())).get();
            controlEnv.close();
            Assertions.assertEquals(this.generateTestRecords(fooId, brokerEpochs), logEnv.allRecords());
            QuorumControllerTest.testToImages(logEnv.allRecords());
        }
    }

    private List<ApiMessageAndVersion> generateTestRecords(Uuid fooId, Map<Integer, Long> brokerEpochs) {
        return List.of(new ApiMessageAndVersion((ApiMessage)new BeginTransactionRecord().setName("Bootstrap records"), 0), new ApiMessageAndVersion((ApiMessage)new FeatureLevelRecord().setName("metadata.version").setFeatureLevel(MetadataVersion.IBP_3_7_IV0.featureLevel()), 0), new ApiMessageAndVersion((ApiMessage)new EndTransactionRecord(), 0), new ApiMessageAndVersion((ApiMessage)new RegisterControllerRecord().setControllerId(0).setIncarnationId(Uuid.fromString((String)"AAAAAAA04IIAAAAAAAAAAA")).setEndPoints(new RegisterControllerRecord.ControllerEndpointCollection(List.of(new RegisterControllerRecord.ControllerEndpoint().setName("CONTROLLER").setHost("localhost").setPort(8000).setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)).iterator())).setFeatures(new RegisterControllerRecord.ControllerFeatureCollection(List.of(new RegisterControllerRecord.ControllerFeature().setName("metadata.version").setMinSupportedVersion(MetadataVersion.MINIMUM_VERSION.featureLevel()).setMaxSupportedVersion(MetadataVersion.IBP_3_7_IV0.featureLevel())).iterator())), 0), new ApiMessageAndVersion((ApiMessage)new RegisterControllerRecord().setControllerId(1).setIncarnationId(Uuid.fromString((String)"AAAAAAA04IIAAAAAAAAAAQ")).setEndPoints(new RegisterControllerRecord.ControllerEndpointCollection(List.of(new RegisterControllerRecord.ControllerEndpoint().setName("CONTROLLER").setHost("localhost").setPort(8001).setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)).iterator())).setFeatures(new RegisterControllerRecord.ControllerFeatureCollection(List.of(new RegisterControllerRecord.ControllerFeature().setName("metadata.version").setMinSupportedVersion(MetadataVersion.MINIMUM_VERSION.featureLevel()).setMaxSupportedVersion(MetadataVersion.IBP_3_7_IV0.featureLevel())).iterator())), 0), new ApiMessageAndVersion((ApiMessage)new RegisterControllerRecord().setControllerId(2).setIncarnationId(Uuid.fromString((String)"AAAAAAA04IIAAAAAAAAAAg")).setEndPoints(new RegisterControllerRecord.ControllerEndpointCollection(List.of(new RegisterControllerRecord.ControllerEndpoint().setName("CONTROLLER").setHost("localhost").setPort(8002).setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)).iterator())).setFeatures(new RegisterControllerRecord.ControllerFeatureCollection(List.of(new RegisterControllerRecord.ControllerFeature().setName("metadata.version").setMinSupportedVersion(MetadataVersion.MINIMUM_VERSION.featureLevel()).setMaxSupportedVersion(MetadataVersion.IBP_3_7_IV0.featureLevel())).iterator())), 0), new ApiMessageAndVersion((ApiMessage)new RegisterBrokerRecord().setBrokerId(0).setBrokerEpoch(brokerEpochs.get(0).longValue()).setIncarnationId(Uuid.fromString((String)"kxAT73dKQsitIedpiPtwB0")).setEndPoints(new RegisterBrokerRecord.BrokerEndpointCollection(List.of(new RegisterBrokerRecord.BrokerEndpoint().setName("PLAINTEXT").setHost("localhost").setPort(9092).setSecurityProtocol((short)0)).iterator())).setFeatures(this.registrationFeatures(MetadataVersion.MINIMUM_VERSION, MetadataVersion.IBP_3_7_IV0)).setRack(null).setFenced(true), 2), new ApiMessageAndVersion((ApiMessage)new RegisterBrokerRecord().setBrokerId(1).setBrokerEpoch(brokerEpochs.get(1).longValue()).setIncarnationId(Uuid.fromString((String)"kxAT73dKQsitIedpiPtwB1")).setEndPoints(new RegisterBrokerRecord.BrokerEndpointCollection(List.of(new RegisterBrokerRecord.BrokerEndpoint().setName("PLAINTEXT").setHost("localhost").setPort(9093).setSecurityProtocol((short)0)).iterator())).setFeatures(this.registrationFeatures(MetadataVersion.MINIMUM_VERSION, MetadataVersion.IBP_3_7_IV0)).setRack(null).setFenced(true), 2), new ApiMessageAndVersion((ApiMessage)new RegisterBrokerRecord().setBrokerId(2).setBrokerEpoch(brokerEpochs.get(2).longValue()).setIncarnationId(Uuid.fromString((String)"kxAT73dKQsitIedpiPtwB2")).setEndPoints(new RegisterBrokerRecord.BrokerEndpointCollection(List.of(new RegisterBrokerRecord.BrokerEndpoint().setName("PLAINTEXT").setHost("localhost").setPort(9094).setSecurityProtocol((short)0)).iterator())).setFeatures(this.registrationFeatures(MetadataVersion.MINIMUM_VERSION, MetadataVersion.IBP_3_7_IV0)).setRack(null).setFenced(true), 2), new ApiMessageAndVersion((ApiMessage)new RegisterBrokerRecord().setBrokerId(3).setBrokerEpoch(brokerEpochs.get(3).longValue()).setIncarnationId(Uuid.fromString((String)"kxAT73dKQsitIedpiPtwB3")).setEndPoints(new RegisterBrokerRecord.BrokerEndpointCollection(List.of(new RegisterBrokerRecord.BrokerEndpoint().setName("PLAINTEXT").setHost("localhost").setPort(9095).setSecurityProtocol((short)0)).iterator())).setFeatures(this.registrationFeatures(MetadataVersion.MINIMUM_VERSION, MetadataVersion.IBP_3_7_IV0)).setRack(null).setFenced(true), 2), new ApiMessageAndVersion((ApiMessage)new BrokerRegistrationChangeRecord().setBrokerId(0).setBrokerEpoch(brokerEpochs.get(0).longValue()).setFenced(BrokerRegistrationFencingChange.UNFENCE.value()), 0), new ApiMessageAndVersion((ApiMessage)new BrokerRegistrationChangeRecord().setBrokerId(1).setBrokerEpoch(brokerEpochs.get(1).longValue()).setFenced(BrokerRegistrationFencingChange.UNFENCE.value()), 0), new ApiMessageAndVersion((ApiMessage)new BrokerRegistrationChangeRecord().setBrokerId(2).setBrokerEpoch(brokerEpochs.get(2).longValue()).setFenced(BrokerRegistrationFencingChange.UNFENCE.value()), 0), new ApiMessageAndVersion((ApiMessage)new TopicRecord().setName("foo").setTopicId(fooId), 0), new ApiMessageAndVersion((ApiMessage)new PartitionRecord().setPartitionId(0).setTopicId(fooId).setReplicas(List.of(Integer.valueOf(0), Integer.valueOf(1), Integer.valueOf(2))).setIsr(List.of(Integer.valueOf(0), Integer.valueOf(1), Integer.valueOf(2))).setRemovingReplicas(List.of()).setAddingReplicas(List.of()).setLeader(0).setLeaderEpoch(0).setPartitionEpoch(0), 0), new ApiMessageAndVersion((ApiMessage)new PartitionRecord().setPartitionId(1).setTopicId(fooId).setReplicas(List.of(Integer.valueOf(1), Integer.valueOf(2), Integer.valueOf(0))).setIsr(List.of(Integer.valueOf(1), Integer.valueOf(2), Integer.valueOf(0))).setRemovingReplicas(List.of()).setAddingReplicas(List.of()).setLeader(1).setLeaderEpoch(0).setPartitionEpoch(0), 0), new ApiMessageAndVersion((ApiMessage)new ProducerIdsRecord().setBrokerId(0).setBrokerEpoch(brokerEpochs.get(0).longValue()).setNextProducerId(1000L), 0));
    }

    @Test
    public void testTimeouts() throws Throwable {
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).build();
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).build();){
            QuorumController controller = controlEnv.activeController();
            CountDownLatch countDownLatch = QuorumControllerIntegrationTestUtils.pause(controller);
            long now = controller.time().nanoseconds();
            ControllerRequestContext context0 = new ControllerRequestContext(new RequestHeaderData(), KafkaPrincipal.ANONYMOUS, OptionalLong.of(now));
            CompletableFuture createFuture = controller.createTopics(context0, new CreateTopicsRequestData().setTimeoutMs(0).setTopics(new CreateTopicsRequestData.CreatableTopicCollection(Set.of(new CreateTopicsRequestData.CreatableTopic().setName("foo")).iterator())), Set.of());
            CompletableFuture deleteFuture = controller.deleteTopics(context0, List.of(Uuid.ZERO_UUID));
            CompletableFuture findTopicIdsFuture = controller.findTopicIds(context0, List.of("foo"));
            CompletableFuture findTopicNamesFuture = controller.findTopicNames(context0, List.of(Uuid.ZERO_UUID));
            CompletableFuture createPartitionsFuture = controller.createPartitions(context0, List.of(new CreatePartitionsRequestData.CreatePartitionsTopic()), false);
            CompletableFuture electLeadersFuture = controller.electLeaders(context0, new ElectLeadersRequestData().setTimeoutMs(0).setTopicPartitions(null));
            CompletableFuture alterReassignmentsFuture = controller.alterPartitionReassignments(context0, new AlterPartitionReassignmentsRequestData().setTimeoutMs(0).setTopics(List.of(new AlterPartitionReassignmentsRequestData.ReassignableTopic())));
            CompletableFuture listReassignmentsFuture = controller.listPartitionReassignments(context0, new ListPartitionReassignmentsRequestData().setTopics(null).setTimeoutMs(0));
            while (controller.time().nanoseconds() == now) {
                Thread.sleep(0L, 10);
            }
            countDownLatch.countDown();
            QuorumControllerTest.assertYieldsTimeout(createFuture);
            QuorumControllerTest.assertYieldsTimeout(deleteFuture);
            QuorumControllerTest.assertYieldsTimeout(findTopicIdsFuture);
            QuorumControllerTest.assertYieldsTimeout(findTopicNamesFuture);
            QuorumControllerTest.assertYieldsTimeout(createPartitionsFuture);
            QuorumControllerTest.assertYieldsTimeout(electLeadersFuture);
            QuorumControllerTest.assertYieldsTimeout(alterReassignmentsFuture);
            QuorumControllerTest.assertYieldsTimeout(listReassignmentsFuture);
            QuorumControllerTest.testToImages(logEnv.allRecords());
        }
    }

    private static void assertYieldsTimeout(Future<?> future) {
        Assertions.assertEquals(TimeoutException.class, ((ExecutionException)Assertions.assertThrows(ExecutionException.class, future::get)).getCause().getClass());
    }

    @Test
    public void testEarlyControllerResults() throws Throwable {
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).build();
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).build();){
            QuorumController controller = controlEnv.activeController();
            CountDownLatch countDownLatch = QuorumControllerIntegrationTestUtils.pause(controller);
            CompletableFuture createFuture = controller.createTopics(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, new CreateTopicsRequestData().setTimeoutMs(120000), Set.of());
            CompletableFuture deleteFuture = controller.deleteTopics(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, List.of());
            CompletableFuture findTopicIdsFuture = controller.findTopicIds(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, List.of());
            CompletableFuture findTopicNamesFuture = controller.findTopicNames(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, List.of());
            CompletableFuture createPartitionsFuture = controller.createPartitions(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, List.of(), false);
            CompletableFuture electLeadersFuture = controller.electLeaders(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, new ElectLeadersRequestData());
            CompletableFuture alterReassignmentsFuture = controller.alterPartitionReassignments(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, new AlterPartitionReassignmentsRequestData());
            createFuture.get();
            deleteFuture.get();
            findTopicIdsFuture.get();
            findTopicNamesFuture.get();
            createPartitionsFuture.get();
            electLeadersFuture.get();
            alterReassignmentsFuture.get();
            countDownLatch.countDown();
            QuorumControllerTest.testToImages(logEnv.allRecords());
        }
    }

    @Test
    public void testConfigResourceExistenceChecker() throws Throwable {
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3).build();
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).build();){
            QuorumController active = controlEnv.activeController();
            QuorumControllerIntegrationTestUtils.registerBrokersAndUnfence(active, 5);
            active.createTopics(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, new CreateTopicsRequestData().setTopics(new CreateTopicsRequestData.CreatableTopicCollection(Set.of(new CreateTopicsRequestData.CreatableTopic().setName("foo").setReplicationFactor((short)3).setNumPartitions(1)).iterator())), Set.of("foo")).get();
            QuorumController.ConfigResourceExistenceChecker checker = new QuorumController.ConfigResourceExistenceChecker(active);
            checker.accept(new ConfigResource(ConfigResource.Type.BROKER, ""));
            checker.accept(new ConfigResource(ConfigResource.Type.BROKER, "3"));
            Assertions.assertThrows(BrokerIdNotRegisteredException.class, () -> checker.accept(new ConfigResource(ConfigResource.Type.BROKER, "10")));
            checker.accept(new ConfigResource(ConfigResource.Type.TOPIC, "foo"));
            Assertions.assertThrows(UnknownTopicOrPartitionException.class, () -> checker.accept(new ConfigResource(ConfigResource.Type.TOPIC, "bar")));
            QuorumControllerTest.testToImages(logEnv.allRecords());
        }
    }

    @Test
    public void testFatalMetadataReplayErrorOnActive() throws Throwable {
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3).build();
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).build();){
            QuorumController active = controlEnv.activeController();
            CompletableFuture future = active.appendWriteEvent("errorEvent", OptionalLong.empty(), () -> ControllerResult.of(List.of(new ApiMessageAndVersion((ApiMessage)new ConfigRecord().setName(null).setResourceName(null).setResourceType((byte)-1).setValue(null), 0)), null));
            Assertions.assertThrows(ExecutionException.class, future::get);
            Assertions.assertEquals(NullPointerException.class, controlEnv.fatalFaultHandler(active.nodeId()).firstException().getCause().getClass());
            controlEnv.ignoreFatalFaults();
        }
    }

    @Test
    public void testFatalMetadataErrorDuringSnapshotLoading() throws Exception {
        InitialSnapshot invalidSnapshot = new InitialSnapshot(List.of(new ApiMessageAndVersion((ApiMessage)new PartitionRecord(), 0)));
        LocalLogManagerTestEnv.Builder logEnvBuilder = new LocalLogManagerTestEnv.Builder(3).setSnapshotReader((RawSnapshotReader)FileRawSnapshotReader.open((Path)invalidSnapshot.tempDir.toPath(), (OffsetAndEpoch)new OffsetAndEpoch(0L, 0)));
        try (LocalLogManagerTestEnv logEnv = logEnvBuilder.build();
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).build();){
            TestUtils.waitForCondition(() -> controlEnv.controllers().stream().allMatch(controller -> controlEnv.fatalFaultHandler(controller.nodeId()).firstException() != null), (String)"At least one controller failed to detect the fatal fault");
            controlEnv.ignoreFatalFaults();
        }
    }

    @Test
    public void testFatalMetadataErrorDuringLogLoading() throws Exception {
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3).build();){
            logEnv.appendInitialRecords(List.of(new ApiMessageAndVersion((ApiMessage)new PartitionRecord(), 0)));
            try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).build();){
                TestUtils.waitForCondition(() -> controlEnv.controllers().stream().allMatch(controller -> controlEnv.fatalFaultHandler(controller.nodeId()).firstException() != null), (String)"At least one controller failed to detect the fatal fault");
                controlEnv.ignoreFatalFaults();
            }
        }
    }

    @Test
    public void testInsertBootstrapRecordsToEmptyLog() throws Exception {
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3).build();
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).setBootstrapMetadata(COMPLEX_BOOTSTRAP).build();){
            QuorumController active = controlEnv.activeController();
            ControllerRequestContext ctx = new ControllerRequestContext(new RequestHeaderData(), KafkaPrincipal.ANONYMOUS, OptionalLong.of(Long.MAX_VALUE));
            TestUtils.waitForCondition(() -> {
                FinalizedControllerFeatures features = (FinalizedControllerFeatures)active.finalizedFeatures(ctx).get();
                Optional metadataVersionOpt = features.get("metadata.version");
                return Optional.of(MetadataVersion.MINIMUM_VERSION.featureLevel()).equals(metadataVersionOpt);
            }, (String)"Failed to see expected metadata.version from bootstrap metadata");
            TestUtils.waitForCondition(() -> {
                ConfigResource defaultBrokerResource = new ConfigResource(ConfigResource.Type.BROKER, "");
                Map configs = Map.of(defaultBrokerResource, List.of());
                Map results = (Map)active.describeConfigs(ctx, configs).get();
                ResultOrError resultOrError = (ResultOrError)results.get(defaultBrokerResource);
                return resultOrError.isResult() && Map.of("foo", "bar").equals(resultOrError.result());
            }, (String)"Failed to see expected config change from bootstrap metadata");
            QuorumControllerTest.testToImages(logEnv.allRecords());
        }
    }

    private static ApiMessageAndVersion rec(int i) {
        return new ApiMessageAndVersion((ApiMessage)new BrokerRegistrationChangeRecord().setBrokerId(i), 0);
    }

    @Test
    public void testAppendRecords() {
        TestAppender appender = new TestAppender();
        Assertions.assertEquals((long)5L, (long)QuorumController.appendRecords((Logger)log, (ControllerResult)ControllerResult.of(List.of(QuorumControllerTest.rec(0), QuorumControllerTest.rec(1), QuorumControllerTest.rec(2), QuorumControllerTest.rec(3), QuorumControllerTest.rec(4)), null), (int)2, (Function)appender));
    }

    @Test
    public void testAppendRecordsAtomically() {
        TestAppender appender = new TestAppender();
        Assertions.assertEquals((Object)"Attempted to atomically commit 5 records, but maxRecordsPerBatch is 2", (Object)((IllegalStateException)Assertions.assertThrows(IllegalStateException.class, () -> QuorumController.appendRecords((Logger)log, (ControllerResult)ControllerResult.atomicOf(List.of(QuorumControllerTest.rec(0), QuorumControllerTest.rec(1), QuorumControllerTest.rec(2), QuorumControllerTest.rec(3), QuorumControllerTest.rec(4)), null), (int)2, (Function)appender))).getMessage());
    }

    FeatureControlManager getActivationRecords(MetadataVersion metadataVersion) {
        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
        FeatureControlManager featureControlManager = new FeatureControlManager.Builder().setSnapshotRegistry(snapshotRegistry).build();
        featureControlManager.replay(new FeatureLevelRecord().setName("metadata.version").setFeatureLevel(metadataVersion.featureLevel()));
        ControllerResult result = ActivationRecordsGenerator.generate(msg -> {}, (long)-1L, (BootstrapMetadata)BootstrapMetadata.fromVersion((MetadataVersion)metadataVersion, (String)"test"), Optional.empty(), (int)3);
        RecordTestUtils.replayAll(featureControlManager, result.records());
        return featureControlManager;
    }

    @Test
    public void testActivationRecords33() {
        FeatureControlManager featureControl = this.getActivationRecords(MetadataVersion.IBP_3_3_IV3);
        Assertions.assertEquals((Object)MetadataVersion.IBP_3_3_IV3, (Object)featureControl.metadataVersionOrThrow());
    }

    @Test
    public void testActivationRecords34() {
        FeatureControlManager featureControl = this.getActivationRecords(MetadataVersion.IBP_3_4_IV0);
        Assertions.assertEquals((Object)MetadataVersion.IBP_3_4_IV0, (Object)featureControl.metadataVersionOrThrow());
    }

    @Test
    public void testActivationRecordsNonEmptyLog() {
        FeatureControlManager featureControl = this.getActivationRecords(MetadataVersion.IBP_3_9_IV0);
        Assertions.assertEquals((Object)MetadataVersion.IBP_3_9_IV0, (Object)featureControl.metadataVersionOrThrow());
    }

    @Test
    public void testActivationRecordsPartialBootstrap() {
        ControllerResult result = ActivationRecordsGenerator.generate(logMsg -> {}, (long)0L, (BootstrapMetadata)BootstrapMetadata.fromVersion((MetadataVersion)MetadataVersion.IBP_3_6_IV1, (String)"test"), Optional.empty(), (int)3);
        Assertions.assertFalse((boolean)result.isAtomic());
        Assertions.assertTrue((boolean)RecordTestUtils.recordAtIndexAs(AbortTransactionRecord.class, result.records(), 0).isPresent());
        Assertions.assertTrue((boolean)RecordTestUtils.recordAtIndexAs(BeginTransactionRecord.class, result.records(), 1).isPresent());
        Assertions.assertTrue((boolean)RecordTestUtils.recordAtIndexAs(EndTransactionRecord.class, result.records(), result.records().size() - 1).isPresent());
    }

    private static void testToImages(List<ApiMessageAndVersion> fromRecords) {
        List<RecordTestUtils.ImageDeltaPair<TopicsImage, TopicsDelta>> testMatrix = List.of(new RecordTestUtils.ImageDeltaPair<AclsImage, AclsDelta>(() -> AclsImage.EMPTY, AclsDelta::new), new RecordTestUtils.ImageDeltaPair<ClientQuotasImage, ClientQuotasDelta>(() -> ClientQuotasImage.EMPTY, ClientQuotasDelta::new), new RecordTestUtils.ImageDeltaPair<ClusterImage, ClusterDelta>(() -> ClusterImage.EMPTY, ClusterDelta::new), new RecordTestUtils.ImageDeltaPair<ConfigurationsImage, ConfigurationsDelta>(() -> ConfigurationsImage.EMPTY, ConfigurationsDelta::new), new RecordTestUtils.ImageDeltaPair<DelegationTokenImage, DelegationTokenDelta>(() -> DelegationTokenImage.EMPTY, DelegationTokenDelta::new), new RecordTestUtils.ImageDeltaPair<FeaturesImage, FeaturesDelta>(() -> FeaturesImage.EMPTY, FeaturesDelta::new), new RecordTestUtils.ImageDeltaPair<ProducerIdsImage, ProducerIdsDelta>(() -> ProducerIdsImage.EMPTY, ProducerIdsDelta::new), new RecordTestUtils.ImageDeltaPair<ScramImage, ScramDelta>(() -> ScramImage.EMPTY, ScramDelta::new), new RecordTestUtils.ImageDeltaPair<TopicsImage, TopicsDelta>(() -> TopicsImage.EMPTY, TopicsDelta::new));
        for (RecordTestUtils.ImageDeltaPair<TopicsImage, TopicsDelta> pair : testMatrix) {
            new RecordTestUtils.TestThroughAllIntermediateImagesLeadingToFinalImageHelper<TopicsDelta, TopicsImage>(pair.imageSupplier(), pair.deltaCreator()).test(fromRecords);
        }
    }

    @Test
    public void testActivationRecordsPartialTransaction() {
        OffsetControlManager offsetControlManager = new OffsetControlManager.Builder().build();
        offsetControlManager.replay(new BeginTransactionRecord(), 10L);
        offsetControlManager.handleCommitBatch(Batch.data((long)20L, (int)1, (long)1L, (int)0, List.of(new ApiMessageAndVersion((ApiMessage)new BeginTransactionRecord(), 0))));
        ControllerResult result = ActivationRecordsGenerator.generate(logMsg -> {}, (long)offsetControlManager.transactionStartOffset(), (BootstrapMetadata)BootstrapMetadata.fromVersion((MetadataVersion)MetadataVersion.IBP_3_6_IV1, (String)"test"), Optional.of(MetadataVersion.IBP_3_6_IV1), (int)3);
        Assertions.assertTrue((boolean)result.isAtomic());
        offsetControlManager.replay(RecordTestUtils.recordAtIndexAs(AbortTransactionRecord.class, result.records(), 0).get(), 21L);
        Assertions.assertEquals((long)-1L, (long)offsetControlManager.transactionStartOffset());
    }

    @Test
    public void testActivationRecordsPartialTransactionNoSupport() {
        OffsetControlManager offsetControlManager = new OffsetControlManager.Builder().build();
        offsetControlManager.replay(new BeginTransactionRecord(), 10L);
        offsetControlManager.handleCommitBatch(Batch.data((long)20L, (int)1, (long)1L, (int)0, List.of(new ApiMessageAndVersion((ApiMessage)new BeginTransactionRecord(), 0))));
        Assertions.assertThrows(RuntimeException.class, () -> ActivationRecordsGenerator.generate(msg -> {}, (long)offsetControlManager.transactionStartOffset(), (BootstrapMetadata)BootstrapMetadata.fromVersion((MetadataVersion)MetadataVersion.IBP_3_6_IV0, (String)"test"), Optional.of(MetadataVersion.IBP_3_6_IV0), (int)3));
    }

    static class InitialSnapshot
    implements AutoCloseable {
        File tempDir = TestUtils.tempDirectory();
        BatchFileWriter writer;

        public InitialSnapshot(List<ApiMessageAndVersion> records) throws Exception {
            Path path = Snapshots.snapshotPath((Path)this.tempDir.toPath(), (OffsetAndEpoch)new OffsetAndEpoch(0L, 0));
            this.writer = BatchFileWriter.open((Path)path);
            this.writer.append(records);
            this.writer.close();
            this.writer = null;
        }

        @Override
        public void close() throws Exception {
            Utils.closeQuietly((AutoCloseable)this.writer, (String)"BatchFileWriter");
            Utils.delete((File)this.tempDir);
        }
    }

    static class TestAppender
    implements Function<List<ApiMessageAndVersion>, Long> {
        private long offset = 0L;

        TestAppender() {
        }

        @Override
        public Long apply(List<ApiMessageAndVersion> apiMessageAndVersions) {
            for (ApiMessageAndVersion apiMessageAndVersion : apiMessageAndVersions) {
                BrokerRegistrationChangeRecord record = (BrokerRegistrationChangeRecord)apiMessageAndVersion.message();
                Assertions.assertEquals((int)((int)this.offset), (int)record.brokerId());
                ++this.offset;
            }
            return this.offset;
        }
    }
}

