/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.controller;

import java.io.File;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.message.AllocateProducerIdsRequestData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.AlterPartitionRequestData;
import org.apache.kafka.common.message.AlterPartitionResponseData;
import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
import org.apache.kafka.common.message.BrokerRegistrationRequestData;
import org.apache.kafka.common.message.CreatePartitionsRequestData;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsResponseData;
import org.apache.kafka.common.message.ElectLeadersRequestData;
import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.RequestHeaderData;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.ProducerIdsRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.controller.BrokersToIsrs;
import org.apache.kafka.controller.ConfigurationControlManagerTest;
import org.apache.kafka.controller.ControllerMetrics;
import org.apache.kafka.controller.ControllerRequestContext;
import org.apache.kafka.controller.ControllerRequestContextUtil;
import org.apache.kafka.controller.ControllerResult;
import org.apache.kafka.controller.MockControllerMetrics;
import org.apache.kafka.controller.QuorumController;
import org.apache.kafka.controller.QuorumControllerTestEnv;
import org.apache.kafka.controller.ResultOrError;
import org.apache.kafka.controller.TopicIdPartition;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
import org.apache.kafka.metadata.BrokerRegistrationReply;
import org.apache.kafka.metadata.FinalizedControllerFeatures;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer;
import org.apache.kafka.metadata.authorizer.StandardAuthorizer;
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
import org.apache.kafka.metadata.util.BatchFileWriter;
import org.apache.kafka.metalog.LocalLogManager;
import org.apache.kafka.metalog.LocalLogManagerTestEnv;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.snapshot.FileRawSnapshotReader;
import org.apache.kafka.snapshot.RawSnapshotReader;
import org.apache.kafka.snapshot.Snapshots;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Timeout(value=40L)
public class QuorumControllerTest {
    private static final Logger log = LoggerFactory.getLogger(QuorumControllerTest.class);
    static final BootstrapMetadata SIMPLE_BOOTSTRAP = BootstrapMetadata.fromVersion((MetadataVersion)MetadataVersion.IBP_3_3_IV3, (String)"test-provided bootstrap");
    private static final Uuid FOO_ID = Uuid.fromString((String)"igRktLOnR8ektWHr79F8mw");
    private static final Map<Integer, Long> ALL_ZERO_BROKER_EPOCHS = IntStream.of(0, 1, 2, 3).boxed().collect(Collectors.toMap(Function.identity(), __ -> 0L));
    private static final List<ApiMessageAndVersion> PRE_PRODUCTION_RECORDS = Collections.unmodifiableList(Arrays.asList(new ApiMessageAndVersion((ApiMessage)new RegisterBrokerRecord().setBrokerEpoch(42L).setBrokerId(123).setIncarnationId(Uuid.fromString((String)"v78Gbc6sQXK0y5qqRxiryw")).setRack(null), 0), new ApiMessageAndVersion((ApiMessage)new UnfenceBrokerRecord().setEpoch(42L).setId(123), 0), new ApiMessageAndVersion((ApiMessage)new TopicRecord().setName("bar").setTopicId(Uuid.fromString((String)"cxBT72dK4si8Ied1iP4wBA")), 0)));
    private static final BootstrapMetadata COMPLEX_BOOTSTRAP = BootstrapMetadata.fromRecords(Arrays.asList(new ApiMessageAndVersion((ApiMessage)new FeatureLevelRecord().setName("metadata.version").setFeatureLevel(MetadataVersion.IBP_3_3_IV1.featureLevel()), 0), new ApiMessageAndVersion((ApiMessage)new ConfigRecord().setResourceType(ConfigResource.Type.BROKER.id()).setResourceName("").setName("foo").setValue("bar"), 0)), (String)"test bootstrap");

