/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.controller;

import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.common.DirectoryId;
import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
import org.apache.kafka.common.errors.PolicyViolationException;
import org.apache.kafka.common.errors.StaleBrokerEpochException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.AlterPartitionRequestData;
import org.apache.kafka.common.message.AlterPartitionResponseData;
import org.apache.kafka.common.message.AssignReplicasToDirsRequestData;
import org.apache.kafka.common.message.AssignReplicasToDirsResponseData;
import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
import org.apache.kafka.common.message.CreatePartitionsRequestData;
import org.apache.kafka.common.message.CreatePartitionsResponseData;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsResponseData;
import org.apache.kafka.common.message.ElectLeadersRequestData;
import org.apache.kafka.common.message.ElectLeadersResponseData;
import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.ClearElrRecord;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.metadata.MetadataRecordType;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.RemoveTopicRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AlterPartitionRequest;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
import org.apache.kafka.controller.BrokerHeartbeatManager;
import org.apache.kafka.controller.ClusterControlManager;
import org.apache.kafka.controller.ConfigurationControlManager;
import org.apache.kafka.controller.ControllerRequestContext;
import org.apache.kafka.controller.ControllerRequestContextUtil;
import org.apache.kafka.controller.ControllerResult;
import org.apache.kafka.controller.FeatureControlManager;
import org.apache.kafka.controller.OffsetControlManager;
import org.apache.kafka.controller.QuorumFeatures;
import org.apache.kafka.controller.ReplicationControlManager;
import org.apache.kafka.controller.ResultOrError;
import org.apache.kafka.metadata.AssignmentsHelper;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.metadata.BrokerRegistrationInControlledShutdownChange;
import org.apache.kafka.metadata.FakeKafkaConfigSchema;
import org.apache.kafka.metadata.LeaderRecoveryState;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.metadata.Replicas;
import org.apache.kafka.metadata.placement.PartitionAssignmentTest;
import org.apache.kafka.metadata.placement.ReplicaPlacer;
import org.apache.kafka.metadata.placement.StripedReplicaPlacer;
import org.apache.kafka.metadata.placement.UsableBroker;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.common.TopicIdPartition;
import org.apache.kafka.server.policy.CreateTopicPolicy;
import org.apache.kafka.server.util.MockRandom;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Timeout(value=40L)
public class ReplicationControlManagerTest {
    private static final Logger log = LoggerFactory.getLogger(ReplicationControlManagerTest.class);
    private static final int BROKER_SESSION_TIMEOUT_MS = 1000;
    private static final ListPartitionReassignmentsResponseData NONE_REASSIGNING = new ListPartitionReassignmentsResponseData().setErrorMessage(null);

    static CreateTopicsResponseData withoutConfigs(CreateTopicsResponseData data) {
        data.topics().forEach(t -> t.configs().clear());
        return data;
    }

    @Test
    public void testExcessiveNumberOfTopicsCannotBeCreated() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        CreateTopicsRequestData request = new CreateTopicsRequestData();
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions(5000).setReplicationFactor((short)1));
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("bar").setNumPartitions(5000).setReplicationFactor((short)1));
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("baz").setNumPartitions(1).setReplicationFactor((short)1));
        ControllerRequestContext requestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.CREATE_TOPICS);
        PolicyViolationException error = (PolicyViolationException)Assertions.assertThrows(PolicyViolationException.class, () -> replicationControl.createTopics(requestContext, request, Set.of("foo", "bar", "baz")));
        Assertions.assertEquals((Object)error.getMessage(), (Object)"Excessively large number of partitions per request.");
    }

    @Test
    public void testExcessiveNumberOfTopicsCannotBeCreatedWithAssignments() {
        CreateTopicsRequestData request = new CreateTopicsRequestData();
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions(-1).setReplicationFactor((short)1));
        CreateTopicsRequestData.CreatableReplicaAssignmentCollection assignments = new CreateTopicsRequestData.CreatableReplicaAssignmentCollection();
        assignments.add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableReplicaAssignment().setPartitionIndex(1));
        assignments.add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableReplicaAssignment().setPartitionIndex(2));
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("baz").setAssignments(assignments));
        PolicyViolationException error = (PolicyViolationException)Assertions.assertThrows(PolicyViolationException.class, () -> ReplicationControlManager.validateTotalNumberOfPartitions((CreateTopicsRequestData)request, (int)9999));
        Assertions.assertEquals((Object)error.getMessage(), (Object)"Excessively large number of partitions per request.");
    }

    @Test
    public void testCreateTopics() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        CreateTopicsRequestData request = new CreateTopicsRequestData();
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions(-1).setReplicationFactor((short)-1));
        ControllerRequestContext requestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.CREATE_TOPICS);
        ControllerResult result = replicationControl.createTopics(requestContext, request, Set.of("foo"));
        CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData();
        expectedResponse.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsResponseData.CreatableTopicResult().setName("foo").setErrorCode(Errors.INVALID_REPLICATION_FACTOR.code()).setErrorMessage("Unable to replicate the partition 3 time(s): All brokers are currently fenced."));
        Assertions.assertEquals((Object)expectedResponse, (Object)result.response());
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0);
        ctx.inControlledShutdownBrokers(0);
        ControllerResult result2 = replicationControl.createTopics(requestContext, request, Set.of("foo"));
        CreateTopicsResponseData expectedResponse2 = new CreateTopicsResponseData();
        expectedResponse2.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsResponseData.CreatableTopicResult().setName("foo").setErrorCode(Errors.INVALID_REPLICATION_FACTOR.code()).setErrorMessage("Unable to replicate the partition 3 time(s): All brokers are currently fenced or in controlled shutdown."));
        Assertions.assertEquals((Object)expectedResponse2, (Object)result2.response());
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        ControllerResult result3 = replicationControl.createTopics(requestContext, request, Set.of("foo"));
        CreateTopicsResponseData expectedResponse3 = new CreateTopicsResponseData();
        expectedResponse3.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsResponseData.CreatableTopicResult().setName("foo").setNumPartitions(1).setReplicationFactor((short)3).setErrorMessage(null).setErrorCode((short)0).setTopicId(((CreateTopicsResponseData)result3.response()).topics().find("foo").topicId()));
        Assertions.assertEquals((Object)expectedResponse3, (Object)ReplicationControlManagerTest.withoutConfigs((CreateTopicsResponseData)result3.response()));
        ctx.replay(result3.records());
        Assertions.assertEquals((Object)new PartitionRegistration.Builder().setReplicas(new int[]{1, 2, 0}).setDirectories(new Uuid[]{Uuid.fromString((String)"TESTBROKER00001DIRAAAA"), Uuid.fromString((String)"TESTBROKER00002DIRAAAA"), Uuid.fromString((String)"TESTBROKER00000DIRAAAA")}).setIsr(new int[]{1, 2, 0}).setLeader(Integer.valueOf(1)).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(Integer.valueOf(0)).setPartitionEpoch(Integer.valueOf(0)).build(), (Object)replicationControl.getPartition(((TopicRecord)((ApiMessageAndVersion)result3.records().get(0)).message()).topicId(), 0));
        ControllerResult result4 = replicationControl.createTopics(requestContext, request, Set.of("foo"));
        CreateTopicsResponseData expectedResponse4 = new CreateTopicsResponseData();
        expectedResponse4.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsResponseData.CreatableTopicResult().setName("foo").setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code()).setErrorMessage("Topic 'foo' already exists."));
        Assertions.assertEquals((Object)expectedResponse4, (Object)result4.response());
    }

    @Test
    public void testCreateTopicsWithMutationQuotaExceeded() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        CreateTopicsRequestData request = new CreateTopicsRequestData();
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions(-1).setReplicationFactor((short)-1));
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        ControllerRequestContext requestContext = ControllerRequestContextUtil.anonymousContextWithMutationQuotaExceededFor(ApiKeys.CREATE_TOPICS);
        ControllerResult result = replicationControl.createTopics(requestContext, request, Set.of("foo"));
        CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData();
        expectedResponse.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsResponseData.CreatableTopicResult().setName("foo").setErrorCode(Errors.THROTTLING_QUOTA_EXCEEDED.code()).setErrorMessage("Quota exceeded in test"));
        Assertions.assertEquals((Object)expectedResponse, (Object)result.response());
    }

    @Test
    public void testCreateTopicsISRInvariants() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        CreateTopicsRequestData request = new CreateTopicsRequestData();
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions(-1).setReplicationFactor((short)-1));
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1);
        ctx.inControlledShutdownBrokers(1);
        ControllerRequestContext requestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.CREATE_TOPICS);
        ControllerResult result = replicationControl.createTopics(requestContext, request, Set.of("foo"));
        CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData();
        expectedResponse.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsResponseData.CreatableTopicResult().setName("foo").setNumPartitions(1).setReplicationFactor((short)3).setErrorMessage(null).setErrorCode((short)0).setTopicId(((CreateTopicsResponseData)result.response()).topics().find("foo").topicId()));
        for (CreateTopicsResponseData.CreatableTopicResult topic : ((CreateTopicsResponseData)result.response()).topics()) {
            topic.configs().clear();
        }
        Assertions.assertEquals((Object)expectedResponse, (Object)result.response());
        ctx.replay(result.records());
        Assertions.assertEquals((Object)new PartitionRegistration.Builder().setReplicas(new int[]{1, 0, 2}).setDirectories(new Uuid[]{Uuid.fromString((String)"TESTBROKER00001DIRAAAA"), Uuid.fromString((String)"TESTBROKER00000DIRAAAA"), Uuid.fromString((String)"TESTBROKER00002DIRAAAA")}).setIsr(new int[]{0}).setLeader(Integer.valueOf(0)).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(Integer.valueOf(0)).setPartitionEpoch(Integer.valueOf(0)).build(), (Object)replicationControl.getPartition(((TopicRecord)((ApiMessageAndVersion)result.records().get(0)).message()).topicId(), 0));
    }

    @Test
    public void testCreateTopicsWithConfigs() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        CreateTopicsRequestData.CreatableTopicConfigCollection validConfigs = new CreateTopicsRequestData.CreatableTopicConfigCollection();
        validConfigs.add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopicConfig().setName("foo").setValue("notNull"));
        CreateTopicsRequestData request1 = new CreateTopicsRequestData();
        request1.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions(-1).setReplicationFactor((short)-1).setConfigs(validConfigs));
        ControllerRequestContext requestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.CREATE_TOPICS);
        ControllerResult result1 = replicationControl.createTopics(requestContext, request1, Set.of("foo"));
        Assertions.assertEquals((short)0, (short)((CreateTopicsResponseData)result1.response()).topics().find("foo").errorCode());
        List records1 = result1.records();
        Assertions.assertEquals((int)3, (int)records1.size());
        ApiMessageAndVersion record0 = (ApiMessageAndVersion)records1.get(0);
        Assertions.assertEquals(TopicRecord.class, (Object)record0.message().getClass());
        ApiMessageAndVersion record1 = (ApiMessageAndVersion)records1.get(1);
        Assertions.assertEquals(ConfigRecord.class, (Object)record1.message().getClass());
        ApiMessageAndVersion lastRecord = (ApiMessageAndVersion)records1.get(2);
        Assertions.assertEquals(PartitionRecord.class, (Object)lastRecord.message().getClass());
        ctx.replay(result1.records());
        Assertions.assertEquals((Object)"notNull", ctx.configurationControl.getConfigs(new ConfigResource(ConfigResource.Type.TOPIC, "foo")).get("foo"));
        CreateTopicsRequestData.CreatableTopicConfigCollection invalidConfigs = new CreateTopicsRequestData.CreatableTopicConfigCollection();
        invalidConfigs.add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopicConfig().setName("foo").setValue(null));
        CreateTopicsRequestData request2 = new CreateTopicsRequestData();
        request2.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("bar").setNumPartitions(-1).setReplicationFactor((short)-1).setConfigs(invalidConfigs));
        ControllerResult result2 = replicationControl.createTopics(requestContext, request2, Set.of("bar"));
        Assertions.assertEquals((short)Errors.INVALID_CONFIG.code(), (short)((CreateTopicsResponseData)result2.response()).topics().find("bar").errorCode());
        Assertions.assertEquals((Object)"Null value not supported for topic configs: foo", (Object)((CreateTopicsResponseData)result2.response()).topics().find("bar").errorMessage());
        CreateTopicsRequestData request3 = new CreateTopicsRequestData();
        request3.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("baz").setNumPartitions(-1).setReplicationFactor((short)-2).setConfigs(validConfigs));
        ControllerResult result3 = replicationControl.createTopics(requestContext, request3, Set.of("baz"));
        Assertions.assertEquals((short)Errors.INVALID_REPLICATION_FACTOR.code(), (short)((CreateTopicsResponseData)result3.response()).topics().find("baz").errorCode());
        Assertions.assertEquals(List.of(), (Object)result3.records());
        CreateTopicsRequestData request4 = new CreateTopicsRequestData();
        String batchedTopic1 = "batched-topic-1";
        request4.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName(batchedTopic1).setNumPartitions(-1).setReplicationFactor((short)-1).setConfigs(validConfigs));
        String batchedTopic2 = "batched-topic2";
        request4.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName(batchedTopic2).setNumPartitions(-1).setReplicationFactor((short)-2).setConfigs(validConfigs));
        HashSet<String> request4Topics = new HashSet<String>();
        request4Topics.add(batchedTopic1);
        request4Topics.add(batchedTopic2);
        ControllerResult result4 = replicationControl.createTopics(requestContext, request4, request4Topics);
        Assertions.assertEquals((short)Errors.NONE.code(), (short)((CreateTopicsResponseData)result4.response()).topics().find(batchedTopic1).errorCode());
        Assertions.assertEquals((short)Errors.INVALID_REPLICATION_FACTOR.code(), (short)((CreateTopicsResponseData)result4.response()).topics().find(batchedTopic2).errorCode());
        Assertions.assertEquals((int)3, (int)result4.records().size());
        Assertions.assertEquals(TopicRecord.class, (Object)((ApiMessageAndVersion)result4.records().get(0)).message().getClass());
        TopicRecord batchedTopic1Record = (TopicRecord)((ApiMessageAndVersion)result4.records().get(0)).message();
        Assertions.assertEquals((Object)batchedTopic1, (Object)batchedTopic1Record.name());
        Assertions.assertEquals((Object)new ConfigRecord().setResourceName(batchedTopic1).setResourceType(ConfigResource.Type.TOPIC.id()).setName("foo").setValue("notNull"), (Object)((ApiMessageAndVersion)result4.records().get(1)).message());
        Assertions.assertEquals(PartitionRecord.class, (Object)((ApiMessageAndVersion)result4.records().get(2)).message().getClass());
        Assertions.assertEquals((Object)batchedTopic1Record.topicId(), (Object)((PartitionRecord)((ApiMessageAndVersion)result4.records().get(2)).message()).topicId());
    }

    @ParameterizedTest(name="testCreateTopicsWithValidateOnlyFlag with mutationQuotaExceeded: {0}")
    @ValueSource(booleans={true, false})
    public void testCreateTopicsWithValidateOnlyFlag(boolean mutationQuotaExceeded) {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        CreateTopicsRequestData request = new CreateTopicsRequestData().setValidateOnly(true);
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions(1).setReplicationFactor((short)3));
        ControllerRequestContext requestContext = mutationQuotaExceeded ? ControllerRequestContextUtil.anonymousContextWithMutationQuotaExceededFor(ApiKeys.CREATE_TOPICS) : ControllerRequestContextUtil.anonymousContextFor(ApiKeys.CREATE_TOPICS);
        ControllerResult result = ctx.replicationControl.createTopics(requestContext, request, Set.of("foo"));
        Assertions.assertEquals((int)0, (int)result.records().size());
        CreateTopicsResponseData.CreatableTopicResult topicResult = ((CreateTopicsResponseData)result.response()).topics().find("foo");
        if (mutationQuotaExceeded) {
            Assertions.assertEquals((short)Errors.THROTTLING_QUOTA_EXCEEDED.code(), (short)topicResult.errorCode());
        } else {
            Assertions.assertEquals((short)Errors.NONE.code(), (short)topicResult.errorCode());
        }
    }

    @Test
    public void testInvalidCreateTopicsWithValidateOnlyFlag() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        CreateTopicsRequestData request = new CreateTopicsRequestData().setValidateOnly(true);
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions(1).setReplicationFactor((short)4));
        ControllerRequestContext requestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.CREATE_TOPICS);
        ControllerResult result = ctx.replicationControl.createTopics(requestContext, request, Set.of("foo"));
        Assertions.assertEquals((int)0, (int)result.records().size());
        CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData();
        expectedResponse.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsResponseData.CreatableTopicResult().setName("foo").setErrorCode(Errors.INVALID_REPLICATION_FACTOR.code()).setErrorMessage("Unable to replicate the partition 4 time(s): The target replication factor of 4 cannot be reached because only 3 broker(s) are registered."));
        Assertions.assertEquals((Object)expectedResponse, (Object)result.response());
    }

    @Test
    public void testCreateTopicsWithPolicy() {
        MockCreateTopicPolicy createTopicPolicy = new MockCreateTopicPolicy(List.of(new CreateTopicPolicy.RequestMetadata("foo", Integer.valueOf(2), Short.valueOf((short)2), null, Map.of()), new CreateTopicPolicy.RequestMetadata("bar", Integer.valueOf(3), Short.valueOf((short)2), null, Map.of()), new CreateTopicPolicy.RequestMetadata("baz", null, null, Map.of(0, List.of(Integer.valueOf(2), Integer.valueOf(1), Integer.valueOf(0))), Map.of("segment.bytes", "12300000")), new CreateTopicPolicy.RequestMetadata("quux", null, null, Map.of(0, List.of(Integer.valueOf(2), Integer.valueOf(1), Integer.valueOf(0))), Map.of())));
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setCreateTopicPolicy(createTopicPolicy).build();
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        ctx.createTestTopic("foo", 2, (short)2, Errors.NONE.code());
        ctx.createTestTopic("bar", 3, (short)3, Errors.POLICY_VIOLATION.code());
        ctx.createTestTopic("baz", new int[][]{{2, 1, 0}}, Map.of("segment.bytes", "12300000"), Errors.NONE.code());
        ctx.createTestTopic("quux", new int[][]{{1, 2, 0}}, Errors.POLICY_VIOLATION.code());
    }

    @Test
    public void testCreateTopicWithCollisionChars() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        CreateTopicsResponseData.CreatableTopicResult initialTopic = ctx.createTestTopic("foo.bar", 2, (short)2, Errors.NONE.code());
        Assertions.assertEquals((int)2, (int)ctx.replicationControl.getTopic(initialTopic.topicId()).numPartitions(Long.MAX_VALUE));
        ControllerRequestContext requestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.DELETE_TOPICS);
        ctx.deleteTopic(requestContext, initialTopic.topicId());
        CreateTopicsResponseData.CreatableTopicResult recreatedTopic = ctx.createTestTopic("foo.bar", 4, (short)2, Errors.NONE.code());
        Assertions.assertNotEquals((Object)initialTopic.topicId(), (Object)recreatedTopic.topicId());
        Assertions.assertEquals((int)4, (int)ctx.replicationControl.getTopic(recreatedTopic.topicId()).numPartitions(Long.MAX_VALUE));
    }

    @Test
    public void testValidateNewTopicNames() {
        HashMap topicErrors = new HashMap();
        CreateTopicsRequestData.CreatableTopicCollection topics = new CreateTopicsRequestData.CreatableTopicCollection();
        topics.add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName(""));
        topics.add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("woo"));
        topics.add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("."));
        ReplicationControlManager.validateNewTopicNames(topicErrors, (CreateTopicsRequestData.CreatableTopicCollection)topics, Map.of());
        HashMap<String, ApiError> expectedTopicErrors = new HashMap<String, ApiError>();
        expectedTopicErrors.put("", new ApiError(Errors.INVALID_TOPIC_EXCEPTION, "Topic name is invalid: the empty string is not allowed"));
        expectedTopicErrors.put(".", new ApiError(Errors.INVALID_TOPIC_EXCEPTION, "Topic name is invalid: '.' is not allowed"));
        Assertions.assertEquals(expectedTopicErrors, topicErrors);
    }

    @Test
    public void testTopicNameCollision() {
        HashMap topicErrors = new HashMap();
        CreateTopicsRequestData.CreatableTopicCollection topics = new CreateTopicsRequestData.CreatableTopicCollection();
        topics.add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("foo.bar"));
        topics.add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("woo.bar_foo"));
        HashMap<String, TreeSet<String>> collisionMap = new HashMap<String, TreeSet<String>>();
        collisionMap.put("foo_bar", new TreeSet<String>(List.of("foo_bar")));
        collisionMap.put("woo_bar_foo", new TreeSet<String>(List.of("woo.bar.foo", "woo_bar.foo")));
        ReplicationControlManager.validateNewTopicNames(topicErrors, (CreateTopicsRequestData.CreatableTopicCollection)topics, collisionMap);
        HashMap<String, ApiError> expectedTopicErrors = new HashMap<String, ApiError>();
        expectedTopicErrors.put("foo.bar", new ApiError(Errors.INVALID_TOPIC_EXCEPTION, "Topic 'foo.bar' collides with existing topic: foo_bar"));
        expectedTopicErrors.put("woo.bar_foo", new ApiError(Errors.INVALID_TOPIC_EXCEPTION, "Topic 'woo.bar_foo' collides with existing topic: woo.bar.foo"));
        Assertions.assertEquals(expectedTopicErrors, topicErrors);
    }

    @Test
    public void testRemoveLeaderships() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2, 3);
        ctx.unfenceBrokers(0, 1, 2, 3);
        CreateTopicsResponseData.CreatableTopicResult result = ctx.createTestTopic("foo", new int[][]{{0, 1, 2}, {1, 2, 3}, {2, 3, 0}, {0, 2, 1}});
        HashSet<TopicIdPartition> expectedPartitions = new HashSet<TopicIdPartition>();
        expectedPartitions.add(new TopicIdPartition(result.topicId(), 0));
        expectedPartitions.add(new TopicIdPartition(result.topicId(), 3));
        Assertions.assertEquals(expectedPartitions, RecordTestUtils.iteratorToSet(replicationControl.brokersToIsrs().iterator(0, true)));
        ArrayList<ApiMessageAndVersion> records = new ArrayList<ApiMessageAndVersion>();
        replicationControl.handleBrokerFenced(0, records);
        ctx.replay(records);
        Assertions.assertEquals(Set.of(), RecordTestUtils.iteratorToSet(replicationControl.brokersToIsrs().iterator(0, true)));
    }

    @Test
    public void testShrinkAndExpandIsr() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        CreateTopicsResponseData.CreatableTopicResult createTopicResult = ctx.createTestTopic("foo", new int[][]{{0, 1, 2}});
        TopicIdPartition topicIdPartition = new TopicIdPartition(createTopicResult.topicId(), 0);
        Assertions.assertEquals((Object)OptionalInt.of(0), (Object)ctx.currentLeader(topicIdPartition));
        long brokerEpoch = ctx.currentBrokerEpoch(0);
        AlterPartitionRequestData.PartitionData shrinkIsrRequest = this.newAlterPartition(replicationControl, topicIdPartition, ReplicationControlManagerTest.isrWithDefaultEpoch(0, 1), LeaderRecoveryState.RECOVERED);
        ControllerResult<AlterPartitionResponseData> shrinkIsrResult = this.sendAlterPartition(replicationControl, 0, brokerEpoch, topicIdPartition.topicId(), shrinkIsrRequest);
        AlterPartitionResponseData.PartitionData shrinkIsrResponse = this.assertAlterPartitionResponse(shrinkIsrResult, topicIdPartition, Errors.NONE);
        this.assertConsistentAlterPartitionResponse(replicationControl, topicIdPartition, shrinkIsrResponse);
        AlterPartitionRequestData.PartitionData expandIsrRequest = this.newAlterPartition(replicationControl, topicIdPartition, ReplicationControlManagerTest.isrWithDefaultEpoch(0, 1, 2), LeaderRecoveryState.RECOVERED);
        ControllerResult<AlterPartitionResponseData> expandIsrResult = this.sendAlterPartition(replicationControl, 0, brokerEpoch, topicIdPartition.topicId(), expandIsrRequest);
        AlterPartitionResponseData.PartitionData expandIsrResponse = this.assertAlterPartitionResponse(expandIsrResult, topicIdPartition, Errors.NONE);
        this.assertConsistentAlterPartitionResponse(replicationControl, topicIdPartition, expandIsrResponse);
    }

    @Test
    public void testEligibleLeaderReplicas_ShrinkAndExpandIsr() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setIsElrEnabled(true).build();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        CreateTopicsResponseData.CreatableTopicResult createTopicResult = ctx.createTestTopic("foo", new int[][]{{0, 1, 2}});
        TopicIdPartition topicIdPartition = new TopicIdPartition(createTopicResult.topicId(), 0);
        Assertions.assertEquals((Object)OptionalInt.of(0), (Object)ctx.currentLeader(topicIdPartition));
        long brokerEpoch = ctx.currentBrokerEpoch(0);
        ctx.alterTopicConfig("foo", "min.insync.replicas", "2");
        AlterPartitionRequestData.PartitionData shrinkIsrRequest = this.newAlterPartition(replicationControl, topicIdPartition, ReplicationControlManagerTest.isrWithDefaultEpoch(0), LeaderRecoveryState.RECOVERED);
        ControllerResult<AlterPartitionResponseData> shrinkIsrResult = this.sendAlterPartition(replicationControl, 0, brokerEpoch, topicIdPartition.topicId(), shrinkIsrRequest);
        AlterPartitionResponseData.PartitionData shrinkIsrResponse = this.assertAlterPartitionResponse(shrinkIsrResult, topicIdPartition, Errors.NONE);
        this.assertConsistentAlterPartitionResponse(replicationControl, topicIdPartition, shrinkIsrResponse);
        PartitionRegistration partition = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
        Assertions.assertArrayEquals((int[])new int[]{1, 2}, (int[])partition.elr, (String)partition.toString());
        Assertions.assertArrayEquals((int[])new int[0], (int[])partition.lastKnownElr, (String)partition.toString());
        AlterPartitionRequestData.PartitionData expandIsrRequest = this.newAlterPartition(replicationControl, topicIdPartition, ReplicationControlManagerTest.isrWithDefaultEpoch(0, 1), LeaderRecoveryState.RECOVERED);
        ControllerResult<AlterPartitionResponseData> expandIsrResult = this.sendAlterPartition(replicationControl, 0, brokerEpoch, topicIdPartition.topicId(), expandIsrRequest);
        AlterPartitionResponseData.PartitionData expandIsrResponse = this.assertAlterPartitionResponse(expandIsrResult, topicIdPartition, Errors.NONE);
        this.assertConsistentAlterPartitionResponse(replicationControl, topicIdPartition, expandIsrResponse);
        partition = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
        Assertions.assertArrayEquals((int[])new int[0], (int[])partition.elr, (String)partition.toString());
        Assertions.assertArrayEquals((int[])new int[0], (int[])partition.lastKnownElr, (String)partition.toString());
    }

    @Test
    public void testEligibleLeaderReplicas_ShrinkToEmptyIsr() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setIsElrEnabled(true).build();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        CreateTopicsResponseData.CreatableTopicResult createTopicResult = ctx.createTestTopic("foo", new int[][]{{0, 1, 2}});
        TopicIdPartition topicIdPartition = new TopicIdPartition(createTopicResult.topicId(), 0);
        Assertions.assertEquals((Object)OptionalInt.of(0), (Object)ctx.currentLeader(topicIdPartition));
        ctx.alterTopicConfig("foo", "min.insync.replicas", "3");
        ctx.fenceBrokers(Set.of(Integer.valueOf(1), Integer.valueOf(2)));
        PartitionRegistration partition = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
        Assertions.assertArrayEquals((int[])new int[]{1, 2}, (int[])partition.elr, (String)partition.toString());
        Assertions.assertArrayEquals((int[])new int[0], (int[])partition.lastKnownElr, (String)partition.toString());
        ctx.handleBrokersShutdown(true, 0);
        partition = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
        Assertions.assertArrayEquals((int[])new int[]{0, 1, 2}, (int[])partition.elr, (String)partition.toString());
        Assertions.assertArrayEquals((int[])new int[]{0}, (int[])partition.lastKnownElr, (String)partition.toString());
        Assertions.assertEquals((int)0, (int)partition.isr.length);
    }

    @Test
    public void testEligibleLeaderReplicas_BrokerFence() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setIsElrEnabled(true).build();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2, 3);
        ctx.unfenceBrokers(0, 1, 2, 3);
        CreateTopicsResponseData.CreatableTopicResult createTopicResult = ctx.createTestTopic("foo", new int[][]{{0, 1, 2, 3}});
        TopicIdPartition topicIdPartition = new TopicIdPartition(createTopicResult.topicId(), 0);
        Assertions.assertEquals((Object)OptionalInt.of(0), (Object)ctx.currentLeader(topicIdPartition));
        ctx.alterTopicConfig("foo", "min.insync.replicas", "3");
        ctx.fenceBrokers(Set.of(Integer.valueOf(2), Integer.valueOf(3)));
        PartitionRegistration partition = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
        Assertions.assertArrayEquals((int[])new int[]{3}, (int[])partition.elr, (String)partition.toString());
        Assertions.assertArrayEquals((int[])new int[0], (int[])partition.lastKnownElr, (String)partition.toString());
        ctx.fenceBrokers(Set.of(Integer.valueOf(1), Integer.valueOf(2), Integer.valueOf(3)));
        partition = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
        Assertions.assertArrayEquals((int[])new int[]{1, 3}, (int[])partition.elr, (String)partition.toString());
        Assertions.assertArrayEquals((int[])new int[0], (int[])partition.lastKnownElr, (String)partition.toString());
        ctx.unfenceBrokers(0, 1, 2, 3);
        partition = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
        Assertions.assertArrayEquals((int[])new int[]{1, 3}, (int[])partition.elr, (String)partition.toString());
        Assertions.assertArrayEquals((int[])new int[0], (int[])partition.lastKnownElr, (String)partition.toString());
    }

    @Test
    public void testEligibleLeaderReplicas_DeleteTopic() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setIsElrEnabled(true).build();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        CreateTopicsResponseData.CreatableTopicResult createTopicResult = ctx.createTestTopic("foo", new int[][]{{0, 1, 2}});
        TopicIdPartition topicIdPartition = new TopicIdPartition(createTopicResult.topicId(), 0);
        Assertions.assertEquals((Object)OptionalInt.of(0), (Object)ctx.currentLeader(topicIdPartition));
        long brokerEpoch = ctx.currentBrokerEpoch(0);
        ctx.alterTopicConfig("foo", "min.insync.replicas", "2");
        AlterPartitionRequestData.PartitionData shrinkIsrRequest = this.newAlterPartition(replicationControl, topicIdPartition, ReplicationControlManagerTest.isrWithDefaultEpoch(0), LeaderRecoveryState.RECOVERED);
        ControllerResult<AlterPartitionResponseData> shrinkIsrResult = this.sendAlterPartition(replicationControl, 0, brokerEpoch, topicIdPartition.topicId(), shrinkIsrRequest);
        AlterPartitionResponseData.PartitionData shrinkIsrResponse = this.assertAlterPartitionResponse(shrinkIsrResult, topicIdPartition, Errors.NONE);
        this.assertConsistentAlterPartitionResponse(replicationControl, topicIdPartition, shrinkIsrResponse);
        PartitionRegistration partition = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
        Assertions.assertArrayEquals((int[])new int[]{1, 2}, (int[])partition.elr, (String)partition.toString());
        Assertions.assertArrayEquals((int[])new int[0], (int[])partition.lastKnownElr, (String)partition.toString());
        Assertions.assertTrue((boolean)replicationControl.brokersToElrs().partitionsWithBrokerInElr(1).hasNext());
        ControllerRequestContext deleteTopicsRequestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.DELETE_TOPICS);
        ctx.deleteTopic(deleteTopicsRequestContext, createTopicResult.topicId());
        Assertions.assertFalse((boolean)replicationControl.brokersToElrs().partitionsWithBrokerInElr(1).hasNext());
        Assertions.assertFalse((boolean)replicationControl.brokersToIsrs().partitionsWithBrokerInIsr(0).hasNext());
    }

    @Test
    public void testEligibleLeaderReplicas_EffectiveMinIsr() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setIsElrEnabled(true).build();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        CreateTopicsResponseData.CreatableTopicResult createTopicResult = ctx.createTestTopic("foo", new int[][]{{0, 1, 2}});
        TopicIdPartition topicIdPartition = new TopicIdPartition(createTopicResult.topicId(), 0);
        Assertions.assertEquals((Object)OptionalInt.of(0), (Object)ctx.currentLeader(topicIdPartition));
        ctx.alterTopicConfig("foo", "min.insync.replicas", "5");
        Assertions.assertEquals((int)3, (int)replicationControl.getTopicEffectiveMinIsr("foo"));
    }

    @Test
    public void testEligibleLeaderReplicas_CleanElection() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setIsElrEnabled(true).build();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2, 3);
        ctx.unfenceBrokers(0, 1, 2, 3);
        CreateTopicsResponseData.CreatableTopicResult createTopicResult = ctx.createTestTopic("foo", new int[][]{{0, 1, 2, 3}});
        TopicIdPartition topicIdPartition = new TopicIdPartition(createTopicResult.topicId(), 0);
        Assertions.assertEquals((Object)OptionalInt.of(0), (Object)ctx.currentLeader(topicIdPartition));
        ctx.alterTopicConfig("foo", "min.insync.replicas", "3");
        ctx.fenceBrokers(Set.of(Integer.valueOf(1), Integer.valueOf(2), Integer.valueOf(3)));
        PartitionRegistration partition = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
        Assertions.assertArrayEquals((int[])new int[]{2, 3}, (int[])partition.elr, (String)partition.toString());
        Assertions.assertArrayEquals((int[])new int[0], (int[])partition.lastKnownElr, (String)partition.toString());
        ctx.unfenceBrokers(2);
        ctx.fenceBrokers(Set.of(Integer.valueOf(0), Integer.valueOf(1)));
        partition = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
        Assertions.assertArrayEquals((int[])new int[]{0, 3}, (int[])partition.elr, (String)partition.toString());
        Assertions.assertArrayEquals((int[])new int[]{2}, (int[])partition.isr, (String)partition.toString());
        Assertions.assertEquals((int)2, (int)partition.leader, (String)partition.toString());
        Assertions.assertArrayEquals((int[])new int[0], (int[])partition.lastKnownElr, (String)partition.toString());
    }

    @Test
    public void testEligibleLeaderReplicas_UncleanShutdown() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setIsElrEnabled(true).build();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2, 3);
        ctx.unfenceBrokers(0, 1, 2, 3);
        CreateTopicsResponseData.CreatableTopicResult createTopicResult = ctx.createTestTopic("foo", new int[][]{{0, 1, 2, 3}});
        TopicIdPartition topicIdPartition = new TopicIdPartition(createTopicResult.topicId(), 0);
        Assertions.assertEquals((Object)OptionalInt.of(0), (Object)ctx.currentLeader(topicIdPartition));
        ctx.alterTopicConfig("foo", "min.insync.replicas", "3");
        ctx.fenceBrokers(Set.of(Integer.valueOf(1), Integer.valueOf(2), Integer.valueOf(3)));
        PartitionRegistration partition = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
        Assertions.assertArrayEquals((int[])new int[]{2, 3}, (int[])partition.elr, (String)partition.toString());
        Assertions.assertArrayEquals((int[])new int[0], (int[])partition.lastKnownElr, (String)partition.toString());
        ctx.handleBrokersShutdown(false, 3);
        partition = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
        Assertions.assertArrayEquals((int[])new int[]{2}, (int[])partition.elr, (String)partition.toString());
        Assertions.assertArrayEquals((int[])new int[0], (int[])partition.lastKnownElr, (String)partition.toString());
        ctx.handleBrokersShutdown(false, 0);
        partition = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
        Assertions.assertArrayEquals((int[])new int[]{2}, (int[])partition.elr, (String)partition.toString());
        Assertions.assertArrayEquals((int[])new int[]{0}, (int[])partition.lastKnownElr, (String)partition.toString());
    }

    @ParameterizedTest
    @ApiKeyVersionsSource(apiKey=ApiKeys.ALTER_PARTITION)
    public void testAlterPartitionHandleUnknownTopicIdOrName(short version) {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        Uuid topicId = Uuid.randomUuid();
        AlterPartitionRequestData request = new AlterPartitionRequestData().setBrokerId(0).setBrokerEpoch(100L).setTopics(List.of(new AlterPartitionRequestData.TopicData().setTopicId(topicId).setPartitions(List.of(new AlterPartitionRequestData.PartitionData().setPartitionIndex(0)))));
        ControllerRequestContext requestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.ALTER_PARTITION, version);
        ControllerResult result = replicationControl.alterPartition(requestContext, request);
        Errors expectedError = Errors.UNKNOWN_TOPIC_ID;
        AlterPartitionResponseData expectedResponse = new AlterPartitionResponseData().setTopics(List.of(new AlterPartitionResponseData.TopicData().setTopicId(topicId).setPartitions(List.of(new AlterPartitionResponseData.PartitionData().setPartitionIndex(0).setErrorCode(expectedError.code())))));
        Assertions.assertEquals((Object)expectedResponse, (Object)result.response());
    }

    @Test
    public void testInvalidAlterPartitionRequests() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        CreateTopicsResponseData.CreatableTopicResult createTopicResult = ctx.createTestTopic("foo", new int[][]{{0, 1, 2}});
        TopicIdPartition topicIdPartition = new TopicIdPartition(createTopicResult.topicId(), 0);
        int leaderId = 0;
        int notLeaderId = 1;
        Assertions.assertEquals((Object)OptionalInt.of(leaderId), (Object)ctx.currentLeader(topicIdPartition));
        long brokerEpoch = ctx.currentBrokerEpoch(0);
        AlterPartitionRequestData.PartitionData invalidLeaderRequest = this.newAlterPartition(replicationControl, topicIdPartition, ReplicationControlManagerTest.isrWithDefaultEpoch(0, 1), LeaderRecoveryState.RECOVERED);
        ControllerResult<AlterPartitionResponseData> invalidLeaderResult = this.sendAlterPartition(replicationControl, notLeaderId, ctx.currentBrokerEpoch(notLeaderId), topicIdPartition.topicId(), invalidLeaderRequest);
        this.assertAlterPartitionResponse(invalidLeaderResult, topicIdPartition, Errors.INVALID_REQUEST);
        AlterPartitionRequestData.PartitionData invalidBrokerEpochRequest = this.newAlterPartition(replicationControl, topicIdPartition, ReplicationControlManagerTest.isrWithDefaultEpoch(0, 1), LeaderRecoveryState.RECOVERED);
        Assertions.assertThrows(StaleBrokerEpochException.class, () -> this.sendAlterPartition(replicationControl, leaderId, brokerEpoch - 1L, topicIdPartition.topicId(), invalidBrokerEpochRequest));
        AlterPartitionRequestData.PartitionData invalidLeaderEpochRequest = this.newAlterPartition(replicationControl, topicIdPartition, ReplicationControlManagerTest.isrWithDefaultEpoch(0, 1), LeaderRecoveryState.RECOVERED);
        invalidLeaderEpochRequest.setLeaderEpoch(500);
        ControllerResult<AlterPartitionResponseData> invalidLeaderEpochResult = this.sendAlterPartition(replicationControl, leaderId, ctx.currentBrokerEpoch(leaderId), topicIdPartition.topicId(), invalidLeaderEpochRequest);
        this.assertAlterPartitionResponse(invalidLeaderEpochResult, topicIdPartition, Errors.NOT_CONTROLLER);
        AlterPartitionRequestData.PartitionData invalidPartitionEpochRequest = this.newAlterPartition(replicationControl, topicIdPartition, ReplicationControlManagerTest.isrWithDefaultEpoch(0, 1), LeaderRecoveryState.RECOVERED);
        invalidPartitionEpochRequest.setPartitionEpoch(500);
        ControllerResult<AlterPartitionResponseData> invalidPartitionEpochResult = this.sendAlterPartition(replicationControl, leaderId, ctx.currentBrokerEpoch(leaderId), topicIdPartition.topicId(), invalidPartitionEpochRequest);
        this.assertAlterPartitionResponse(invalidPartitionEpochResult, topicIdPartition, Errors.NOT_CONTROLLER);
        AlterPartitionRequestData.PartitionData invalidIsrRequest1 = this.newAlterPartition(replicationControl, topicIdPartition, ReplicationControlManagerTest.isrWithDefaultEpoch(0, 1, 3), LeaderRecoveryState.RECOVERED);
        ControllerResult<AlterPartitionResponseData> invalidIsrResult1 = this.sendAlterPartition(replicationControl, leaderId, ctx.currentBrokerEpoch(leaderId), topicIdPartition.topicId(), invalidIsrRequest1);
        this.assertAlterPartitionResponse(invalidIsrResult1, topicIdPartition, Errors.INVALID_REQUEST);
        AlterPartitionRequestData.PartitionData invalidIsrRequest2 = this.newAlterPartition(replicationControl, topicIdPartition, ReplicationControlManagerTest.isrWithDefaultEpoch(1, 2), LeaderRecoveryState.RECOVERED);
        ControllerResult<AlterPartitionResponseData> invalidIsrResult2 = this.sendAlterPartition(replicationControl, leaderId, ctx.currentBrokerEpoch(leaderId), topicIdPartition.topicId(), invalidIsrRequest2);
        this.assertAlterPartitionResponse(invalidIsrResult2, topicIdPartition, Errors.INVALID_REQUEST);
        AlterPartitionRequestData.PartitionData invalidIsrRecoveryRequest = this.newAlterPartition(replicationControl, topicIdPartition, ReplicationControlManagerTest.isrWithDefaultEpoch(0, 1), LeaderRecoveryState.RECOVERING);
        ControllerResult<AlterPartitionResponseData> invalidIsrRecoveryResult = this.sendAlterPartition(replicationControl, leaderId, ctx.currentBrokerEpoch(leaderId), topicIdPartition.topicId(), invalidIsrRecoveryRequest);
        this.assertAlterPartitionResponse(invalidIsrRecoveryResult, topicIdPartition, Errors.INVALID_REQUEST);
        AlterPartitionRequestData.PartitionData invalidRecoveryRequest = this.newAlterPartition(replicationControl, topicIdPartition, ReplicationControlManagerTest.isrWithDefaultEpoch(0), LeaderRecoveryState.RECOVERING);
        ControllerResult<AlterPartitionResponseData> invalidRecoveryResult = this.sendAlterPartition(replicationControl, leaderId, ctx.currentBrokerEpoch(leaderId), topicIdPartition.topicId(), invalidRecoveryRequest);
        this.assertAlterPartitionResponse(invalidRecoveryResult, topicIdPartition, Errors.INVALID_REQUEST);
    }

    private AlterPartitionRequestData.PartitionData newAlterPartition(ReplicationControlManager replicationControl, TopicIdPartition topicIdPartition, List<AlterPartitionRequestData.BrokerState> newIsrWithEpoch, LeaderRecoveryState leaderRecoveryState) {
        PartitionRegistration partitionControl = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
        return new AlterPartitionRequestData.PartitionData().setPartitionIndex(0).setLeaderEpoch(partitionControl.leaderEpoch).setPartitionEpoch(partitionControl.partitionEpoch).setNewIsrWithEpochs(newIsrWithEpoch).setLeaderRecoveryState(leaderRecoveryState.value());
    }

    private ControllerResult<AlterPartitionResponseData> sendAlterPartition(ReplicationControlManager replicationControl, int brokerId, long brokerEpoch, Uuid topicId, AlterPartitionRequestData.PartitionData partitionData) {
        AlterPartitionRequestData request = new AlterPartitionRequestData().setBrokerId(brokerId).setBrokerEpoch(brokerEpoch);
        AlterPartitionRequestData.TopicData topicData = new AlterPartitionRequestData.TopicData().setTopicId(topicId);
        request.topics().add(topicData);
        topicData.partitions().add(partitionData);
        ControllerRequestContext requestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.ALTER_PARTITION);
        ControllerResult result = replicationControl.alterPartition(requestContext, request);
        RecordTestUtils.replayAll(replicationControl, result.records());
        return result;
    }

    private AlterPartitionResponseData.PartitionData assertAlterPartitionResponse(ControllerResult<AlterPartitionResponseData> alterPartitionResult, TopicIdPartition topicIdPartition, Errors expectedError) {
        AlterPartitionResponseData response = (AlterPartitionResponseData)alterPartitionResult.response();
        Assertions.assertEquals((int)1, (int)response.topics().size());
        AlterPartitionResponseData.TopicData topicData = (AlterPartitionResponseData.TopicData)response.topics().get(0);
        Assertions.assertEquals((Object)topicIdPartition.topicId(), (Object)topicData.topicId());
        Assertions.assertEquals((int)1, (int)topicData.partitions().size());
        AlterPartitionResponseData.PartitionData partitionData = (AlterPartitionResponseData.PartitionData)topicData.partitions().get(0);
        Assertions.assertEquals((int)topicIdPartition.partitionId(), (int)partitionData.partitionIndex());
        Assertions.assertEquals((Object)expectedError, (Object)Errors.forCode((short)partitionData.errorCode()));
        return partitionData;
    }

    private void assertConsistentAlterPartitionResponse(ReplicationControlManager replicationControl, TopicIdPartition topicIdPartition, AlterPartitionResponseData.PartitionData partitionData) {
        PartitionRegistration partitionControl = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
        Assertions.assertEquals((int)partitionControl.leader, (int)partitionData.leaderId());
        Assertions.assertEquals((int)partitionControl.leaderEpoch, (int)partitionData.leaderEpoch());
        Assertions.assertEquals((int)partitionControl.partitionEpoch, (int)partitionData.partitionEpoch());
        List expectedIsr = IntStream.of(partitionControl.isr).boxed().collect(Collectors.toList());
        Assertions.assertEquals(expectedIsr, (Object)partitionData.isr());
    }

    private void assertCreatedTopicConfigs(ReplicationControlTestContext ctx, String topic, CreateTopicsRequestData.CreatableTopicConfigCollection requestConfigs) {
        Map configs = ctx.configurationControl.getConfigs(new ConfigResource(ConfigResource.Type.TOPIC, topic));
        Assertions.assertEquals((int)requestConfigs.size(), (int)configs.size());
        for (CreateTopicsRequestData.CreatableTopicConfig requestConfig : requestConfigs) {
            String value = (String)configs.get(requestConfig.name());
            Assertions.assertEquals((Object)requestConfig.value(), (Object)value);
        }
    }

    private void assertEmptyTopicConfigs(ReplicationControlTestContext ctx, String topic) {
        Map configs = ctx.configurationControl.getConfigs(new ConfigResource(ConfigResource.Type.TOPIC, topic));
        Assertions.assertEquals(Map.of(), (Object)configs);
    }

    @Test
    public void testDeleteTopics() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        CreateTopicsRequestData request = new CreateTopicsRequestData();
        CreateTopicsRequestData.CreatableTopicConfigCollection requestConfigs = new CreateTopicsRequestData.CreatableTopicConfigCollection();
        requestConfigs.add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopicConfig().setName("cleanup.policy").setValue("compact"));
        requestConfigs.add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopicConfig().setName("min.cleanable.dirty.ratio").setValue("0.1"));
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions(3).setReplicationFactor((short)2).setConfigs(requestConfigs));
        ctx.registerBrokers(0, 1);
        ctx.unfenceBrokers(0, 1);
        ControllerRequestContext createTopicsRequestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.CREATE_TOPICS);
        ControllerResult createResult = replicationControl.createTopics(createTopicsRequestContext, request, Set.of("foo"));
        CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData();
        Uuid topicId = ((CreateTopicsResponseData)createResult.response()).topics().find("foo").topicId();
        expectedResponse.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsResponseData.CreatableTopicResult().setName("foo").setNumPartitions(3).setReplicationFactor((short)2).setErrorMessage(null).setErrorCode((short)0).setTopicId(topicId));
        Assertions.assertEquals((Object)expectedResponse, (Object)ReplicationControlManagerTest.withoutConfigs((CreateTopicsResponseData)createResult.response()));
        Assertions.assertNull((Object)replicationControl.getPartition(topicId, 0));
        this.assertEmptyTopicConfigs(ctx, "foo");
        ctx.replay(createResult.records());
        Assertions.assertNotNull((Object)replicationControl.getPartition(topicId, 0));
        Assertions.assertNotNull((Object)replicationControl.getPartition(topicId, 1));
        Assertions.assertNotNull((Object)replicationControl.getPartition(topicId, 2));
        Assertions.assertNull((Object)replicationControl.getPartition(topicId, 3));
        this.assertCreatedTopicConfigs(ctx, "foo", requestConfigs);
        Assertions.assertEquals(Map.of(topicId, new ResultOrError((Object)"foo")), (Object)replicationControl.findTopicNames(Long.MAX_VALUE, Set.of(topicId)));
        Assertions.assertEquals(Map.of("foo", new ResultOrError((Object)topicId)), (Object)replicationControl.findTopicIds(Long.MAX_VALUE, Set.of("foo")));
        Uuid invalidId = new Uuid(topicId.getMostSignificantBits() + 1L, topicId.getLeastSignificantBits());
        Assertions.assertEquals(Map.of(invalidId, new ResultOrError(new ApiError(Errors.UNKNOWN_TOPIC_ID))), (Object)replicationControl.findTopicNames(Long.MAX_VALUE, Set.of(invalidId)));
        Assertions.assertEquals(Map.of("bar", new ResultOrError(new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION))), (Object)replicationControl.findTopicIds(Long.MAX_VALUE, Set.of("bar")));
        ControllerRequestContext deleteTopicsRequestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.DELETE_TOPICS);
        ControllerResult invalidDeleteResult = replicationControl.deleteTopics(deleteTopicsRequestContext, List.of(invalidId));
        Assertions.assertEquals((int)0, (int)invalidDeleteResult.records().size());
        Assertions.assertEquals(Map.of(invalidId, new ApiError(Errors.UNKNOWN_TOPIC_ID, null)), (Object)invalidDeleteResult.response());
        ControllerResult deleteResult = replicationControl.deleteTopics(deleteTopicsRequestContext, List.of(topicId));
        Assertions.assertTrue((boolean)deleteResult.isAtomic());
        Assertions.assertEquals(Map.of(topicId, new ApiError(Errors.NONE, null)), (Object)deleteResult.response());
        Assertions.assertEquals((int)1, (int)deleteResult.records().size());
        ctx.replay(deleteResult.records());
        Assertions.assertNull((Object)replicationControl.getPartition(topicId, 0));
        Assertions.assertNull((Object)replicationControl.getPartition(topicId, 1));
        Assertions.assertNull((Object)replicationControl.getPartition(topicId, 2));
        Assertions.assertNull((Object)replicationControl.getPartition(topicId, 3));
        Assertions.assertEquals(Map.of(topicId, new ResultOrError(new ApiError(Errors.UNKNOWN_TOPIC_ID))), (Object)replicationControl.findTopicNames(Long.MAX_VALUE, Set.of(topicId)));
        Assertions.assertEquals(Map.of("foo", new ResultOrError(new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION))), (Object)replicationControl.findTopicIds(Long.MAX_VALUE, Set.of("foo")));
        this.assertEmptyTopicConfigs(ctx, "foo");
    }

    @Test
    public void testDeleteTopicsWithMutationQuotaExceeded() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        CreateTopicsRequestData request = new CreateTopicsRequestData();
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions(3).setReplicationFactor((short)2));
        ctx.registerBrokers(0, 1);
        ctx.unfenceBrokers(0, 1);
        ControllerRequestContext createTopicsRequestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.CREATE_TOPICS);
        ControllerResult createResult = replicationControl.createTopics(createTopicsRequestContext, request, Set.of("foo"));
        CreateTopicsResponseData.CreatableTopicResult createdTopic = ((CreateTopicsResponseData)createResult.response()).topics().find("foo");
        Assertions.assertEquals((short)Errors.NONE.code(), (short)createdTopic.errorCode());
        ctx.replay(createResult.records());
        ControllerRequestContext deleteTopicsRequestContext = ControllerRequestContextUtil.anonymousContextWithMutationQuotaExceededFor(ApiKeys.DELETE_TOPICS);
        Uuid topicId = createdTopic.topicId();
        ControllerResult deleteResult = replicationControl.deleteTopics(deleteTopicsRequestContext, List.of(topicId));
        Assertions.assertEquals(Map.of(topicId, new ApiError(Errors.THROTTLING_QUOTA_EXCEEDED, "Quota exceeded in test")), (Object)deleteResult.response());
        Assertions.assertEquals((int)0, (int)deleteResult.records().size());
    }

    @Test
    public void testCreatePartitions() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        CreateTopicsRequestData request = new CreateTopicsRequestData();
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions(3).setReplicationFactor((short)2));
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("bar").setNumPartitions(4).setReplicationFactor((short)2));
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("quux").setNumPartitions(2).setReplicationFactor((short)2));
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("foo2").setNumPartitions(2).setReplicationFactor((short)2));
        ctx.registerBrokersWithDirs(0, List.of(), 1, List.of(Uuid.fromString((String)"QMzamNQVQ7GnJK9DwQHG7Q"), Uuid.fromString((String)"loDxEBLETdedNnQGOKKENw")), 3, List.of(Uuid.fromString((String)"dxCDSgNjQvS4WuyqEKoCwA")));
        ctx.unfenceBrokers(0, 1, 3);
        ControllerRequestContext requestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.CREATE_TOPICS);
        ControllerResult createTopicResult = replicationControl.createTopics(requestContext, request, new HashSet<String>(List.of("foo", "bar", "quux", "foo2")));
        ctx.replay(createTopicResult.records());
        ArrayList<CreatePartitionsRequestData.CreatePartitionsTopic> topics = new ArrayList<CreatePartitionsRequestData.CreatePartitionsTopic>();
        topics.add(new CreatePartitionsRequestData.CreatePartitionsTopic().setName("foo").setCount(5).setAssignments(null));
        topics.add(new CreatePartitionsRequestData.CreatePartitionsTopic().setName("bar").setCount(3).setAssignments(null));
        topics.add(new CreatePartitionsRequestData.CreatePartitionsTopic().setName("baz").setCount(3).setAssignments(null));
        topics.add(new CreatePartitionsRequestData.CreatePartitionsTopic().setName("quux").setCount(2).setAssignments(null));
        ControllerResult createPartitionsResult = replicationControl.createPartitions(requestContext, topics);
        Assertions.assertEquals(List.of(new CreatePartitionsResponseData.CreatePartitionsTopicResult().setName("foo").setErrorCode(Errors.NONE.code()).setErrorMessage(null), new CreatePartitionsResponseData.CreatePartitionsTopicResult().setName("bar").setErrorCode(Errors.INVALID_PARTITIONS.code()).setErrorMessage("The topic bar currently has 4 partition(s); 3 would not be an increase."), new CreatePartitionsResponseData.CreatePartitionsTopicResult().setName("baz").setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()).setErrorMessage(null), new CreatePartitionsResponseData.CreatePartitionsTopicResult().setName("quux").setErrorCode(Errors.INVALID_PARTITIONS.code()).setErrorMessage("Topic already has 2 partition(s).")), (Object)createPartitionsResult.response());
        ctx.replay(createPartitionsResult.records());
        ArrayList<CreatePartitionsRequestData.CreatePartitionsTopic> topics2 = new ArrayList<CreatePartitionsRequestData.CreatePartitionsTopic>();
        topics2.add(new CreatePartitionsRequestData.CreatePartitionsTopic().setName("foo").setCount(6).setAssignments(List.of(new CreatePartitionsRequestData.CreatePartitionsAssignment().setBrokerIds(List.of(Integer.valueOf(1), Integer.valueOf(3))))));
        topics2.add(new CreatePartitionsRequestData.CreatePartitionsTopic().setName("bar").setCount(5).setAssignments(List.of(new CreatePartitionsRequestData.CreatePartitionsAssignment().setBrokerIds(List.of(Integer.valueOf(1))))));
        topics2.add(new CreatePartitionsRequestData.CreatePartitionsTopic().setName("quux").setCount(4).setAssignments(List.of(new CreatePartitionsRequestData.CreatePartitionsAssignment().setBrokerIds(List.of(Integer.valueOf(1), Integer.valueOf(0))))));
        topics2.add(new CreatePartitionsRequestData.CreatePartitionsTopic().setName("foo2").setCount(3).setAssignments(List.of(new CreatePartitionsRequestData.CreatePartitionsAssignment().setBrokerIds(List.of(Integer.valueOf(2), Integer.valueOf(0))))));
        ControllerResult createPartitionsResult2 = replicationControl.createPartitions(requestContext, topics2);
        Assertions.assertEquals(List.of(new CreatePartitionsResponseData.CreatePartitionsTopicResult().setName("foo").setErrorCode(Errors.NONE.code()).setErrorMessage(null), new CreatePartitionsResponseData.CreatePartitionsTopicResult().setName("bar").setErrorCode(Errors.INVALID_REPLICA_ASSIGNMENT.code()).setErrorMessage("The manual partition assignment includes a partition with 1 replica(s), but this is not consistent with previous partitions, which have 2 replica(s)."), new CreatePartitionsResponseData.CreatePartitionsTopicResult().setName("quux").setErrorCode(Errors.INVALID_REPLICA_ASSIGNMENT.code()).setErrorMessage("Attempted to add 2 additional partition(s), but only 1 assignment(s) were specified."), new CreatePartitionsResponseData.CreatePartitionsTopicResult().setName("foo2").setErrorCode(Errors.INVALID_REPLICA_ASSIGNMENT.code()).setErrorMessage("The manual partition assignment includes broker 2, but no such broker is registered.")), (Object)createPartitionsResult2.response());
        ctx.replay(createPartitionsResult2.records());
        Assertions.assertArrayEquals((Object[])new Uuid[]{DirectoryId.UNASSIGNED, Uuid.fromString((String)"dxCDSgNjQvS4WuyqEKoCwA")}, (Object[])replicationControl.getPartition((Uuid)replicationControl.getTopicId((String)"foo"), (int)5).directories);
    }

    @Test
    public void testCreatePartitionsWithMutationQuotaExceeded() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        CreateTopicsRequestData request = new CreateTopicsRequestData();
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions(3).setReplicationFactor((short)2));
        ctx.registerBrokers(0, 1);
        ctx.unfenceBrokers(0, 1);
        ControllerRequestContext createTopicsRequestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.CREATE_TOPICS);
        ControllerResult createResult = replicationControl.createTopics(createTopicsRequestContext, request, Set.of("foo"));
        CreateTopicsResponseData.CreatableTopicResult createdTopic = ((CreateTopicsResponseData)createResult.response()).topics().find("foo");
        Assertions.assertEquals((short)Errors.NONE.code(), (short)createdTopic.errorCode());
        ctx.replay(createResult.records());
        ArrayList<CreatePartitionsRequestData.CreatePartitionsTopic> topics = new ArrayList<CreatePartitionsRequestData.CreatePartitionsTopic>();
        topics.add(new CreatePartitionsRequestData.CreatePartitionsTopic().setName("foo").setCount(5).setAssignments(null));
        ControllerRequestContext createPartitionsRequestContext = ControllerRequestContextUtil.anonymousContextWithMutationQuotaExceededFor(ApiKeys.CREATE_PARTITIONS);
        ControllerResult createPartitionsResult = replicationControl.createPartitions(createPartitionsRequestContext, topics);
        List<CreatePartitionsResponseData.CreatePartitionsTopicResult> expectedThrottled = List.of(new CreatePartitionsResponseData.CreatePartitionsTopicResult().setName("foo").setErrorCode(Errors.THROTTLING_QUOTA_EXCEEDED.code()).setErrorMessage("Quota exceeded in test"));
        Assertions.assertEquals(expectedThrottled, (Object)createPartitionsResult.response());
        ArrayList<CreatePartitionsRequestData.CreatePartitionsTopic> topics2 = new ArrayList<CreatePartitionsRequestData.CreatePartitionsTopic>();
        topics2.add(new CreatePartitionsRequestData.CreatePartitionsTopic().setName("foo").setCount(4).setAssignments(List.of(new CreatePartitionsRequestData.CreatePartitionsAssignment().setBrokerIds(List.of(Integer.valueOf(1), Integer.valueOf(0))))));
        ControllerResult createPartitionsResult2 = replicationControl.createPartitions(createPartitionsRequestContext, topics2);
        Assertions.assertEquals(expectedThrottled, (Object)createPartitionsResult2.response());
    }

    @Test
    public void testCreatePartitionsFailsWhenAllBrokersAreFencedOrInControlledShutdown() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        CreateTopicsRequestData request = new CreateTopicsRequestData();
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions(1).setReplicationFactor((short)2));
        ctx.registerBrokers(0, 1);
        ctx.unfenceBrokers(0, 1);
        ControllerRequestContext requestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.CREATE_TOPICS);
        ControllerResult createTopicResult = replicationControl.createTopics(requestContext, request, new HashSet<String>(List.of("foo")));
        ctx.replay(createTopicResult.records());
        ctx.registerBrokers(0, 1);
        ctx.unfenceBrokers(0);
        ctx.inControlledShutdownBrokers(0);
        ArrayList<CreatePartitionsRequestData.CreatePartitionsTopic> topics = new ArrayList<CreatePartitionsRequestData.CreatePartitionsTopic>();
        topics.add(new CreatePartitionsRequestData.CreatePartitionsTopic().setName("foo").setCount(2).setAssignments(null));
        ControllerResult createPartitionsResult = replicationControl.createPartitions(requestContext, topics);
        Assertions.assertEquals(List.of(new CreatePartitionsResponseData.CreatePartitionsTopicResult().setName("foo").setErrorCode(Errors.INVALID_REPLICATION_FACTOR.code()).setErrorMessage("Unable to replicate the partition 2 time(s): All brokers are currently fenced or in controlled shutdown.")), (Object)createPartitionsResult.response());
    }

    @Test
    public void testCreatePartitionsISRInvariants() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        CreateTopicsRequestData request = new CreateTopicsRequestData();
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions(1).setReplicationFactor((short)3));
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1);
        ctx.inControlledShutdownBrokers(1);
        ControllerRequestContext requestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.CREATE_TOPICS);
        ControllerResult result = replicationControl.createTopics(requestContext, request, Set.of("foo"));
        ctx.replay(result.records());
        List<CreatePartitionsRequestData.CreatePartitionsTopic> topics = List.of(new CreatePartitionsRequestData.CreatePartitionsTopic().setName("foo").setCount(2).setAssignments(null));
        ControllerResult createPartitionsResult = replicationControl.createPartitions(requestContext, topics);
        ctx.replay(createPartitionsResult.records());
        Assertions.assertEquals((Object)new PartitionRegistration.Builder().setReplicas(new int[]{0, 1, 2}).setDirectories(new Uuid[]{Uuid.fromString((String)"TESTBROKER00000DIRAAAA"), Uuid.fromString((String)"TESTBROKER00001DIRAAAA"), Uuid.fromString((String)"TESTBROKER00002DIRAAAA")}).setIsr(new int[]{0}).setLeader(Integer.valueOf(0)).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(Integer.valueOf(0)).setPartitionEpoch(Integer.valueOf(0)).build(), (Object)replicationControl.getPartition(((TopicRecord)((ApiMessageAndVersion)result.records().get(0)).message()).topicId(), 1));
    }

    @Test
    public void testValidateGoodManualPartitionAssignments() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ctx.registerBrokers(1, 2, 3);
        ctx.replicationControl.validateManualPartitionAssignment(PartitionAssignmentTest.partitionAssignment(List.of(Integer.valueOf(1))), OptionalInt.of(1));
        ctx.replicationControl.validateManualPartitionAssignment(PartitionAssignmentTest.partitionAssignment(List.of(Integer.valueOf(1))), OptionalInt.empty());
        ctx.replicationControl.validateManualPartitionAssignment(PartitionAssignmentTest.partitionAssignment(List.of(Integer.valueOf(1), Integer.valueOf(2), Integer.valueOf(3))), OptionalInt.of(3));
        ctx.replicationControl.validateManualPartitionAssignment(PartitionAssignmentTest.partitionAssignment(List.of(Integer.valueOf(1), Integer.valueOf(2), Integer.valueOf(3))), OptionalInt.empty());
    }

    @Test
    public void testValidateBadManualPartitionAssignments() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ctx.registerBrokers(1, 2);
        Assertions.assertEquals((Object)"The manual partition assignment includes an empty replica list.", (Object)((InvalidReplicaAssignmentException)Assertions.assertThrows(InvalidReplicaAssignmentException.class, () -> ctx.replicationControl.validateManualPartitionAssignment(PartitionAssignmentTest.partitionAssignment(List.of()), OptionalInt.empty()))).getMessage());
        Assertions.assertEquals((Object)"The manual partition assignment includes broker 3, but no such broker is registered.", (Object)((InvalidReplicaAssignmentException)Assertions.assertThrows(InvalidReplicaAssignmentException.class, () -> ctx.replicationControl.validateManualPartitionAssignment(PartitionAssignmentTest.partitionAssignment(List.of(Integer.valueOf(1), Integer.valueOf(2), Integer.valueOf(3))), OptionalInt.empty()))).getMessage());
        Assertions.assertEquals((Object)"The manual partition assignment includes the broker 2 more than once.", (Object)((InvalidReplicaAssignmentException)Assertions.assertThrows(InvalidReplicaAssignmentException.class, () -> ctx.replicationControl.validateManualPartitionAssignment(PartitionAssignmentTest.partitionAssignment(List.of(Integer.valueOf(1), Integer.valueOf(2), Integer.valueOf(2))), OptionalInt.empty()))).getMessage());
        Assertions.assertEquals((Object)"The manual partition assignment includes a partition with 2 replica(s), but this is not consistent with previous partitions, which have 3 replica(s).", (Object)((InvalidReplicaAssignmentException)Assertions.assertThrows(InvalidReplicaAssignmentException.class, () -> ctx.replicationControl.validateManualPartitionAssignment(PartitionAssignmentTest.partitionAssignment(List.of(Integer.valueOf(1), Integer.valueOf(2))), OptionalInt.of(3)))).getMessage());
    }

    @ParameterizedTest
    @ApiKeyVersionsSource(apiKey=ApiKeys.ALTER_PARTITION)
    public void testReassignPartitions(short version) {
        MetadataVersion metadataVersion = MetadataVersion.latestTesting();
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setMetadataVersion(metadataVersion).build();
        ReplicationControlManager replication = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2, 3);
        ctx.unfenceBrokers(0, 1, 2, 3);
        Uuid fooId = ctx.createTestTopic("foo", new int[][]{{1, 2, 3}, {3, 2, 1}}).topicId();
        ctx.createTestTopic("bar", new int[][]{{1, 2, 3}}).topicId();
        Assertions.assertEquals((Object)NONE_REASSIGNING, (Object)replication.listPartitionReassignments(null, Long.MAX_VALUE));
        ControllerResult alterResult = replication.alterPartitionReassignments(new AlterPartitionReassignmentsRequestData().setTopics(List.of(new AlterPartitionReassignmentsRequestData.ReassignableTopic().setName("foo").setPartitions(List.of(new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(0).setReplicas(List.of(Integer.valueOf(3), Integer.valueOf(2), Integer.valueOf(1))), new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(1).setReplicas(List.of(Integer.valueOf(0), Integer.valueOf(2), Integer.valueOf(1))), new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(2).setReplicas(List.of(Integer.valueOf(0), Integer.valueOf(2), Integer.valueOf(1))))), new AlterPartitionReassignmentsRequestData.ReassignableTopic().setName("bar"))));
        Assertions.assertEquals((Object)new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(List.of(new AlterPartitionReassignmentsResponseData.ReassignableTopicResponse().setName("foo").setPartitions(List.of(new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(0).setErrorMessage(null), new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(1).setErrorMessage(null), new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(2).setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()).setErrorMessage("Unable to find partition foo:2."))), new AlterPartitionReassignmentsResponseData.ReassignableTopicResponse().setName("bar"))), (Object)alterResult.response());
        ctx.replay(alterResult.records());
        ListPartitionReassignmentsResponseData currentReassigning = new ListPartitionReassignmentsResponseData().setErrorMessage(null).setTopics(List.of(new ListPartitionReassignmentsResponseData.OngoingTopicReassignment().setName("foo").setPartitions(List.of(new ListPartitionReassignmentsResponseData.OngoingPartitionReassignment().setPartitionIndex(1).setRemovingReplicas(List.of(Integer.valueOf(3))).setAddingReplicas(List.of(Integer.valueOf(0))).setReplicas(List.of(Integer.valueOf(0), Integer.valueOf(2), Integer.valueOf(1), Integer.valueOf(3)))))));
        Assertions.assertEquals((Object)currentReassigning, (Object)replication.listPartitionReassignments(null, Long.MAX_VALUE));
        Assertions.assertEquals((Object)NONE_REASSIGNING, (Object)replication.listPartitionReassignments(List.of(new ListPartitionReassignmentsRequestData.ListPartitionReassignmentsTopics().setName("bar").setPartitionIndexes(List.of(Integer.valueOf(0), Integer.valueOf(1), Integer.valueOf(2)))), Long.MAX_VALUE));
        Assertions.assertEquals((Object)currentReassigning, (Object)replication.listPartitionReassignments(List.of(new ListPartitionReassignmentsRequestData.ListPartitionReassignmentsTopics().setName("foo").setPartitionIndexes(List.of(Integer.valueOf(0), Integer.valueOf(1), Integer.valueOf(2)))), Long.MAX_VALUE));
        ControllerResult cancelResult = replication.alterPartitionReassignments(new AlterPartitionReassignmentsRequestData().setTopics(List.of(new AlterPartitionReassignmentsRequestData.ReassignableTopic().setName("foo").setPartitions(List.of(new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(0).setReplicas(null), new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(1).setReplicas(null), new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(2).setReplicas(null))), new AlterPartitionReassignmentsRequestData.ReassignableTopic().setName("bar").setPartitions(List.of(new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(0).setReplicas(null))))));
        Assertions.assertEquals((Object)ControllerResult.atomicOf(List.of(new ApiMessageAndVersion((ApiMessage)new PartitionChangeRecord().setTopicId(fooId).setPartitionId(1).setReplicas(List.of(Integer.valueOf(2), Integer.valueOf(1), Integer.valueOf(3))).setDirectories(List.of(Uuid.fromString((String)"TESTBROKER00002DIRAAAA"), Uuid.fromString((String)"TESTBROKER00001DIRAAAA"), Uuid.fromString((String)"TESTBROKER00003DIRAAAA"))).setLeader(3).setRemovingReplicas(List.of()).setAddingReplicas(List.of()), MetadataVersion.latestTesting().partitionChangeRecordVersion())), (Object)new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(List.of(new AlterPartitionReassignmentsResponseData.ReassignableTopicResponse().setName("foo").setPartitions(List.of(new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(0).setErrorCode(Errors.NO_REASSIGNMENT_IN_PROGRESS.code()).setErrorMessage(null), new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(1).setErrorCode(Errors.NONE.code()).setErrorMessage(null), new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(2).setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()).setErrorMessage("Unable to find partition foo:2."))), new AlterPartitionReassignmentsResponseData.ReassignableTopicResponse().setName("bar").setPartitions(List.of(new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(0).setErrorCode(Errors.NO_REASSIGNMENT_IN_PROGRESS.code()).setErrorMessage(null)))))), (Object)cancelResult);
        log.info("running final alterPartition...");
        ControllerRequestContext requestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.ALTER_PARTITION, version);
        AlterPartitionRequestData alterPartitionRequestData = new AlterPartitionRequestData().setBrokerId(3).setBrokerEpoch(103L).setTopics(List.of(new AlterPartitionRequestData.TopicData().setTopicId(fooId).setPartitions(List.of(new AlterPartitionRequestData.PartitionData().setPartitionIndex(1).setPartitionEpoch(1).setLeaderEpoch(0).setNewIsrWithEpochs(ReplicationControlManagerTest.isrWithDefaultEpoch(3, 0, 2, 1))))));
        ControllerResult alterPartitionResult = replication.alterPartition(requestContext, new AlterPartitionRequest.Builder(alterPartitionRequestData).build(version).data());
        Errors expectedError = Errors.NEW_LEADER_ELECTED;
        Assertions.assertEquals((Object)new AlterPartitionResponseData().setTopics(List.of(new AlterPartitionResponseData.TopicData().setTopicId(fooId).setPartitions(List.of(new AlterPartitionResponseData.PartitionData().setPartitionIndex(1).setErrorCode(expectedError.code()))))), (Object)alterPartitionResult.response());
        ctx.replay(alterPartitionResult.records());
        Assertions.assertEquals((Object)NONE_REASSIGNING, (Object)replication.listPartitionReassignments(null, Long.MAX_VALUE));
    }

    @ParameterizedTest
    @ApiKeyVersionsSource(apiKey=ApiKeys.ALTER_PARTITION)
    public void testAlterPartitionDisallowReplicationFactorChange(short version) {
        MetadataVersion metadataVersion = MetadataVersion.latestTesting();
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setMetadataVersion(metadataVersion).build();
        ReplicationControlManager replication = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2, 3);
        ctx.unfenceBrokers(0, 1, 2, 3);
        ctx.createTestTopic("foo", new int[][]{{0, 1, 2}, {0, 1, 2}, {0, 1, 2}});
        ControllerResult alterResult = replication.alterPartitionReassignments(new AlterPartitionReassignmentsRequestData().setTopics(List.of(new AlterPartitionReassignmentsRequestData.ReassignableTopic().setName("foo").setPartitions(List.of(new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(0).setReplicas(List.of(Integer.valueOf(1), Integer.valueOf(2), Integer.valueOf(3))), new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(1).setReplicas(List.of(Integer.valueOf(0), Integer.valueOf(1))), new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(2).setReplicas(List.of(Integer.valueOf(0), Integer.valueOf(1), Integer.valueOf(2), Integer.valueOf(3))))))).setAllowReplicationFactorChange(false));
        Assertions.assertEquals((Object)new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setAllowReplicationFactorChange(false).setResponses(List.of(new AlterPartitionReassignmentsResponseData.ReassignableTopicResponse().setName("foo").setPartitions(List.of(new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(0).setErrorMessage(null), new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(1).setErrorCode(Errors.INVALID_REPLICATION_FACTOR.code()).setErrorMessage("The replication factor is changed from 3 to 2"), new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(2).setErrorCode(Errors.INVALID_REPLICATION_FACTOR.code()).setErrorMessage("The replication factor is changed from 3 to 4"))))), (Object)alterResult.response());
        ctx.replay(alterResult.records());
        ListPartitionReassignmentsResponseData currentReassigning = new ListPartitionReassignmentsResponseData().setErrorMessage(null).setTopics(List.of(new ListPartitionReassignmentsResponseData.OngoingTopicReassignment().setName("foo").setPartitions(List.of(new ListPartitionReassignmentsResponseData.OngoingPartitionReassignment().setPartitionIndex(0).setRemovingReplicas(List.of(Integer.valueOf(0))).setAddingReplicas(List.of(Integer.valueOf(3))).setReplicas(List.of(Integer.valueOf(1), Integer.valueOf(2), Integer.valueOf(3), Integer.valueOf(0)))))));
        Assertions.assertEquals((Object)currentReassigning, (Object)replication.listPartitionReassignments(List.of(new ListPartitionReassignmentsRequestData.ListPartitionReassignmentsTopics().setName("foo").setPartitionIndexes(List.of(Integer.valueOf(0), Integer.valueOf(1), Integer.valueOf(2)))), Long.MAX_VALUE));
        ControllerResult alterReassigningResult = replication.alterPartitionReassignments(new AlterPartitionReassignmentsRequestData().setTopics(List.of(new AlterPartitionReassignmentsRequestData.ReassignableTopic().setName("foo").setPartitions(List.of(new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(0).setReplicas(List.of(Integer.valueOf(0), Integer.valueOf(1))))))).setAllowReplicationFactorChange(false));
        Assertions.assertEquals((Object)new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setAllowReplicationFactorChange(false).setResponses(List.of(new AlterPartitionReassignmentsResponseData.ReassignableTopicResponse().setName("foo").setPartitions(List.of(new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(0).setErrorCode(Errors.INVALID_REPLICATION_FACTOR.code()).setErrorMessage("The replication factor is changed from 3 to 2"))))), (Object)alterReassigningResult.response());
        ControllerResult alterReassigningResult2 = replication.alterPartitionReassignments(new AlterPartitionReassignmentsRequestData().setTopics(List.of(new AlterPartitionReassignmentsRequestData.ReassignableTopic().setName("foo").setPartitions(List.of(new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(0).setReplicas(List.of(Integer.valueOf(0), Integer.valueOf(2), Integer.valueOf(3))))))).setAllowReplicationFactorChange(false));
        Assertions.assertEquals((Object)new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setAllowReplicationFactorChange(false).setResponses(List.of(new AlterPartitionReassignmentsResponseData.ReassignableTopicResponse().setName("foo").setPartitions(List.of(new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(0).setErrorMessage(null))))), (Object)alterReassigningResult2.response());
    }

    @ParameterizedTest
    @ApiKeyVersionsSource(apiKey=ApiKeys.ALTER_PARTITION)
    public void testDisallowReplicationFactorChangeNoEffectWhenCancelAlterPartition(short version) {
        MetadataVersion metadataVersion = MetadataVersion.latestTesting();
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setMetadataVersion(metadataVersion).build();
        ReplicationControlManager replication = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2, 3);
        ctx.unfenceBrokers(0, 1, 2, 3);
        ctx.createTestTopic("foo", new int[][]{{0, 1, 2}}).topicId();
        ControllerResult alterResult = replication.alterPartitionReassignments(new AlterPartitionReassignmentsRequestData().setTopics(List.of(new AlterPartitionReassignmentsRequestData.ReassignableTopic().setName("foo").setPartitions(List.of(new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(0).setReplicas(List.of(Integer.valueOf(1), Integer.valueOf(2), Integer.valueOf(3))))))));
        Assertions.assertEquals((Object)new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(List.of(new AlterPartitionReassignmentsResponseData.ReassignableTopicResponse().setName("foo").setPartitions(List.of(new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(0).setErrorMessage(null))))), (Object)alterResult.response());
        ctx.replay(alterResult.records());
        ListPartitionReassignmentsResponseData currentReassigning = new ListPartitionReassignmentsResponseData().setErrorMessage(null).setTopics(List.of(new ListPartitionReassignmentsResponseData.OngoingTopicReassignment().setName("foo").setPartitions(List.of(new ListPartitionReassignmentsResponseData.OngoingPartitionReassignment().setPartitionIndex(0).setRemovingReplicas(List.of(Integer.valueOf(0))).setAddingReplicas(List.of(Integer.valueOf(3))).setReplicas(List.of(Integer.valueOf(1), Integer.valueOf(2), Integer.valueOf(3), Integer.valueOf(0)))))));
        Assertions.assertEquals((Object)currentReassigning, (Object)replication.listPartitionReassignments(List.of(new ListPartitionReassignmentsRequestData.ListPartitionReassignmentsTopics().setName("foo").setPartitionIndexes(List.of(Integer.valueOf(0), Integer.valueOf(1), Integer.valueOf(2)))), Long.MAX_VALUE));
        ControllerResult cancelResult = replication.alterPartitionReassignments(new AlterPartitionReassignmentsRequestData().setTopics(List.of(new AlterPartitionReassignmentsRequestData.ReassignableTopic().setName("foo").setPartitions(List.of(new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(0).setReplicas(null))))).setAllowReplicationFactorChange(false));
        Assertions.assertEquals((Object)new AlterPartitionReassignmentsResponseData().setAllowReplicationFactorChange(false).setErrorMessage(null).setResponses(List.of(new AlterPartitionReassignmentsResponseData.ReassignableTopicResponse().setName("foo").setPartitions(List.of(new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(0).setErrorMessage(null))))), (Object)cancelResult.response());
        ctx.replay(cancelResult.records());
        Assertions.assertEquals((Object)NONE_REASSIGNING, (Object)replication.listPartitionReassignments(null, Long.MAX_VALUE));
    }

    @ParameterizedTest
    @ApiKeyVersionsSource(apiKey=ApiKeys.ALTER_PARTITION)
    public void testAlterPartitionShouldRejectFencedBrokers(short version) {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replication = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2, 3, 4);
        ctx.unfenceBrokers(0, 1, 2, 3, 4);
        Uuid fooId = ctx.createTestTopic("foo", new int[][]{{1, 2, 3, 4}}).topicId();
        ArrayList<Object> fenceRecords = new ArrayList<ApiMessageAndVersion>();
        replication.handleBrokerFenced(3, fenceRecords);
        ctx.replay(fenceRecords);
        Assertions.assertEquals((Object)new PartitionRegistration.Builder().setReplicas(new int[]{1, 2, 3, 4}).setDirectories(new Uuid[]{Uuid.fromString((String)"TESTBROKER00001DIRAAAA"), Uuid.fromString((String)"TESTBROKER00002DIRAAAA"), Uuid.fromString((String)"TESTBROKER00003DIRAAAA"), Uuid.fromString((String)"TESTBROKER00004DIRAAAA")}).setIsr(new int[]{1, 2, 4}).setLeader(Integer.valueOf(1)).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(Integer.valueOf(0)).setPartitionEpoch(Integer.valueOf(1)).build(), (Object)replication.getPartition(fooId, 0));
        AlterPartitionRequestData alterIsrRequest = new AlterPartitionRequestData().setBrokerId(1).setBrokerEpoch(101L).setTopics(List.of(new AlterPartitionRequestData.TopicData().setTopicId(fooId).setPartitions(List.of(new AlterPartitionRequestData.PartitionData().setPartitionIndex(0).setPartitionEpoch(1).setLeaderEpoch(0).setNewIsrWithEpochs(ReplicationControlManagerTest.isrWithDefaultEpoch(1, 2, 3, 4))))));
        ControllerRequestContext requestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.ALTER_PARTITION, version);
        ControllerResult alterPartitionResult = replication.alterPartition(requestContext, new AlterPartitionRequest.Builder(alterIsrRequest).build(version).data());
        Errors expectedError = Errors.INELIGIBLE_REPLICA;
        Assertions.assertEquals((Object)new AlterPartitionResponseData().setTopics(List.of(new AlterPartitionResponseData.TopicData().setTopicId(fooId).setPartitions(List.of(new AlterPartitionResponseData.PartitionData().setPartitionIndex(0).setErrorCode(expectedError.code()))))), (Object)alterPartitionResult.response());
        fenceRecords = new ArrayList();
        replication.handleBrokerUnfenced(3, 103L, fenceRecords);
        ctx.replay(fenceRecords);
        alterPartitionResult = replication.alterPartition(requestContext, alterIsrRequest);
        Assertions.assertEquals((Object)new AlterPartitionResponseData().setTopics(List.of(new AlterPartitionResponseData.TopicData().setTopicId(fooId).setPartitions(List.of(new AlterPartitionResponseData.PartitionData().setPartitionIndex(0).setLeaderId(1).setLeaderEpoch(0).setIsr(List.of(Integer.valueOf(1), Integer.valueOf(2), Integer.valueOf(3), Integer.valueOf(4))).setPartitionEpoch(2).setErrorCode(Errors.NONE.code()))))), (Object)alterPartitionResult.response());
    }

    @ParameterizedTest
    @ApiKeyVersionsSource(apiKey=ApiKeys.ALTER_PARTITION)
    public void testAlterPartitionShouldRejectBrokersWithStaleEpoch(short version) {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replication = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2, 3, 4);
        ctx.unfenceBrokers(0, 1, 2, 3, 4);
        Uuid fooId = ctx.createTestTopic("foo", new int[][]{{1, 2, 3, 4}}).topicId();
        ctx.alterPartition(new TopicIdPartition(fooId, 0), 1, ReplicationControlManagerTest.isrWithDefaultEpoch(1, 2, 3), LeaderRecoveryState.RECOVERED);
        AlterPartitionRequestData alterIsrRequest = new AlterPartitionRequestData().setBrokerId(1).setBrokerEpoch(101L).setTopics(List.of(new AlterPartitionRequestData.TopicData().setTopicId(fooId).setPartitions(List.of(new AlterPartitionRequestData.PartitionData().setPartitionIndex(0).setPartitionEpoch(1).setLeaderEpoch(0).setNewIsrWithEpochs(ReplicationControlManagerTest.isrWithDefaultEpoch(1, 2, 3, 4))))));
        long newEpoch = ReplicationControlManagerTest.defaultBrokerEpoch(4) + 1000L;
        RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().setBrokerEpoch(newEpoch).setBrokerId(4).setRack(null);
        brokerRecord.endPoints().add((ImplicitLinkedHashCollection.Element)new RegisterBrokerRecord.BrokerEndpoint().setSecurityProtocol(SecurityProtocol.PLAINTEXT.id).setPort(9096).setName("PLAINTEXT").setHost("localhost"));
        ctx.replay(List.of(new ApiMessageAndVersion((ApiMessage)brokerRecord, 0)));
        ControllerResult result = ctx.replicationControl.processBrokerHeartbeat(new BrokerHeartbeatRequestData().setBrokerId(4).setBrokerEpoch(newEpoch).setCurrentMetadataOffset(1L).setWantFence(false).setWantShutDown(false), 0L);
        Assertions.assertEquals((Object)new BrokerHeartbeatReply(true, false, false, false), (Object)result.response());
        ctx.replay(result.records());
        ControllerRequestContext requestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.ALTER_PARTITION, version);
        ControllerResult alterPartitionResult = replication.alterPartition(requestContext, new AlterPartitionRequest.Builder(alterIsrRequest).build(version).data());
        if (version >= 3) {
            Assertions.assertEquals((Object)new AlterPartitionResponseData().setTopics(List.of(new AlterPartitionResponseData.TopicData().setTopicId(fooId).setPartitions(List.of(new AlterPartitionResponseData.PartitionData().setPartitionIndex(0).setErrorCode(Errors.INELIGIBLE_REPLICA.code()))))), (Object)alterPartitionResult.response());
        } else {
            Assertions.assertEquals((short)Errors.NONE.code(), (short)((AlterPartitionResponseData)alterPartitionResult.response()).errorCode());
        }
    }

    @ParameterizedTest
    @ApiKeyVersionsSource(apiKey=ApiKeys.ALTER_PARTITION)
    public void testAlterPartitionShouldRejectShuttingDownBrokers(short version) {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replication = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2, 3, 4);
        ctx.unfenceBrokers(0, 1, 2, 3, 4);
        Uuid fooId = ctx.createTestTopic("foo", new int[][]{{1, 2, 3, 4}}).topicId();
        Assertions.assertEquals((Object)new PartitionRegistration.Builder().setReplicas(new int[]{1, 2, 3, 4}).setDirectories(new Uuid[]{Uuid.fromString((String)"TESTBROKER00001DIRAAAA"), Uuid.fromString((String)"TESTBROKER00002DIRAAAA"), Uuid.fromString((String)"TESTBROKER00003DIRAAAA"), Uuid.fromString((String)"TESTBROKER00004DIRAAAA")}).setIsr(new int[]{1, 2, 3, 4}).setLeader(Integer.valueOf(1)).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(Integer.valueOf(0)).setPartitionEpoch(Integer.valueOf(0)).build(), (Object)replication.getPartition(fooId, 0));
        ctx.inControlledShutdownBrokers(3);
        AlterPartitionRequestData alterIsrRequest = new AlterPartitionRequestData().setBrokerId(1).setBrokerEpoch(101L).setTopics(List.of(new AlterPartitionRequestData.TopicData().setTopicId(fooId).setPartitions(List.of(new AlterPartitionRequestData.PartitionData().setPartitionIndex(0).setPartitionEpoch(0).setLeaderEpoch(0).setNewIsrWithEpochs(ReplicationControlManagerTest.isrWithDefaultEpoch(1, 2, 3, 4))))));
        ControllerRequestContext requestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.ALTER_PARTITION, version);
        ControllerResult alterPartitionResult = replication.alterPartition(requestContext, new AlterPartitionRequest.Builder(alterIsrRequest).build(version).data());
        Errors expectedError = Errors.INELIGIBLE_REPLICA;
        Assertions.assertEquals((Object)new AlterPartitionResponseData().setTopics(List.of(new AlterPartitionResponseData.TopicData().setTopicId(fooId).setPartitions(List.of(new AlterPartitionResponseData.PartitionData().setPartitionIndex(0).setErrorCode(expectedError.code()))))), (Object)alterPartitionResult.response());
    }

    @Test
    public void testCancelReassignPartitions() {
        MetadataVersion metadataVersion = MetadataVersion.latestTesting();
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setMetadataVersion(metadataVersion).build();
        ReplicationControlManager replication = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2, 3, 4);
        ctx.unfenceBrokers(0, 1, 2, 3, 4);
        Uuid fooId = ctx.createTestTopic("foo", new int[][]{{1, 2, 3, 4}, {0, 1, 2, 3}, {4, 3, 1, 0}, {2, 3, 4, 1}}).topicId();
        Uuid barId = ctx.createTestTopic("bar", new int[][]{{4, 3, 2}}).topicId();
        Assertions.assertEquals((Object)NONE_REASSIGNING, (Object)replication.listPartitionReassignments(null, Long.MAX_VALUE));
        ArrayList<ApiMessageAndVersion> fenceRecords = new ArrayList<ApiMessageAndVersion>();
        replication.handleBrokerFenced(3, fenceRecords);
        ctx.replay(fenceRecords);
        Assertions.assertEquals((Object)new PartitionRegistration.Builder().setReplicas(new int[]{1, 2, 3, 4}).setIsr(new int[]{1, 2, 4}).setDirectories(new Uuid[]{Uuid.fromString((String)"TESTBROKER00001DIRAAAA"), Uuid.fromString((String)"TESTBROKER00002DIRAAAA"), Uuid.fromString((String)"TESTBROKER00003DIRAAAA"), Uuid.fromString((String)"TESTBROKER00004DIRAAAA")}).setLeader(Integer.valueOf(1)).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(Integer.valueOf(0)).setPartitionEpoch(Integer.valueOf(1)).build(), (Object)replication.getPartition(fooId, 0));
        ControllerResult alterResult = replication.alterPartitionReassignments(new AlterPartitionReassignmentsRequestData().setTopics(List.of(new AlterPartitionReassignmentsRequestData.ReassignableTopic().setName("foo").setPartitions(List.of(new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(0).setReplicas(List.of(Integer.valueOf(1), Integer.valueOf(2), Integer.valueOf(4))), new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(1).setReplicas(List.of(Integer.valueOf(1), Integer.valueOf(2), Integer.valueOf(3), Integer.valueOf(0))), new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(2).setReplicas(List.of(Integer.valueOf(5), Integer.valueOf(6), Integer.valueOf(7))), new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(3).setReplicas(List.of()))), new AlterPartitionReassignmentsRequestData.ReassignableTopic().setName("bar").setPartitions(List.of(new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(0).setReplicas(List.of(Integer.valueOf(1), Integer.valueOf(2), Integer.valueOf(3), Integer.valueOf(4), Integer.valueOf(0))))))));
        Assertions.assertEquals((Object)new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(List.of(new AlterPartitionReassignmentsResponseData.ReassignableTopicResponse().setName("foo").setPartitions(List.of(new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(0).setErrorMessage(null), new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(1).setErrorMessage(null), new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(2).setErrorCode(Errors.INVALID_REPLICA_ASSIGNMENT.code()).setErrorMessage("The manual partition assignment includes broker 5, but no such broker is registered."), new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(3).setErrorCode(Errors.INVALID_REPLICA_ASSIGNMENT.code()).setErrorMessage("The manual partition assignment includes an empty replica list."))), new AlterPartitionReassignmentsResponseData.ReassignableTopicResponse().setName("bar").setPartitions(List.of(new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(0).setErrorMessage(null))))), (Object)alterResult.response());
        ctx.replay(alterResult.records());
        Assertions.assertEquals((Object)new PartitionRegistration.Builder().setReplicas(new int[]{1, 2, 4}).setIsr(new int[]{1, 2, 4}).setDirectories(new Uuid[]{Uuid.fromString((String)"TESTBROKER00001DIRAAAA"), Uuid.fromString((String)"TESTBROKER00002DIRAAAA"), Uuid.fromString((String)"TESTBROKER00004DIRAAAA")}).setLeader(Integer.valueOf(1)).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(Integer.valueOf(1)).setPartitionEpoch(Integer.valueOf(2)).build(), (Object)replication.getPartition(fooId, 0));
        Assertions.assertEquals((Object)new PartitionRegistration.Builder().setReplicas(new int[]{1, 2, 3, 0}).setIsr(new int[]{0, 1, 2}).setDirectories(new Uuid[]{Uuid.fromString((String)"TESTBROKER00001DIRAAAA"), Uuid.fromString((String)"TESTBROKER00002DIRAAAA"), Uuid.fromString((String)"TESTBROKER00003DIRAAAA"), Uuid.fromString((String)"TESTBROKER00000DIRAAAA")}).setLeader(Integer.valueOf(0)).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(Integer.valueOf(0)).setPartitionEpoch(Integer.valueOf(2)).build(), (Object)replication.getPartition(fooId, 1));
        Assertions.assertEquals((Object)new PartitionRegistration.Builder().setReplicas(new int[]{1, 2, 3, 4, 0}).setIsr(new int[]{4, 2}).setDirectories(new Uuid[]{Uuid.fromString((String)"TESTBROKER00001DIRAAAA"), Uuid.fromString((String)"TESTBROKER00002DIRAAAA"), Uuid.fromString((String)"TESTBROKER00003DIRAAAA"), Uuid.fromString((String)"TESTBROKER00004DIRAAAA"), Uuid.fromString((String)"TESTBROKER00000DIRAAAA")}).setAddingReplicas(new int[]{0, 1}).setLeader(Integer.valueOf(4)).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(Integer.valueOf(0)).setPartitionEpoch(Integer.valueOf(2)).build(), (Object)replication.getPartition(barId, 0));
        ListPartitionReassignmentsResponseData currentReassigning = new ListPartitionReassignmentsResponseData().setErrorMessage(null).setTopics(List.of(new ListPartitionReassignmentsResponseData.OngoingTopicReassignment().setName("bar").setPartitions(List.of(new ListPartitionReassignmentsResponseData.OngoingPartitionReassignment().setPartitionIndex(0).setRemovingReplicas(List.of()).setAddingReplicas(List.of(Integer.valueOf(0), Integer.valueOf(1))).setReplicas(List.of(Integer.valueOf(1), Integer.valueOf(2), Integer.valueOf(3), Integer.valueOf(4), Integer.valueOf(0)))))));
        Assertions.assertEquals((Object)currentReassigning, (Object)replication.listPartitionReassignments(null, Long.MAX_VALUE));
        Assertions.assertEquals((Object)NONE_REASSIGNING, (Object)replication.listPartitionReassignments(List.of(new ListPartitionReassignmentsRequestData.ListPartitionReassignmentsTopics().setName("foo").setPartitionIndexes(List.of(Integer.valueOf(0), Integer.valueOf(1), Integer.valueOf(2)))), Long.MAX_VALUE));
        Assertions.assertEquals((Object)currentReassigning, (Object)replication.listPartitionReassignments(List.of(new ListPartitionReassignmentsRequestData.ListPartitionReassignmentsTopics().setName("bar").setPartitionIndexes(List.of(Integer.valueOf(0), Integer.valueOf(1), Integer.valueOf(2)))), Long.MAX_VALUE));
        ControllerResult alterPartitionResult = replication.alterPartition(ControllerRequestContextUtil.anonymousContextFor(ApiKeys.ALTER_PARTITION), new AlterPartitionRequestData().setBrokerId(4).setBrokerEpoch(104L).setTopics(List.of(new AlterPartitionRequestData.TopicData().setTopicId(barId).setPartitions(List.of(new AlterPartitionRequestData.PartitionData().setPartitionIndex(0).setPartitionEpoch(2).setLeaderEpoch(0).setNewIsrWithEpochs(ReplicationControlManagerTest.isrWithDefaultEpoch(4, 1, 2, 0)))))));
        Assertions.assertEquals((Object)new AlterPartitionResponseData().setTopics(List.of(new AlterPartitionResponseData.TopicData().setTopicId(barId).setPartitions(List.of(new AlterPartitionResponseData.PartitionData().setPartitionIndex(0).setLeaderId(4).setLeaderEpoch(0).setIsr(List.of(Integer.valueOf(4), Integer.valueOf(1), Integer.valueOf(2), Integer.valueOf(0))).setPartitionEpoch(3).setErrorCode(Errors.NONE.code()))))), (Object)alterPartitionResult.response());
        ControllerResult cancelResult = replication.alterPartitionReassignments(new AlterPartitionReassignmentsRequestData().setTopics(List.of(new AlterPartitionReassignmentsRequestData.ReassignableTopic().setName("foo").setPartitions(List.of(new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(0).setReplicas(null))), new AlterPartitionReassignmentsRequestData.ReassignableTopic().setName("bar").setPartitions(List.of(new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(0).setReplicas(null))))));
        Assertions.assertEquals((Object)ControllerResult.atomicOf(List.of(new ApiMessageAndVersion((ApiMessage)new PartitionChangeRecord().setTopicId(barId).setPartitionId(0).setLeader(4).setReplicas(List.of(Integer.valueOf(2), Integer.valueOf(3), Integer.valueOf(4))).setDirectories(List.of(Uuid.fromString((String)"TESTBROKER00002DIRAAAA"), Uuid.fromString((String)"TESTBROKER00003DIRAAAA"), Uuid.fromString((String)"TESTBROKER00004DIRAAAA"))).setRemovingReplicas(null).setAddingReplicas(List.of()), MetadataVersion.latestTesting().partitionChangeRecordVersion())), (Object)new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(List.of(new AlterPartitionReassignmentsResponseData.ReassignableTopicResponse().setName("foo").setPartitions(List.of(new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(0).setErrorCode(Errors.NO_REASSIGNMENT_IN_PROGRESS.code()).setErrorMessage(null))), new AlterPartitionReassignmentsResponseData.ReassignableTopicResponse().setName("bar").setPartitions(List.of(new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(0).setErrorMessage(null)))))), (Object)cancelResult);
        ctx.replay(cancelResult.records());
        Assertions.assertEquals((Object)NONE_REASSIGNING, (Object)replication.listPartitionReassignments(null, Long.MAX_VALUE));
        Assertions.assertEquals((Object)new PartitionRegistration.Builder().setReplicas(new int[]{2, 3, 4}).setIsr(new int[]{4, 2}).setDirectories(new Uuid[]{Uuid.fromString((String)"TESTBROKER00002DIRAAAA"), Uuid.fromString((String)"TESTBROKER00003DIRAAAA"), Uuid.fromString((String)"TESTBROKER00004DIRAAAA")}).setLeader(Integer.valueOf(4)).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(Integer.valueOf(1)).setPartitionEpoch(Integer.valueOf(3)).build(), (Object)replication.getPartition(barId, 0));
    }

    @Test
    public void testManualPartitionAssignmentOnAllFencedBrokers() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ctx.registerBrokers(0, 1, 2, 3);
        ctx.createTestTopic("foo", new int[][]{{0, 1, 2}}, Errors.INVALID_REPLICA_ASSIGNMENT.code());
    }

    @Test
    public void testCreatePartitionsFailsWithManualAssignmentWithAllFenced() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ctx.registerBrokers(0, 1, 2, 3, 4, 5);
        ctx.unfenceBrokers(0, 1, 2);
        Uuid fooId = ctx.createTestTopic("foo", new int[][]{{0, 1, 2}}).topicId();
        ctx.createPartitions(2, "foo", new int[][]{{3, 4, 5}}, Errors.INVALID_REPLICA_ASSIGNMENT.code());
        ctx.createPartitions(2, "foo", new int[][]{{2, 4, 5}}, Errors.NONE.code());
        Assertions.assertEquals((Object)new PartitionRegistration.Builder().setReplicas(new int[]{2, 4, 5}).setDirectories(new Uuid[]{Uuid.fromString((String)"TESTBROKER00002DIRAAAA"), Uuid.fromString((String)"TESTBROKER00004DIRAAAA"), Uuid.fromString((String)"TESTBROKER00005DIRAAAA")}).setIsr(new int[]{2}).setLeader(Integer.valueOf(2)).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(Integer.valueOf(0)).setPartitionEpoch(Integer.valueOf(0)).build(), (Object)ctx.replicationControl.getPartition(fooId, 1));
    }

    private void assertLeaderAndIsr(ReplicationControlManager replication, TopicIdPartition topicIdPartition, int leaderId, int[] isr) {
        PartitionRegistration registration = replication.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
        Assertions.assertArrayEquals((int[])isr, (int[])registration.isr);
        Assertions.assertEquals((int)leaderId, (int)registration.leader);
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testElectUncleanLeaders_WithoutElr(boolean electAllPartitions) {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setMetadataVersion(MetadataVersion.IBP_3_6_IV1).build();
        ReplicationControlManager replication = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2, 3, 4);
        ctx.unfenceBrokers(0, 1, 2, 3, 4);
        Uuid fooId = ctx.createTestTopic("foo", new int[][]{{1, 2, 3}, {2, 3, 4}, {0, 2, 1}}).topicId();
        TopicIdPartition partition0 = new TopicIdPartition(fooId, 0);
        TopicIdPartition partition1 = new TopicIdPartition(fooId, 1);
        TopicIdPartition partition2 = new TopicIdPartition(fooId, 2);
        ctx.fenceBrokers(Set.of(Integer.valueOf(2), Integer.valueOf(3)));
        ctx.fenceBrokers(Set.of(Integer.valueOf(1), Integer.valueOf(2), Integer.valueOf(3)));
        this.assertLeaderAndIsr(replication, partition0, -1, new int[]{1});
        this.assertLeaderAndIsr(replication, partition1, 4, new int[]{4});
        this.assertLeaderAndIsr(replication, partition2, 0, new int[]{0});
        ElectLeadersRequestData request = this.buildElectLeadersRequest(ElectionType.UNCLEAN, electAllPartitions ? null : Map.of("foo", List.of(Integer.valueOf(0), Integer.valueOf(1), Integer.valueOf(2))));
        ControllerResult result1 = replication.electLeaders(request);
        Assertions.assertEquals(List.of(), (Object)result1.records());
        ElectLeadersResponseData expectedResponse1 = this.buildElectLeadersResponse(Errors.NONE, electAllPartitions, Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)new TopicPartition("foo", 0), (Object)new ApiError(Errors.ELIGIBLE_LEADERS_NOT_AVAILABLE)), Utils.mkEntry((Object)new TopicPartition("foo", 1), (Object)new ApiError(Errors.ELECTION_NOT_NEEDED)), Utils.mkEntry((Object)new TopicPartition("foo", 2), (Object)new ApiError(Errors.ELECTION_NOT_NEEDED))}));
        this.assertElectLeadersResponse(expectedResponse1, (ElectLeadersResponseData)result1.response());
        ctx.unfenceBrokers(2);
        ctx.alterPartition(partition1, 4, ReplicationControlManagerTest.isrWithDefaultEpoch(2, 4), LeaderRecoveryState.RECOVERED);
        ControllerResult result = replication.electLeaders(request);
        Assertions.assertEquals((int)1, (int)result.records().size());
        ApiMessageAndVersion record = (ApiMessageAndVersion)result.records().get(0);
        Assertions.assertInstanceOf(PartitionChangeRecord.class, (Object)record.message());
        PartitionChangeRecord partitionChangeRecord = (PartitionChangeRecord)record.message();
        Assertions.assertEquals((int)0, (int)partitionChangeRecord.partitionId());
        Assertions.assertEquals((int)2, (int)partitionChangeRecord.leader());
        Assertions.assertEquals(List.of(Integer.valueOf(2)), (Object)partitionChangeRecord.isr());
        ctx.replay(result.records());
        this.assertLeaderAndIsr(replication, partition0, 2, new int[]{2});
        this.assertLeaderAndIsr(replication, partition1, 4, new int[]{2, 4});
        this.assertLeaderAndIsr(replication, partition2, 0, new int[]{0});
        ElectLeadersResponseData expectedResponse = this.buildElectLeadersResponse(Errors.NONE, electAllPartitions, Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)new TopicPartition("foo", 0), (Object)ApiError.NONE), Utils.mkEntry((Object)new TopicPartition("foo", 1), (Object)new ApiError(Errors.ELECTION_NOT_NEEDED)), Utils.mkEntry((Object)new TopicPartition("foo", 2), (Object)new ApiError(Errors.ELECTION_NOT_NEEDED))}));
        this.assertElectLeadersResponse(expectedResponse, (ElectLeadersResponseData)result.response());
    }

    @Test
    public void testPreferredElectionDoesNotTriggerUncleanElection() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replication = ctx.replicationControl;
        ctx.registerBrokers(1, 2, 3, 4);
        ctx.unfenceBrokers(1, 2, 3, 4);
        Uuid fooId = ctx.createTestTopic("foo", new int[][]{{1, 2, 3}}).topicId();
        TopicIdPartition partition = new TopicIdPartition(fooId, 0);
        ctx.fenceBrokers(Set.of(Integer.valueOf(2), Integer.valueOf(3)));
        ctx.fenceBrokers(Set.of(Integer.valueOf(1), Integer.valueOf(2), Integer.valueOf(3)));
        ctx.unfenceBrokers(2);
        this.assertLeaderAndIsr(replication, partition, -1, new int[]{1});
        ctx.alterTopicConfig("foo", "unclean.leader.election.enable", "true");
        ElectLeadersRequestData request = this.buildElectLeadersRequest(ElectionType.PREFERRED, Map.of("foo", List.of(Integer.valueOf(0))));
        ControllerResult result = replication.electLeaders(request);
        Assertions.assertEquals(List.of(), (Object)result.records());
        ElectLeadersResponseData expectedResponse = this.buildElectLeadersResponse(Errors.NONE, false, Map.of(new TopicPartition("foo", 0), new ApiError(Errors.PREFERRED_LEADER_NOT_AVAILABLE)));
        Assertions.assertEquals((Object)expectedResponse, (Object)result.response());
    }

    private ElectLeadersRequestData buildElectLeadersRequest(ElectionType electionType, Map<String, List<Integer>> partitions) {
        ElectLeadersRequestData request = new ElectLeadersRequestData().setElectionType(electionType.value);
        if (partitions == null) {
            request.setTopicPartitions(null);
        } else {
            partitions.forEach((topic, partitionIds) -> request.topicPartitions().add((ImplicitLinkedHashCollection.Element)new ElectLeadersRequestData.TopicPartitions().setTopic(topic).setPartitions(partitionIds)));
        }
        return request;
    }

    @Test
    public void testFenceMultipleBrokers() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replication = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2, 3, 4);
        ctx.unfenceBrokers(0, 1, 2, 3, 4);
        Uuid fooId = ctx.createTestTopic("foo", new int[][]{{1, 2, 3}, {2, 3, 4}, {0, 2, 1}}).topicId();
        Assertions.assertTrue((boolean)ctx.fencedBrokerIds().isEmpty());
        ctx.fenceBrokers(Set.of(Integer.valueOf(2), Integer.valueOf(3)));
        PartitionRegistration partition0 = replication.getPartition(fooId, 0);
        PartitionRegistration partition1 = replication.getPartition(fooId, 1);
        PartitionRegistration partition2 = replication.getPartition(fooId, 2);
        Assertions.assertArrayEquals((int[])new int[]{1, 2, 3}, (int[])partition0.replicas);
        Assertions.assertArrayEquals((int[])new int[]{1}, (int[])partition0.isr);
        Assertions.assertEquals((int)1, (int)partition0.leader);
        Assertions.assertArrayEquals((int[])new int[]{2, 3, 4}, (int[])partition1.replicas);
        Assertions.assertArrayEquals((int[])new int[]{4}, (int[])partition1.isr);
        Assertions.assertEquals((int)4, (int)partition1.leader);
        Assertions.assertArrayEquals((int[])new int[]{0, 2, 1}, (int[])partition2.replicas);
        Assertions.assertArrayEquals((int[])new int[]{0, 1}, (int[])partition2.isr);
        Assertions.assertNotEquals((int)2, (int)partition2.leader);
    }

    @Test
    public void testElectPreferredLeaders() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replication = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2, 3, 4);
        ctx.unfenceBrokers(1, 2, 3, 4);
        ctx.inControlledShutdownBrokers(1);
        Uuid fooId = ctx.createTestTopic("foo", new int[][]{{1, 2, 3}, {2, 3, 4}, {0, 2, 1}}).topicId();
        ElectLeadersRequestData request1 = new ElectLeadersRequestData().setElectionType(ElectionType.PREFERRED.value).setTopicPartitions(new ElectLeadersRequestData.TopicPartitionsCollection(List.of(new ElectLeadersRequestData.TopicPartitions().setTopic("foo").setPartitions(List.of(Integer.valueOf(0), Integer.valueOf(1), Integer.valueOf(2))), new ElectLeadersRequestData.TopicPartitions().setTopic("bar").setPartitions(List.of(Integer.valueOf(0), Integer.valueOf(1)))).iterator()));
        ControllerResult election1Result = replication.electLeaders(request1);
        ElectLeadersResponseData expectedResponse1 = this.buildElectLeadersResponse(Errors.NONE, false, Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)new TopicPartition("foo", 0), (Object)new ApiError(Errors.PREFERRED_LEADER_NOT_AVAILABLE)), Utils.mkEntry((Object)new TopicPartition("foo", 1), (Object)new ApiError(Errors.ELECTION_NOT_NEEDED)), Utils.mkEntry((Object)new TopicPartition("foo", 2), (Object)new ApiError(Errors.PREFERRED_LEADER_NOT_AVAILABLE)), Utils.mkEntry((Object)new TopicPartition("bar", 0), (Object)new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "No such topic as bar")), Utils.mkEntry((Object)new TopicPartition("bar", 1), (Object)new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "No such topic as bar"))}));
        this.assertElectLeadersResponse(expectedResponse1, (ElectLeadersResponseData)election1Result.response());
        Assertions.assertEquals(List.of(), (Object)election1Result.records());
        ctx.registerBrokers(1);
        ctx.unfenceBrokers(0, 1);
        ControllerResult alterPartitionResult = replication.alterPartition(ControllerRequestContextUtil.anonymousContextFor(ApiKeys.ALTER_PARTITION), new AlterPartitionRequestData().setBrokerId(2).setBrokerEpoch(102L).setTopics(List.of(new AlterPartitionRequestData.TopicData().setTopicId(fooId).setPartitions(List.of(new AlterPartitionRequestData.PartitionData().setPartitionIndex(0).setPartitionEpoch(0).setLeaderEpoch(0).setNewIsrWithEpochs(ReplicationControlManagerTest.isrWithDefaultEpoch(1, 2, 3)), new AlterPartitionRequestData.PartitionData().setPartitionIndex(2).setPartitionEpoch(0).setLeaderEpoch(0).setNewIsrWithEpochs(ReplicationControlManagerTest.isrWithDefaultEpoch(0, 2, 1)))))));
        Assertions.assertEquals((Object)new AlterPartitionResponseData().setTopics(List.of(new AlterPartitionResponseData.TopicData().setTopicId(fooId).setPartitions(List.of(new AlterPartitionResponseData.PartitionData().setPartitionIndex(0).setLeaderId(2).setLeaderEpoch(0).setIsr(List.of(Integer.valueOf(1), Integer.valueOf(2), Integer.valueOf(3))).setPartitionEpoch(1).setErrorCode(Errors.NONE.code()), new AlterPartitionResponseData.PartitionData().setPartitionIndex(2).setLeaderId(2).setLeaderEpoch(0).setIsr(List.of(Integer.valueOf(0), Integer.valueOf(2), Integer.valueOf(1))).setPartitionEpoch(1).setErrorCode(Errors.NONE.code()))))), (Object)alterPartitionResult.response());
        ElectLeadersResponseData expectedResponse2 = this.buildElectLeadersResponse(Errors.NONE, false, Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)new TopicPartition("foo", 0), (Object)ApiError.NONE), Utils.mkEntry((Object)new TopicPartition("foo", 1), (Object)new ApiError(Errors.ELECTION_NOT_NEEDED)), Utils.mkEntry((Object)new TopicPartition("foo", 2), (Object)ApiError.NONE), Utils.mkEntry((Object)new TopicPartition("bar", 0), (Object)new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "No such topic as bar")), Utils.mkEntry((Object)new TopicPartition("bar", 1), (Object)new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "No such topic as bar"))}));
        ctx.replay(alterPartitionResult.records());
        ControllerResult election2Result = replication.electLeaders(request1);
        this.assertElectLeadersResponse(expectedResponse2, (ElectLeadersResponseData)election2Result.response());
        Assertions.assertEquals(List.of(new ApiMessageAndVersion((ApiMessage)new PartitionChangeRecord().setPartitionId(0).setTopicId(fooId).setLeader(1), MetadataVersion.latestTesting().partitionChangeRecordVersion()), new ApiMessageAndVersion((ApiMessage)new PartitionChangeRecord().setPartitionId(2).setTopicId(fooId).setLeader(0), MetadataVersion.latestTesting().partitionChangeRecordVersion())), (Object)election2Result.records());
    }

    @Test
    public void testBalancePartitionLeaders() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replication = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2, 3, 4);
        ctx.unfenceBrokers(2, 3, 4);
        Uuid fooId = ctx.createTestTopic("foo", new int[][]{{1, 2, 3}, {2, 3, 4}, {0, 2, 1}}).topicId();
        Assertions.assertTrue((boolean)replication.arePartitionLeadersImbalanced());
        ctx.unfenceBrokers(1);
        ControllerResult alterPartitionResult = replication.alterPartition(ControllerRequestContextUtil.anonymousContextFor(ApiKeys.ALTER_PARTITION), new AlterPartitionRequestData().setBrokerId(2).setBrokerEpoch(102L).setTopics(List.of(new AlterPartitionRequestData.TopicData().setTopicId(fooId).setPartitions(List.of(new AlterPartitionRequestData.PartitionData().setPartitionIndex(0).setPartitionEpoch(0).setLeaderEpoch(0).setNewIsrWithEpochs(ReplicationControlManagerTest.isrWithDefaultEpoch(1, 2, 3)))))));
        Assertions.assertEquals((Object)new AlterPartitionResponseData().setTopics(List.of(new AlterPartitionResponseData.TopicData().setTopicId(fooId).setPartitions(List.of(new AlterPartitionResponseData.PartitionData().setPartitionIndex(0).setLeaderId(2).setLeaderEpoch(0).setIsr(List.of(Integer.valueOf(1), Integer.valueOf(2), Integer.valueOf(3))).setPartitionEpoch(1).setErrorCode(Errors.NONE.code()))))), (Object)alterPartitionResult.response());
        ctx.replay(alterPartitionResult.records());
        ControllerResult balanceResult = replication.maybeBalancePartitionLeaders();
        ctx.replay(balanceResult.records());
        PartitionChangeRecord expectedChangeRecord = new PartitionChangeRecord().setPartitionId(0).setTopicId(fooId).setLeader(1);
        Assertions.assertEquals(List.of(new ApiMessageAndVersion((ApiMessage)expectedChangeRecord, MetadataVersion.latestTesting().partitionChangeRecordVersion())), (Object)balanceResult.records());
        Assertions.assertTrue((boolean)replication.arePartitionLeadersImbalanced());
        Assertions.assertFalse((boolean)((Boolean)balanceResult.response()));
        ctx.unfenceBrokers(0);
        alterPartitionResult = replication.alterPartition(ControllerRequestContextUtil.anonymousContextFor(ApiKeys.ALTER_PARTITION), new AlterPartitionRequestData().setBrokerId(2).setBrokerEpoch(102L).setTopics(List.of(new AlterPartitionRequestData.TopicData().setTopicId(fooId).setPartitions(List.of(new AlterPartitionRequestData.PartitionData().setPartitionIndex(2).setPartitionEpoch(0).setLeaderEpoch(0).setNewIsrWithEpochs(ReplicationControlManagerTest.isrWithDefaultEpoch(0, 2, 1)))))));
        Assertions.assertEquals((Object)new AlterPartitionResponseData().setTopics(List.of(new AlterPartitionResponseData.TopicData().setTopicId(fooId).setPartitions(List.of(new AlterPartitionResponseData.PartitionData().setPartitionIndex(2).setLeaderId(2).setLeaderEpoch(0).setIsr(List.of(Integer.valueOf(0), Integer.valueOf(2), Integer.valueOf(1))).setPartitionEpoch(1).setErrorCode(Errors.NONE.code()))))), (Object)alterPartitionResult.response());
        ctx.replay(alterPartitionResult.records());
        balanceResult = replication.maybeBalancePartitionLeaders();
        ctx.replay(balanceResult.records());
        expectedChangeRecord = new PartitionChangeRecord().setPartitionId(2).setTopicId(fooId).setLeader(0);
        Assertions.assertEquals(List.of(new ApiMessageAndVersion((ApiMessage)expectedChangeRecord, MetadataVersion.latestTesting().partitionChangeRecordVersion())), (Object)balanceResult.records());
        Assertions.assertFalse((boolean)replication.arePartitionLeadersImbalanced());
        Assertions.assertFalse((boolean)((Boolean)balanceResult.response()));
    }

    @ParameterizedTest
    @ValueSource(strings={"none", "static", "dynamic_cluster", "dynamic_node", "dynamic_topic"})
    public void testMaybeTriggerUncleanLeaderElectionForLeaderlessPartitions(String uncleanConfig) {
        ReplicationControlTestContext.Builder ctxBuilder = new ReplicationControlTestContext.Builder();
        if (uncleanConfig.equals("static")) {
            ctxBuilder.setStaticConfig("unclean.leader.election.enable", "true");
        }
        ReplicationControlTestContext ctx = ctxBuilder.build();
        ReplicationControlManager replication = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2, 3, 4);
        ctx.unfenceBrokers(0, 1, 2, 3, 4);
        Uuid fooId = ctx.createTestTopic("foo", new int[][]{{1, 2, 4}, {1, 3, 4}, {0, 2, 4}}).topicId();
        Assertions.assertFalse((boolean)replication.areSomePartitionsLeaderless());
        ctx.fenceBrokers(0, 1, 2, 3, 4);
        Assertions.assertTrue((boolean)replication.areSomePartitionsLeaderless());
        for (int partitionId : List.of(Integer.valueOf(0), Integer.valueOf(1), Integer.valueOf(2))) {
            Assertions.assertArrayEquals((int[])new int[]{4}, (int[])ctx.replicationControl.getPartition((Uuid)fooId, (int)partitionId).isr);
            Assertions.assertEquals((int)-1, (int)ctx.replicationControl.getPartition((Uuid)fooId, (int)partitionId).leader);
        }
        ctx.unfenceBrokers(2);
        if (uncleanConfig.equals("static")) {
            Assertions.assertArrayEquals((int[])new int[]{2}, (int[])ctx.replicationControl.getPartition((Uuid)fooId, (int)0).isr);
            Assertions.assertEquals((int)2, (int)ctx.replicationControl.getPartition((Uuid)fooId, (int)0).leader);
            Assertions.assertArrayEquals((int[])new int[]{4}, (int[])ctx.replicationControl.getPartition((Uuid)fooId, (int)1).isr);
            Assertions.assertEquals((int)-1, (int)ctx.replicationControl.getPartition((Uuid)fooId, (int)1).leader);
            Assertions.assertArrayEquals((int[])new int[]{2}, (int[])ctx.replicationControl.getPartition((Uuid)fooId, (int)2).isr);
            Assertions.assertEquals((int)2, (int)ctx.replicationControl.getPartition((Uuid)fooId, (int)2).leader);
        } else {
            for (int partitionId : List.of(Integer.valueOf(0), Integer.valueOf(1), Integer.valueOf(2))) {
                Assertions.assertArrayEquals((int[])new int[]{4}, (int[])ctx.replicationControl.getPartition((Uuid)fooId, (int)partitionId).isr);
                Assertions.assertEquals((int)-1, (int)ctx.replicationControl.getPartition((Uuid)fooId, (int)partitionId).leader);
            }
        }
        if (uncleanConfig.equals("dynamic_cluster")) {
            ctx.replay(ctx.configurationControl.incrementalAlterConfigs(Map.of(new ConfigResource(ConfigResource.Type.BROKER, ""), Map.of("unclean.leader.election.enable", new AbstractMap.SimpleImmutableEntry<AlterConfigOp.OpType, String>(AlterConfigOp.OpType.SET, "true"))), true).records());
        } else if (uncleanConfig.equals("dynamic_node")) {
            ctx.replay(ctx.configurationControl.incrementalAlterConfigs(Map.of(new ConfigResource(ConfigResource.Type.BROKER, "0"), Map.of("unclean.leader.election.enable", new AbstractMap.SimpleImmutableEntry<AlterConfigOp.OpType, String>(AlterConfigOp.OpType.SET, "true"))), true).records());
        } else if (uncleanConfig.equals("dynamic_topic")) {
            ctx.replay(ctx.configurationControl.incrementalAlterConfigs(Map.of(new ConfigResource(ConfigResource.Type.TOPIC, "foo"), Map.of("unclean.leader.election.enable", new AbstractMap.SimpleImmutableEntry<AlterConfigOp.OpType, String>(AlterConfigOp.OpType.SET, "true"))), true).records());
        }
        ControllerResult balanceResult = replication.maybeElectUncleanLeaders();
        Assertions.assertFalse((boolean)((Boolean)balanceResult.response()));
        if (uncleanConfig.equals("none") || uncleanConfig.equals("static")) {
            Assertions.assertEquals((int)0, (int)balanceResult.records().size(), (String)("Expected no records, but " + balanceResult.records().size() + " were found."));
        } else {
            Assertions.assertNotEquals((int)0, (int)balanceResult.records().size(), (String)"Expected some records, but none were found.");
            ctx.replay(balanceResult.records());
            Assertions.assertArrayEquals((int[])new int[]{2}, (int[])ctx.replicationControl.getPartition((Uuid)fooId, (int)0).isr);
            Assertions.assertEquals((int)2, (int)ctx.replicationControl.getPartition((Uuid)fooId, (int)0).leader);
            Assertions.assertArrayEquals((int[])new int[]{4}, (int[])ctx.replicationControl.getPartition((Uuid)fooId, (int)1).isr);
            Assertions.assertEquals((int)-1, (int)ctx.replicationControl.getPartition((Uuid)fooId, (int)1).leader);
            Assertions.assertArrayEquals((int[])new int[]{2}, (int[])ctx.replicationControl.getPartition((Uuid)fooId, (int)2).isr);
            Assertions.assertEquals((int)2, (int)ctx.replicationControl.getPartition((Uuid)fooId, (int)2).leader);
        }
    }

    private void assertElectLeadersResponse(ElectLeadersResponseData expected, ElectLeadersResponseData actual) {
        Assertions.assertEquals((Object)Errors.forCode((short)expected.errorCode()), (Object)Errors.forCode((short)actual.errorCode()));
        Assertions.assertEquals(this.collectElectLeadersErrors(expected), this.collectElectLeadersErrors(actual));
    }

    private Map<TopicPartition, ElectLeadersResponseData.PartitionResult> collectElectLeadersErrors(ElectLeadersResponseData response) {
        HashMap<TopicPartition, ElectLeadersResponseData.PartitionResult> res = new HashMap<TopicPartition, ElectLeadersResponseData.PartitionResult>();
        response.replicaElectionResults().forEach(topicResult -> {
            String topic = topicResult.topic();
            topicResult.partitionResult().forEach(partitionResult -> {
                TopicPartition topicPartition = new TopicPartition(topic, partitionResult.partitionId());
                res.put(topicPartition, (ElectLeadersResponseData.PartitionResult)partitionResult);
            });
        });
        return res;
    }

    private ElectLeadersResponseData buildElectLeadersResponse(Errors topLevelError, boolean electAllPartitions, Map<TopicPartition, ApiError> errors) {
        Map<String, List<Map.Entry>> errorsByTopic = errors.entrySet().stream().collect(Collectors.groupingBy(entry -> ((TopicPartition)entry.getKey()).topic()));
        ElectLeadersResponseData response = new ElectLeadersResponseData().setErrorCode(topLevelError.code());
        errorsByTopic.forEach((topic, partitionErrors) -> {
            ElectLeadersResponseData.ReplicaElectionResult electionResult = new ElectLeadersResponseData.ReplicaElectionResult().setTopic(topic);
            electionResult.setPartitionResult(partitionErrors.stream().filter(entry -> !electAllPartitions || ((ApiError)entry.getValue()).error() != Errors.ELECTION_NOT_NEEDED).map(entry -> {
                TopicPartition topicPartition = (TopicPartition)entry.getKey();
                ApiError error = (ApiError)entry.getValue();
                return new ElectLeadersResponseData.PartitionResult().setPartitionId(topicPartition.partition()).setErrorCode(error.error().code()).setErrorMessage(error.message());
            }).collect(Collectors.toList()));
            response.replicaElectionResults().add(electionResult);
        });
        return response;
    }

    @Test
    public void testKRaftClusterDescriber() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replication = ctx.replicationControl;
        ctx.registerBrokersWithDirs(0, List.of(), 1, List.of(), 2, List.of(Uuid.fromString((String)"ozwqsVMFSNiYQUPSJA3j0w")), 3, List.of(Uuid.fromString((String)"SSDgCZ4BTyec5QojGT65qg"), Uuid.fromString((String)"K8KwMrviRcOUvgI8FPOJWg")), 4, List.of());
        ctx.unfenceBrokers(2, 3, 4);
        ctx.createTestTopic("foo", new int[][]{{1, 2, 3}, {2, 3, 4}, {0, 2, 1}}).topicId();
        ctx.createTestTopic("bar", new int[][]{{2, 3, 4}, {3, 4, 2}}).topicId();
        ReplicationControlManager.KRaftClusterDescriber describer = replication.clusterDescriber;
        HashSet brokers = new HashSet();
        describer.usableBrokers().forEachRemaining(broker -> brokers.add(broker));
        Assertions.assertEquals(new HashSet<UsableBroker>(List.of(new UsableBroker(0, Optional.empty(), true), new UsableBroker(1, Optional.empty(), true), new UsableBroker(2, Optional.empty(), false), new UsableBroker(3, Optional.empty(), false), new UsableBroker(4, Optional.empty(), false))), brokers);
        Assertions.assertEquals((Object)DirectoryId.MIGRATING, (Object)describer.defaultDir(1));
        Assertions.assertEquals((Object)Uuid.fromString((String)"ozwqsVMFSNiYQUPSJA3j0w"), (Object)describer.defaultDir(2));
        Assertions.assertEquals((Object)DirectoryId.UNASSIGNED, (Object)describer.defaultDir(3));
    }

    @Test
    public void testProcessBrokerHeartbeatInControlledShutdown() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setMetadataVersion(MetadataVersion.MINIMUM_VERSION).build();
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        Uuid topicId = ctx.createTestTopic("foo", new int[][]{{0, 1, 2}}).topicId();
        BrokerHeartbeatRequestData heartbeatRequest = new BrokerHeartbeatRequestData().setBrokerId(0).setBrokerEpoch(100L).setCurrentMetadataOffset(0L).setWantShutDown(true);
        ControllerResult result = ctx.replicationControl.processBrokerHeartbeat(heartbeatRequest, 0L);
        ArrayList<ApiMessageAndVersion> expectedRecords = new ArrayList<ApiMessageAndVersion>();
        expectedRecords.add(new ApiMessageAndVersion((ApiMessage)new BrokerRegistrationChangeRecord().setBrokerEpoch(100L).setBrokerId(0).setInControlledShutdown(BrokerRegistrationInControlledShutdownChange.IN_CONTROLLED_SHUTDOWN.value()), 1));
        expectedRecords.add(new ApiMessageAndVersion((ApiMessage)new PartitionChangeRecord().setPartitionId(0).setTopicId(topicId).setIsr(List.of(Integer.valueOf(1), Integer.valueOf(2))).setLeader(1), 0));
        Assertions.assertEquals(expectedRecords, (Object)result.records());
    }

    @Test
    public void testProcessExpiredBrokerHeartbeat() {
        MockTime mockTime = new MockTime(0L, 0L, 0L);
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setMockTime(mockTime).build();
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        BrokerHeartbeatRequestData heartbeatRequest = new BrokerHeartbeatRequestData().setBrokerId(0).setBrokerEpoch(100L).setCurrentMetadataOffset(123L).setWantShutDown(false);
        mockTime.sleep(100L);
        ctx.replicationControl.processExpiredBrokerHeartbeat(heartbeatRequest);
        Optional<BrokerHeartbeatManager.BrokerHeartbeatState> state = ctx.clusterControl.heartbeatManager().brokers().stream().filter(broker -> broker.id() == 0).findFirst();
        Assertions.assertTrue((boolean)state.isPresent());
        Assertions.assertEquals((int)0, (int)state.get().id());
        Assertions.assertEquals((long)123L, (long)state.get().metadataOffset());
    }

    @Test
    public void testReassignPartitionsHandlesNewReassignmentThatRemovesPreviouslyAddingReplicas() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replication = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2, 3, 4, 5);
        ctx.unfenceBrokers(0, 1, 2, 3, 4, 5);
        String topic = "topic-1";
        Uuid topicId = ctx.createTestTopic(topic, new int[][]{{0, 1}}).topicId();
        log.debug("Created topic with ID {}", (Object)topicId);
        Assertions.assertEquals((Object)NONE_REASSIGNING, (Object)replication.listPartitionReassignments(null, Long.MAX_VALUE));
        ControllerResult alterResultOne = replication.alterPartitionReassignments(new AlterPartitionReassignmentsRequestData().setTopics(List.of(new AlterPartitionReassignmentsRequestData.ReassignableTopic().setName(topic).setPartitions(List.of(new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(0).setReplicas(List.of(Integer.valueOf(2), Integer.valueOf(3))))))));
        Assertions.assertEquals((Object)new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(List.of(new AlterPartitionReassignmentsResponseData.ReassignableTopicResponse().setName(topic).setPartitions(List.of(new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(0).setErrorMessage(null))))), (Object)alterResultOne.response());
        ctx.replay(alterResultOne.records());
        ListPartitionReassignmentsResponseData currentReassigning = new ListPartitionReassignmentsResponseData().setErrorMessage(null).setTopics(List.of(new ListPartitionReassignmentsResponseData.OngoingTopicReassignment().setName(topic).setPartitions(List.of(new ListPartitionReassignmentsResponseData.OngoingPartitionReassignment().setPartitionIndex(0).setRemovingReplicas(List.of(Integer.valueOf(0), Integer.valueOf(1))).setAddingReplicas(List.of(Integer.valueOf(2), Integer.valueOf(3))).setReplicas(List.of(Integer.valueOf(2), Integer.valueOf(3), Integer.valueOf(0), Integer.valueOf(1)))))));
        Assertions.assertEquals((Object)currentReassigning, (Object)replication.listPartitionReassignments(null, Long.MAX_VALUE));
        PartitionRegistration partition = replication.getPartition(topicId, 0);
        AlterPartitionRequestData alterPartitionRequestData = new AlterPartitionRequestData().setBrokerId(partition.leader).setBrokerEpoch(ctx.currentBrokerEpoch(partition.leader)).setTopics(List.of(new AlterPartitionRequestData.TopicData().setTopicId(topicId).setPartitions(List.of(new AlterPartitionRequestData.PartitionData().setPartitionIndex(0).setPartitionEpoch(partition.partitionEpoch).setLeaderEpoch(partition.leaderEpoch).setNewIsrWithEpochs(ReplicationControlManagerTest.isrWithDefaultEpoch(0, 1, 2))))));
        ControllerResult alterPartitionResult = replication.alterPartition(ControllerRequestContextUtil.anonymousContextFor(ApiKeys.ALTER_PARTITION), ((AlterPartitionRequest)new AlterPartitionRequest.Builder(alterPartitionRequestData).build()).data());
        Assertions.assertEquals((Object)new AlterPartitionResponseData().setTopics(List.of(new AlterPartitionResponseData.TopicData().setTopicId(topicId).setPartitions(List.of(new AlterPartitionResponseData.PartitionData().setPartitionIndex(0).setIsr(List.of(Integer.valueOf(0), Integer.valueOf(1), Integer.valueOf(2))).setPartitionEpoch(partition.partitionEpoch + 1).setErrorCode(Errors.NONE.code()))))), (Object)alterPartitionResult.response());
        ctx.replay(alterPartitionResult.records());
        ElectLeadersRequestData request = this.buildElectLeadersRequest(ElectionType.PREFERRED, Map.of(topic, List.of(Integer.valueOf(0))));
        ControllerResult electLeaderTwoResult = replication.electLeaders(request);
        ElectLeadersResponseData.ReplicaElectionResult replicaElectionResult = new ElectLeadersResponseData.ReplicaElectionResult().setTopic(topic);
        replicaElectionResult.setPartitionResult(List.of(new ElectLeadersResponseData.PartitionResult().setPartitionId(0).setErrorCode(Errors.NONE.code()).setErrorMessage(null)));
        Assertions.assertEquals((Object)new ElectLeadersResponseData().setErrorCode(Errors.NONE.code()).setReplicaElectionResults(List.of(replicaElectionResult)), (Object)electLeaderTwoResult.response());
        ctx.replay(electLeaderTwoResult.records());
        partition = replication.getPartition(topicId, 0);
        Assertions.assertEquals((int)2, (int)partition.leader);
        ControllerResult alterResultTwo = replication.alterPartitionReassignments(new AlterPartitionReassignmentsRequestData().setTopics(List.of(new AlterPartitionReassignmentsRequestData.ReassignableTopic().setName(topic).setPartitions(List.of(new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(0).setReplicas(List.of(Integer.valueOf(4), Integer.valueOf(5))))))));
        Assertions.assertEquals((Object)new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(List.of(new AlterPartitionReassignmentsResponseData.ReassignableTopicResponse().setName(topic).setPartitions(List.of(new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(0).setErrorMessage(null))))), (Object)alterResultTwo.response());
        ctx.replay(alterResultTwo.records());
        currentReassigning = new ListPartitionReassignmentsResponseData().setErrorMessage(null).setTopics(List.of(new ListPartitionReassignmentsResponseData.OngoingTopicReassignment().setName(topic).setPartitions(List.of(new ListPartitionReassignmentsResponseData.OngoingPartitionReassignment().setPartitionIndex(0).setRemovingReplicas(List.of(Integer.valueOf(0), Integer.valueOf(1), Integer.valueOf(2), Integer.valueOf(3))).setAddingReplicas(List.of(Integer.valueOf(4), Integer.valueOf(5))).setReplicas(List.of(Integer.valueOf(4), Integer.valueOf(5), Integer.valueOf(0), Integer.valueOf(1), Integer.valueOf(2), Integer.valueOf(3)))))));
        Assertions.assertEquals((Object)currentReassigning, (Object)replication.listPartitionReassignments(null, Long.MAX_VALUE));
        partition = replication.getPartition(topicId, 0);
        Assertions.assertEquals((int)2, (int)partition.leader);
        Assertions.assertTrue((boolean)Replicas.toSet((int[])partition.replicas).contains(partition.leader));
        AlterPartitionRequestData alterPartitionRequestDataTwo = new AlterPartitionRequestData().setBrokerId(partition.leader).setBrokerEpoch(ctx.currentBrokerEpoch(partition.leader)).setTopics(List.of(new AlterPartitionRequestData.TopicData().setTopicId(topicId).setPartitions(List.of(new AlterPartitionRequestData.PartitionData().setPartitionIndex(0).setPartitionEpoch(partition.partitionEpoch).setLeaderEpoch(partition.leaderEpoch).setNewIsrWithEpochs(ReplicationControlManagerTest.isrWithDefaultEpoch(0, 1, 2, 3, 4, 5))))));
        ControllerResult alterPartitionResultTwo = replication.alterPartition(ControllerRequestContextUtil.anonymousContextFor(ApiKeys.ALTER_PARTITION), ((AlterPartitionRequest)new AlterPartitionRequest.Builder(alterPartitionRequestDataTwo).build()).data());
        Assertions.assertEquals((Object)new AlterPartitionResponseData().setTopics(List.of(new AlterPartitionResponseData.TopicData().setTopicId(topicId).setPartitions(List.of(new AlterPartitionResponseData.PartitionData().setPartitionIndex(0).setErrorCode(Errors.NEW_LEADER_ELECTED.code()))))), (Object)alterPartitionResultTwo.response());
        ctx.replay(alterPartitionResultTwo.records());
        partition = replication.getPartition(topicId, 0);
        Assertions.assertEquals((int)4, (int)partition.leader);
        Assertions.assertEquals((Object)NONE_REASSIGNING, (Object)replication.listPartitionReassignments(null, Long.MAX_VALUE));
    }

    private static AlterPartitionRequestData.BrokerState brokerState(int brokerId, Long brokerEpoch) {
        return new AlterPartitionRequestData.BrokerState().setBrokerId(brokerId).setBrokerEpoch(brokerEpoch.longValue());
    }

    private static Long defaultBrokerEpoch(int brokerId) {
        return (long)brokerId + 100L;
    }

    private static List<AlterPartitionRequestData.BrokerState> isrWithDefaultEpoch(Integer ... isr) {
        return Arrays.stream(isr).map(brokerId -> ReplicationControlManagerTest.brokerState(brokerId, ReplicationControlManagerTest.defaultBrokerEpoch(brokerId))).collect(Collectors.toList());
    }

    @Test
    public void testDuplicateTopicIdReplay() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        replicationControl.replay(new TopicRecord().setName("foo").setTopicId(Uuid.fromString((String)"Ktv3YkMQRe-MId4VkkrMyw")));
        Assertions.assertEquals((Object)"Found duplicate TopicRecord for foo with topic ID Ktv3YkMQRe-MId4VkkrMyw", (Object)((RuntimeException)Assertions.assertThrows(RuntimeException.class, () -> replicationControl.replay(new TopicRecord().setName("foo").setTopicId(Uuid.fromString((String)"Ktv3YkMQRe-MId4VkkrMyw"))))).getMessage());
        Assertions.assertEquals((Object)"Found duplicate TopicRecord for foo with a different ID than before. Previous ID was Ktv3YkMQRe-MId4VkkrMyw and new ID is 8auUWq8zQqe_99H_m2LAmw", (Object)((RuntimeException)Assertions.assertThrows(RuntimeException.class, () -> replicationControl.replay(new TopicRecord().setName("foo").setTopicId(Uuid.fromString((String)"8auUWq8zQqe_99H_m2LAmw"))))).getMessage());
    }

    @Test
    void testHandleAssignReplicasToDirsFailsOnOlderMv() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setMetadataVersion(MetadataVersion.IBP_3_7_IV1).build();
        Assertions.assertThrows(UnsupportedVersionException.class, () -> ctx.replicationControl.handleAssignReplicasToDirs(new AssignReplicasToDirsRequestData()));
    }

    @Test
    void testHandleAssignReplicasToDirs() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        final Uuid dir1b1 = Uuid.fromString((String)"hO2YI5bgRUmByNPHiHxjNQ");
        final Uuid dir2b1 = Uuid.fromString((String)"R3Gb1HLoTzuKMgAkH5Vtpw");
        Uuid dir1b2 = Uuid.fromString((String)"TBGa8UayQi6KguqF5nC0sw");
        final Uuid offlineDir = Uuid.fromString((String)"zvAf9BKZRyyrEWz4FX2nLA");
        ctx.registerBrokersWithDirs(1, List.of(dir1b1, dir2b1), 2, List.of(dir1b2));
        ctx.unfenceBrokers(1, 2);
        final Uuid topicA = ctx.createTestTopic("a", new int[][]{{1, 2}, {1, 2}, {1, 2}}).topicId();
        final Uuid topicB = ctx.createTestTopic("b", new int[][]{{1, 2}, {1, 2}}).topicId();
        final Uuid topicC = ctx.createTestTopic("c", new int[][]{{2}}).topicId();
        ControllerResult<AssignReplicasToDirsResponseData> controllerResult = ctx.assignReplicasToDirs(1, (Map<TopicIdPartition, Uuid>)new HashMap<TopicIdPartition, Uuid>(){
            {
                this.put(new TopicIdPartition(topicA, 0), dir1b1);
                this.put(new TopicIdPartition(topicA, 1), dir2b1);
                this.put(new TopicIdPartition(topicA, 2), offlineDir);
                this.put(new TopicIdPartition(topicB, 0), dir1b1);
                this.put(new TopicIdPartition(topicB, 1), DirectoryId.LOST);
                this.put(new TopicIdPartition(Uuid.fromString((String)"nLU9hKNXSZuMe5PO2A4dVQ"), 1), dir2b1);
                this.put(new TopicIdPartition(topicA, 137), dir1b1);
                this.put(new TopicIdPartition(topicC, 0), dir1b1);
            }
        });
        Assertions.assertEquals((Object)AssignmentsHelper.normalize((AssignReplicasToDirsResponseData)AssignmentsHelper.buildResponseData((short)0, (int)0, (Map)new HashMap<Uuid, Map<TopicIdPartition, Errors>>(){
            {
                this.put(dir1b1, new HashMap<TopicIdPartition, Errors>(){
                    {
                        this.put(new TopicIdPartition(topicA, 0), Errors.NONE);
                        this.put(new TopicIdPartition(topicA, 137), Errors.UNKNOWN_TOPIC_OR_PARTITION);
                        this.put(new TopicIdPartition(topicB, 0), Errors.NONE);
                        this.put(new TopicIdPartition(topicC, 0), Errors.NOT_LEADER_OR_FOLLOWER);
                    }
                });
                this.put(dir2b1, new HashMap<TopicIdPartition, Errors>(){
                    {
                        this.put(new TopicIdPartition(topicA, 1), Errors.NONE);
                        this.put(new TopicIdPartition(Uuid.fromString((String)"nLU9hKNXSZuMe5PO2A4dVQ"), 1), Errors.UNKNOWN_TOPIC_ID);
                    }
                });
                this.put(offlineDir, new HashMap<TopicIdPartition, Errors>(){
                    {
                        this.put(new TopicIdPartition(topicA, 2), Errors.NONE);
                    }
                });
                this.put(DirectoryId.LOST, new HashMap<TopicIdPartition, Errors>(){
                    {
                        this.put(new TopicIdPartition(topicB, 1), Errors.NONE);
                    }
                });
            }
        })), (Object)AssignmentsHelper.normalize((AssignReplicasToDirsResponseData)((AssignReplicasToDirsResponseData)controllerResult.response())));
        short recordVersion = ctx.featureControl.metadataVersionOrThrow().partitionChangeRecordVersion();
        Assertions.assertEquals(ReplicationControlManagerTest.sortPartitionChangeRecords(List.of(new ApiMessageAndVersion((ApiMessage)new PartitionChangeRecord().setTopicId(topicA).setPartitionId(0).setDirectories(List.of(dir1b1, dir1b2)), recordVersion), new ApiMessageAndVersion((ApiMessage)new PartitionChangeRecord().setTopicId(topicA).setPartitionId(1).setDirectories(List.of(dir2b1, dir1b2)), recordVersion), new ApiMessageAndVersion((ApiMessage)new PartitionChangeRecord().setTopicId(topicA).setPartitionId(2).setDirectories(List.of(offlineDir, dir1b2)), recordVersion), new ApiMessageAndVersion((ApiMessage)new PartitionChangeRecord().setTopicId(topicB).setPartitionId(0).setDirectories(List.of(dir1b1, dir1b2)), recordVersion), new ApiMessageAndVersion((ApiMessage)new PartitionChangeRecord().setTopicId(topicB).setPartitionId(1).setDirectories(List.of(DirectoryId.LOST, dir1b2)), recordVersion), new ApiMessageAndVersion((ApiMessage)new PartitionChangeRecord().setTopicId(topicA).setPartitionId(2).setIsr(List.of(Integer.valueOf(2))).setLeader(2), recordVersion), new ApiMessageAndVersion((ApiMessage)new PartitionChangeRecord().setTopicId(topicB).setPartitionId(1).setIsr(List.of(Integer.valueOf(2))).setLeader(2), recordVersion))), ReplicationControlManagerTest.sortPartitionChangeRecords(controllerResult.records()));
        ctx.replay(controllerResult.records());
        Assertions.assertEquals((Object)new HashSet<TopicIdPartition>(){
            {
                this.add(new TopicIdPartition(topicA, 0));
                this.add(new TopicIdPartition(topicA, 1));
                this.add(new TopicIdPartition(topicB, 0));
            }
        }, RecordTestUtils.iteratorToSet(ctx.replicationControl.brokersToIsrs().iterator(1, true)));
        Assertions.assertEquals((Object)new HashSet<TopicIdPartition>(){
            {
                this.add(new TopicIdPartition(topicA, 2));
                this.add(new TopicIdPartition(topicB, 1));
                this.add(new TopicIdPartition(topicC, 0));
            }
        }, RecordTestUtils.iteratorToSet(ctx.replicationControl.brokersToIsrs().iterator(2, true)));
    }

    @Test
    void testHandleDirectoriesOffline() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        int b1 = 101;
        int b2 = 102;
        final Uuid dir1b1 = Uuid.fromString((String)"suitdzfTTdqoWcy8VqmkUg");
        final Uuid dir2b1 = Uuid.fromString((String)"yh3acnzGSeurSTj8aIhOjw");
        final Uuid dir1b2 = Uuid.fromString((String)"OmpmJ8RjQliQlEFht56DwQ");
        final Uuid dir2b2 = Uuid.fromString((String)"w05baLpsT5Oz0LvKTKXoDw");
        ctx.registerBrokersWithDirs(b1, List.of(dir1b1, dir2b1), b2, List.of(dir1b2, dir2b2));
        ctx.unfenceBrokers(b1, b2);
        final Uuid topicA = ctx.createTestTopic("a", new int[][]{{b1, b2}, {b1, b2}}).topicId();
        final Uuid topicB = ctx.createTestTopic("b", new int[][]{{b1, b2}, {b1, b2}}).topicId();
        ctx.assignReplicasToDirs(b1, (Map<TopicIdPartition, Uuid>)new HashMap<TopicIdPartition, Uuid>(){
            {
                this.put(new TopicIdPartition(topicA, 0), dir1b1);
                this.put(new TopicIdPartition(topicA, 1), dir2b1);
                this.put(new TopicIdPartition(topicB, 0), dir1b1);
                this.put(new TopicIdPartition(topicB, 1), dir2b1);
            }
        });
        ctx.assignReplicasToDirs(b2, (Map<TopicIdPartition, Uuid>)new HashMap<TopicIdPartition, Uuid>(){
            {
                this.put(new TopicIdPartition(topicA, 0), dir1b2);
                this.put(new TopicIdPartition(topicA, 1), dir2b2);
                this.put(new TopicIdPartition(topicB, 0), dir1b2);
                this.put(new TopicIdPartition(topicB, 1), dir2b2);
            }
        });
        ArrayList<ApiMessageAndVersion> records = new ArrayList<ApiMessageAndVersion>();
        ctx.replicationControl.handleDirectoriesOffline(b1, ReplicationControlManagerTest.defaultBrokerEpoch(b1).longValue(), List.of(dir1b1, dir1b2), records);
        Assertions.assertEquals(List.of(new ApiMessageAndVersion((ApiMessage)new BrokerRegistrationChangeRecord().setBrokerId(b1).setBrokerEpoch(ReplicationControlManagerTest.defaultBrokerEpoch(b1).longValue()).setLogDirs(List.of(dir2b1)), 2)), ReplicationControlManagerTest.filter(records, BrokerRegistrationChangeRecord.class));
        short partitionChangeRecordVersion = ctx.featureControl.metadataVersionOrThrow().partitionChangeRecordVersion();
        Assertions.assertEquals(ReplicationControlManagerTest.sortPartitionChangeRecords(List.of(new ApiMessageAndVersion((ApiMessage)new PartitionChangeRecord().setTopicId(topicA).setPartitionId(0).setLeader(b2).setIsr(List.of(Integer.valueOf(b2))), partitionChangeRecordVersion), new ApiMessageAndVersion((ApiMessage)new PartitionChangeRecord().setTopicId(topicB).setPartitionId(0).setLeader(b2).setIsr(List.of(Integer.valueOf(b2))), partitionChangeRecordVersion))), ReplicationControlManagerTest.sortPartitionChangeRecords(ReplicationControlManagerTest.filter(records, PartitionChangeRecord.class)));
        Assertions.assertEquals((int)3, (int)records.size());
        ctx.replay(records);
        Assertions.assertEquals(List.of(dir2b1), (Object)ctx.clusterControl.registration(b1).directories());
    }

    private static List<ApiMessageAndVersion> sortPartitionChangeRecords(List<ApiMessageAndVersion> records) {
        records = new ArrayList<ApiMessageAndVersion>(records);
        records.sort(Comparator.comparing(record -> {
            PartitionChangeRecord partitionChangeRecord = (PartitionChangeRecord)record.message();
            return String.valueOf(partitionChangeRecord.topicId()) + "-" + partitionChangeRecord.partitionId();
        }));
        return records;
    }

    private static List<ApiMessageAndVersion> filter(List<ApiMessageAndVersion> records, Class<? extends ApiMessage> clazz) {
        return records.stream().filter(r -> clazz.equals(r.message().getClass())).collect(Collectors.toList());
    }

    @ParameterizedTest
    @CsvSource(value={"false, false", "false, true", "true, false", "true, true"})
    void testElrsRemovedOnMinIsrUpdate(boolean clusterLevel, boolean useLegacyAlterConfigs) {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setIsElrEnabled(true).setStaticConfig("min.insync.replicas", "2").build();
        ctx.registerBrokers(1, 2, 3, 4);
        ctx.unfenceBrokers(1, 2, 3, 4);
        Uuid fooId = ctx.createTestTopic("foo", new int[][]{{1, 2, 4}, {1, 3, 4}}).topicId();
        Uuid barId = ctx.createTestTopic("bar", new int[][]{{1, 2, 4}, {1, 3, 4}}).topicId();
        ctx.fenceBrokers(4);
        ctx.fenceBrokers(1);
        Assertions.assertArrayEquals((int[])new int[]{1}, (int[])ctx.replicationControl.getPartition((Uuid)fooId, (int)0).elr);
        Assertions.assertArrayEquals((int[])new int[]{1}, (int[])ctx.replicationControl.getPartition((Uuid)barId, (int)0).elr);
        ConfigResource configResource = clusterLevel ? new ConfigResource(ConfigResource.Type.BROKER, "") : new ConfigResource(ConfigResource.Type.TOPIC, "foo");
        if (useLegacyAlterConfigs) {
            ctx.replay(ctx.configurationControl.legacyAlterConfigs(Map.of(configResource, Map.of("min.insync.replicas", "1")), false).records());
        } else {
            ctx.replay(ctx.configurationControl.incrementalAlterConfigs(Map.of(configResource, Map.of("min.insync.replicas", new AbstractMap.SimpleImmutableEntry<AlterConfigOp.OpType, String>(AlterConfigOp.OpType.SET, "1"))), false).records());
        }
        Assertions.assertArrayEquals((int[])new int[0], (int[])ctx.replicationControl.getPartition((Uuid)fooId, (int)0).elr);
        if (clusterLevel) {
            Assertions.assertArrayEquals((int[])new int[0], (int[])ctx.replicationControl.getPartition((Uuid)barId, (int)0).elr);
        } else {
            Assertions.assertArrayEquals((int[])new int[]{1}, (int[])ctx.replicationControl.getPartition((Uuid)barId, (int)0).elr);
        }
    }

    @Test
    void testElrsRemovedShouldNotBumpPartitionEpochIfNoChange() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setIsElrEnabled(true).setStaticConfig("min.insync.replicas", "2").build();
        ctx.registerBrokers(1, 2, 3, 4);
        ctx.unfenceBrokers(1, 2, 3, 4);
        Uuid fooId = ctx.createTestTopic("foo", new int[][]{{1, 2, 4}, {1, 3, 4}}).topicId();
        int partitionEpoch = ctx.replicationControl.getPartition((Uuid)fooId, (int)0).partitionEpoch;
        ctx.replay(List.of(new ApiMessageAndVersion((ApiMessage)new ClearElrRecord(), MetadataRecordType.CLEAR_ELR_RECORD.highestSupportedVersion())));
        Assertions.assertEquals((int)partitionEpoch, (int)ctx.replicationControl.getPartition((Uuid)fooId, (int)0).partitionEpoch);
    }

    private static class ReplicationControlTestContext {
        final SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
        final LogContext logContext = new LogContext();
        final MockTime time;
        final MockRandom random = new MockRandom();
        final FeatureControlManager featureControl;
        final ClusterControlManager clusterControl;
        final ConfigurationControlManager configurationControl;
        final ReplicationControlManager replicationControl;
        final OffsetControlManager offsetControlManager;

        void replay(List<ApiMessageAndVersion> records) {
            RecordTestUtils.replayAll(this.clusterControl, records);
            RecordTestUtils.replayAll(this.configurationControl, records);
            RecordTestUtils.replayAll(this.replicationControl, records);
        }

        private ReplicationControlTestContext(MetadataVersion metadataVersion, Optional<CreateTopicPolicy> createTopicPolicy, MockTime time, boolean isElrEnabled, Map<String, Object> staticConfig) {
            this.time = time;
            this.featureControl = new FeatureControlManager.Builder().setSnapshotRegistry(this.snapshotRegistry).setQuorumFeatures(new QuorumFeatures(0, QuorumFeatures.defaultSupportedFeatureMap((boolean)true), List.of(Integer.valueOf(0)))).build();
            this.featureControl.replay(new FeatureLevelRecord().setName("metadata.version").setFeatureLevel(metadataVersion.featureLevel()));
            this.featureControl.replay(new FeatureLevelRecord().setName("eligible.leader.replicas.version").setFeatureLevel(isElrEnabled ? EligibleLeaderReplicasVersion.ELRV_1.featureLevel() : EligibleLeaderReplicasVersion.ELRV_0.featureLevel()));
            this.clusterControl = new ClusterControlManager.Builder().setLogContext(this.logContext).setTime((Time)time).setSnapshotRegistry(this.snapshotRegistry).setSessionTimeoutNs(TimeUnit.MILLISECONDS.convert(1000L, TimeUnit.NANOSECONDS)).setReplicaPlacer((ReplicaPlacer)new StripedReplicaPlacer((Random)this.random)).setFeatureControlManager(this.featureControl).setBrokerShutdownHandler(this::handleBrokerShutdown).build();
            this.configurationControl = new ConfigurationControlManager.Builder().setSnapshotRegistry(this.snapshotRegistry).setFeatureControl(this.featureControl).setStaticConfig(staticConfig).setKafkaConfigSchema(FakeKafkaConfigSchema.INSTANCE).build();
            this.offsetControlManager = new OffsetControlManager.Builder().setSnapshotRegistry(this.snapshotRegistry).build();
            this.replicationControl = new ReplicationControlManager.Builder().setSnapshotRegistry(this.snapshotRegistry).setLogContext(this.logContext).setMaxElectionsPerImbalance(Integer.MAX_VALUE).setConfigurationControl(this.configurationControl).setClusterControl(this.clusterControl).setCreateTopicPolicy(createTopicPolicy).setFeatureControl(this.featureControl).build();
            this.clusterControl.activate();
        }

        void handleBrokerShutdown(int brokerId, boolean isCleanShutdown, List<ApiMessageAndVersion> records) {
            this.replicationControl.handleBrokerShutdown(brokerId, isCleanShutdown, records);
        }

        CreateTopicsResponseData.CreatableTopicResult createTestTopic(String name, int numPartitions, short replicationFactor, short expectedErrorCode) {
            CreateTopicsRequestData request = new CreateTopicsRequestData();
            CreateTopicsRequestData.CreatableTopic topic = new CreateTopicsRequestData.CreatableTopic().setName(name);
            topic.setNumPartitions(numPartitions).setReplicationFactor(replicationFactor);
            request.topics().add((ImplicitLinkedHashCollection.Element)topic);
            ControllerRequestContext requestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.CREATE_TOPICS);
            ControllerResult result = this.replicationControl.createTopics(requestContext, request, Set.of(name));
            CreateTopicsResponseData.CreatableTopicResult topicResult = ((CreateTopicsResponseData)result.response()).topics().find(name);
            Assertions.assertNotNull((Object)topicResult);
            Assertions.assertEquals((short)expectedErrorCode, (short)topicResult.errorCode());
            if (expectedErrorCode == Errors.NONE.code()) {
                this.replay(result.records());
            }
            return topicResult;
        }

        CreateTopicsResponseData.CreatableTopicResult createTestTopic(String name, int[][] replicas) {
            return this.createTestTopic(name, replicas, Map.of(), (short)0);
        }

        CreateTopicsResponseData.CreatableTopicResult createTestTopic(String name, int[][] replicas, short expectedErrorCode) {
            return this.createTestTopic(name, replicas, Map.of(), expectedErrorCode);
        }

        CreateTopicsResponseData.CreatableTopicResult createTestTopic(String name, int[][] replicas, Map<String, String> configs, short expectedErrorCode) {
            Assertions.assertNotEquals((int)0, (int)replicas.length);
            CreateTopicsRequestData request = new CreateTopicsRequestData();
            CreateTopicsRequestData.CreatableTopic topic = new CreateTopicsRequestData.CreatableTopic().setName(name);
            topic.setNumPartitions(-1).setReplicationFactor((short)-1);
            for (int i = 0; i < replicas.length; ++i) {
                topic.assignments().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableReplicaAssignment().setPartitionIndex(i).setBrokerIds(Replicas.toList((int[])replicas[i])));
            }
            configs.forEach((key, value) -> topic.configs().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopicConfig().setName(key).setValue(value)));
            request.topics().add((ImplicitLinkedHashCollection.Element)topic);
            ControllerRequestContext requestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.CREATE_TOPICS);
            ControllerResult result = this.replicationControl.createTopics(requestContext, request, Set.of(name));
            CreateTopicsResponseData.CreatableTopicResult topicResult = ((CreateTopicsResponseData)result.response()).topics().find(name);
            Assertions.assertNotNull((Object)topicResult);
            Assertions.assertEquals((short)expectedErrorCode, (short)topicResult.errorCode());
            if (expectedErrorCode == Errors.NONE.code()) {
                Assertions.assertEquals((int)replicas.length, (int)topicResult.numPartitions());
                Assertions.assertEquals((int)replicas[0].length, (int)topicResult.replicationFactor());
                this.replay(result.records());
            }
            return topicResult;
        }

        void deleteTopic(ControllerRequestContext context, Uuid topicId) {
            ControllerResult result = this.replicationControl.deleteTopics(context, Set.of(topicId));
            Assertions.assertEquals(Set.of(topicId), ((Map)result.response()).keySet());
            Assertions.assertEquals((Object)Errors.NONE, (Object)((ApiError)((Map)result.response()).get(topicId)).error());
            Assertions.assertEquals((int)1, (int)result.records().size());
            ApiMessageAndVersion removeRecordAndVersion = (ApiMessageAndVersion)result.records().get(0);
            Assertions.assertInstanceOf(RemoveTopicRecord.class, (Object)removeRecordAndVersion.message());
            RemoveTopicRecord removeRecord = (RemoveTopicRecord)removeRecordAndVersion.message();
            Assertions.assertEquals((Object)topicId, (Object)removeRecord.topicId());
            this.replay(result.records());
        }

        void createPartitions(int count, String name, int[][] replicas, short expectedErrorCode) {
            Assertions.assertNotEquals((int)0, (int)replicas.length);
            CreatePartitionsRequestData.CreatePartitionsTopic topic = new CreatePartitionsRequestData.CreatePartitionsTopic().setName(name).setCount(count);
            for (int[] replica : replicas) {
                topic.assignments().add(new CreatePartitionsRequestData.CreatePartitionsAssignment().setBrokerIds(Replicas.toList((int[])replica)));
            }
            ControllerRequestContext requestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.CREATE_PARTITIONS);
            ControllerResult result = this.replicationControl.createPartitions(requestContext, List.of(topic));
            Assertions.assertEquals((int)1, (int)((List)result.response()).size());
            CreatePartitionsResponseData.CreatePartitionsTopicResult topicResult = (CreatePartitionsResponseData.CreatePartitionsTopicResult)((List)result.response()).get(0);
            Assertions.assertEquals((Object)name, (Object)topicResult.name());
            Assertions.assertEquals((short)expectedErrorCode, (short)topicResult.errorCode());
            this.replay(result.records());
        }

        void registerBrokers(Integer ... brokerIds) {
            Object[] brokersAndDirs = new Object[brokerIds.length * 2];
            for (int i = 0; i < brokerIds.length; ++i) {
                brokersAndDirs[i * 2] = brokerIds[i];
                brokersAndDirs[i * 2 + 1] = List.of(Uuid.fromString((String)("TESTBROKER" + Integer.toString(100000 + brokerIds[i]).substring(1) + "DIRAAAA")));
            }
            this.registerBrokersWithDirs(brokersAndDirs);
        }

        void registerBrokersWithDirs(Object ... brokerIdsAndDirs) {
            if (brokerIdsAndDirs.length % 2 != 0) {
                throw new IllegalArgumentException("uneven number of arguments");
            }
            for (int i = 0; i < brokerIdsAndDirs.length / 2; ++i) {
                int brokerId = (Integer)brokerIdsAndDirs[i * 2];
                List logDirs = (List)brokerIdsAndDirs[i * 2 + 1];
                RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().setBrokerEpoch(ReplicationControlManagerTest.defaultBrokerEpoch(brokerId).longValue()).setBrokerId(brokerId).setRack(null).setLogDirs(logDirs);
                brokerRecord.endPoints().add((ImplicitLinkedHashCollection.Element)new RegisterBrokerRecord.BrokerEndpoint().setSecurityProtocol(SecurityProtocol.PLAINTEXT.id).setPort(9092 + brokerId).setName("PLAINTEXT").setHost("localhost"));
                this.replay(List.of(new ApiMessageAndVersion((ApiMessage)brokerRecord, 3)));
            }
        }

        void handleBrokersShutdown(boolean isCleanShutdown, Integer ... brokerIds) {
            ArrayList<ApiMessageAndVersion> records = new ArrayList<ApiMessageAndVersion>();
            Integer[] integerArray = brokerIds;
            int n = integerArray.length;
            for (int i = 0; i < n; ++i) {
                int brokerId = integerArray[i];
                this.replicationControl.handleBrokerShutdown(brokerId, isCleanShutdown, records);
            }
            this.replay(records);
        }

        void alterPartition(TopicIdPartition topicIdPartition, int leaderId, List<AlterPartitionRequestData.BrokerState> isrWithEpoch, LeaderRecoveryState leaderRecoveryState) {
            BrokerRegistration registration = (BrokerRegistration)this.clusterControl.brokerRegistrations().get(leaderId);
            Assertions.assertFalse((boolean)registration.fenced());
            PartitionRegistration partition = this.replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
            Assertions.assertNotNull((Object)partition);
            Assertions.assertEquals((int)leaderId, (int)partition.leader);
            AlterPartitionRequestData.PartitionData partitionData = new AlterPartitionRequestData.PartitionData().setPartitionIndex(topicIdPartition.partitionId()).setPartitionEpoch(partition.partitionEpoch).setLeaderEpoch(partition.leaderEpoch).setLeaderRecoveryState(leaderRecoveryState.value()).setNewIsrWithEpochs(isrWithEpoch);
            AlterPartitionRequestData.TopicData topicData = new AlterPartitionRequestData.TopicData().setTopicId(topicIdPartition.topicId()).setPartitions(List.of(partitionData));
            ControllerRequestContext requestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.ALTER_PARTITION);
            ControllerResult alterPartition = this.replicationControl.alterPartition(requestContext, new AlterPartitionRequestData().setBrokerId(leaderId).setBrokerEpoch(registration.epoch()).setTopics(List.of(topicData)));
            this.replay(alterPartition.records());
        }

        void unfenceBrokers(Integer ... brokerIds) {
            Integer[] integerArray = brokerIds;
            int n = integerArray.length;
            for (int i = 0; i < n; ++i) {
                int brokerId = integerArray[i];
                this.clusterControl.trackBrokerHeartbeat(brokerId, ReplicationControlManagerTest.defaultBrokerEpoch(brokerId).longValue());
                ControllerResult result = this.replicationControl.processBrokerHeartbeat(new BrokerHeartbeatRequestData().setBrokerId(brokerId).setBrokerEpoch(ReplicationControlManagerTest.defaultBrokerEpoch(brokerId).longValue()).setCurrentMetadataOffset(1L).setWantFence(false).setWantShutDown(false), 0L);
                Assertions.assertEquals((Object)new BrokerHeartbeatReply(true, false, false, false), (Object)result.response());
                this.replay(result.records());
            }
        }

        void inControlledShutdownBrokers(Integer ... brokerIds) {
            Integer[] integerArray = brokerIds;
            int n = integerArray.length;
            for (int i = 0; i < n; ++i) {
                int brokerId = integerArray[i];
                BrokerRegistrationChangeRecord record = new BrokerRegistrationChangeRecord().setBrokerId(brokerId).setBrokerEpoch(ReplicationControlManagerTest.defaultBrokerEpoch(brokerId).longValue()).setInControlledShutdown(BrokerRegistrationInControlledShutdownChange.IN_CONTROLLED_SHUTDOWN.value());
                this.replay(List.of(new ApiMessageAndVersion((ApiMessage)record, 1)));
            }
        }

        void alterTopicConfig(String topic, String configKey, String configValue) {
            ConfigRecord configRecord = new ConfigRecord().setResourceType(ConfigResource.Type.TOPIC.id()).setResourceName(topic).setName(configKey).setValue(configValue);
            this.replay(List.of(new ApiMessageAndVersion((ApiMessage)configRecord, 0)));
        }

        void fenceBrokers(Integer ... brokerIds) {
            this.fenceBrokers(Set.of(brokerIds));
        }

        void fenceBrokers(Set<Integer> brokerIds) {
            ControllerResult fenceResult;
            this.time.sleep(1000L);
            Set<Integer> unfencedBrokerIds = this.clusterControl.brokerRegistrations().keySet().stream().filter(brokerId -> !brokerIds.contains(brokerId)).collect(Collectors.toSet());
            this.unfenceBrokers(unfencedBrokerIds.toArray(new Integer[0]));
            do {
                fenceResult = this.replicationControl.maybeFenceOneStaleBroker();
                this.replay(fenceResult.records());
            } while (((Boolean)fenceResult.response()).booleanValue());
            Assertions.assertEquals(brokerIds, this.fencedBrokerIds());
        }

        long currentBrokerEpoch(int brokerId) {
            Map registrations = this.clusterControl.brokerRegistrations();
            BrokerRegistration registration = (BrokerRegistration)registrations.get(brokerId);
            Assertions.assertNotNull((Object)registration, (String)("No current registration for broker " + brokerId));
            return registration.epoch();
        }

        OptionalInt currentLeader(TopicIdPartition topicIdPartition) {
            PartitionRegistration partition = this.replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
            return partition.leader < 0 ? OptionalInt.empty() : OptionalInt.of(partition.leader);
        }

        ControllerResult<AssignReplicasToDirsResponseData> assignReplicasToDirs(int brokerId, Map<TopicIdPartition, Uuid> assignment) {
            ControllerResult result = this.replicationControl.handleAssignReplicasToDirs(AssignmentsHelper.buildRequestData((int)brokerId, (long)ReplicationControlManagerTest.defaultBrokerEpoch(brokerId), assignment));
            Assertions.assertNotNull((Object)result.response());
            Assertions.assertEquals((short)Errors.NONE.code(), (short)((AssignReplicasToDirsResponseData)result.response()).errorCode());
            this.replay(result.records());
            return result;
        }

        Set<Integer> fencedBrokerIds() {
            return this.clusterControl.brokerRegistrations().values().stream().filter(BrokerRegistration::fenced).map(BrokerRegistration::id).collect(Collectors.toSet());
        }

        private static class Builder {
            private Optional<CreateTopicPolicy> createTopicPolicy = Optional.empty();
            private MetadataVersion metadataVersion = MetadataVersion.latestTesting();
            private MockTime mockTime = new MockTime();
            private boolean isElrEnabled = false;
            private final Map<String, Object> staticConfig = new HashMap<String, Object>();

            private Builder() {
            }

            Builder setCreateTopicPolicy(CreateTopicPolicy createTopicPolicy) {
                this.createTopicPolicy = Optional.of(createTopicPolicy);
                return this;
            }

            Builder setMetadataVersion(MetadataVersion metadataVersion) {
                this.metadataVersion = metadataVersion;
                return this;
            }

            Builder setIsElrEnabled(boolean isElrEnabled) {
                this.isElrEnabled = isElrEnabled;
                return this;
            }

            Builder setStaticConfig(String key, Object value) {
                this.staticConfig.put(key, value);
                return this;
            }

            Builder setMockTime(MockTime mockTime) {
                this.mockTime = mockTime;
                return this;
            }

            ReplicationControlTestContext build() {
                return new ReplicationControlTestContext(this.metadataVersion, this.createTopicPolicy, this.mockTime, this.isElrEnabled, this.staticConfig);
            }
        }
    }

    private static class MockCreateTopicPolicy
    implements CreateTopicPolicy {
        private final List<CreateTopicPolicy.RequestMetadata> expecteds;
        private final AtomicLong index = new AtomicLong(0L);

        MockCreateTopicPolicy(List<CreateTopicPolicy.RequestMetadata> expecteds) {
            this.expecteds = expecteds;
        }

        public void validate(CreateTopicPolicy.RequestMetadata actual) throws PolicyViolationException {
            long curIndex = this.index.getAndIncrement();
            if (curIndex >= (long)this.expecteds.size()) {
                throw new PolicyViolationException("Unexpected topic creation: index out of range at " + curIndex);
            }
            CreateTopicPolicy.RequestMetadata expected = this.expecteds.get((int)curIndex);
            if (!expected.equals((Object)actual)) {
                throw new PolicyViolationException("Expected: " + String.valueOf(expected) + ". Got: " + String.valueOf(actual));
            }
        }

        public void close() {
        }

        public void configure(Map<String, ?> configs) {
        }
    }
}