    @Test
    public void testCreateAndClose() throws Throwable {
        MockControllerMetrics metrics = new MockControllerMetrics();
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).build();){
            QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).setControllerBuilderInitializer(controllerBuilder -> controllerBuilder.setMetrics((ControllerMetrics)metrics)).build();
            Throwable throwable = null;
            if (controlEnv != null) {
                if (throwable != null) {
                    try {
                        controlEnv.close();
                    }
                    catch (Throwable throwable2) {
                        throwable.addSuppressed(throwable2);
                    }
                } else {
                    controlEnv.close();
                }
            }
        }
        Assertions.assertTrue((boolean)metrics.isClosed(), (String)"metrics were not closed");
    }

    @Test
    public void testConfigurationOperations() throws Throwable {
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).build();
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).setControllerBuilderInitializer(controllerBuilder -> controllerBuilder.setConfigSchema(ConfigurationControlManagerTest.SCHEMA)).build();){
            controlEnv.activeController().registerBroker(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, new BrokerRegistrationRequestData().setFeatures(this.brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_4_IV0)).setBrokerId(0).setClusterId(logEnv.clusterId())).get();
            this.testConfigurationOperations(controlEnv.activeController());
        }
    }

    private void testConfigurationOperations(QuorumController controller) throws Throwable {
        Assertions.assertEquals(Collections.singletonMap(ConfigurationControlManagerTest.BROKER0, ApiError.NONE), controller.incrementalAlterConfigs(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, Collections.singletonMap(ConfigurationControlManagerTest.BROKER0, Collections.singletonMap("baz", ConfigurationControlManagerTest.entry(AlterConfigOp.OpType.SET, "123"))), true).get());
        Assertions.assertEquals(Collections.singletonMap(ConfigurationControlManagerTest.BROKER0, new ResultOrError(Collections.emptyMap())), controller.describeConfigs(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, Collections.singletonMap(ConfigurationControlManagerTest.BROKER0, Collections.emptyList())).get());
        Assertions.assertEquals(Collections.singletonMap(ConfigurationControlManagerTest.BROKER0, ApiError.NONE), controller.incrementalAlterConfigs(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, Collections.singletonMap(ConfigurationControlManagerTest.BROKER0, Collections.singletonMap("baz", ConfigurationControlManagerTest.entry(AlterConfigOp.OpType.SET, "123"))), false).get());
        Assertions.assertEquals(Collections.singletonMap(ConfigurationControlManagerTest.BROKER0, new ResultOrError(Collections.singletonMap("baz", "123"))), controller.describeConfigs(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, Collections.singletonMap(ConfigurationControlManagerTest.BROKER0, Collections.emptyList())).get());
    }

    @Test
    public void testDelayedConfigurationOperations() throws Throwable {
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).build();
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).setControllerBuilderInitializer(controllerBuilder -> controllerBuilder.setConfigSchema(ConfigurationControlManagerTest.SCHEMA)).build();){
            controlEnv.activeController().registerBroker(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, new BrokerRegistrationRequestData().setFeatures(this.brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_4_IV0)).setBrokerId(0).setClusterId(logEnv.clusterId())).get();
            this.testDelayedConfigurationOperations(logEnv, controlEnv.activeController());
        }
    }

    private void testDelayedConfigurationOperations(LocalLogManagerTestEnv logEnv, QuorumController controller) throws Throwable {
        logEnv.logManagers().forEach(m -> m.setMaxReadOffset(1L));
        CompletableFuture future1 = controller.incrementalAlterConfigs(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, Collections.singletonMap(ConfigurationControlManagerTest.BROKER0, Collections.singletonMap("baz", ConfigurationControlManagerTest.entry(AlterConfigOp.OpType.SET, "123"))), false);
        Assertions.assertFalse((boolean)future1.isDone());
        Assertions.assertEquals(Collections.singletonMap(ConfigurationControlManagerTest.BROKER0, new ResultOrError(Collections.emptyMap())), controller.describeConfigs(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, Collections.singletonMap(ConfigurationControlManagerTest.BROKER0, Collections.emptyList())).get());
        logEnv.logManagers().forEach(m -> m.setMaxReadOffset(3L));
        Assertions.assertEquals(Collections.singletonMap(ConfigurationControlManagerTest.BROKER0, ApiError.NONE), future1.get());
    }

    @Test
    public void testFenceMultipleBrokers() throws Throwable {
        List<Integer> allBrokers = Arrays.asList(1, 2, 3, 4, 5);
        List<Integer> brokersToKeepUnfenced = Arrays.asList(1);
        List<Integer> brokersToFence = Arrays.asList(2, 3, 4, 5);
        short replicationFactor = (short)allBrokers.size();
        short numberOfPartitions = (short)allBrokers.size();
        long sessionTimeoutMillis = 1000L;
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).build();
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).setControllerBuilderInitializer(controllerBuilder -> controllerBuilder.setConfigSchema(ConfigurationControlManagerTest.SCHEMA)).setSessionTimeoutMillis(OptionalLong.of(sessionTimeoutMillis)).setBootstrapMetadata(SIMPLE_BOOTSTRAP).build();){
            BrokerRegistrationRequestData.ListenerCollection listeners = new BrokerRegistrationRequestData.ListenerCollection();
            listeners.add((ImplicitLinkedHashCollection.Element)new BrokerRegistrationRequestData.Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092));
            QuorumController active = controlEnv.activeController();
            HashMap<Integer, Long> brokerEpochs = new HashMap<Integer, Long>();
            for (Integer brokerId2 : allBrokers) {
                CompletableFuture reply = active.registerBroker(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, new BrokerRegistrationRequestData().setBrokerId(brokerId2.intValue()).setClusterId(active.clusterId()).setFeatures(this.brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_3_IV3)).setIncarnationId(Uuid.randomUuid()).setListeners(listeners));
                brokerEpochs.put(brokerId2, ((BrokerRegistrationReply)reply.get()).epoch());
            }
            allBrokers.forEach(brokerId -> Assertions.assertFalse((boolean)active.clusterControl().isUnfenced(brokerId.intValue()), (String)("Broker " + brokerId + " should have been fenced")));
            this.sendBrokerheartbeat(active, allBrokers, brokerEpochs);
            CreateTopicsRequestData createTopicsRequestData = new CreateTopicsRequestData().setTopics(new CreateTopicsRequestData.CreatableTopicCollection(Collections.singleton(new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions((int)numberOfPartitions).setReplicationFactor(replicationFactor)).iterator()));
            CreateTopicsResponseData createTopicsResponseData = (CreateTopicsResponseData)active.createTopics(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, createTopicsRequestData, Collections.singleton("foo")).get();
            Assertions.assertEquals((Object)Errors.NONE, (Object)Errors.forCode((short)createTopicsResponseData.topics().find("foo").errorCode()));
            Uuid topicIdFoo = createTopicsResponseData.topics().find("foo").topicId();
            TestUtils.waitForCondition(() -> {
                this.sendBrokerheartbeat(active, brokersToKeepUnfenced, brokerEpochs);
                for (Integer brokerId : brokersToFence) {
                    if (!active.clusterControl().isUnfenced(brokerId.intValue())) continue;
                    return false;
                }
                return true;
            }, (long)(sessionTimeoutMillis * 3L), (String)"Fencing of brokers did not process within expected time");
            this.sendBrokerheartbeat(active, brokersToKeepUnfenced, brokerEpochs);
            brokersToKeepUnfenced.forEach(brokerId -> Assertions.assertTrue((boolean)active.clusterControl().isUnfenced(brokerId.intValue()), (String)("Broker " + brokerId + " should have been unfenced")));
            brokersToFence.forEach(brokerId -> Assertions.assertFalse((boolean)active.clusterControl().isUnfenced(brokerId.intValue()), (String)("Broker " + brokerId + " should have been fenced")));
            int[] expectedIsr = new int[]{1};
            int[] isrFoo = active.replicationControl().getPartition((Uuid)topicIdFoo, (int)0).isr;
            Assertions.assertTrue((boolean)Arrays.equals(isrFoo, expectedIsr), (String)("The ISR for topic foo was " + Arrays.toString(isrFoo) + ". It is expected to be " + Arrays.toString(expectedIsr)));
            int fooLeader = active.replicationControl().getPartition((Uuid)topicIdFoo, (int)0).leader;
            Assertions.assertEquals((int)expectedIsr[0], (int)fooLeader);
            Assertions.assertTrue((boolean)active.replicationControl().arePartitionLeadersImbalanced());
        }
    }

    @Test
    public void testBalancePartitionLeaders() throws Throwable {
        List<Integer> allBrokers = Arrays.asList(1, 2, 3);
        List<Integer> brokersToKeepUnfenced = Arrays.asList(1, 2);
        List<Integer> brokersToFence = Arrays.asList(3);
        short replicationFactor = (short)allBrokers.size();
        short numberOfPartitions = (short)allBrokers.size();
        long sessionTimeoutMillis = 1000L;
        long leaderImbalanceCheckIntervalNs = 1000000000L;
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).build();
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).setControllerBuilderInitializer(controllerBuilder -> controllerBuilder.setConfigSchema(ConfigurationControlManagerTest.SCHEMA)).setSessionTimeoutMillis(OptionalLong.of(sessionTimeoutMillis)).setLeaderImbalanceCheckIntervalNs(OptionalLong.of(leaderImbalanceCheckIntervalNs)).setBootstrapMetadata(SIMPLE_BOOTSTRAP).build();){
            BrokerRegistrationRequestData.ListenerCollection listeners = new BrokerRegistrationRequestData.ListenerCollection();
            listeners.add((ImplicitLinkedHashCollection.Element)new BrokerRegistrationRequestData.Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092));
            QuorumController active = controlEnv.activeController();
            HashMap<Integer, Long> brokerEpochs = new HashMap<Integer, Long>();
            for (Integer brokerId2 : allBrokers) {
                CompletableFuture reply = active.registerBroker(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, new BrokerRegistrationRequestData().setBrokerId(brokerId2.intValue()).setClusterId(active.clusterId()).setFeatures(this.brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_3_IV3)).setIncarnationId(Uuid.randomUuid()).setListeners(listeners));
                brokerEpochs.put(brokerId2, ((BrokerRegistrationReply)reply.get()).epoch());
            }
            allBrokers.forEach(brokerId -> Assertions.assertFalse((boolean)active.clusterControl().isUnfenced(brokerId.intValue()), (String)("Broker " + brokerId + " should have been fenced")));
            this.sendBrokerheartbeat(active, allBrokers, brokerEpochs);
            CreateTopicsRequestData createTopicsRequestData = new CreateTopicsRequestData().setTopics(new CreateTopicsRequestData.CreatableTopicCollection(Collections.singleton(new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions((int)numberOfPartitions).setReplicationFactor(replicationFactor)).iterator()));
            CreateTopicsResponseData createTopicsResponseData = (CreateTopicsResponseData)active.createTopics(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, createTopicsRequestData, Collections.singleton("foo")).get();
            Assertions.assertEquals((Object)Errors.NONE, (Object)Errors.forCode((short)createTopicsResponseData.topics().find("foo").errorCode()));
            Uuid topicIdFoo = createTopicsResponseData.topics().find("foo").topicId();
            TestUtils.waitForCondition(() -> {
                this.sendBrokerheartbeat(active, brokersToKeepUnfenced, brokerEpochs);
                for (Integer brokerId : brokersToFence) {
                    if (!active.clusterControl().isUnfenced(brokerId.intValue())) continue;
                    return false;
                }
                return true;
            }, (long)(sessionTimeoutMillis * 3L), (String)"Fencing of brokers did not process within expected time");
            this.sendBrokerheartbeat(active, brokersToKeepUnfenced, brokerEpochs);
            brokersToKeepUnfenced.forEach(brokerId -> Assertions.assertTrue((boolean)active.clusterControl().isUnfenced(brokerId.intValue()), (String)("Broker " + brokerId + " should have been unfenced")));
            brokersToFence.forEach(brokerId -> Assertions.assertFalse((boolean)active.clusterControl().isUnfenced(brokerId.intValue()), (String)("Broker " + brokerId + " should have been fenced")));
            Assertions.assertTrue((boolean)active.replicationControl().arePartitionLeadersImbalanced());
            for (Integer brokerId3 : brokersToFence) {
                CompletableFuture reply = active.registerBroker(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, new BrokerRegistrationRequestData().setBrokerId(brokerId3.intValue()).setClusterId(active.clusterId()).setFeatures(this.brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_3_IV3)).setIncarnationId(Uuid.randomUuid()).setListeners(listeners));
                brokerEpochs.put(brokerId3, ((BrokerRegistrationReply)reply.get()).epoch());
            }
            this.sendBrokerheartbeat(active, allBrokers, brokerEpochs);
            Set imbalancedPartitions = active.replicationControl().imbalancedPartitions();
            Assertions.assertEquals((int)1, (int)imbalancedPartitions.size());
            int imbalancedPartitionId = ((TopicIdPartition)imbalancedPartitions.iterator().next()).partitionId();
            PartitionRegistration partitionRegistration = active.replicationControl().getPartition(topicIdFoo, imbalancedPartitionId);
            AlterPartitionRequestData.PartitionData partitionData = new AlterPartitionRequestData.PartitionData().setPartitionIndex(imbalancedPartitionId).setLeaderEpoch(partitionRegistration.leaderEpoch).setPartitionEpoch(partitionRegistration.partitionEpoch).setNewIsr(Arrays.asList(1, 2, 3));
            AlterPartitionRequestData.TopicData topicData = new AlterPartitionRequestData.TopicData().setTopicName("foo");
            topicData.partitions().add(partitionData);
            AlterPartitionRequestData alterPartitionRequest = new AlterPartitionRequestData().setBrokerId(partitionRegistration.leader).setBrokerEpoch(((Long)brokerEpochs.get(partitionRegistration.leader)).longValue());
            alterPartitionRequest.topics().add(topicData);
            active.alterPartition(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, alterPartitionRequest).get();
            AtomicLong lastHeartbeat = new AtomicLong(active.time().milliseconds());
            TestUtils.waitForCondition(() -> {
                if (active.time().milliseconds() > lastHeartbeat.get() + sessionTimeoutMillis / 2L) {
                    lastHeartbeat.set(active.time().milliseconds());
                    this.sendBrokerheartbeat(active, allBrokers, brokerEpochs);
                }
                return !active.replicationControl().arePartitionLeadersImbalanced();
            }, (long)TimeUnit.MILLISECONDS.convert(leaderImbalanceCheckIntervalNs * 10L, TimeUnit.NANOSECONDS), (String)"Leaders where not balanced after unfencing all of the brokers");
        }
    }

    @Test
    public void testNoOpRecordWriteAfterTimeout() throws Throwable {
        long maxIdleIntervalNs = 1000L;
        long maxReplicationDelayMs = 60000L;
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3).build();
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).setControllerBuilderInitializer(controllerBuilder -> {
            controllerBuilder.setConfigSchema(ConfigurationControlManagerTest.SCHEMA);
            controllerBuilder.setMaxIdleIntervalNs(OptionalLong.of(maxIdleIntervalNs));
        }).build();){
            BrokerRegistrationRequestData.ListenerCollection listeners = new BrokerRegistrationRequestData.ListenerCollection();
            listeners.add((ImplicitLinkedHashCollection.Element)new BrokerRegistrationRequestData.Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092));
            QuorumController active = controlEnv.activeController();
            LocalLogManager localLogManager = logEnv.logManagers().stream().filter(logManager -> logManager.nodeId().equals(OptionalInt.of(active.nodeId()))).findAny().get();
            TestUtils.waitForCondition(() -> localLogManager.highWatermark().isPresent(), (long)maxReplicationDelayMs, (String)"High watermark was not established");
            long firstHighWatermark = localLogManager.highWatermark().getAsLong();
            TestUtils.waitForCondition(() -> localLogManager.highWatermark().getAsLong() > firstHighWatermark, (long)maxReplicationDelayMs, (String)"Active controller didn't write NoOpRecord the first time");
            long secondHighWatermark = localLogManager.highWatermark().getAsLong();
            TestUtils.waitForCondition(() -> localLogManager.highWatermark().getAsLong() > secondHighWatermark, (long)maxReplicationDelayMs, (String)"Active controller didn't write NoOpRecord the second time");
        }
    }

    @Test
    public void testUnregisterBroker() throws Throwable {
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).build();
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).setControllerBuilderInitializer(controllerBuilder -> controllerBuilder.setConfigSchema(ConfigurationControlManagerTest.SCHEMA)).build();){
            BrokerRegistrationRequestData.ListenerCollection listeners = new BrokerRegistrationRequestData.ListenerCollection();
            listeners.add((ImplicitLinkedHashCollection.Element)new BrokerRegistrationRequestData.Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092));
            QuorumController active = controlEnv.activeController();
            CompletableFuture reply = active.registerBroker(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, new BrokerRegistrationRequestData().setBrokerId(0).setClusterId(active.clusterId()).setIncarnationId(Uuid.fromString((String)"kxAT73dKQsitIedpiPtwBA")).setFeatures(this.brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_4_IV0)).setListeners(listeners));
            Assertions.assertEquals((long)2L, (long)((BrokerRegistrationReply)reply.get()).epoch());
            CreateTopicsRequestData createTopicsRequestData = new CreateTopicsRequestData().setTopics(new CreateTopicsRequestData.CreatableTopicCollection(Collections.singleton(new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions(1).setReplicationFactor((short)1)).iterator()));
            Assertions.assertEquals((short)Errors.INVALID_REPLICATION_FACTOR.code(), (short)((CreateTopicsResponseData)active.createTopics(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, createTopicsRequestData, Collections.singleton("foo")).get()).topics().find("foo").errorCode());
            Assertions.assertEquals((Object)"Unable to replicate the partition 1 time(s): All brokers are currently fenced.", (Object)((CreateTopicsResponseData)active.createTopics(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, createTopicsRequestData, Collections.singleton("foo")).get()).topics().find("foo").errorMessage());
            Assertions.assertEquals((Object)new BrokerHeartbeatReply(true, false, false, false), active.processBrokerHeartbeat(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, new BrokerHeartbeatRequestData().setWantFence(false).setBrokerEpoch(2L).setBrokerId(0).setCurrentMetadataOffset(100000L)).get());
            Assertions.assertEquals((short)Errors.NONE.code(), (short)((CreateTopicsResponseData)active.createTopics(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, createTopicsRequestData, Collections.singleton("foo")).get()).topics().find("foo").errorCode());
            CompletableFuture topicPartitionFuture = active.appendReadEvent("debugGetPartition", OptionalLong.empty(), () -> {
                BrokersToIsrs.PartitionsOnReplicaIterator iterator = active.replicationControl().brokersToIsrs().iterator(0, true);
                Assertions.assertTrue((boolean)iterator.hasNext());
                return (TopicIdPartition)iterator.next();
            });
            Assertions.assertEquals((int)0, (int)((TopicIdPartition)topicPartitionFuture.get()).partitionId());
            active.unregisterBroker(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, 0).get();
            topicPartitionFuture = active.appendReadEvent("debugGetPartition", OptionalLong.empty(), () -> {
                BrokersToIsrs.PartitionsOnReplicaIterator iterator = active.replicationControl().brokersToIsrs().partitionsWithNoLeader();
                Assertions.assertTrue((boolean)iterator.hasNext());
                return (TopicIdPartition)iterator.next();
            });
            Assertions.assertEquals((int)0, (int)((TopicIdPartition)topicPartitionFuture.get()).partitionId());
        }
    }

    private BrokerRegistrationRequestData.FeatureCollection brokerFeatures() {
        return this.brokerFeatures(MetadataVersion.MINIMUM_KRAFT_VERSION, MetadataVersion.latest());
    }

    private BrokerRegistrationRequestData.FeatureCollection brokerFeatures(MetadataVersion minVersion, MetadataVersion maxVersion) {
        BrokerRegistrationRequestData.FeatureCollection features = new BrokerRegistrationRequestData.FeatureCollection();
        features.add((ImplicitLinkedHashCollection.Element)new BrokerRegistrationRequestData.Feature().setName("metadata.version").setMinSupportedVersion(minVersion.featureLevel()).setMaxSupportedVersion(maxVersion.featureLevel()));
        return features;
    }

    private RegisterBrokerRecord.BrokerFeatureCollection registrationFeatures(MetadataVersion minVersion, MetadataVersion maxVersion) {
        RegisterBrokerRecord.BrokerFeatureCollection features = new RegisterBrokerRecord.BrokerFeatureCollection();
        features.add(new RegisterBrokerRecord.BrokerFeature().setName("metadata.version").setMinSupportedVersion(minVersion.featureLevel()).setMaxSupportedVersion(maxVersion.featureLevel()));
        return features;
    }

    @Test
    public void testSnapshotSaveAndLoad() throws Throwable {
        int numBrokers = 4;
        HashMap<Integer, Long> brokerEpochs = new HashMap<Integer, Long>();
        Object reader = null;
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3).build();
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).setControllerBuilderInitializer(controllerBuilder -> controllerBuilder.setConfigSchema(ConfigurationControlManagerTest.SCHEMA)).setBootstrapMetadata(SIMPLE_BOOTSTRAP).build();){
            int i;
            QuorumController active = controlEnv.activeController();
            for (i = 0; i < 4; ++i) {
                BrokerRegistrationReply reply = (BrokerRegistrationReply)active.registerBroker(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, new BrokerRegistrationRequestData().setBrokerId(i).setRack(null).setClusterId(active.clusterId()).setFeatures(this.brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_3_IV3)).setIncarnationId(Uuid.fromString((String)("kxAT73dKQsitIedpiPtwB" + i))).setListeners(new BrokerRegistrationRequestData.ListenerCollection(Arrays.asList(new BrokerRegistrationRequestData.Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092 + i)).iterator()))).get();
                brokerEpochs.put(i, reply.epoch());
            }
            for (i = 0; i < 3; ++i) {
                Assertions.assertEquals((Object)new BrokerHeartbeatReply(true, false, false, false), active.processBrokerHeartbeat(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, new BrokerHeartbeatRequestData().setWantFence(false).setBrokerEpoch(((Long)brokerEpochs.get(i)).longValue()).setBrokerId(i).setCurrentMetadataOffset(100000L)).get());
            }
            CreateTopicsResponseData fooData = (CreateTopicsResponseData)active.createTopics(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, new CreateTopicsRequestData().setTopics(new CreateTopicsRequestData.CreatableTopicCollection(Collections.singleton(new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions(-1).setReplicationFactor((short)-1).setAssignments(new CreateTopicsRequestData.CreatableReplicaAssignmentCollection(Arrays.asList(new CreateTopicsRequestData.CreatableReplicaAssignment().setPartitionIndex(0).setBrokerIds(Arrays.asList(0, 1, 2)), new CreateTopicsRequestData.CreatableReplicaAssignment().setPartitionIndex(1).setBrokerIds(Arrays.asList(1, 2, 0))).iterator()))).iterator())), Collections.singleton("foo")).get();
            Uuid fooId = fooData.topics().find("foo").topicId();
            active.allocateProducerIds(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, new AllocateProducerIdsRequestData().setBrokerId(0).setBrokerEpoch(((Long)brokerEpochs.get(0)).longValue())).get();
            controlEnv.close();
            Assertions.assertEquals(this.generateTestRecords(fooId, brokerEpochs), logEnv.allRecords());
        }
    }

    private List<ApiMessageAndVersion> generateTestRecords(Uuid fooId, Map<Integer, Long> brokerEpochs) {
        return Arrays.asList(new ApiMessageAndVersion((ApiMessage)new FeatureLevelRecord().setName("metadata.version").setFeatureLevel(MetadataVersion.IBP_3_3_IV3.featureLevel()), 0), new ApiMessageAndVersion((ApiMessage)new RegisterBrokerRecord().setBrokerId(0).setBrokerEpoch(brokerEpochs.get(0)).setIncarnationId(Uuid.fromString((String)"kxAT73dKQsitIedpiPtwB0")).setEndPoints(new RegisterBrokerRecord.BrokerEndpointCollection(Arrays.asList(new RegisterBrokerRecord.BrokerEndpoint().setName("PLAINTEXT").setHost("localhost").setPort(9092).setSecurityProtocol((short)0)).iterator())).setFeatures(this.registrationFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_3_IV3)).setRack(null).setFenced(true), 1), new ApiMessageAndVersion((ApiMessage)new RegisterBrokerRecord().setBrokerId(1).setBrokerEpoch(brokerEpochs.get(1)).setIncarnationId(Uuid.fromString((String)"kxAT73dKQsitIedpiPtwB1")).setEndPoints(new RegisterBrokerRecord.BrokerEndpointCollection(Arrays.asList(new RegisterBrokerRecord.BrokerEndpoint().setName("PLAINTEXT").setHost("localhost").setPort(9093).setSecurityProtocol((short)0)).iterator())).setFeatures(this.registrationFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_3_IV3)).setRack(null).setFenced(true), 1), new ApiMessageAndVersion((ApiMessage)new RegisterBrokerRecord().setBrokerId(2).setBrokerEpoch(brokerEpochs.get(2)).setIncarnationId(Uuid.fromString((String)"kxAT73dKQsitIedpiPtwB2")).setEndPoints(new RegisterBrokerRecord.BrokerEndpointCollection(Arrays.asList(new RegisterBrokerRecord.BrokerEndpoint().setName("PLAINTEXT").setHost("localhost").setPort(9094).setSecurityProtocol((short)0)).iterator())).setFeatures(this.registrationFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_3_IV3)).setRack(null).setFenced(true), 1), new ApiMessageAndVersion((ApiMessage)new RegisterBrokerRecord().setBrokerId(3).setBrokerEpoch(brokerEpochs.get(3)).setIncarnationId(Uuid.fromString((String)"kxAT73dKQsitIedpiPtwB3")).setEndPoints(new RegisterBrokerRecord.BrokerEndpointCollection(Arrays.asList(new RegisterBrokerRecord.BrokerEndpoint().setName("PLAINTEXT").setHost("localhost").setPort(9095).setSecurityProtocol((short)0)).iterator())).setFeatures(this.registrationFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_3_IV3)).setRack(null).setFenced(true), 1), new ApiMessageAndVersion((ApiMessage)new BrokerRegistrationChangeRecord().setBrokerId(0).setBrokerEpoch(brokerEpochs.get(0)).setFenced(BrokerRegistrationFencingChange.UNFENCE.value()), 0), new ApiMessageAndVersion((ApiMessage)new BrokerRegistrationChangeRecord().setBrokerId(1).setBrokerEpoch(brokerEpochs.get(1)).setFenced(BrokerRegistrationFencingChange.UNFENCE.value()), 0), new ApiMessageAndVersion((ApiMessage)new BrokerRegistrationChangeRecord().setBrokerId(2).setBrokerEpoch(brokerEpochs.get(2)).setFenced(BrokerRegistrationFencingChange.UNFENCE.value()), 0), new ApiMessageAndVersion((ApiMessage)new TopicRecord().setName("foo").setTopicId(fooId), 0), new ApiMessageAndVersion((ApiMessage)new PartitionRecord().setPartitionId(0).setTopicId(fooId).setReplicas(Arrays.asList(0, 1, 2)).setIsr(Arrays.asList(0, 1, 2)).setRemovingReplicas(Collections.emptyList()).setAddingReplicas(Collections.emptyList()).setLeader(0).setLeaderEpoch(0).setPartitionEpoch(0), 0), new ApiMessageAndVersion((ApiMessage)new PartitionRecord().setPartitionId(1).setTopicId(fooId).setReplicas(Arrays.asList(1, 2, 0)).setIsr(Arrays.asList(1, 2, 0)).setRemovingReplicas(Collections.emptyList()).setAddingReplicas(Collections.emptyList()).setLeader(1).setLeaderEpoch(0).setPartitionEpoch(0), 0), new ApiMessageAndVersion((ApiMessage)new ProducerIdsRecord().setBrokerId(0).setBrokerEpoch(brokerEpochs.get(0)).setNextProducerId(1000L), 0));
    }

    @Test
    public void testTimeouts() throws Throwable {
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).build();
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).setControllerBuilderInitializer(controllerBuilder -> controllerBuilder.setConfigSchema(ConfigurationControlManagerTest.SCHEMA)).build();){
            QuorumController controller = controlEnv.activeController();
            CountDownLatch countDownLatch = controller.pause();
            long now = controller.time().nanoseconds();
            ControllerRequestContext context0 = new ControllerRequestContext(new RequestHeaderData(), KafkaPrincipal.ANONYMOUS, OptionalLong.of(now));
            CompletableFuture createFuture = controller.createTopics(context0, new CreateTopicsRequestData().setTimeoutMs(0).setTopics(new CreateTopicsRequestData.CreatableTopicCollection(Collections.singleton(new CreateTopicsRequestData.CreatableTopic().setName("foo")).iterator())), Collections.emptySet());
            CompletableFuture deleteFuture = controller.deleteTopics(context0, Collections.singletonList(Uuid.ZERO_UUID));
            CompletableFuture findTopicIdsFuture = controller.findTopicIds(context0, Collections.singletonList("foo"));
            CompletableFuture findTopicNamesFuture = controller.findTopicNames(context0, Collections.singletonList(Uuid.ZERO_UUID));
            CompletableFuture createPartitionsFuture = controller.createPartitions(context0, Collections.singletonList(new CreatePartitionsRequestData.CreatePartitionsTopic()), false);
            CompletableFuture electLeadersFuture = controller.electLeaders(context0, new ElectLeadersRequestData().setTimeoutMs(0).setTopicPartitions(null));
            CompletableFuture alterReassignmentsFuture = controller.alterPartitionReassignments(context0, new AlterPartitionReassignmentsRequestData().setTimeoutMs(0).setTopics(Collections.singletonList(new AlterPartitionReassignmentsRequestData.ReassignableTopic())));
            CompletableFuture listReassignmentsFuture = controller.listPartitionReassignments(context0, new ListPartitionReassignmentsRequestData().setTopics(null).setTimeoutMs(0));
            while (controller.time().nanoseconds() == now) {
                Thread.sleep(0L, 10);
            }
            countDownLatch.countDown();
            QuorumControllerTest.assertYieldsTimeout(createFuture);
            QuorumControllerTest.assertYieldsTimeout(deleteFuture);
            QuorumControllerTest.assertYieldsTimeout(findTopicIdsFuture);
            QuorumControllerTest.assertYieldsTimeout(findTopicNamesFuture);
            QuorumControllerTest.assertYieldsTimeout(createPartitionsFuture);
            QuorumControllerTest.assertYieldsTimeout(electLeadersFuture);
            QuorumControllerTest.assertYieldsTimeout(alterReassignmentsFuture);
            QuorumControllerTest.assertYieldsTimeout(listReassignmentsFuture);
        }
    }

    private static void assertYieldsTimeout(Future<?> future) {
        Assertions.assertEquals(TimeoutException.class, ((ExecutionException)Assertions.assertThrows(ExecutionException.class, () -> future.get())).getCause().getClass());
    }

    @Test
    public void testEarlyControllerResults() throws Throwable {
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).build();
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).setControllerBuilderInitializer(controllerBuilder -> controllerBuilder.setConfigSchema(ConfigurationControlManagerTest.SCHEMA)).build();){
            QuorumController controller = controlEnv.activeController();
            CountDownLatch countDownLatch = controller.pause();
            CompletableFuture createFuture = controller.createTopics(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, new CreateTopicsRequestData().setTimeoutMs(120000), Collections.emptySet());
            CompletableFuture deleteFuture = controller.deleteTopics(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, Collections.emptyList());
            CompletableFuture findTopicIdsFuture = controller.findTopicIds(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, Collections.emptyList());
            CompletableFuture findTopicNamesFuture = controller.findTopicNames(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, Collections.emptyList());
            CompletableFuture createPartitionsFuture = controller.createPartitions(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, Collections.emptyList(), false);
            CompletableFuture electLeadersFuture = controller.electLeaders(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, new ElectLeadersRequestData());
            CompletableFuture alterReassignmentsFuture = controller.alterPartitionReassignments(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, new AlterPartitionReassignmentsRequestData());
            createFuture.get();
            deleteFuture.get();
            findTopicIdsFuture.get();
            findTopicNamesFuture.get();
            createPartitionsFuture.get();
            electLeadersFuture.get();
            alterReassignmentsFuture.get();
            countDownLatch.countDown();
        }
    }

    @Disabled
    @Test
    public void testMissingInMemorySnapshot() throws Exception {
        int numBrokers = 3;
        int numPartitions = 3;
        String topicName = "topic-name";
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).build();
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).setControllerBuilderInitializer(controllerBuilder -> controllerBuilder.setConfigSchema(ConfigurationControlManagerTest.SCHEMA)).build();){
            QuorumController controller = controlEnv.activeController();
            Map<Integer, Long> brokerEpochs = this.registerBrokers(controller, numBrokers);
            List partitions = IntStream.range(0, numPartitions).mapToObj(partitionIndex -> new CreateTopicsRequestData.CreatableReplicaAssignment().setPartitionIndex(partitionIndex).setBrokerIds(Arrays.asList(0, 1, 2))).collect(Collectors.toList());
            Uuid topicId = ((CreateTopicsResponseData)controller.createTopics(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, new CreateTopicsRequestData().setTopics(new CreateTopicsRequestData.CreatableTopicCollection(Collections.singleton(new CreateTopicsRequestData.CreatableTopic().setName(topicName).setNumPartitions(-1).setReplicationFactor((short)-1).setAssignments(new CreateTopicsRequestData.CreatableReplicaAssignmentCollection(partitions.iterator()))).iterator())), Collections.singleton("foo")).get()).topics().find(topicName).topicId();
            List alterPartitions = IntStream.range(0, numPartitions).mapToObj(partitionIndex -> {
                PartitionRegistration partitionRegistration = controller.replicationControl().getPartition(topicId, partitionIndex);
                return new AlterPartitionRequestData.PartitionData().setPartitionIndex(partitionIndex).setLeaderEpoch(partitionRegistration.leaderEpoch).setPartitionEpoch(partitionRegistration.partitionEpoch).setNewIsr(Arrays.asList(0, 1));
            }).collect(Collectors.toList());
            AlterPartitionRequestData.TopicData topicData = new AlterPartitionRequestData.TopicData().setTopicName(topicName);
            topicData.partitions().addAll(alterPartitions);
            int leaderId = 0;
            AlterPartitionRequestData alterPartitionRequest = new AlterPartitionRequestData().setBrokerId(leaderId).setBrokerEpoch(brokerEpochs.get(leaderId).longValue());
            alterPartitionRequest.topics().add(topicData);
            logEnv.logManagers().get(0).resignAfterNonAtomicCommit();
            int oldClaimEpoch = controller.curClaimEpoch();
            Assertions.assertThrows(ExecutionException.class, () -> {
                AlterPartitionResponseData cfr_ignored_0 = (AlterPartitionResponseData)controller.alterPartition(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, alterPartitionRequest).get();
            });
            Assertions.assertSame((Object)controller, (Object)controlEnv.activeController());
            Assertions.assertTrue((oldClaimEpoch < controller.curClaimEpoch() ? 1 : 0) != 0, (String)String.format("oldClaimEpoch = %s, newClaimEpoch = %s", oldClaimEpoch, controller.curClaimEpoch()));
            int partitionsWithReplica2 = Utils.toList((Iterator)controller.replicationControl().brokersToIsrs().partitionsWithBrokerInIsr(2)).size();
            int partitionsWithReplica0 = Utils.toList((Iterator)controller.replicationControl().brokersToIsrs().partitionsWithBrokerInIsr(0)).size();
            Assertions.assertEquals((int)numPartitions, (int)partitionsWithReplica0);
            Assertions.assertNotEquals((int)0, (int)partitionsWithReplica2);
            Assertions.assertTrue((partitionsWithReplica0 > partitionsWithReplica2 ? 1 : 0) != 0, (String)String.format("partitionsWithReplica0 = %s, partitionsWithReplica2 = %s", partitionsWithReplica0, partitionsWithReplica2));
        }
    }

    private Map<Integer, Long> registerBrokers(QuorumController controller, int numBrokers) throws Exception {
        HashMap<Integer, Long> brokerEpochs = new HashMap<Integer, Long>();
        for (int brokerId = 0; brokerId < numBrokers; ++brokerId) {
            BrokerRegistrationReply reply = (BrokerRegistrationReply)controller.registerBroker(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, new BrokerRegistrationRequestData().setBrokerId(brokerId).setRack(null).setClusterId(controller.clusterId()).setFeatures(this.brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_4_IV0)).setIncarnationId(Uuid.fromString((String)("kxAT73dKQsitIedpiPtwB" + brokerId))).setListeners(new BrokerRegistrationRequestData.ListenerCollection(Arrays.asList(new BrokerRegistrationRequestData.Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092 + brokerId)).iterator()))).get();
            brokerEpochs.put(brokerId, reply.epoch());
            controller.processBrokerHeartbeat(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, new BrokerHeartbeatRequestData().setWantFence(false).setBrokerEpoch(((Long)brokerEpochs.get(brokerId)).longValue()).setBrokerId(brokerId).setCurrentMetadataOffset(100000L)).get();
        }
        return brokerEpochs;
    }

    private void sendBrokerheartbeat(QuorumController controller, List<Integer> brokers, Map<Integer, Long> brokerEpochs) throws Exception {
        if (brokers.isEmpty()) {
            return;
        }
        for (Integer brokerId : brokers) {
            BrokerHeartbeatReply reply = (BrokerHeartbeatReply)controller.processBrokerHeartbeat(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, new BrokerHeartbeatRequestData().setWantFence(false).setBrokerEpoch(brokerEpochs.get(brokerId).longValue()).setBrokerId(brokerId.intValue()).setCurrentMetadataOffset(100000L)).get();
            Assertions.assertEquals((Object)new BrokerHeartbeatReply(true, false, false, false), (Object)reply);
        }
    }

    @Test
    public void testConfigResourceExistenceChecker() throws Throwable {
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3).build();
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).setControllerBuilderInitializer(controllerBuilder -> controllerBuilder.setConfigSchema(ConfigurationControlManagerTest.SCHEMA)).build();){
            QuorumController active = controlEnv.activeController();
            this.registerBrokers(active, 5);
            active.createTopics(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, new CreateTopicsRequestData().setTopics(new CreateTopicsRequestData.CreatableTopicCollection(Collections.singleton(new CreateTopicsRequestData.CreatableTopic().setName("foo").setReplicationFactor((short)3).setNumPartitions(1)).iterator())), Collections.singleton("foo")).get();
            QuorumController.ConfigResourceExistenceChecker checker = new QuorumController.ConfigResourceExistenceChecker(active);
            checker.accept(new ConfigResource(ConfigResource.Type.BROKER, ""));
            checker.accept(new ConfigResource(ConfigResource.Type.BROKER, "3"));
            Assertions.assertThrows(BrokerIdNotRegisteredException.class, () -> checker.accept(new ConfigResource(ConfigResource.Type.BROKER, "10")));
            checker.accept(new ConfigResource(ConfigResource.Type.TOPIC, "foo"));
            Assertions.assertThrows(UnknownTopicOrPartitionException.class, () -> checker.accept(new ConfigResource(ConfigResource.Type.TOPIC, "bar")));
        }
    }

    @Test
    public void testQuorumControllerCompletesAuthorizerInitialLoad() throws Throwable {
        int numControllers = 3;
        ArrayList<StandardAuthorizer> authorizers = new ArrayList<StandardAuthorizer>(3);
        for (int i = 0; i < 3; ++i) {
            StandardAuthorizer authorizer = new StandardAuthorizer();
            authorizer.configure(Collections.emptyMap());
            authorizers.add(authorizer);
        }
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3).setSharedLogDataInitializer(sharedLogData -> sharedLogData.setInitialMaxReadOffset(2L)).build();){
            logEnv.appendInitialRecords(this.generateTestRecords(FOO_ID, ALL_ZERO_BROKER_EPOCHS));
            logEnv.logManagers().forEach(m -> m.setMaxReadOffset(2L));
            try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).setControllerBuilderInitializer(controllerBuilder -> controllerBuilder.setAuthorizer((ClusterMetadataAuthorizer)authorizers.get(controllerBuilder.nodeId()))).build();){
                QuorumControllerTest.assertInitialLoadFuturesNotComplete(authorizers);
                logEnv.logManagers().get(0).setMaxReadOffset(Long.MAX_VALUE);
                QuorumController active = controlEnv.activeController();
                active.unregisterBroker(ControllerRequestContextUtil.ANONYMOUS_CONTEXT, 3).get();
                QuorumControllerTest.assertInitialLoadFuturesNotComplete(authorizers.stream().skip(1L).collect(Collectors.toList()));
                logEnv.logManagers().forEach(m -> m.setMaxReadOffset(Long.MAX_VALUE));
                TestUtils.waitForCondition(() -> authorizers.stream().allMatch(a -> a.initialLoadFuture().isDone()), (String)"Failed to complete initial authorizer load for all controllers.");
            }
        }
    }

    @Test
    public void testFatalMetadataReplayErrorOnActive() throws Throwable {
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3).build();
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).build();){
            QuorumController active = controlEnv.activeController();
            CompletableFuture future = active.appendWriteEvent("errorEvent", OptionalLong.empty(), () -> ControllerResult.of(Collections.singletonList(new ApiMessageAndVersion((ApiMessage)new ConfigRecord().setName(null).setResourceName(null).setResourceType((byte)-1).setValue(null), 0)), null));
            Assertions.assertThrows(ExecutionException.class, () -> {
                Void cfr_ignored_0 = (Void)future.get();
            });
            Assertions.assertEquals(NullPointerException.class, controlEnv.fatalFaultHandler(active.nodeId()).firstException().getCause().getClass());
            controlEnv.ignoreFatalFaults();
        }
    }

    @Test
    public void testFatalMetadataErrorDuringSnapshotLoading() throws Exception {
        InitialSnapshot invalidSnapshot = new InitialSnapshot(Collections.unmodifiableList(Arrays.asList(new ApiMessageAndVersion((ApiMessage)new PartitionRecord(), 0))));
        LocalLogManagerTestEnv.Builder logEnvBuilder = new LocalLogManagerTestEnv.Builder(3).setSnapshotReader((RawSnapshotReader)FileRawSnapshotReader.open((Path)invalidSnapshot.tempDir.toPath(), (OffsetAndEpoch)new OffsetAndEpoch(0L, 0)));
        try (LocalLogManagerTestEnv logEnv = logEnvBuilder.build();
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).build();){
            TestUtils.waitForCondition(() -> controlEnv.controllers().stream().allMatch(controller -> controlEnv.fatalFaultHandler(controller.nodeId()).firstException() != null), (String)"At least one controller failed to detect the fatal fault");
            controlEnv.ignoreFatalFaults();
        }
    }

    @Test
    public void testFatalMetadataErrorDuringLogLoading() throws Exception {
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3).build();){
            logEnv.appendInitialRecords(Collections.unmodifiableList(Arrays.asList(new ApiMessageAndVersion((ApiMessage)new PartitionRecord(), 0))));
            try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).build();){
                TestUtils.waitForCondition(() -> controlEnv.controllers().stream().allMatch(controller -> controlEnv.fatalFaultHandler(controller.nodeId()).firstException() != null), (String)"At least one controller failed to detect the fatal fault");
                controlEnv.ignoreFatalFaults();
            }
        }
    }

    private static void assertInitialLoadFuturesNotComplete(List<StandardAuthorizer> authorizers) {
        for (int i = 0; i < authorizers.size(); ++i) {
            Assertions.assertFalse((boolean)authorizers.get(i).initialLoadFuture().isDone(), (String)("authorizer " + i + " should not have completed loading."));
        }
    }

    @Test
    public void testUpgradeFromPreProductionVersion() throws Exception {
        try (InitialSnapshot initialSnapshot = new InitialSnapshot(PRE_PRODUCTION_RECORDS);
             LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3).setSnapshotReader((RawSnapshotReader)FileRawSnapshotReader.open((Path)initialSnapshot.tempDir.toPath(), (OffsetAndEpoch)new OffsetAndEpoch(0L, 0))).build();
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).setControllerBuilderInitializer(controllerBuilder -> controllerBuilder.setConfigSchema(ConfigurationControlManagerTest.SCHEMA)).setBootstrapMetadata(COMPLEX_BOOTSTRAP).build();){
            QuorumController active = controlEnv.activeController();
            TestUtils.waitForCondition(() -> active.featureControl().metadataVersion().equals((Object)MetadataVersion.IBP_3_0_IV1), (String)("Failed to get a metadata version of " + MetadataVersion.IBP_3_0_IV1));
            Assertions.assertEquals(Collections.emptyMap(), (Object)active.configurationControl().getConfigs(new ConfigResource(ConfigResource.Type.BROKER, "")));
        }
    }

    @Test
    public void testInsertBootstrapRecordsToEmptyLog() throws Exception {
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3).build();
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).setControllerBuilderInitializer(controllerBuilder -> controllerBuilder.setConfigSchema(ConfigurationControlManagerTest.SCHEMA)).setBootstrapMetadata(COMPLEX_BOOTSTRAP).build();){
            QuorumController active = controlEnv.activeController();
            ControllerRequestContext ctx = new ControllerRequestContext(new RequestHeaderData(), KafkaPrincipal.ANONYMOUS, OptionalLong.of(Long.MAX_VALUE));
            TestUtils.waitForCondition(() -> {
                FinalizedControllerFeatures features = (FinalizedControllerFeatures)active.finalizedFeatures(ctx).get();
                Optional metadataVersionOpt = features.get("metadata.version");
                return Optional.of(MetadataVersion.IBP_3_3_IV1.featureLevel()).equals(metadataVersionOpt);
            }, (String)"Failed to see expected metadata version from bootstrap metadata");
            TestUtils.waitForCondition(() -> {
                ConfigResource defaultBrokerResource = new ConfigResource(ConfigResource.Type.BROKER, "");
                Map configs = Collections.singletonMap(defaultBrokerResource, Collections.emptyList());
                Map results = (Map)active.describeConfigs(ctx, configs).get();
                ResultOrError resultOrError = (ResultOrError)results.get(defaultBrokerResource);
                return resultOrError.isResult() && Collections.singletonMap("foo", "bar").equals(resultOrError.result());
            }, (String)"Failed to see expected config change from bootstrap metadata");
        }
    }

    private static ApiMessageAndVersion rec(int i) {
        return new ApiMessageAndVersion((ApiMessage)new BrokerRegistrationChangeRecord().setBrokerId(i), 0);
    }

    @Test
    public void testAppendRecords() {
        TestAppender appender = new TestAppender();
        Assertions.assertEquals((long)5L, (long)QuorumController.appendRecords((Logger)log, (ControllerResult)ControllerResult.of(Arrays.asList(QuorumControllerTest.rec(0), QuorumControllerTest.rec(1), QuorumControllerTest.rec(2), QuorumControllerTest.rec(3), QuorumControllerTest.rec(4)), null), (int)2, (Function)appender));
    }

    @Test
    public void testAppendRecordsAtomically() {
        TestAppender appender = new TestAppender();
        Assertions.assertEquals((Object)"Attempted to atomically commit 5 records, but maxRecordsPerBatch is 2", (Object)((IllegalStateException)Assertions.assertThrows(IllegalStateException.class, () -> QuorumController.appendRecords((Logger)log, (ControllerResult)ControllerResult.atomicOf(Arrays.asList(QuorumControllerTest.rec(0), QuorumControllerTest.rec(1), QuorumControllerTest.rec(2), QuorumControllerTest.rec(3), QuorumControllerTest.rec(4)), null), (int)2, (Function)appender))).getMessage());
    }

    static class TestAppender
    implements Function<List<ApiMessageAndVersion>, Long> {
        private long offset = 0L;

        TestAppender() {
        }

        @Override
        public Long apply(List<ApiMessageAndVersion> apiMessageAndVersions) {
            for (ApiMessageAndVersion apiMessageAndVersion : apiMessageAndVersions) {
                BrokerRegistrationChangeRecord record = (BrokerRegistrationChangeRecord)apiMessageAndVersion.message();
                Assertions.assertEquals((int)((int)this.offset), (int)record.brokerId());
                ++this.offset;
            }
            return this.offset;
        }
    }

    static class InitialSnapshot
    implements AutoCloseable {
        File tempDir = TestUtils.tempDirectory();
        BatchFileWriter writer = null;

        public InitialSnapshot(List<ApiMessageAndVersion> records) throws Exception {
            Path path = Snapshots.snapshotPath((Path)this.tempDir.toPath(), (OffsetAndEpoch)new OffsetAndEpoch(0L, 0));
            this.writer = BatchFileWriter.open((Path)path);
            this.writer.append(records);
            this.writer.close();
            this.writer = null;
        }

        @Override
        public void close() throws Exception {
            Utils.closeQuietly((AutoCloseable)this.writer, (String)"BatchFileWriter");
            Utils.delete((File)this.tempDir);
        }
    }
}

