/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.controller;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
import org.apache.kafka.common.errors.PolicyViolationException;
import org.apache.kafka.common.errors.StaleBrokerEpochException;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.AlterPartitionRequestData;
import org.apache.kafka.common.message.AlterPartitionResponseData;
import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
import org.apache.kafka.common.message.CreateClusterLinksRequestData;
import org.apache.kafka.common.message.CreatePartitionsRequestData;
import org.apache.kafka.common.message.CreatePartitionsResponseData;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsResponseData;
import org.apache.kafka.common.message.ElectLeadersRequestData;
import org.apache.kafka.common.message.ElectLeadersResponseData;
import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.BrokerReplicaExclusionRecord;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.MetadataRecordType;
import org.apache.kafka.common.metadata.MirrorTopicRecord;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
import org.apache.kafka.controller.ClusterControlManager;
import org.apache.kafka.controller.ClusterLinkControlManager;
import org.apache.kafka.controller.ConfigurationControlManager;
import org.apache.kafka.controller.ControllerMetrics;
import org.apache.kafka.controller.ControllerRequestContext;
import org.apache.kafka.controller.ControllerRequestContextUtil;
import org.apache.kafka.controller.ControllerResult;
import org.apache.kafka.controller.FeatureControlManager;
import org.apache.kafka.controller.MirrorTopicControlManager;
import org.apache.kafka.controller.MockControllerMetrics;
import org.apache.kafka.controller.QuorumFeatures;
import org.apache.kafka.controller.ReplicationControlManager;
import org.apache.kafka.controller.ResultOrError;
import org.apache.kafka.controller.TopicIdPartition;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.metadata.BrokerRegistrationInControlledShutdownChange;
import org.apache.kafka.metadata.ConfluentPartitionsPerTopicListener;
import org.apache.kafka.metadata.LeaderRecoveryState;
import org.apache.kafka.metadata.MirrorTopic;
import org.apache.kafka.metadata.MockRandom;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.metadata.Replicas;
import org.apache.kafka.metadata.placement.ReplicaPlacer;
import org.apache.kafka.metadata.placement.StripedReplicaPlacer;
import org.apache.kafka.metadata.placement.UsableBroker;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.policy.CreateTopicPolicy;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Timeout(value=40L)
public class ReplicationControlManagerTest {
    private static final Logger log = LoggerFactory.getLogger(ReplicationControlManagerTest.class);
    private static final int BROKER_SESSION_TIMEOUT_MS = 1000;
    private static final int MAX_NUM_CLUSTER_TOPICS = 3;
    private static final int MAX_NUM_TENANT_PARTITIONS = 20;
    private static final ListPartitionReassignmentsResponseData NONE_REASSIGNING = new ListPartitionReassignmentsResponseData().setErrorMessage(null);

    @Test
    public void testCreateTopics() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        CreateTopicsRequestData request = new CreateTopicsRequestData();
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions(-1).setReplicationFactor((short)-1));
        ControllerResult result = replicationControl.createTopics(request, Collections.singleton("foo"), KafkaPrincipal.ANONYMOUS);
        CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData();
        expectedResponse.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsResponseData.CreatableTopicResult().setName("foo").setErrorCode(Errors.INVALID_REPLICATION_FACTOR.code()).setErrorMessage("Unable to replicate the partition 3 time(s): All brokers are currently fenced."));
        Assertions.assertEquals((Object)expectedResponse, (Object)result.response());
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0);
        ctx.inControlledShutdownBrokers(0);
        ControllerResult result2 = replicationControl.createTopics(request, Collections.singleton("foo"), KafkaPrincipal.ANONYMOUS);
        CreateTopicsResponseData expectedResponse2 = new CreateTopicsResponseData();
        expectedResponse2.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsResponseData.CreatableTopicResult().setName("foo").setErrorCode(Errors.INVALID_REPLICATION_FACTOR.code()).setErrorMessage("Unable to replicate the partition 3 time(s): All brokers are currently fenced or in controlled shutdown."));
        Assertions.assertEquals((Object)expectedResponse2, (Object)result2.response());
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        ControllerResult result3 = replicationControl.createTopics(request, Collections.singleton("foo"), KafkaPrincipal.ANONYMOUS);
        CreateTopicsResponseData expectedResponse3 = new CreateTopicsResponseData();
        expectedResponse3.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsResponseData.CreatableTopicResult().setName("foo").setNumPartitions(1).setReplicationFactor((short)3).setErrorMessage(null).setErrorCode((short)0).setTopicId(((CreateTopicsResponseData)result3.response()).topics().find("foo").topicId()));
        Assertions.assertEquals((Object)expectedResponse3, (Object)result3.response());
        ctx.replay(result3.records());
        Assertions.assertEquals((Object)new PartitionRegistration(new int[]{1, 2, 0}, new int[]{1, 2, 0}, Replicas.NONE, Replicas.NONE, 1, LeaderRecoveryState.RECOVERED, 0, 0), (Object)replicationControl.getPartition(((TopicRecord)((ApiMessageAndVersion)result3.records().get(0)).message()).topicId(), 0));
        ControllerResult result4 = replicationControl.createTopics(request, Collections.singleton("foo"), KafkaPrincipal.ANONYMOUS);
        CreateTopicsResponseData expectedResponse4 = new CreateTopicsResponseData();
        expectedResponse4.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsResponseData.CreatableTopicResult().setName("foo").setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code()).setErrorMessage("Topic 'foo' already exists."));
        Assertions.assertEquals((Object)expectedResponse4, (Object)result4.response());
        Uuid fooId = ((CreateTopicsResponseData)result3.response()).topics().find("foo").topicId();
        RecordTestUtils.assertBatchIteratorContains(Arrays.asList(Arrays.asList(new ApiMessageAndVersion((ApiMessage)new PartitionRecord().setPartitionId(0).setTopicId(fooId).setReplicas(Arrays.asList(1, 2, 0)).setIsr(Arrays.asList(1, 2, 0)).setRemovingReplicas(Collections.emptyList()).setAddingReplicas(Collections.emptyList()).setLeader(1).setLeaderEpoch(0).setPartitionEpoch(0), 0), new ApiMessageAndVersion((ApiMessage)new TopicRecord().setTopicId(fooId).setName("foo"), 0))), (Iterator<List<ApiMessageAndVersion>>)ctx.replicationControl.iterator(Long.MAX_VALUE));
    }

    @Test
    public void testCreateTopicsISRInvariants() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        CreateTopicsRequestData request = new CreateTopicsRequestData();
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions(-1).setReplicationFactor((short)-1));
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1);
        ctx.inControlledShutdownBrokers(1);
        ControllerResult result = replicationControl.createTopics(request, Collections.singleton("foo"), KafkaPrincipal.ANONYMOUS);
        CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData();
        expectedResponse.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsResponseData.CreatableTopicResult().setName("foo").setNumPartitions(1).setReplicationFactor((short)3).setErrorMessage(null).setErrorCode((short)0).setTopicId(((CreateTopicsResponseData)result.response()).topics().find("foo").topicId()));
        Assertions.assertEquals((Object)expectedResponse, (Object)result.response());
        ctx.replay(result.records());
        Assertions.assertEquals((Object)new PartitionRegistration(new int[]{1, 0, 2}, new int[]{0}, Replicas.NONE, Replicas.NONE, 0, LeaderRecoveryState.RECOVERED, 0, 0), (Object)replicationControl.getPartition(((TopicRecord)((ApiMessageAndVersion)result.records().get(0)).message()).topicId(), 0));
    }

    @Test
    public void testCreateTopicsWithConfigs() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        CreateTopicsRequestData.CreateableTopicConfigCollection validConfigs = new CreateTopicsRequestData.CreateableTopicConfigCollection();
        validConfigs.add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreateableTopicConfig().setName("foo").setValue("notNull"));
        CreateTopicsRequestData request1 = new CreateTopicsRequestData();
        request1.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions(-1).setReplicationFactor((short)-1).setConfigs(validConfigs));
        ControllerResult result1 = replicationControl.createTopics(request1, Collections.singleton("foo"), KafkaPrincipal.ANONYMOUS);
        Assertions.assertEquals((short)0, (short)((CreateTopicsResponseData)result1.response()).topics().find("foo").errorCode());
        ctx.replay(result1.records());
        Assertions.assertEquals((Object)"notNull", ctx.configurationControl.getConfigs(new ConfigResource(ConfigResource.Type.TOPIC, "foo")).get("foo"));
        CreateTopicsRequestData.CreateableTopicConfigCollection invalidConfigs = new CreateTopicsRequestData.CreateableTopicConfigCollection();
        invalidConfigs.add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreateableTopicConfig().setName("foo").setValue(null));
        CreateTopicsRequestData request2 = new CreateTopicsRequestData();
        request2.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("bar").setNumPartitions(-1).setReplicationFactor((short)-1).setConfigs(invalidConfigs));
        ControllerResult result2 = replicationControl.createTopics(request2, Collections.singleton("bar"), KafkaPrincipal.ANONYMOUS);
        Assertions.assertEquals((short)Errors.INVALID_CONFIG.code(), (short)((CreateTopicsResponseData)result2.response()).topics().find("bar").errorCode());
        Assertions.assertEquals((Object)"Null value not supported for topic configs: foo", (Object)((CreateTopicsResponseData)result2.response()).topics().find("bar").errorMessage());
    }

    @Test
    public void testCreateAndDeleteMirrorTopics() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        CreateTopicsRequestData request = new CreateTopicsRequestData();
        Uuid mirrorTopicId = Uuid.randomUuid();
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions(-1).setReplicationFactor((short)-1).setLinkName("test-link").setSourceTopicId(mirrorTopicId).setMirrorTopic("foo-origin"));
        ControllerResult createResult = replicationControl.createTopics(request, Collections.singleton("foo"), KafkaPrincipal.ANONYMOUS);
        Assertions.assertTrue((boolean)createResult.records().isEmpty());
        Assertions.assertEquals((short)Errors.CLUSTER_LINK_NOT_FOUND.code(), (short)((CreateTopicsResponseData)createResult.response()).topics().find("foo").errorCode());
        Uuid linkId = ctx.createClusterLink("test-link");
        createResult = replicationControl.createTopics(request, Collections.singleton("foo"), KafkaPrincipal.ANONYMOUS);
        Assertions.assertFalse((boolean)createResult.records().isEmpty());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)((CreateTopicsResponseData)createResult.response()).topics().find("foo").errorCode());
        ctx.replay(createResult.records());
        Uuid topicId = ((CreateTopicsResponseData)createResult.response()).topics().find("foo").topicId();
        RecordTestUtils.assertBatchIteratorContains(Collections.singletonList(Arrays.asList(new ApiMessageAndVersion((ApiMessage)new PartitionRecord().setPartitionId(0).setTopicId(topicId).setReplicas(Arrays.asList(1, 2, 0)).setIsr(Arrays.asList(1, 2, 0)).setRemovingReplicas(Collections.emptyList()).setAddingReplicas(Collections.emptyList()).setLeader(1).setLeaderEpoch(0).setPartitionEpoch(0).setLinkedLeaderEpoch(-1).setLinkState(PartitionRegistration.LinkState.ACTIVE.levelCode), 0), new ApiMessageAndVersion((ApiMessage)new TopicRecord().setTopicId(topicId).setName("foo"), 0), new ApiMessageAndVersion((ApiMessage)new MirrorTopicRecord().setTopicName("foo").setTopicId(topicId).setSourceTopicName("foo-origin").setSourceTopicId(mirrorTopicId).setMirrorTopicState(MirrorTopic.State.MIRROR.stateName()).setClusterLinkId(linkId).setClusterLinkName("test-link"), MetadataRecordType.MIRROR_TOPIC_RECORD.highestSupportedVersion()))), (Iterator<List<ApiMessageAndVersion>>)ctx.replicationControl.iterator(Long.MAX_VALUE));
        ControllerResult deleteResult = ctx.replicationControl.deleteTopics(Collections.singletonList(topicId));
        Assertions.assertFalse((boolean)deleteResult.records().isEmpty());
        Assertions.assertEquals((Object)Errors.NONE, (Object)((ApiError)((Map)deleteResult.response()).get(topicId)).error());
        ctx.replay(deleteResult.records());
        Assertions.assertFalse((boolean)ctx.mirrorTopicControl.isMirrorTopic(topicId));
        Assertions.assertFalse((boolean)ctx.mirrorTopicControl.clusterLinkIdForTopicId(topicId).isPresent());
        Assertions.assertFalse((boolean)ctx.mirrorTopicControl.topicIdsForClusterLinkId(linkId, false).contains(topicId));
        Assertions.assertNull((Object)ctx.replicationControl.getTopicId("foo"));
    }

    @Test
    public void testFailMirrorPartition() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        Uuid linkId = ctx.createClusterLink("test-link");
        Uuid mirrorTopicId = Uuid.randomUuid();
        CreateTopicsResponseData.CreatableTopicResult result = ctx.createTestTopic("foo", -1, (short)-1, Errors.NONE.code(), creatableTopic -> creatableTopic.setLinkName("test-link").setSourceTopicId(mirrorTopicId).setMirrorTopic("foo-origin"));
        Uuid topicId = result.topicId();
        TopicIdPartition p0 = new TopicIdPartition(topicId, 0);
        AlterPartitionRequestData.PartitionData shrinkIsrRequest = this.newAlterPartition(replicationControl, p0, Arrays.asList(0, 1, 2), LeaderRecoveryState.RECOVERED);
        shrinkIsrRequest.setClusterLinkState(new AlterPartitionRequestData.ClusterLinkState().setLinkFailed(true));
        int brokerId = ctx.currentLeader(p0).getAsInt();
        this.sendAlterPartition(ctx, brokerId, ctx.currentBrokerEpoch(brokerId), topicId, shrinkIsrRequest);
        RecordTestUtils.assertBatchIteratorContains(Collections.singletonList(Arrays.asList(new ApiMessageAndVersion((ApiMessage)new PartitionRecord().setPartitionId(0).setTopicId(topicId).setReplicas(Arrays.asList(1, 2, 0)).setIsr(Arrays.asList(0, 1, 2)).setRemovingReplicas(Collections.emptyList()).setAddingReplicas(Collections.emptyList()).setLeader(1).setLeaderEpoch(0).setPartitionEpoch(1).setLinkedLeaderEpoch(-1).setLinkState(PartitionRegistration.LinkState.FAILED.levelCode), 0), new ApiMessageAndVersion((ApiMessage)new TopicRecord().setTopicId(topicId).setName("foo"), 0), new ApiMessageAndVersion((ApiMessage)new MirrorTopicRecord().setTopicName("foo").setTopicId(topicId).setSourceTopicName("foo-origin").setSourceTopicId(mirrorTopicId).setMirrorTopicState(MirrorTopic.State.FAILED.stateName()).setClusterLinkId(linkId).setClusterLinkName("test-link"), MetadataRecordType.MIRROR_TOPIC_RECORD.highestSupportedVersion()))), (Iterator<List<ApiMessageAndVersion>>)ctx.replicationControl.iterator(Long.MAX_VALUE));
    }

    @Test
    public void testAlterIsrForNonMirrorPartition() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        CreateTopicsResponseData.CreatableTopicResult createTopicResult = ctx.createTestTopic("foo", new int[][]{{0, 1, 2}});
        Uuid topicId = createTopicResult.topicId();
        TopicIdPartition p0 = new TopicIdPartition(topicId, 0);
        TopicPartition tp0 = new TopicPartition("foo", 0);
        AlterPartitionRequestData.PartitionData linkLeaderBump = this.newAlterPartition(replicationControl, p0, Arrays.asList(0, 1, 2), LeaderRecoveryState.RECOVERED);
        linkLeaderBump.setClusterLinkState(new AlterPartitionRequestData.ClusterLinkState().setLinkedLeaderEpoch(42));
        ControllerResult<AlterPartitionResponseData> result = this.sendAlterPartition(ctx, 0, ctx.currentBrokerEpoch(0), topicId, linkLeaderBump);
        Assertions.assertEquals((int)0, (int)result.records().size(), (String)"Should not generate any records for non-mirrored partition when handling only linkedLeaderEpoch");
        AlterPartitionRequestData.PartitionData linkedLeaderAndIsr = this.newAlterPartition(replicationControl, p0, Arrays.asList(0, 1), LeaderRecoveryState.RECOVERED);
        linkedLeaderAndIsr.setClusterLinkState(new AlterPartitionRequestData.ClusterLinkState().setLinkedLeaderEpoch(42));
        result = this.sendAlterPartition(ctx, 0, ctx.currentBrokerEpoch(0), topicId, linkedLeaderAndIsr);
        Assertions.assertEquals((int)1, (int)result.records().size(), (String)"Should still generate record for ISR change for non-mirrored partition even if linkedLeaderEpoch is set");
        AlterPartitionResponseData.PartitionData shrinkIsrResponse = this.assertAlterPartitionResponse(result, p0, Errors.NONE);
        this.assertConsistentAlterPartitionResponse(replicationControl, p0, shrinkIsrResponse);
        Assertions.assertEquals(PartitionChangeRecord.class, ((ApiMessageAndVersion)result.records().get(0)).message().getClass());
    }

    @Test
    public void testUnlinkMirrorTopic() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        Uuid linkId = ctx.createClusterLink("test-link");
        Uuid mirrorTopicId = Uuid.randomUuid();
        CreateTopicsResponseData.CreatableTopicResult result = ctx.createTestTopic("foo", -1, (short)-1, Errors.NONE.code(), creatableTopic -> creatableTopic.setLinkName("test-link").setSourceTopicId(mirrorTopicId).setMirrorTopic("foo-origin"));
        Uuid topicId = result.topicId();
        Assertions.assertTrue((boolean)ctx.mirrorTopicControl.mirrorTopic(topicId).isPresent());
        Assertions.assertEquals((Object)ctx.replicationControl.getPartition((Uuid)topicId, (int)0).linkState, (Object)PartitionRegistration.LinkState.ACTIVE);
        ArrayList<ApiMessageAndVersion> records = new ArrayList<ApiMessageAndVersion>();
        ctx.clusterLinkControl.deleteClusterLink("test-link", records::add, true);
        ctx.replay(records);
        Assertions.assertFalse((boolean)ctx.mirrorTopicControl.mirrorTopic(topicId).isPresent(), (String)"Should not be a mirror topic after link was deleted");
        Assertions.assertEquals((Object)ctx.replicationControl.getPartition((Uuid)topicId, (int)0).linkState, (Object)PartitionRegistration.LinkState.NOT_MIRROR);
    }

    @Test
    public void testBrokerCountMetrics() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        ctx.registerBrokers(0);
        Assertions.assertEquals((int)1, (int)ctx.metrics.fencedBrokerCount());
        Assertions.assertEquals((int)0, (int)ctx.metrics.activeBrokerCount());
        ctx.unfenceBrokers(0);
        Assertions.assertEquals((int)0, (int)ctx.metrics.fencedBrokerCount());
        Assertions.assertEquals((int)1, (int)ctx.metrics.activeBrokerCount());
        ctx.registerBrokers(1);
        ctx.unfenceBrokers(1);
        Assertions.assertEquals((int)2, (int)ctx.metrics.activeBrokerCount());
        ctx.registerBrokers(2);
        ctx.unfenceBrokers(2);
        Assertions.assertEquals((int)0, (int)ctx.metrics.fencedBrokerCount());
        Assertions.assertEquals((int)3, (int)ctx.metrics.activeBrokerCount());
        ControllerResult result = replicationControl.unregisterBroker(0);
        ctx.replay(result.records());
        result = replicationControl.unregisterBroker(2);
        ctx.replay(result.records());
        Assertions.assertEquals((int)0, (int)ctx.metrics.fencedBrokerCount());
        Assertions.assertEquals((int)1, (int)ctx.metrics.activeBrokerCount());
    }

    @Test
    public void testCreateTopicsWithValidateOnlyFlag() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        CreateTopicsRequestData request = new CreateTopicsRequestData().setValidateOnly(true);
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions(1).setReplicationFactor((short)3));
        ControllerResult result = ctx.replicationControl.createTopics(request, Collections.singleton("foo"), KafkaPrincipal.ANONYMOUS);
        Assertions.assertEquals((int)0, (int)result.records().size());
        CreateTopicsResponseData.CreatableTopicResult topicResult = ((CreateTopicsResponseData)result.response()).topics().find("foo");
        Assertions.assertEquals((short)0, (short)topicResult.errorCode());
    }

    @Test
    public void testInvalidCreateTopicsWithValidateOnlyFlag() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        CreateTopicsRequestData request = new CreateTopicsRequestData().setValidateOnly(true);
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions(1).setReplicationFactor((short)4));
        ControllerResult result = ctx.replicationControl.createTopics(request, Collections.singleton("foo"), KafkaPrincipal.ANONYMOUS);
        Assertions.assertEquals((int)0, (int)result.records().size());
        CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData();
        expectedResponse.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsResponseData.CreatableTopicResult().setName("foo").setErrorCode(Errors.INVALID_REPLICATION_FACTOR.code()).setErrorMessage("Unable to replicate the partition 4 time(s): The target replication factor of 4 cannot be reached because only 3 broker(s) are registered."));
        Assertions.assertEquals((Object)expectedResponse, (Object)result.response());
    }

    @Test
    public void testCreateTopicsWithPolicy() throws Exception {
        MockCreateTopicPolicy createTopicPolicy = new MockCreateTopicPolicy(Arrays.asList(new CreateTopicPolicy.RequestMetadata("foo", Integer.valueOf(2), Short.valueOf((short)2), null, Collections.emptyMap()), new CreateTopicPolicy.RequestMetadata("bar", Integer.valueOf(3), Short.valueOf((short)2), null, Collections.emptyMap()), new CreateTopicPolicy.RequestMetadata("baz", null, null, Collections.singletonMap(0, Arrays.asList(2, 1, 0)), Collections.singletonMap("segment.bytes", "12300000")), new CreateTopicPolicy.RequestMetadata("quux", null, null, Collections.singletonMap(0, Arrays.asList(2, 1, 0)), Collections.emptyMap())));
        ReplicationControlTestContext ctx = new ReplicationControlTestContext(Optional.of(createTopicPolicy));
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        ctx.createTestTopic("foo", 2, (short)2, Errors.NONE.code());
        ctx.createTestTopic("bar", 3, (short)3, Errors.POLICY_VIOLATION.code());
        ctx.createTestTopic("baz", new int[][]{{2, 1, 0}}, Collections.singletonMap("segment.bytes", "12300000"), Errors.NONE.code());
        ctx.createTestTopic("quux", new int[][]{{1, 2, 0}}, Errors.POLICY_VIOLATION.code());
    }

    @Test
    public void testCreateTopicsWithPartitionViolation() throws Exception {
        MockCreateTopicPolicy createTopicPolicy = new MockCreateTopicPolicy(Arrays.asList(new CreateTopicPolicy.RequestMetadata("foo", Integer.valueOf(10), Short.valueOf((short)2), null, Collections.emptyMap()), new CreateTopicPolicy.RequestMetadata("bar", Integer.valueOf(9), Short.valueOf((short)2), null, Collections.emptyMap()), new CreateTopicPolicy.RequestMetadata("baz", Integer.valueOf(1), Short.valueOf((short)2), null, Collections.emptyMap()), new CreateTopicPolicy.RequestMetadata("baz", Integer.valueOf(5), Short.valueOf((short)2), null, Collections.emptyMap()), new CreateTopicPolicy.RequestMetadata("qux", Integer.valueOf(1), Short.valueOf((short)2), null, Collections.emptyMap()), new CreateTopicPolicy.RequestMetadata("thud", Integer.valueOf(1), Short.valueOf((short)2), null, Collections.emptyMap())));
        ReplicationControlTestContext ctx = new ReplicationControlTestContext(Optional.of(createTopicPolicy));
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        CreateTopicsResponseData.CreatableTopicResult fooTopicResult = ctx.createTestTopic("foo", 10, (short)2, Errors.NONE.code());
        ctx.createTestTopic("bar", 9, (short)2, Errors.NONE.code());
        ctx.createTestTopic("baz", 1, (short)2, Errors.POLICY_VIOLATION.code());
        ArrayList<Uuid> topicsToDelete = new ArrayList<Uuid>();
        topicsToDelete.add(fooTopicResult.topicId());
        ControllerResult deleteResult = ctx.replicationControl.deleteTopics(topicsToDelete);
        RecordTestUtils.replayAll(ctx.replicationControl, deleteResult.records());
        ctx.createTestTopic("baz", 5, (short)2, Errors.NONE.code());
        ctx.createTestTopic("qux", 1, (short)2, Errors.NONE.code());
        ctx.createTestTopic("thud", 1, (short)2, Errors.POLICY_VIOLATION.code());
    }

    @Test
    public void testGlobalTopicAndPartitionMetrics() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        CreateTopicsRequestData request = new CreateTopicsRequestData();
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions(1).setReplicationFactor((short)-1));
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        ArrayList<Uuid> topicsToDelete = new ArrayList<Uuid>();
        ControllerResult result = replicationControl.createTopics(request, Collections.singleton("foo"), KafkaPrincipal.ANONYMOUS);
        topicsToDelete.add(((CreateTopicsResponseData)result.response()).topics().find("foo").topicId());
        RecordTestUtils.replayAll(replicationControl, result.records());
        Assertions.assertEquals((int)1, (int)ctx.metrics.globalTopicsCount());
        request = new CreateTopicsRequestData();
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("bar").setNumPartitions(1).setReplicationFactor((short)-1));
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("baz").setNumPartitions(2).setReplicationFactor((short)-1));
        result = replicationControl.createTopics(request, new HashSet<String>(Arrays.asList("bar", "baz")), KafkaPrincipal.ANONYMOUS);
        RecordTestUtils.replayAll(replicationControl, result.records());
        Assertions.assertEquals((int)3, (int)ctx.metrics.globalTopicsCount());
        Assertions.assertEquals((int)4, (int)ctx.metrics.globalPartitionCount());
        topicsToDelete.add(((CreateTopicsResponseData)result.response()).topics().find("baz").topicId());
        ControllerResult deleteResult = replicationControl.deleteTopics(topicsToDelete);
        RecordTestUtils.replayAll(replicationControl, deleteResult.records());
        Assertions.assertEquals((int)1, (int)ctx.metrics.globalTopicsCount());
        Assertions.assertEquals((int)1, (int)ctx.metrics.globalPartitionCount());
        Uuid topicToDelete = ((CreateTopicsResponseData)result.response()).topics().find("bar").topicId();
        deleteResult = replicationControl.deleteTopics(Collections.singletonList(topicToDelete));
        RecordTestUtils.replayAll(replicationControl, deleteResult.records());
        Assertions.assertEquals((int)0, (int)ctx.metrics.globalTopicsCount());
        Assertions.assertEquals((int)0, (int)ctx.metrics.globalPartitionCount());
    }

    @Test
    public void testOfflinePartitionAndReplicaImbalanceMetrics() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2, 3);
        ctx.unfenceBrokers(0, 1, 2, 3);
        CreateTopicsResponseData.CreatableTopicResult foo = ctx.createTestTopic("foo", new int[][]{{0, 2}, {0, 1}});
        CreateTopicsResponseData.CreatableTopicResult zar = ctx.createTestTopic("zar", new int[][]{{0, 1, 2}, {1, 2, 3}, {1, 2, 0}});
        ControllerResult result = replicationControl.unregisterBroker(0);
        ctx.replay(result.records());
        Assertions.assertEquals((int)0, (int)ctx.metrics.offlinePartitionCount());
        Assertions.assertEquals((int)3, (int)ctx.metrics.preferredReplicaImbalanceCount());
        result = replicationControl.unregisterBroker(1);
        ctx.replay(result.records());
        Assertions.assertEquals((int)1, (int)ctx.metrics.offlinePartitionCount());
        Assertions.assertEquals((int)5, (int)ctx.metrics.preferredReplicaImbalanceCount());
        result = replicationControl.unregisterBroker(2);
        ctx.replay(result.records());
        Assertions.assertEquals((int)4, (int)ctx.metrics.offlinePartitionCount());
        result = replicationControl.unregisterBroker(3);
        ctx.replay(result.records());
        Assertions.assertEquals((int)5, (int)ctx.metrics.offlinePartitionCount());
        ArrayList<Object> records = new ArrayList<ApiMessageAndVersion>();
        replicationControl.deleteTopic(foo.topicId(), records);
        ctx.replay(records);
        Assertions.assertEquals((int)3, (int)ctx.metrics.offlinePartitionCount());
        records = new ArrayList();
        replicationControl.deleteTopic(zar.topicId(), records);
        ctx.replay(records);
        Assertions.assertEquals((int)0, (int)ctx.metrics.offlinePartitionCount());
    }

    @Test
    public void testValidateNewTopicNames() {
        HashMap topicErrors = new HashMap();
        CreateTopicsRequestData.CreatableTopicCollection topics = new CreateTopicsRequestData.CreatableTopicCollection();
        topics.add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName(""));
        topics.add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("woo"));
        topics.add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("."));
        ReplicationControlManager.validateNewTopicNames(topicErrors, (CreateTopicsRequestData.CreatableTopicCollection)topics, Collections.emptyMap());
        HashMap<String, ApiError> expectedTopicErrors = new HashMap<String, ApiError>();
        expectedTopicErrors.put("", new ApiError(Errors.INVALID_TOPIC_EXCEPTION, "Topic name is illegal, it can't be empty"));
        expectedTopicErrors.put(".", new ApiError(Errors.INVALID_TOPIC_EXCEPTION, "Topic name cannot be \".\" or \"..\""));
        Assertions.assertEquals(expectedTopicErrors, topicErrors);
    }

    @Test
    public void testTopicNameCollision() {
        HashMap topicErrors = new HashMap();
        CreateTopicsRequestData.CreatableTopicCollection topics = new CreateTopicsRequestData.CreatableTopicCollection();
        topics.add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("foo.bar"));
        topics.add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("woo.bar_foo"));
        HashMap<String, TreeSet<String>> collisionMap = new HashMap<String, TreeSet<String>>();
        collisionMap.put("foo_bar", new TreeSet<String>(Arrays.asList("foo_bar")));
        collisionMap.put("woo_bar_foo", new TreeSet<String>(Arrays.asList("woo.bar.foo", "woo_bar.foo")));
        ReplicationControlManager.validateNewTopicNames(topicErrors, (CreateTopicsRequestData.CreatableTopicCollection)topics, collisionMap);
        HashMap<String, ApiError> expectedTopicErrors = new HashMap<String, ApiError>();
        expectedTopicErrors.put("foo.bar", new ApiError(Errors.INVALID_TOPIC_EXCEPTION, "Topic 'foo.bar' collides with existing topic: foo_bar"));
        expectedTopicErrors.put("woo.bar_foo", new ApiError(Errors.INVALID_TOPIC_EXCEPTION, "Topic 'woo.bar_foo' collides with existing topic: woo.bar.foo"));
        Assertions.assertEquals(expectedTopicErrors, topicErrors);
    }

    @Test
    public void testRemoveLeaderships() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2, 3);
        ctx.unfenceBrokers(0, 1, 2, 3);
        CreateTopicsResponseData.CreatableTopicResult result = ctx.createTestTopic("foo", new int[][]{{0, 1, 2}, {1, 2, 3}, {2, 3, 0}, {0, 2, 1}});
        HashSet<TopicIdPartition> expectedPartitions = new HashSet<TopicIdPartition>();
        expectedPartitions.add(new TopicIdPartition(result.topicId(), 0));
        expectedPartitions.add(new TopicIdPartition(result.topicId(), 3));
        Assertions.assertEquals(expectedPartitions, RecordTestUtils.iteratorToSet(replicationControl.brokersToIsrs().iterator(0, true)));
        ArrayList<ApiMessageAndVersion> records = new ArrayList<ApiMessageAndVersion>();
        replicationControl.handleBrokerFenced(0, records);
        ctx.replay(records);
        Assertions.assertEquals(Collections.emptySet(), RecordTestUtils.iteratorToSet(replicationControl.brokersToIsrs().iterator(0, true)));
    }

    @Test
    public void testShrinkAndExpandIsr() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        CreateTopicsResponseData.CreatableTopicResult createTopicResult = ctx.createTestTopic("foo", new int[][]{{0, 1, 2}});
        TopicIdPartition topicIdPartition = new TopicIdPartition(createTopicResult.topicId(), 0);
        Assertions.assertEquals((Object)OptionalInt.of(0), (Object)ctx.currentLeader(topicIdPartition));
        long brokerEpoch = ctx.currentBrokerEpoch(0);
        AlterPartitionRequestData.PartitionData shrinkIsrRequest = this.newAlterPartition(replicationControl, topicIdPartition, Arrays.asList(0, 1), LeaderRecoveryState.RECOVERED);
        ControllerResult<AlterPartitionResponseData> shrinkIsrResult = this.sendAlterPartition(ctx, 0, brokerEpoch, topicIdPartition.topicId(), shrinkIsrRequest);
        AlterPartitionResponseData.PartitionData shrinkIsrResponse = this.assertAlterPartitionResponse(shrinkIsrResult, topicIdPartition, Errors.NONE);
        this.assertConsistentAlterPartitionResponse(replicationControl, topicIdPartition, shrinkIsrResponse);
        AlterPartitionRequestData.PartitionData expandIsrRequest = this.newAlterPartition(replicationControl, topicIdPartition, Arrays.asList(0, 1, 2), LeaderRecoveryState.RECOVERED);
        ControllerResult<AlterPartitionResponseData> expandIsrResult = this.sendAlterPartition(ctx, 0, brokerEpoch, topicIdPartition.topicId(), expandIsrRequest);
        AlterPartitionResponseData.PartitionData expandIsrResponse = this.assertAlterPartitionResponse(expandIsrResult, topicIdPartition, Errors.NONE);
        this.assertConsistentAlterPartitionResponse(replicationControl, topicIdPartition, expandIsrResponse);
    }

    @ParameterizedTest
    @ApiKeyVersionsSource(apiKey=ApiKeys.ALTER_PARTITION)
    public void testAlterPartitionHandleUnknownTopicIdOrName(short version) throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        String topicName = "foo";
        Uuid topicId = Uuid.randomUuid();
        AlterPartitionRequestData request = new AlterPartitionRequestData().setBrokerId(0).setBrokerEpoch(100L).setTopics(Arrays.asList(new AlterPartitionRequestData.TopicData().setTopicName(version <= 1 ? topicName : "").setTopicId(version > 1 ? topicId : Uuid.ZERO_UUID).setPartitions(Arrays.asList(new AlterPartitionRequestData.PartitionData().setPartitionIndex(0)))));
        ControllerRequestContext requestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.ALTER_PARTITION, version);
        ControllerResult result = replicationControl.alterPartition(requestContext, request);
        Errors expectedError = version > 1 ? Errors.UNKNOWN_TOPIC_ID : Errors.UNKNOWN_TOPIC_OR_PARTITION;
        AlterPartitionResponseData expectedResponse = new AlterPartitionResponseData().setTopics(Arrays.asList(new AlterPartitionResponseData.TopicData().setTopicName(version <= 1 ? topicName : "").setTopicId(version > 1 ? topicId : Uuid.ZERO_UUID).setPartitions(Arrays.asList(new AlterPartitionResponseData.PartitionData().setPartitionIndex(0).setErrorCode(expectedError.code())))));
        Assertions.assertEquals((Object)expectedResponse, (Object)result.response());
    }

    @Test
    public void testClusterLinkAlterIsr() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        ctx.registerBrokers(0);
        ctx.unfenceBrokers(0);
        ctx.createClusterLink("test-link");
        CreateTopicsResponseData.CreatableTopicResult createTopicResult = ctx.createTestTopic("foo", 1, (short)1, Errors.NONE.code(), creatableTopic -> creatableTopic.setLinkName("test-link").setMirrorTopic("foo-origin"));
        TopicIdPartition topicIdPartition = new TopicIdPartition(createTopicResult.topicId(), 0);
        long brokerEpoch = ctx.currentBrokerEpoch(0);
        AlterPartitionRequestData.PartitionData partitionData = this.newAlterPartition(replicationControl, topicIdPartition, Arrays.asList(0), LeaderRecoveryState.RECOVERED);
        AlterPartitionRequestData.ClusterLinkState clusterLinkState = new AlterPartitionRequestData.ClusterLinkState().setLinkedLeaderEpoch(10).setLinkFailed(false);
        partitionData.setClusterLinkState(clusterLinkState);
        ControllerResult<AlterPartitionResponseData> alterClusterLinkStateResult = this.sendAlterPartition(ctx, 0, brokerEpoch, createTopicResult.topicId(), partitionData);
        PartitionChangeRecord record = (PartitionChangeRecord)((ApiMessageAndVersion)alterClusterLinkStateResult.records().get(0)).message();
        Assertions.assertEquals((Object)new PartitionChangeRecord().setTopicId(createTopicResult.topicId()).setPartitionId(0).setLinkedLeaderEpoch(10).setLinkState(PartitionRegistration.LinkState.ACTIVE.levelCode), (Object)record);
        AlterPartitionRequestData.PartitionData partitionData1 = this.newAlterPartition(replicationControl, topicIdPartition, Arrays.asList(0), LeaderRecoveryState.RECOVERED);
        clusterLinkState.setLinkedLeaderEpoch(65).setLinkFailed(true);
        partitionData1.setClusterLinkState(clusterLinkState);
        alterClusterLinkStateResult = this.sendAlterPartition(ctx, 0, brokerEpoch, createTopicResult.topicId(), partitionData1);
        record = (PartitionChangeRecord)((ApiMessageAndVersion)alterClusterLinkStateResult.records().get(0)).message();
        Assertions.assertEquals((Object)record, (Object)new PartitionChangeRecord().setTopicId(createTopicResult.topicId()).setPartitionId(0).setLinkedLeaderEpoch(65).setLinkState(PartitionRegistration.LinkState.FAILED.levelCode));
    }

    @Test
    public void testInvalidAlterPartitionRequests() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        CreateTopicsResponseData.CreatableTopicResult createTopicResult = ctx.createTestTopic("foo", new int[][]{{0, 1, 2}});
        TopicIdPartition topicIdPartition = new TopicIdPartition(createTopicResult.topicId(), 0);
        int leaderId = 0;
        int notLeaderId = 1;
        Assertions.assertEquals((Object)OptionalInt.of(leaderId), (Object)ctx.currentLeader(topicIdPartition));
        long brokerEpoch = ctx.currentBrokerEpoch(0);
        AlterPartitionRequestData.PartitionData invalidLeaderRequest = this.newAlterPartition(replicationControl, topicIdPartition, Arrays.asList(0, 1), LeaderRecoveryState.RECOVERED);
        ControllerResult<AlterPartitionResponseData> invalidLeaderResult = this.sendAlterPartition(ctx, notLeaderId, ctx.currentBrokerEpoch(notLeaderId), topicIdPartition.topicId(), invalidLeaderRequest);
        this.assertAlterPartitionResponse(invalidLeaderResult, topicIdPartition, Errors.INVALID_REQUEST);
        AlterPartitionRequestData.PartitionData invalidBrokerEpochRequest = this.newAlterPartition(replicationControl, topicIdPartition, Arrays.asList(0, 1), LeaderRecoveryState.RECOVERED);
        Assertions.assertThrows(StaleBrokerEpochException.class, () -> this.sendAlterPartition(ctx, leaderId, brokerEpoch - 1L, topicIdPartition.topicId(), invalidBrokerEpochRequest));
        AlterPartitionRequestData.PartitionData invalidLeaderEpochRequest = this.newAlterPartition(replicationControl, topicIdPartition, Arrays.asList(0, 1), LeaderRecoveryState.RECOVERED);
        invalidLeaderEpochRequest.setLeaderEpoch(500);
        ControllerResult<AlterPartitionResponseData> invalidLeaderEpochResult = this.sendAlterPartition(ctx, leaderId, ctx.currentBrokerEpoch(leaderId), topicIdPartition.topicId(), invalidLeaderEpochRequest);
        this.assertAlterPartitionResponse(invalidLeaderEpochResult, topicIdPartition, Errors.NOT_CONTROLLER);
        AlterPartitionRequestData.PartitionData invalidPartitionEpochRequest = this.newAlterPartition(replicationControl, topicIdPartition, Arrays.asList(0, 1), LeaderRecoveryState.RECOVERED);
        invalidPartitionEpochRequest.setPartitionEpoch(500);
        ControllerResult<AlterPartitionResponseData> invalidPartitionEpochResult = this.sendAlterPartition(ctx, leaderId, ctx.currentBrokerEpoch(leaderId), topicIdPartition.topicId(), invalidPartitionEpochRequest);
        this.assertAlterPartitionResponse(invalidPartitionEpochResult, topicIdPartition, Errors.NOT_CONTROLLER);
        AlterPartitionRequestData.PartitionData invalidIsrRequest1 = this.newAlterPartition(replicationControl, topicIdPartition, Arrays.asList(0, 1, 3), LeaderRecoveryState.RECOVERED);
        ControllerResult<AlterPartitionResponseData> invalidIsrResult1 = this.sendAlterPartition(ctx, leaderId, ctx.currentBrokerEpoch(leaderId), topicIdPartition.topicId(), invalidIsrRequest1);
        this.assertAlterPartitionResponse(invalidIsrResult1, topicIdPartition, Errors.INVALID_REQUEST);
        AlterPartitionRequestData.PartitionData invalidIsrRequest2 = this.newAlterPartition(replicationControl, topicIdPartition, Arrays.asList(1, 2), LeaderRecoveryState.RECOVERED);
        ControllerResult<AlterPartitionResponseData> invalidIsrResult2 = this.sendAlterPartition(ctx, leaderId, ctx.currentBrokerEpoch(leaderId), topicIdPartition.topicId(), invalidIsrRequest2);
        this.assertAlterPartitionResponse(invalidIsrResult2, topicIdPartition, Errors.INVALID_REQUEST);
        AlterPartitionRequestData.PartitionData invalidIsrRecoveryRequest = this.newAlterPartition(replicationControl, topicIdPartition, Arrays.asList(0, 1), LeaderRecoveryState.RECOVERING);
        ControllerResult<AlterPartitionResponseData> invalidIsrRecoveryResult = this.sendAlterPartition(ctx, leaderId, ctx.currentBrokerEpoch(leaderId), topicIdPartition.topicId(), invalidIsrRecoveryRequest);
        this.assertAlterPartitionResponse(invalidIsrRecoveryResult, topicIdPartition, Errors.INVALID_REQUEST);
        AlterPartitionRequestData.PartitionData invalidRecoveryRequest = this.newAlterPartition(replicationControl, topicIdPartition, Arrays.asList(0), LeaderRecoveryState.RECOVERING);
        ControllerResult<AlterPartitionResponseData> invalidRecoveryResult = this.sendAlterPartition(ctx, leaderId, ctx.currentBrokerEpoch(leaderId), topicIdPartition.topicId(), invalidRecoveryRequest);
        this.assertAlterPartitionResponse(invalidRecoveryResult, topicIdPartition, Errors.INVALID_REQUEST);
    }

    private AlterPartitionRequestData.PartitionData newAlterPartition(ReplicationControlManager replicationControl, TopicIdPartition topicIdPartition, List<Integer> newIsr, LeaderRecoveryState leaderRecoveryState) {
        PartitionRegistration partitionControl = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
        return new AlterPartitionRequestData.PartitionData().setPartitionIndex(0).setLeaderEpoch(partitionControl.leaderEpoch).setPartitionEpoch(partitionControl.partitionEpoch).setNewIsr(newIsr).setLeaderRecoveryState(leaderRecoveryState.value());
    }

    private ControllerResult<AlterPartitionResponseData> sendAlterPartition(ReplicationControlTestContext ctx, int brokerId, long brokerEpoch, Uuid topicId, AlterPartitionRequestData.PartitionData partitionData) throws Exception {
        AlterPartitionRequestData request = new AlterPartitionRequestData().setBrokerId(brokerId).setBrokerEpoch(brokerEpoch);
        AlterPartitionRequestData.TopicData topicData = new AlterPartitionRequestData.TopicData().setTopicId(topicId);
        request.topics().add(topicData);
        topicData.partitions().add(partitionData);
        ControllerRequestContext requestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.ALTER_PARTITION);
        ControllerResult result = ctx.replicationControl.alterPartition(requestContext, request);
        ctx.replay(result.records());
        return result;
    }

    private AlterPartitionResponseData.PartitionData assertAlterPartitionResponse(ControllerResult<AlterPartitionResponseData> alterPartitionResult, TopicIdPartition topicIdPartition, Errors expectedError) {
        AlterPartitionResponseData response = (AlterPartitionResponseData)alterPartitionResult.response();
        Assertions.assertEquals((int)1, (int)response.topics().size());
        AlterPartitionResponseData.TopicData topicData = (AlterPartitionResponseData.TopicData)response.topics().get(0);
        Assertions.assertEquals((Object)topicIdPartition.topicId(), (Object)topicData.topicId());
        Assertions.assertEquals((int)1, (int)topicData.partitions().size());
        AlterPartitionResponseData.PartitionData partitionData = (AlterPartitionResponseData.PartitionData)topicData.partitions().get(0);
        Assertions.assertEquals((int)topicIdPartition.partitionId(), (int)partitionData.partitionIndex());
        Assertions.assertEquals((Object)expectedError, (Object)Errors.forCode((short)partitionData.errorCode()));
        return partitionData;
    }

    private void assertConsistentAlterPartitionResponse(ReplicationControlManager replicationControl, TopicIdPartition topicIdPartition, AlterPartitionResponseData.PartitionData partitionData) {
        PartitionRegistration partitionControl = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
        Assertions.assertEquals((int)partitionControl.leader, (int)partitionData.leaderId());
        Assertions.assertEquals((int)partitionControl.leaderEpoch, (int)partitionData.leaderEpoch());
        Assertions.assertEquals((int)partitionControl.partitionEpoch, (int)partitionData.partitionEpoch());
        List<Integer> expectedIsr = ReplicationControlManagerTest.arrayToList(partitionControl.isr);
        Assertions.assertEquals(expectedIsr, (Object)partitionData.isr());
    }

    private void assertCreatedTopicConfigs(ReplicationControlTestContext ctx, String topic, CreateTopicsRequestData.CreateableTopicConfigCollection requestConfigs) {
        Map configs = ctx.configurationControl.getConfigs(new ConfigResource(ConfigResource.Type.TOPIC, topic));
        Assertions.assertEquals((int)requestConfigs.size(), (int)configs.size());
        for (CreateTopicsRequestData.CreateableTopicConfig requestConfig : requestConfigs) {
            String value = (String)configs.get(requestConfig.name());
            Assertions.assertEquals((Object)requestConfig.value(), (Object)value);
        }
    }

    private void assertEmptyTopicConfigs(ReplicationControlTestContext ctx, String topic) {
        Map configs = ctx.configurationControl.getConfigs(new ConfigResource(ConfigResource.Type.TOPIC, topic));
        Assertions.assertEquals(Collections.emptyMap(), (Object)configs);
    }

    @Test
    public void testDeleteTopics() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        CreateTopicsRequestData request = new CreateTopicsRequestData();
        CreateTopicsRequestData.CreateableTopicConfigCollection requestConfigs = new CreateTopicsRequestData.CreateableTopicConfigCollection();
        requestConfigs.add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreateableTopicConfig().setName("cleanup.policy").setValue("compact"));
        requestConfigs.add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreateableTopicConfig().setName("min.cleanable.dirty.ratio").setValue("0.1"));
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions(3).setReplicationFactor((short)2).setConfigs(requestConfigs));
        ctx.registerBrokers(0, 1);
        ctx.unfenceBrokers(0, 1);
        ControllerResult createResult = replicationControl.createTopics(request, Collections.singleton("foo"), KafkaPrincipal.ANONYMOUS);
        CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData();
        Uuid topicId = ((CreateTopicsResponseData)createResult.response()).topics().find("foo").topicId();
        expectedResponse.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsResponseData.CreatableTopicResult().setName("foo").setNumPartitions(3).setReplicationFactor((short)2).setErrorMessage(null).setErrorCode((short)0).setTopicId(topicId));
        Assertions.assertEquals((Object)expectedResponse, (Object)createResult.response());
        Assertions.assertNull((Object)replicationControl.getPartition(topicId, 0));
        this.assertEmptyTopicConfigs(ctx, "foo");
        ctx.replay(createResult.records());
        Assertions.assertNotNull((Object)replicationControl.getPartition(topicId, 0));
        Assertions.assertNotNull((Object)replicationControl.getPartition(topicId, 1));
        Assertions.assertNotNull((Object)replicationControl.getPartition(topicId, 2));
        Assertions.assertNull((Object)replicationControl.getPartition(topicId, 3));
        this.assertCreatedTopicConfigs(ctx, "foo", requestConfigs);
        Assertions.assertEquals(Collections.singletonMap(topicId, new ResultOrError((Object)"foo")), (Object)replicationControl.findTopicNames(Long.MAX_VALUE, Collections.singleton(topicId)));
        Assertions.assertEquals(Collections.singletonMap("foo", new ResultOrError((Object)topicId)), (Object)replicationControl.findTopicIds(Long.MAX_VALUE, Collections.singleton("foo")));
        Uuid invalidId = new Uuid(topicId.getMostSignificantBits() + 1L, topicId.getLeastSignificantBits());
        Assertions.assertEquals(Collections.singletonMap(invalidId, new ResultOrError(new ApiError(Errors.UNKNOWN_TOPIC_ID))), (Object)replicationControl.findTopicNames(Long.MAX_VALUE, Collections.singleton(invalidId)));
        Assertions.assertEquals(Collections.singletonMap("bar", new ResultOrError(new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION))), (Object)replicationControl.findTopicIds(Long.MAX_VALUE, Collections.singleton("bar")));
        ControllerResult invalidDeleteResult = replicationControl.deleteTopics(Collections.singletonList(invalidId));
        Assertions.assertEquals((int)0, (int)invalidDeleteResult.records().size());
        Assertions.assertEquals(Collections.singletonMap(invalidId, new ApiError(Errors.UNKNOWN_TOPIC_ID, null)), (Object)invalidDeleteResult.response());
        ControllerResult deleteResult = replicationControl.deleteTopics(Collections.singletonList(topicId));
        Assertions.assertTrue((boolean)deleteResult.isAtomic());
        Assertions.assertEquals(Collections.singletonMap(topicId, new ApiError(Errors.NONE, null)), (Object)deleteResult.response());
        Assertions.assertEquals((int)1, (int)deleteResult.records().size());
        ctx.replay(deleteResult.records());
        Assertions.assertNull((Object)replicationControl.getPartition(topicId, 0));
        Assertions.assertNull((Object)replicationControl.getPartition(topicId, 1));
        Assertions.assertNull((Object)replicationControl.getPartition(topicId, 2));
        Assertions.assertNull((Object)replicationControl.getPartition(topicId, 3));
        Assertions.assertEquals(Collections.singletonMap(topicId, new ResultOrError(new ApiError(Errors.UNKNOWN_TOPIC_ID))), (Object)replicationControl.findTopicNames(Long.MAX_VALUE, Collections.singleton(topicId)));
        Assertions.assertEquals(Collections.singletonMap("foo", new ResultOrError(new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION))), (Object)replicationControl.findTopicIds(Long.MAX_VALUE, Collections.singleton("foo")));
        this.assertEmptyTopicConfigs(ctx, "foo");
    }

    @Test
    public void testCreatePartitions() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        CreateTopicsRequestData request = new CreateTopicsRequestData();
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions(3).setReplicationFactor((short)2));
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("bar").setNumPartitions(4).setReplicationFactor((short)2));
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("quux").setNumPartitions(2).setReplicationFactor((short)2));
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("foo2").setNumPartitions(2).setReplicationFactor((short)2));
        ctx.registerBrokers(0, 1);
        ctx.unfenceBrokers(0, 1);
        ControllerResult createTopicResult = replicationControl.createTopics(request, new HashSet<String>(Arrays.asList("foo", "bar", "quux", "foo2")), KafkaPrincipal.ANONYMOUS);
        ctx.replay(createTopicResult.records());
        ArrayList<CreatePartitionsRequestData.CreatePartitionsTopic> topics = new ArrayList<CreatePartitionsRequestData.CreatePartitionsTopic>();
        topics.add(new CreatePartitionsRequestData.CreatePartitionsTopic().setName("foo").setCount(5).setAssignments(null));
        topics.add(new CreatePartitionsRequestData.CreatePartitionsTopic().setName("bar").setCount(3).setAssignments(null));
        topics.add(new CreatePartitionsRequestData.CreatePartitionsTopic().setName("baz").setCount(3).setAssignments(null));
        topics.add(new CreatePartitionsRequestData.CreatePartitionsTopic().setName("quux").setCount(2).setAssignments(null));
        ControllerResult createPartitionsResult = replicationControl.createPartitions(topics, KafkaPrincipal.ANONYMOUS);
        Assertions.assertEquals(Arrays.asList(new CreatePartitionsResponseData.CreatePartitionsTopicResult().setName("foo").setErrorCode(Errors.NONE.code()).setErrorMessage(null), new CreatePartitionsResponseData.CreatePartitionsTopicResult().setName("bar").setErrorCode(Errors.INVALID_PARTITIONS.code()).setErrorMessage("The topic bar currently has 4 partition(s); 3 would not be an increase."), new CreatePartitionsResponseData.CreatePartitionsTopicResult().setName("baz").setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()).setErrorMessage(null), new CreatePartitionsResponseData.CreatePartitionsTopicResult().setName("quux").setErrorCode(Errors.INVALID_PARTITIONS.code()).setErrorMessage("Topic already has 2 partition(s).")), (Object)createPartitionsResult.response());
        ctx.replay(createPartitionsResult.records());
        ArrayList<CreatePartitionsRequestData.CreatePartitionsTopic> topics2 = new ArrayList<CreatePartitionsRequestData.CreatePartitionsTopic>();
        topics2.add(new CreatePartitionsRequestData.CreatePartitionsTopic().setName("foo").setCount(6).setAssignments(Arrays.asList(new CreatePartitionsRequestData.CreatePartitionsAssignment().setBrokerIds(Arrays.asList(1, 0)))));
        topics2.add(new CreatePartitionsRequestData.CreatePartitionsTopic().setName("bar").setCount(5).setAssignments(Arrays.asList(new CreatePartitionsRequestData.CreatePartitionsAssignment().setBrokerIds(Arrays.asList(1)))));
        topics2.add(new CreatePartitionsRequestData.CreatePartitionsTopic().setName("quux").setCount(4).setAssignments(Arrays.asList(new CreatePartitionsRequestData.CreatePartitionsAssignment().setBrokerIds(Arrays.asList(1, 0)))));
        topics2.add(new CreatePartitionsRequestData.CreatePartitionsTopic().setName("foo2").setCount(3).setAssignments(Arrays.asList(new CreatePartitionsRequestData.CreatePartitionsAssignment().setBrokerIds(Arrays.asList(2, 0)))));
        ControllerResult createPartitionsResult2 = replicationControl.createPartitions(topics2, KafkaPrincipal.ANONYMOUS);
        Assertions.assertEquals(Arrays.asList(new CreatePartitionsResponseData.CreatePartitionsTopicResult().setName("foo").setErrorCode(Errors.NONE.code()).setErrorMessage(null), new CreatePartitionsResponseData.CreatePartitionsTopicResult().setName("bar").setErrorCode(Errors.INVALID_REPLICA_ASSIGNMENT.code()).setErrorMessage("The manual partition assignment includes a partition with 1 replica(s), but this is not consistent with previous partitions, which have 2 replica(s)."), new CreatePartitionsResponseData.CreatePartitionsTopicResult().setName("quux").setErrorCode(Errors.INVALID_REPLICA_ASSIGNMENT.code()).setErrorMessage("Attempted to add 2 additional partition(s), but only 1 assignment(s) were specified."), new CreatePartitionsResponseData.CreatePartitionsTopicResult().setName("foo2").setErrorCode(Errors.INVALID_REPLICA_ASSIGNMENT.code()).setErrorMessage("The manual partition assignment includes broker 2, but no such broker is registered.")), (Object)createPartitionsResult2.response());
        ctx.replay(createPartitionsResult2.records());
    }

    @Test
    public void testCreatePartitionsFailsWhenAllBrokersAreFencedOrInControlledShutdown() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        CreateTopicsRequestData request = new CreateTopicsRequestData();
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions(1).setReplicationFactor((short)2));
        ctx.registerBrokers(0, 1);
        ctx.unfenceBrokers(0, 1);
        ControllerResult createTopicResult = replicationControl.createTopics(request, new HashSet<String>(Arrays.asList("foo")), KafkaPrincipal.ANONYMOUS);
        ctx.replay(createTopicResult.records());
        ctx.registerBrokers(0, 1);
        ctx.unfenceBrokers(0);
        ctx.inControlledShutdownBrokers(0);
        ArrayList<CreatePartitionsRequestData.CreatePartitionsTopic> topics = new ArrayList<CreatePartitionsRequestData.CreatePartitionsTopic>();
        topics.add(new CreatePartitionsRequestData.CreatePartitionsTopic().setName("foo").setCount(2).setAssignments(null));
        ControllerResult createPartitionsResult = replicationControl.createPartitions(topics, KafkaPrincipal.ANONYMOUS);
        Assertions.assertEquals(Arrays.asList(new CreatePartitionsResponseData.CreatePartitionsTopicResult().setName("foo").setErrorCode(Errors.INVALID_REPLICATION_FACTOR.code()).setErrorMessage("Unable to replicate the partition 2 time(s): All brokers are currently fenced or in controlled shutdown.")), (Object)createPartitionsResult.response());
    }

    @Test
    public void testCreatePartitionsISRInvariants() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        CreateTopicsRequestData request = new CreateTopicsRequestData();
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions(1).setReplicationFactor((short)3));
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1);
        ctx.inControlledShutdownBrokers(1);
        ControllerResult result = replicationControl.createTopics(request, Collections.singleton("foo"), KafkaPrincipal.ANONYMOUS);
        ctx.replay(result.records());
        List<CreatePartitionsRequestData.CreatePartitionsTopic> topics = Arrays.asList(new CreatePartitionsRequestData.CreatePartitionsTopic().setName("foo").setCount(2).setAssignments(null));
        ControllerResult createPartitionsResult = replicationControl.createPartitions(topics, KafkaPrincipal.ANONYMOUS);
        ctx.replay(createPartitionsResult.records());
        Assertions.assertEquals((Object)new PartitionRegistration(new int[]{0, 1, 2}, new int[]{0}, Replicas.NONE, Replicas.NONE, 0, LeaderRecoveryState.RECOVERED, 0, 0), (Object)replicationControl.getPartition(((TopicRecord)((ApiMessageAndVersion)result.records().get(0)).message()).topicId(), 1));
    }

    @Test
    public void testValidateGoodManualPartitionAssignments() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
        Set<Integer> excludedBrokers = Collections.singleton(4);
        ctx.registerBrokers(1, 2, 3, 4);
        ctx.replicationControl.validateManualPartitionAssignment(Collections.singletonList(1), OptionalInt.of(1), excludedBrokers);
        ctx.replicationControl.validateManualPartitionAssignment(Collections.singletonList(1), OptionalInt.empty(), excludedBrokers);
        ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(1, 2, 3), OptionalInt.of(3), excludedBrokers);
        ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(1, 2, 3), OptionalInt.empty(), excludedBrokers);
    }

    @Test
    public void testValidateBadManualPartitionAssignments() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
        Set<Integer> excludedBrokers = Collections.singleton(3);
        ctx.registerBrokers(1, 2, 3);
        Assertions.assertEquals((Object)"The manual partition assignment includes an empty replica list.", (Object)((InvalidReplicaAssignmentException)Assertions.assertThrows(InvalidReplicaAssignmentException.class, () -> ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(new Integer[0]), OptionalInt.empty(), excludedBrokers))).getMessage());
        Assertions.assertEquals((Object)"The manual partition assignment includes broker 4, but no such broker is registered.", (Object)((InvalidReplicaAssignmentException)Assertions.assertThrows(InvalidReplicaAssignmentException.class, () -> ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(2, 3, 4), OptionalInt.empty(), Collections.singleton(1)))).getMessage());
        Assertions.assertEquals((Object)"The manual partition assignment includes the broker 2 more than once.", (Object)((InvalidReplicaAssignmentException)Assertions.assertThrows(InvalidReplicaAssignmentException.class, () -> ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(1, 2, 2), OptionalInt.empty(), excludedBrokers))).getMessage());
        Assertions.assertEquals((Object)"The manual partition assignment includes the broker 3 which is excluded from replica placement.", (Object)((InvalidReplicaAssignmentException)Assertions.assertThrows(InvalidReplicaAssignmentException.class, () -> ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(1, 2, 3), OptionalInt.empty(), excludedBrokers))).getMessage());
        Assertions.assertEquals((Object)"The manual partition assignment includes a partition with 2 replica(s), but this is not consistent with previous partitions, which have 3 replica(s).", (Object)((InvalidReplicaAssignmentException)Assertions.assertThrows(InvalidReplicaAssignmentException.class, () -> ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(1, 2), OptionalInt.of(3), excludedBrokers))).getMessage());
    }

    @ParameterizedTest
    @ApiKeyVersionsSource(apiKey=ApiKeys.ALTER_PARTITION)
    public void testReassignPartitions(short version) throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
        ReplicationControlManager replication = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2, 3);
        ctx.unfenceBrokers(0, 1, 2, 3);
        Uuid fooId = ctx.createTestTopic("foo", new int[][]{{1, 2, 3}, {3, 2, 1}}).topicId();
        ctx.createTestTopic("bar", new int[][]{{1, 2, 3}}).topicId();
        Assertions.assertEquals((Object)NONE_REASSIGNING, (Object)replication.listPartitionReassignments(null));
        ctx.clusterControl.replay(new BrokerReplicaExclusionRecord().setBrokerExclusions(Collections.singletonList(new BrokerReplicaExclusionRecord.BrokerReplicaExclusion().setBrokerId(1).setReason("test"))));
        ControllerResult alterResult = replication.alterPartitionReassignments(new AlterPartitionReassignmentsRequestData().setTopics(Arrays.asList(new AlterPartitionReassignmentsRequestData.ReassignableTopic().setName("foo").setPartitions(Arrays.asList(new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(0).setReplicas(Arrays.asList(3, 2, 1)), new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(1).setReplicas(Arrays.asList(0, 2, 1)), new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(2).setReplicas(Arrays.asList(0, 2, 1)))), new AlterPartitionReassignmentsRequestData.ReassignableTopic().setName("bar"))));
        Assertions.assertEquals((Object)new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(Arrays.asList(new AlterPartitionReassignmentsResponseData.ReassignableTopicResponse().setName("foo").setPartitions(Arrays.asList(new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(0).setErrorMessage(null), new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(1).setErrorMessage(null), new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(2).setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()).setErrorMessage("Unable to find partition foo:2."))), new AlterPartitionReassignmentsResponseData.ReassignableTopicResponse().setName("bar"))), (Object)alterResult.response());
        ctx.replay(alterResult.records());
        ListPartitionReassignmentsResponseData currentReassigning = new ListPartitionReassignmentsResponseData().setErrorMessage(null).setTopics(Arrays.asList(new ListPartitionReassignmentsResponseData.OngoingTopicReassignment().setName("foo").setPartitions(Arrays.asList(new ListPartitionReassignmentsResponseData.OngoingPartitionReassignment().setPartitionIndex(1).setRemovingReplicas(Arrays.asList(3)).setAddingReplicas(Arrays.asList(0)).setReplicas(Arrays.asList(0, 2, 1, 3))))));
        Assertions.assertEquals((Object)currentReassigning, (Object)replication.listPartitionReassignments(null));
        Assertions.assertEquals((Object)NONE_REASSIGNING, (Object)replication.listPartitionReassignments(Arrays.asList(new ListPartitionReassignmentsRequestData.ListPartitionReassignmentsTopics().setName("bar").setPartitionIndexes(Arrays.asList(0, 1, 2)))));
        Assertions.assertEquals((Object)currentReassigning, (Object)replication.listPartitionReassignments(Arrays.asList(new ListPartitionReassignmentsRequestData.ListPartitionReassignmentsTopics().setName("foo").setPartitionIndexes(Arrays.asList(0, 1, 2)))));
        ControllerResult cancelResult = replication.alterPartitionReassignments(new AlterPartitionReassignmentsRequestData().setTopics(Arrays.asList(new AlterPartitionReassignmentsRequestData.ReassignableTopic().setName("foo").setPartitions(Arrays.asList(new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(0).setReplicas(null), new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(1).setReplicas(null), new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(2).setReplicas(null))), new AlterPartitionReassignmentsRequestData.ReassignableTopic().setName("bar").setPartitions(Arrays.asList(new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(0).setReplicas(null))))));
        Assertions.assertEquals((Object)ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion((ApiMessage)new PartitionChangeRecord().setTopicId(fooId).setPartitionId(1).setReplicas(Arrays.asList(2, 1, 3)).setLeader(3).setRemovingReplicas(Collections.emptyList()).setAddingReplicas(Collections.emptyList()), 0)), (Object)new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(Arrays.asList(new AlterPartitionReassignmentsResponseData.ReassignableTopicResponse().setName("foo").setPartitions(Arrays.asList(new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(0).setErrorCode(Errors.NO_REASSIGNMENT_IN_PROGRESS.code()).setErrorMessage(null), new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(1).setErrorCode(Errors.NONE.code()).setErrorMessage(null), new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(2).setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()).setErrorMessage("Unable to find partition foo:2."))), new AlterPartitionReassignmentsResponseData.ReassignableTopicResponse().setName("bar").setPartitions(Arrays.asList(new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(0).setErrorCode(Errors.NO_REASSIGNMENT_IN_PROGRESS.code()).setErrorMessage(null)))))), (Object)cancelResult);
        log.info("running final alterPartition...");
        ControllerRequestContext requestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.ALTER_PARTITION, version);
        ControllerResult alterPartitionResult = replication.alterPartition(requestContext, new AlterPartitionRequestData().setBrokerId(3).setBrokerEpoch(103L).setTopics(Arrays.asList(new AlterPartitionRequestData.TopicData().setTopicName(version <= 1 ? "foo" : "").setTopicId(version > 1 ? fooId : Uuid.ZERO_UUID).setPartitions(Arrays.asList(new AlterPartitionRequestData.PartitionData().setPartitionIndex(1).setPartitionEpoch(1).setLeaderEpoch(0).setNewIsr(Arrays.asList(3, 0, 2, 1)))))));
        Errors expectedError = version > 1 ? Errors.NEW_LEADER_ELECTED : Errors.FENCED_LEADER_EPOCH;
        Assertions.assertEquals((Object)new AlterPartitionResponseData().setTopics(Arrays.asList(new AlterPartitionResponseData.TopicData().setTopicName(version <= 1 ? "foo" : "").setTopicId(version > 1 ? fooId : Uuid.ZERO_UUID).setPartitions(Arrays.asList(new AlterPartitionResponseData.PartitionData().setPartitionIndex(1).setErrorCode(expectedError.code()))))), (Object)alterPartitionResult.response());
        ctx.replay(alterPartitionResult.records());
        Assertions.assertEquals((Object)NONE_REASSIGNING, (Object)replication.listPartitionReassignments(null));
    }

    @ParameterizedTest
    @ApiKeyVersionsSource(apiKey=ApiKeys.ALTER_PARTITION)
    public void testAlterPartitionShouldRejectFencedBrokers(short version) throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
        ReplicationControlManager replication = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2, 3, 4);
        ctx.unfenceBrokers(0, 1, 2, 3, 4);
        Uuid fooId = ctx.createTestTopic("foo", new int[][]{{1, 2, 3, 4}}).topicId();
        ArrayList<Object> fenceRecords = new ArrayList<ApiMessageAndVersion>();
        replication.handleBrokerFenced(3, fenceRecords);
        ctx.replay(fenceRecords);
        Assertions.assertEquals((Object)new PartitionRegistration(new int[]{1, 2, 3, 4}, new int[]{1, 2, 4}, new int[0], new int[0], 1, LeaderRecoveryState.RECOVERED, 1, 1), (Object)replication.getPartition(fooId, 0));
        AlterPartitionRequestData alterIsrRequest = new AlterPartitionRequestData().setBrokerId(1).setBrokerEpoch(101L).setTopics(Arrays.asList(new AlterPartitionRequestData.TopicData().setTopicName(version <= 1 ? "foo" : "").setTopicId(version > 1 ? fooId : Uuid.ZERO_UUID).setPartitions(Arrays.asList(new AlterPartitionRequestData.PartitionData().setPartitionIndex(0).setPartitionEpoch(1).setLeaderEpoch(1).setNewIsr(Arrays.asList(1, 2, 3, 4))))));
        ControllerRequestContext requestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.ALTER_PARTITION, version);
        ControllerResult alterPartitionResult = replication.alterPartition(requestContext, alterIsrRequest);
        Errors expectedError = version <= 1 ? Errors.OPERATION_NOT_ATTEMPTED : Errors.INELIGIBLE_REPLICA;
        Assertions.assertEquals((Object)new AlterPartitionResponseData().setTopics(Arrays.asList(new AlterPartitionResponseData.TopicData().setTopicName(version <= 1 ? "foo" : "").setTopicId(version > 1 ? fooId : Uuid.ZERO_UUID).setPartitions(Arrays.asList(new AlterPartitionResponseData.PartitionData().setPartitionIndex(0).setErrorCode(expectedError.code()))))), (Object)alterPartitionResult.response());
        fenceRecords = new ArrayList();
        replication.handleBrokerUnfenced(3, 103L, fenceRecords);
        ctx.replay(fenceRecords);
        alterPartitionResult = replication.alterPartition(requestContext, alterIsrRequest);
        Assertions.assertEquals((Object)new AlterPartitionResponseData().setTopics(Arrays.asList(new AlterPartitionResponseData.TopicData().setTopicName(version <= 1 ? "foo" : "").setTopicId(version > 1 ? fooId : Uuid.ZERO_UUID).setPartitions(Arrays.asList(new AlterPartitionResponseData.PartitionData().setPartitionIndex(0).setLeaderId(1).setLeaderEpoch(1).setIsr(Arrays.asList(1, 2, 3, 4)).setPartitionEpoch(2).setErrorCode(Errors.NONE.code()))))), (Object)alterPartitionResult.response());
    }

    @ParameterizedTest
    @ApiKeyVersionsSource(apiKey=ApiKeys.ALTER_PARTITION)
    public void testAlterPartitionShouldRejectShuttingDownBrokers(short version) throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
        ReplicationControlManager replication = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2, 3, 4);
        ctx.unfenceBrokers(0, 1, 2, 3, 4);
        Uuid fooId = ctx.createTestTopic("foo", new int[][]{{1, 2, 3, 4}}).topicId();
        Assertions.assertEquals((Object)new PartitionRegistration(new int[]{1, 2, 3, 4}, new int[]{1, 2, 3, 4}, new int[0], new int[0], 1, LeaderRecoveryState.RECOVERED, 0, 0), (Object)replication.getPartition(fooId, 0));
        ctx.inControlledShutdownBrokers(3);
        AlterPartitionRequestData alterIsrRequest = new AlterPartitionRequestData().setBrokerId(1).setBrokerEpoch(101L).setTopics(Arrays.asList(new AlterPartitionRequestData.TopicData().setTopicName(version <= 1 ? "foo" : "").setTopicId(version > 1 ? fooId : Uuid.ZERO_UUID).setPartitions(Arrays.asList(new AlterPartitionRequestData.PartitionData().setPartitionIndex(0).setPartitionEpoch(0).setLeaderEpoch(0).setNewIsr(Arrays.asList(1, 2, 3, 4))))));
        ControllerRequestContext requestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.ALTER_PARTITION, version);
        ControllerResult alterPartitionResult = replication.alterPartition(requestContext, alterIsrRequest);
        Errors expectedError = version <= 1 ? Errors.OPERATION_NOT_ATTEMPTED : Errors.INELIGIBLE_REPLICA;
        Assertions.assertEquals((Object)new AlterPartitionResponseData().setTopics(Arrays.asList(new AlterPartitionResponseData.TopicData().setTopicName(version <= 1 ? "foo" : "").setTopicId(version > 1 ? fooId : Uuid.ZERO_UUID).setPartitions(Arrays.asList(new AlterPartitionResponseData.PartitionData().setPartitionIndex(0).setErrorCode(expectedError.code()))))), (Object)alterPartitionResult.response());
    }

    @Test
    public void testCancelReassignPartitions() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
        ReplicationControlManager replication = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2, 3, 4);
        ctx.unfenceBrokers(0, 1, 2, 3, 4);
        Uuid fooId = ctx.createTestTopic("foo", new int[][]{{1, 2, 3, 4}, {0, 1, 2, 3}, {4, 3, 1, 0}, {2, 3, 4, 1}}).topicId();
        Uuid barId = ctx.createTestTopic("bar", new int[][]{{4, 3, 2}}).topicId();
        Assertions.assertEquals((Object)NONE_REASSIGNING, (Object)replication.listPartitionReassignments(null));
        ArrayList<ApiMessageAndVersion> fenceRecords = new ArrayList<ApiMessageAndVersion>();
        replication.handleBrokerFenced(3, fenceRecords);
        ctx.replay(fenceRecords);
        Assertions.assertEquals((Object)new PartitionRegistration(new int[]{1, 2, 3, 4}, new int[]{1, 2, 4}, new int[0], new int[0], 1, LeaderRecoveryState.RECOVERED, 1, 1), (Object)replication.getPartition(fooId, 0));
        ControllerResult alterResult = replication.alterPartitionReassignments(new AlterPartitionReassignmentsRequestData().setTopics(Arrays.asList(new AlterPartitionReassignmentsRequestData.ReassignableTopic().setName("foo").setPartitions(Arrays.asList(new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(0).setReplicas(Arrays.asList(1, 2, 3)), new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(1).setReplicas(Arrays.asList(1, 2, 3, 0)), new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(2).setReplicas(Arrays.asList(5, 6, 7)), new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(3).setReplicas(Arrays.asList(new Integer[0])))), new AlterPartitionReassignmentsRequestData.ReassignableTopic().setName("bar").setPartitions(Arrays.asList(new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(0).setReplicas(Arrays.asList(1, 2, 3, 4, 0)))))));
        Assertions.assertEquals((Object)new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(Arrays.asList(new AlterPartitionReassignmentsResponseData.ReassignableTopicResponse().setName("foo").setPartitions(Arrays.asList(new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(0).setErrorMessage(null), new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(1).setErrorMessage(null), new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(2).setErrorCode(Errors.INVALID_REPLICA_ASSIGNMENT.code()).setErrorMessage("The manual partition assignment includes broker 5, but no such broker is registered."), new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(3).setErrorCode(Errors.INVALID_REPLICA_ASSIGNMENT.code()).setErrorMessage("The manual partition assignment includes an empty replica list."))), new AlterPartitionReassignmentsResponseData.ReassignableTopicResponse().setName("bar").setPartitions(Arrays.asList(new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(0).setErrorMessage(null))))), (Object)alterResult.response());
        ctx.replay(alterResult.records());
        Assertions.assertEquals((Object)new PartitionRegistration(new int[]{1, 2, 3}, new int[]{1, 2}, new int[0], new int[0], 1, LeaderRecoveryState.RECOVERED, 2, 2), (Object)replication.getPartition(fooId, 0));
        Assertions.assertEquals((Object)new PartitionRegistration(new int[]{1, 2, 3, 0}, new int[]{0, 1, 2}, new int[0], new int[0], 0, LeaderRecoveryState.RECOVERED, 1, 2), (Object)replication.getPartition(fooId, 1));
        Assertions.assertEquals((Object)new PartitionRegistration(new int[]{1, 2, 3, 4, 0}, new int[]{4, 2}, new int[0], new int[]{0, 1}, 4, LeaderRecoveryState.RECOVERED, 1, 2), (Object)replication.getPartition(barId, 0));
        ListPartitionReassignmentsResponseData currentReassigning = new ListPartitionReassignmentsResponseData().setErrorMessage(null).setTopics(Arrays.asList(new ListPartitionReassignmentsResponseData.OngoingTopicReassignment().setName("bar").setPartitions(Arrays.asList(new ListPartitionReassignmentsResponseData.OngoingPartitionReassignment().setPartitionIndex(0).setRemovingReplicas(Collections.emptyList()).setAddingReplicas(Arrays.asList(0, 1)).setReplicas(Arrays.asList(1, 2, 3, 4, 0))))));
        Assertions.assertEquals((Object)currentReassigning, (Object)replication.listPartitionReassignments(null));
        Assertions.assertEquals((Object)NONE_REASSIGNING, (Object)replication.listPartitionReassignments(Arrays.asList(new ListPartitionReassignmentsRequestData.ListPartitionReassignmentsTopics().setName("foo").setPartitionIndexes(Arrays.asList(0, 1, 2)))));
        Assertions.assertEquals((Object)currentReassigning, (Object)replication.listPartitionReassignments(Arrays.asList(new ListPartitionReassignmentsRequestData.ListPartitionReassignmentsTopics().setName("bar").setPartitionIndexes(Arrays.asList(0, 1, 2)))));
        ControllerResult alterPartitionResult = replication.alterPartition(ControllerRequestContextUtil.anonymousContextFor(ApiKeys.ALTER_PARTITION), new AlterPartitionRequestData().setBrokerId(4).setBrokerEpoch(104L).setTopics(Arrays.asList(new AlterPartitionRequestData.TopicData().setTopicId(barId).setPartitions(Arrays.asList(new AlterPartitionRequestData.PartitionData().setPartitionIndex(0).setPartitionEpoch(2).setLeaderEpoch(1).setNewIsr(Arrays.asList(4, 1, 2, 0)))))));
        Assertions.assertEquals((Object)new AlterPartitionResponseData().setTopics(Arrays.asList(new AlterPartitionResponseData.TopicData().setTopicId(barId).setPartitions(Arrays.asList(new AlterPartitionResponseData.PartitionData().setPartitionIndex(0).setLeaderId(4).setLeaderEpoch(1).setIsr(Arrays.asList(4, 1, 2, 0)).setPartitionEpoch(3).setErrorCode(Errors.NONE.code()))))), (Object)alterPartitionResult.response());
        ControllerResult cancelResult = replication.alterPartitionReassignments(new AlterPartitionReassignmentsRequestData().setTopics(Arrays.asList(new AlterPartitionReassignmentsRequestData.ReassignableTopic().setName("foo").setPartitions(Arrays.asList(new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(0).setReplicas(null))), new AlterPartitionReassignmentsRequestData.ReassignableTopic().setName("bar").setPartitions(Arrays.asList(new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(0).setReplicas(null))))));
        Assertions.assertEquals((Object)ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion((ApiMessage)new PartitionChangeRecord().setTopicId(barId).setPartitionId(0).setLeader(4).setReplicas(Arrays.asList(2, 3, 4)).setRemovingReplicas(null).setAddingReplicas(Collections.emptyList()), 0)), (Object)new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(Arrays.asList(new AlterPartitionReassignmentsResponseData.ReassignableTopicResponse().setName("foo").setPartitions(Arrays.asList(new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(0).setErrorCode(Errors.NO_REASSIGNMENT_IN_PROGRESS.code()).setErrorMessage(null))), new AlterPartitionReassignmentsResponseData.ReassignableTopicResponse().setName("bar").setPartitions(Arrays.asList(new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(0).setErrorMessage(null)))))), (Object)cancelResult);
        ctx.replay(cancelResult.records());
        Assertions.assertEquals((Object)NONE_REASSIGNING, (Object)replication.listPartitionReassignments(null));
        Assertions.assertEquals((Object)new PartitionRegistration(new int[]{2, 3, 4}, new int[]{4, 2}, new int[0], new int[0], 4, LeaderRecoveryState.RECOVERED, 2, 3), (Object)replication.getPartition(barId, 0));
    }

    @Test
    public void testManualPartitionAssignmentOnAllFencedBrokers() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
        ctx.registerBrokers(0, 1, 2, 3);
        ctx.createTestTopic("foo", new int[][]{{0, 1, 2}}, Errors.INVALID_REPLICA_ASSIGNMENT.code());
    }

    @Test
    public void testCreatePartitionsFailsWithManualAssignmentWithAllFenced() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
        ctx.registerBrokers(0, 1, 2, 3, 4, 5);
        ctx.unfenceBrokers(0, 1, 2);
        Uuid fooId = ctx.createTestTopic("foo", new int[][]{{0, 1, 2}}).topicId();
        ctx.createPartitions(2, "foo", new int[][]{{3, 4, 5}}, Errors.INVALID_REPLICA_ASSIGNMENT.code());
        ctx.createPartitions(2, "foo", new int[][]{{2, 4, 5}}, Errors.NONE.code());
        Assertions.assertEquals((Object)new PartitionRegistration(new int[]{2, 4, 5}, new int[]{2}, Replicas.NONE, Replicas.NONE, 2, LeaderRecoveryState.RECOVERED, 0, 0), (Object)ctx.replicationControl.getPartition(fooId, 1));
    }

    @Test
    public void testCreatePartitionsHonorsCreateTopicsPolicy() throws Exception {
        MockCreateTopicPolicy createTopicPolicy = new MockCreateTopicPolicy(Arrays.asList(new CreateTopicPolicy.RequestMetadata("foo", null, null, Collections.singletonMap(0, Arrays.asList(0, 1, 2)), Collections.emptyMap()), new CreateTopicPolicy.RequestMetadata("foo", null, null, Collections.singletonMap(1, Arrays.asList(2, 1, 0)), Collections.emptyMap()), new CreateTopicPolicy.RequestMetadata("foo", Integer.valueOf(2), Short.valueOf((short)3), null, Collections.emptyMap())));
        ReplicationControlTestContext ctx = new ReplicationControlTestContext(Optional.of(createTopicPolicy));
        ctx.registerBrokers(0, 1, 2, 3, 4, 5);
        ctx.unfenceBrokers(0, 1, 2);
        Uuid fooId = ctx.createTestTopic("foo", new int[][]{{0, 1, 2}}).topicId();
        ctx.createPartitions(2, "foo", new int[][]{{2, 1, 0}}, Errors.NONE.code());
        Assertions.assertEquals((Object)new PartitionRegistration(new int[]{0, 1, 2}, new int[]{0, 1, 2}, Replicas.NONE, Replicas.NONE, 0, LeaderRecoveryState.RECOVERED, 0, 0), (Object)ctx.replicationControl.getPartition(fooId, 0));
        Assertions.assertEquals((Object)new PartitionRegistration(new int[]{2, 1, 0}, new int[]{2, 1, 0}, Replicas.NONE, Replicas.NONE, 2, LeaderRecoveryState.RECOVERED, 0, 0), (Object)ctx.replicationControl.getPartition(fooId, 1));
        ctx.createPartitions(4, "foo", null, Errors.NONE.code());
    }

    private void assertLeaderAndIsr(ReplicationControlManager replication, TopicIdPartition topicIdPartition, int leaderId, int[] isr) {
        PartitionRegistration registration = replication.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
        Assertions.assertArrayEquals((int[])isr, (int[])registration.isr);
        Assertions.assertEquals((int)leaderId, (int)registration.leader);
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testElectUncleanLeaders(boolean electAllPartitions) throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
        ReplicationControlManager replication = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2, 3, 4);
        ctx.unfenceBrokers(0, 1, 2, 3, 4);
        Uuid fooId = ctx.createTestTopic("foo", new int[][]{{1, 2, 3}, {2, 3, 4}, {0, 2, 1}}).topicId();
        TopicIdPartition partition0 = new TopicIdPartition(fooId, 0);
        TopicIdPartition partition1 = new TopicIdPartition(fooId, 1);
        TopicIdPartition partition2 = new TopicIdPartition(fooId, 2);
        ctx.fenceBrokers(Utils.mkSet((Object[])new Integer[]{2, 3}));
        ctx.fenceBrokers(Utils.mkSet((Object[])new Integer[]{1, 2, 3}));
        this.assertLeaderAndIsr(replication, partition0, -1, new int[]{1});
        this.assertLeaderAndIsr(replication, partition1, 4, new int[]{4});
        this.assertLeaderAndIsr(replication, partition2, 0, new int[]{0});
        ElectLeadersRequestData request = this.buildElectLeadersRequest(ElectionType.UNCLEAN, electAllPartitions ? null : Collections.singletonMap("foo", Arrays.asList(0, 1, 2)));
        ControllerResult result1 = replication.electLeaders(request);
        Assertions.assertEquals(Collections.emptyList(), (Object)result1.records());
        ElectLeadersResponseData expectedResponse1 = this.buildElectLeadersResponse(Errors.NONE, electAllPartitions, Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)new TopicPartition("foo", 0), (Object)new ApiError(Errors.ELIGIBLE_LEADERS_NOT_AVAILABLE)), Utils.mkEntry((Object)new TopicPartition("foo", 1), (Object)new ApiError(Errors.ELECTION_NOT_NEEDED)), Utils.mkEntry((Object)new TopicPartition("foo", 2), (Object)new ApiError(Errors.ELECTION_NOT_NEEDED))}));
        this.assertElectLeadersResponse(expectedResponse1, (ElectLeadersResponseData)result1.response());
        ctx.unfenceBrokers(Utils.mkSet((Object[])new Integer[]{2}));
        ctx.alterPartition(partition1, 4, Arrays.asList(2, 4), LeaderRecoveryState.RECOVERED);
        ControllerResult result = replication.electLeaders(request);
        Assertions.assertEquals((int)1, (int)result.records().size());
        ApiMessageAndVersion record = (ApiMessageAndVersion)result.records().get(0);
        Assertions.assertTrue((boolean)(record.message() instanceof PartitionChangeRecord));
        PartitionChangeRecord partitionChangeRecord = (PartitionChangeRecord)record.message();
        Assertions.assertEquals((int)0, (int)partitionChangeRecord.partitionId());
        Assertions.assertEquals((int)2, (int)partitionChangeRecord.leader());
        Assertions.assertEquals(Collections.singletonList(2), partitionChangeRecord.isr());
        ctx.replay(result.records());
        this.assertLeaderAndIsr(replication, partition0, 2, new int[]{2});
        this.assertLeaderAndIsr(replication, partition1, 4, new int[]{2, 4});
        this.assertLeaderAndIsr(replication, partition2, 0, new int[]{0});
        ElectLeadersResponseData expectedResponse = this.buildElectLeadersResponse(Errors.NONE, electAllPartitions, Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)new TopicPartition("foo", 0), (Object)ApiError.NONE), Utils.mkEntry((Object)new TopicPartition("foo", 1), (Object)new ApiError(Errors.ELECTION_NOT_NEEDED)), Utils.mkEntry((Object)new TopicPartition("foo", 2), (Object)new ApiError(Errors.ELECTION_NOT_NEEDED))}));
        this.assertElectLeadersResponse(expectedResponse, (ElectLeadersResponseData)result.response());
    }

    @Test
    public void testPreferredElectionDoesNotTriggerUncleanElection() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
        ReplicationControlManager replication = ctx.replicationControl;
        ctx.registerBrokers(1, 2, 3, 4);
        ctx.unfenceBrokers(1, 2, 3, 4);
        Uuid fooId = ctx.createTestTopic("foo", new int[][]{{1, 2, 3}}).topicId();
        TopicIdPartition partition = new TopicIdPartition(fooId, 0);
        ctx.fenceBrokers(Utils.mkSet((Object[])new Integer[]{2, 3}));
        ctx.fenceBrokers(Utils.mkSet((Object[])new Integer[]{1, 2, 3}));
        ctx.unfenceBrokers(Utils.mkSet((Object[])new Integer[]{2}));
        this.assertLeaderAndIsr(replication, partition, -1, new int[]{1});
        ctx.alterTopicConfig("foo", "unclean.leader.election.enable", "true");
        ElectLeadersRequestData request = this.buildElectLeadersRequest(ElectionType.PREFERRED, Collections.singletonMap("foo", Collections.singletonList(0)));
        ControllerResult result = replication.electLeaders(request);
        Assertions.assertEquals(Collections.emptyList(), (Object)result.records());
        ElectLeadersResponseData expectedResponse = this.buildElectLeadersResponse(Errors.NONE, false, Collections.singletonMap(new TopicPartition("foo", 0), new ApiError(Errors.PREFERRED_LEADER_NOT_AVAILABLE)));
        Assertions.assertEquals((Object)expectedResponse, (Object)result.response());
    }

    private ElectLeadersRequestData buildElectLeadersRequest(ElectionType electionType, Map<String, List<Integer>> partitions) {
        ElectLeadersRequestData request = new ElectLeadersRequestData().setElectionType(electionType.value);
        if (partitions == null) {
            request.setTopicPartitions(null);
        } else {
            partitions.forEach((topic, partitionIds) -> request.topicPartitions().add((ImplicitLinkedHashCollection.Element)new ElectLeadersRequestData.TopicPartitions().setTopic(topic).setPartitions(partitionIds)));
        }
        return request;
    }

    @Test
    public void testFenceMultipleBrokers() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
        ReplicationControlManager replication = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2, 3, 4);
        ctx.unfenceBrokers(0, 1, 2, 3, 4);
        Uuid fooId = ctx.createTestTopic("foo", new int[][]{{1, 2, 3}, {2, 3, 4}, {0, 2, 1}}).topicId();
        Assertions.assertTrue((boolean)ctx.clusterControl.fencedBrokerIds().isEmpty());
        ctx.fenceBrokers(Utils.mkSet((Object[])new Integer[]{2, 3}));
        PartitionRegistration partition0 = replication.getPartition(fooId, 0);
        PartitionRegistration partition1 = replication.getPartition(fooId, 1);
        PartitionRegistration partition2 = replication.getPartition(fooId, 2);
        Assertions.assertArrayEquals((int[])new int[]{1, 2, 3}, (int[])partition0.replicas);
        Assertions.assertArrayEquals((int[])new int[]{1}, (int[])partition0.isr);
        Assertions.assertEquals((int)1, (int)partition0.leader);
        Assertions.assertArrayEquals((int[])new int[]{2, 3, 4}, (int[])partition1.replicas);
        Assertions.assertArrayEquals((int[])new int[]{4}, (int[])partition1.isr);
        Assertions.assertEquals((int)4, (int)partition1.leader);
        Assertions.assertArrayEquals((int[])new int[]{0, 2, 1}, (int[])partition2.replicas);
        Assertions.assertArrayEquals((int[])new int[]{0, 1}, (int[])partition2.isr);
        Assertions.assertNotEquals((int)2, (int)partition2.leader);
    }

    @Test
    public void testElectPreferredLeaders() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
        ReplicationControlManager replication = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2, 3, 4);
        ctx.unfenceBrokers(1, 2, 3, 4);
        ctx.inControlledShutdownBrokers(1);
        Uuid fooId = ctx.createTestTopic("foo", new int[][]{{1, 2, 3}, {2, 3, 4}, {0, 2, 1}}).topicId();
        ElectLeadersRequestData request1 = new ElectLeadersRequestData().setElectionType(ElectionType.PREFERRED.value).setTopicPartitions(new ElectLeadersRequestData.TopicPartitionsCollection(Arrays.asList(new ElectLeadersRequestData.TopicPartitions().setTopic("foo").setPartitions(Arrays.asList(0, 1, 2)), new ElectLeadersRequestData.TopicPartitions().setTopic("bar").setPartitions(Arrays.asList(0, 1))).iterator()));
        ControllerResult election1Result = replication.electLeaders(request1);
        ElectLeadersResponseData expectedResponse1 = this.buildElectLeadersResponse(Errors.NONE, false, Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)new TopicPartition("foo", 0), (Object)new ApiError(Errors.PREFERRED_LEADER_NOT_AVAILABLE)), Utils.mkEntry((Object)new TopicPartition("foo", 1), (Object)new ApiError(Errors.ELECTION_NOT_NEEDED)), Utils.mkEntry((Object)new TopicPartition("foo", 2), (Object)new ApiError(Errors.PREFERRED_LEADER_NOT_AVAILABLE)), Utils.mkEntry((Object)new TopicPartition("bar", 0), (Object)new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "No such topic as bar")), Utils.mkEntry((Object)new TopicPartition("bar", 1), (Object)new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "No such topic as bar"))}));
        this.assertElectLeadersResponse(expectedResponse1, (ElectLeadersResponseData)election1Result.response());
        Assertions.assertEquals(Collections.emptyList(), (Object)election1Result.records());
        ctx.registerBrokers(1);
        ctx.unfenceBrokers(0, 1);
        ControllerResult alterPartitionResult = replication.alterPartition(ControllerRequestContextUtil.anonymousContextFor(ApiKeys.ALTER_PARTITION), new AlterPartitionRequestData().setBrokerId(2).setBrokerEpoch(102L).setTopics(Arrays.asList(new AlterPartitionRequestData.TopicData().setTopicId(fooId).setPartitions(Arrays.asList(new AlterPartitionRequestData.PartitionData().setPartitionIndex(0).setPartitionEpoch(0).setLeaderEpoch(0).setNewIsr(Arrays.asList(1, 2, 3)), new AlterPartitionRequestData.PartitionData().setPartitionIndex(2).setPartitionEpoch(0).setLeaderEpoch(0).setNewIsr(Arrays.asList(0, 2, 1)))))));
        Assertions.assertEquals((Object)new AlterPartitionResponseData().setTopics(Arrays.asList(new AlterPartitionResponseData.TopicData().setTopicId(fooId).setPartitions(Arrays.asList(new AlterPartitionResponseData.PartitionData().setPartitionIndex(0).setLeaderId(2).setLeaderEpoch(0).setIsr(Arrays.asList(1, 2, 3)).setPartitionEpoch(1).setErrorCode(Errors.NONE.code()), new AlterPartitionResponseData.PartitionData().setPartitionIndex(2).setLeaderId(2).setLeaderEpoch(0).setIsr(Arrays.asList(0, 2, 1)).setPartitionEpoch(1).setErrorCode(Errors.NONE.code()))))), (Object)alterPartitionResult.response());
        ElectLeadersResponseData expectedResponse2 = this.buildElectLeadersResponse(Errors.NONE, false, Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)new TopicPartition("foo", 0), (Object)ApiError.NONE), Utils.mkEntry((Object)new TopicPartition("foo", 1), (Object)new ApiError(Errors.ELECTION_NOT_NEEDED)), Utils.mkEntry((Object)new TopicPartition("foo", 2), (Object)ApiError.NONE), Utils.mkEntry((Object)new TopicPartition("bar", 0), (Object)new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "No such topic as bar")), Utils.mkEntry((Object)new TopicPartition("bar", 1), (Object)new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "No such topic as bar"))}));
        ctx.replay(alterPartitionResult.records());
        ControllerResult election2Result = replication.electLeaders(request1);
        this.assertElectLeadersResponse(expectedResponse2, (ElectLeadersResponseData)election2Result.response());
        Assertions.assertEquals(Arrays.asList(new ApiMessageAndVersion((ApiMessage)new PartitionChangeRecord().setPartitionId(0).setTopicId(fooId).setLeader(1), 0), new ApiMessageAndVersion((ApiMessage)new PartitionChangeRecord().setPartitionId(2).setTopicId(fooId).setLeader(0), 0)), (Object)election2Result.records());
    }

    @Test
    public void testBalanceLinkedTopicPartitionLeaders() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
        ReplicationControlManager replication = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        String linkName = "foo_link";
        String topicName = "bar";
        String sourceTopicName = "source_bar";
        ctx.createClusterLink(linkName);
        CreateTopicsResponseData.CreatableTopicResult result = ctx.createLinkTopic(linkName, topicName, sourceTopicName, Uuid.randomUuid());
        Uuid topicId = result.topicId();
        PartitionRegistration partition0 = replication.getPartition(topicId, 0);
        Assertions.assertTrue((boolean)partition0.isMirror());
        int linkedLeaderEpoch = 25;
        AlterPartitionRequestData.ClusterLinkState linkState = new AlterPartitionRequestData.ClusterLinkState().setLinkedLeaderEpoch(linkedLeaderEpoch).setLinkFailed(false);
        ctx.setClusterLinkState(new TopicIdPartition(topicId, 0), linkState);
        int leaderId = partition0.leader;
        ctx.fenceBrokers(Collections.singleton(leaderId));
        Assertions.assertTrue((boolean)replication.arePartitionLeadersImbalanced());
        ControllerResult balanceResult = replication.maybeBalancePartitionLeaders();
        Assertions.assertFalse((boolean)((Boolean)balanceResult.response()));
        Assertions.assertEquals(Collections.emptyList(), (Object)balanceResult.records());
    }

    @Test
    public void testBalancePartitionLeaders() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
        ReplicationControlManager replication = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2, 3, 4);
        ctx.unfenceBrokers(2, 3, 4);
        Uuid fooId = ctx.createTestTopic("foo", new int[][]{{1, 2, 3}, {2, 3, 4}, {0, 2, 1}}).topicId();
        Assertions.assertTrue((boolean)replication.arePartitionLeadersImbalanced());
        Assertions.assertEquals((int)2, (int)ctx.metrics.preferredReplicaImbalanceCount());
        ctx.unfenceBrokers(1);
        ControllerResult alterPartitionResult = replication.alterPartition(ControllerRequestContextUtil.anonymousContextFor(ApiKeys.ALTER_PARTITION), new AlterPartitionRequestData().setBrokerId(2).setBrokerEpoch(102L).setTopics(Arrays.asList(new AlterPartitionRequestData.TopicData().setTopicId(fooId).setPartitions(Arrays.asList(new AlterPartitionRequestData.PartitionData().setPartitionIndex(0).setPartitionEpoch(0).setLeaderEpoch(0).setNewIsr(Arrays.asList(1, 2, 3)))))));
        Assertions.assertEquals((Object)new AlterPartitionResponseData().setTopics(Arrays.asList(new AlterPartitionResponseData.TopicData().setTopicId(fooId).setPartitions(Arrays.asList(new AlterPartitionResponseData.PartitionData().setPartitionIndex(0).setLeaderId(2).setLeaderEpoch(0).setIsr(Arrays.asList(1, 2, 3)).setPartitionEpoch(1).setErrorCode(Errors.NONE.code()))))), (Object)alterPartitionResult.response());
        ctx.replay(alterPartitionResult.records());
        ControllerResult balanceResult = replication.maybeBalancePartitionLeaders();
        ctx.replay(balanceResult.records());
        PartitionChangeRecord expectedChangeRecord = new PartitionChangeRecord().setPartitionId(0).setTopicId(fooId).setLeader(1);
        Assertions.assertEquals(Arrays.asList(new ApiMessageAndVersion((ApiMessage)expectedChangeRecord, 0)), (Object)balanceResult.records());
        Assertions.assertTrue((boolean)replication.arePartitionLeadersImbalanced());
        Assertions.assertEquals((int)1, (int)ctx.metrics.preferredReplicaImbalanceCount());
        Assertions.assertFalse((boolean)((Boolean)balanceResult.response()));
        ctx.unfenceBrokers(0);
        alterPartitionResult = replication.alterPartition(ControllerRequestContextUtil.anonymousContextFor(ApiKeys.ALTER_PARTITION), new AlterPartitionRequestData().setBrokerId(2).setBrokerEpoch(102L).setTopics(Arrays.asList(new AlterPartitionRequestData.TopicData().setTopicId(fooId).setPartitions(Arrays.asList(new AlterPartitionRequestData.PartitionData().setPartitionIndex(2).setPartitionEpoch(0).setLeaderEpoch(0).setNewIsr(Arrays.asList(0, 2, 1)))))));
        Assertions.assertEquals((Object)new AlterPartitionResponseData().setTopics(Arrays.asList(new AlterPartitionResponseData.TopicData().setTopicId(fooId).setPartitions(Arrays.asList(new AlterPartitionResponseData.PartitionData().setPartitionIndex(2).setLeaderId(2).setLeaderEpoch(0).setIsr(Arrays.asList(0, 2, 1)).setPartitionEpoch(1).setErrorCode(Errors.NONE.code()))))), (Object)alterPartitionResult.response());
        ctx.replay(alterPartitionResult.records());
        balanceResult = replication.maybeBalancePartitionLeaders();
        ctx.replay(balanceResult.records());
        expectedChangeRecord = new PartitionChangeRecord().setPartitionId(2).setTopicId(fooId).setLeader(0);
        Assertions.assertEquals(Arrays.asList(new ApiMessageAndVersion((ApiMessage)expectedChangeRecord, 0)), (Object)balanceResult.records());
        Assertions.assertFalse((boolean)replication.arePartitionLeadersImbalanced());
        Assertions.assertEquals((int)0, (int)ctx.metrics.preferredReplicaImbalanceCount());
        Assertions.assertFalse((boolean)((Boolean)balanceResult.response()));
    }

    private void assertElectLeadersResponse(ElectLeadersResponseData expected, ElectLeadersResponseData actual) {
        Assertions.assertEquals((Object)Errors.forCode((short)expected.errorCode()), (Object)Errors.forCode((short)actual.errorCode()));
        Assertions.assertEquals(this.collectElectLeadersErrors(expected), this.collectElectLeadersErrors(actual));
    }

    private Map<TopicPartition, ElectLeadersResponseData.PartitionResult> collectElectLeadersErrors(ElectLeadersResponseData response) {
        HashMap<TopicPartition, ElectLeadersResponseData.PartitionResult> res = new HashMap<TopicPartition, ElectLeadersResponseData.PartitionResult>();
        response.replicaElectionResults().forEach(topicResult -> {
            String topic = topicResult.topic();
            topicResult.partitionResult().forEach(partitionResult -> {
                TopicPartition topicPartition = new TopicPartition(topic, partitionResult.partitionId());
                res.put(topicPartition, (ElectLeadersResponseData.PartitionResult)partitionResult);
            });
        });
        return res;
    }

    private ElectLeadersResponseData buildElectLeadersResponse(Errors topLevelError, boolean electAllPartitions, Map<TopicPartition, ApiError> errors) {
        Map<String, List<Map.Entry>> errorsByTopic = errors.entrySet().stream().collect(Collectors.groupingBy(entry -> ((TopicPartition)entry.getKey()).topic()));
        ElectLeadersResponseData response = new ElectLeadersResponseData().setErrorCode(topLevelError.code());
        errorsByTopic.forEach((topic, partitionErrors) -> {
            ElectLeadersResponseData.ReplicaElectionResult electionResult = new ElectLeadersResponseData.ReplicaElectionResult().setTopic(topic);
            electionResult.setPartitionResult(partitionErrors.stream().filter(entry -> !electAllPartitions || ((ApiError)entry.getValue()).error() != Errors.ELECTION_NOT_NEEDED).map(entry -> {
                TopicPartition topicPartition = (TopicPartition)entry.getKey();
                ApiError error = (ApiError)entry.getValue();
                return new ElectLeadersResponseData.PartitionResult().setPartitionId(topicPartition.partition()).setErrorCode(error.error().code()).setErrorMessage(error.message());
            }).collect(Collectors.toList()));
            response.replicaElectionResults().add(electionResult);
        });
        return response;
    }

    @Test
    public void testKRaftClusterDescriber() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
        ReplicationControlManager replication = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2, 3, 4);
        ctx.unfenceBrokers(2, 3, 4);
        ctx.createTestTopic("foo", new int[][]{{1, 2, 3}, {2, 3, 4}, {0, 2, 1}}).topicId();
        ctx.createTestTopic("bar", new int[][]{{2, 3, 4}, {3, 4, 2}}).topicId();
        HashSet topicNames = new HashSet();
        ReplicationControlManager.KRaftClusterDescriber describer = replication.clusterDescriber;
        describer.topicNames().forEachRemaining(name -> topicNames.add(name));
        Assertions.assertEquals(new HashSet<String>(Arrays.asList("foo", "bar")), topicNames);
        Assertions.assertEquals(Arrays.asList(Arrays.asList(1, 2, 3), Arrays.asList(2, 3, 4), Arrays.asList(0, 2, 1)), (Object)describer.replicasForTopicName("foo"));
        Assertions.assertEquals(Arrays.asList(Arrays.asList(2, 3, 4), Arrays.asList(3, 4, 2)), (Object)describer.replicasForTopicName("bar"));
        Assertions.assertEquals(Arrays.asList(new Object[0]), (Object)describer.replicasForTopicName("baz"));
        HashSet brokers = new HashSet();
        describer.usableBrokers().forEachRemaining(broker -> brokers.add(broker));
        Assertions.assertEquals(new HashSet<UsableBroker>(Arrays.asList(new UsableBroker(0, Optional.empty(), true), new UsableBroker(1, Optional.empty(), true), new UsableBroker(2, Optional.empty(), false), new UsableBroker(3, Optional.empty(), false), new UsableBroker(4, Optional.empty(), false))), brokers);
    }

    @ParameterizedTest
    @EnumSource(value=MetadataVersion.class, names={"IBP_3_3_IV2", "IBP_3_3_IV3"})
    public void testProcessBrokerHeartbeatInControlledShutdown(MetadataVersion metadataVersion) throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext(metadataVersion);
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        Uuid topicId = ctx.createTestTopic("foo", new int[][]{{0, 1, 2}}).topicId();
        BrokerHeartbeatRequestData heartbeatRequest = new BrokerHeartbeatRequestData().setBrokerId(0).setBrokerEpoch(100L).setCurrentMetadataOffset(0L).setWantShutDown(true);
        ControllerResult result = ctx.replicationControl.processBrokerHeartbeat(heartbeatRequest, 0L);
        ArrayList<ApiMessageAndVersion> expectedRecords = new ArrayList<ApiMessageAndVersion>();
        if (metadataVersion.isInControlledShutdownStateSupported()) {
            expectedRecords.add(new ApiMessageAndVersion((ApiMessage)new BrokerRegistrationChangeRecord().setBrokerEpoch(100L).setBrokerId(0).setInControlledShutdown(BrokerRegistrationInControlledShutdownChange.IN_CONTROLLED_SHUTDOWN.value()), 1));
        }
        expectedRecords.add(new ApiMessageAndVersion((ApiMessage)new PartitionChangeRecord().setPartitionId(0).setTopicId(topicId).setIsr(Arrays.asList(1, 2)).setLeader(1), 0));
        Assertions.assertEquals(expectedRecords, (Object)result.records());
    }

    private static List<Integer> arrayToList(int[] array) {
        return IntStream.of(array).boxed().collect(Collectors.toList());
    }

    private static class MockCreateTopicPolicy
    implements CreateTopicPolicy,
    ConfluentPartitionsPerTopicListener {
        private final List<CreateTopicPolicy.RequestMetadata> expecteds;
        private final AtomicLong index = new AtomicLong(0L);
        private int numTenantPartitions = 0;
        private int numClusterTopics = 0;

        MockCreateTopicPolicy(List<CreateTopicPolicy.RequestMetadata> expecteds) {
            this.expecteds = expecteds;
        }

        public void validate(CreateTopicPolicy.RequestMetadata actual) throws PolicyViolationException {
            int partitionsPassed;
            long curIndex = this.index.getAndIncrement();
            if (curIndex >= (long)this.expecteds.size()) {
                throw new PolicyViolationException("Unexpected topic creation: index out of range at " + curIndex);
            }
            CreateTopicPolicy.RequestMetadata expected = this.expecteds.get((int)curIndex);
            if (!expected.equals((Object)actual)) {
                throw new PolicyViolationException("Expected: " + expected + ". Got: " + actual);
            }
            int n = partitionsPassed = actual.numPartitions() == null ? 0 : actual.numPartitions();
            if (partitionsPassed + this.numTenantPartitions >= 20) {
                throw new PolicyViolationException("Exceeds tenant partitions");
            }
            if (this.numClusterTopics >= 3) {
                throw new PolicyViolationException("Exceeds cluster topics");
            }
        }

        public void close() throws Exception {
        }

        public void configure(Map<String, ?> configs) {
        }

        public void fullUpdate(Iterator<Map.Entry<String, Integer>> iterator) {
        }

        public void partialUpdate(String topicName, int numPartitionsAddedOrDeleted, int numTopicsAddedOrDeleted) {
            this.numClusterTopics += numTopicsAddedOrDeleted;
            this.numTenantPartitions += numPartitionsAddedOrDeleted;
        }
    }

    private static class ReplicationControlTestContext {
        final SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
        final LogContext logContext = new LogContext();
        final MockTime time = new MockTime();
        final MockRandom random = new MockRandom();
        final ControllerMetrics metrics = new MockControllerMetrics();
        final FeatureControlManager featureControl = new FeatureControlManager.Builder().setSnapshotRegistry(this.snapshotRegistry).setQuorumFeatures(new QuorumFeatures(0, new ApiVersions(), QuorumFeatures.defaultFeatureMap(), Collections.singletonList(0))).setMetadataVersion(MetadataVersion.latest()).build();
        final ClusterControlManager clusterControl = new ClusterControlManager.Builder().setLogContext(this.logContext).setTime((Time)this.time).setSnapshotRegistry(this.snapshotRegistry).setSessionTimeoutNs(TimeUnit.MILLISECONDS.convert(1000L, TimeUnit.NANOSECONDS)).setReplicaPlacer((ReplicaPlacer)new StripedReplicaPlacer((Random)this.random)).setControllerMetrics(this.metrics).setFeatureControlManager(this.featureControl).build();
        final ConfigurationControlManager configurationControl = new ConfigurationControlManager.Builder().setSnapshotRegistry(this.snapshotRegistry).build();
        final MirrorTopicControlManager mirrorTopicControl = new MirrorTopicControlManager(this.snapshotRegistry, this.logContext, (Time)this.time, this::resolveTopicId, this::resolveClusterLinkId);
        final ReplicationControlManager replicationControl;
        final ClusterLinkControlManager clusterLinkControl;

        void replay(List<ApiMessageAndVersion> records) throws Exception {
            RecordTestUtils.replayAll(this.clusterControl, records);
            RecordTestUtils.replayAll(this.configurationControl, records);
            RecordTestUtils.replayAll(this.replicationControl, records);
            RecordTestUtils.replayAll(this.clusterLinkControl, records);
            RecordTestUtils.replayAll(this.mirrorTopicControl, records);
        }

        ReplicationControlTestContext() {
            this(Optional.empty());
        }

        ReplicationControlTestContext(MetadataVersion metadataVersion) {
            this(metadataVersion, Optional.empty());
        }

        ReplicationControlTestContext(Optional<CreateTopicPolicy> createTopicPolicy) {
            this(MetadataVersion.latest(), createTopicPolicy);
        }

        ReplicationControlTestContext(MetadataVersion metadataVersion, Optional<CreateTopicPolicy> createTopicPolicy) {
            FeatureControlManager featureControl = new FeatureControlManager.Builder().setSnapshotRegistry(this.snapshotRegistry).setQuorumFeatures(new QuorumFeatures(0, new ApiVersions(), QuorumFeatures.defaultFeatureMap(), Collections.singletonList(0))).setMetadataVersion(metadataVersion).build();
            this.replicationControl = new ReplicationControlManager.Builder().setSnapshotRegistry(this.snapshotRegistry).setLogContext(this.logContext).setMaxElectionsPerImbalance(Integer.MAX_VALUE).setConfigurationControl(this.configurationControl).setClusterControl(this.clusterControl).setControllerMetrics(this.metrics).setCreateTopicPolicy(createTopicPolicy).setMirrorTopicControl(this.mirrorTopicControl).setFeatureControl(featureControl).build();
            this.clusterLinkControl = new ClusterLinkControlManager(this.snapshotRegistry, this.logContext, this.configurationControl, this.mirrorTopicControl, arg_0 -> ((ReplicationControlManager)this.replicationControl).unlinkMirrorTopic(arg_0), "clusterId", Optional.empty());
            this.clusterControl.activate();
        }

        Optional<Uuid> resolveTopicId(String topicName) {
            return Optional.ofNullable(this.replicationControl.getTopicId(topicName));
        }

        Optional<Uuid> resolveClusterLinkId(String linkName) {
            return this.clusterLinkControl.getClusterLinkId(linkName);
        }

        CreateTopicsResponseData.CreatableTopicResult createTestTopic(String name, int numPartitions, short replicationFactor, short expectedErrorCode) throws Exception {
            return this.createTestTopic(name, numPartitions, replicationFactor, expectedErrorCode, __ -> {});
        }

        CreateTopicsResponseData.CreatableTopicResult createTestTopic(String name, int numPartitions, short replicationFactor, short expectedErrorCode, Consumer<CreateTopicsRequestData.CreatableTopic> preCreationHook) throws Exception {
            CreateTopicsRequestData request = new CreateTopicsRequestData();
            CreateTopicsRequestData.CreatableTopic topic = new CreateTopicsRequestData.CreatableTopic().setName(name);
            topic.setNumPartitions(numPartitions).setReplicationFactor(replicationFactor);
            preCreationHook.accept(topic);
            request.topics().add((ImplicitLinkedHashCollection.Element)topic);
            ControllerResult result = this.replicationControl.createTopics(request, Collections.singleton(name), KafkaPrincipal.ANONYMOUS);
            CreateTopicsResponseData.CreatableTopicResult topicResult = ((CreateTopicsResponseData)result.response()).topics().find(name);
            Assertions.assertNotNull((Object)topicResult);
            Assertions.assertEquals((Object)Errors.forCode((short)expectedErrorCode), (Object)Errors.forCode((short)topicResult.errorCode()));
            if (expectedErrorCode == Errors.NONE.code()) {
                this.replay(result.records());
            }
            return topicResult;
        }

        CreateTopicsResponseData.CreatableTopicResult createTestTopic(String name, int[][] replicas) throws Exception {
            return this.createTestTopic(name, replicas, Collections.emptyMap(), (short)0);
        }

        CreateTopicsResponseData.CreatableTopicResult createTestTopic(String name, int[][] replicas, short expectedErrorCode) throws Exception {
            return this.createTestTopic(name, replicas, Collections.emptyMap(), expectedErrorCode);
        }

        CreateTopicsResponseData.CreatableTopicResult createLinkTopic(String linkName, String topicName, String mirrorTopicName, Uuid mirrorTopicId) throws Exception {
            CreateTopicsRequestData.CreatableTopic topic = new CreateTopicsRequestData.CreatableTopic().setName(topicName).setMirrorTopic(mirrorTopicName).setSourceTopicId(mirrorTopicId).setLinkName(linkName).setNumPartitions(-1).setReplicationFactor((short)-1);
            CreateTopicsRequestData request = new CreateTopicsRequestData();
            request.topics().add((ImplicitLinkedHashCollection.Element)topic);
            ControllerResult result = this.replicationControl.createTopics(request, Collections.singleton(topicName), KafkaPrincipal.ANONYMOUS);
            CreateTopicsResponseData.CreatableTopicResult topicResult = ((CreateTopicsResponseData)result.response()).topics().find(topicName);
            Assertions.assertNotNull((Object)topicResult);
            Assertions.assertEquals((Object)Errors.NONE, (Object)Errors.forCode((short)topicResult.errorCode()));
            this.replay(result.records());
            return topicResult;
        }

        CreateTopicsResponseData.CreatableTopicResult createTestTopic(String name, int[][] replicas, Map<String, String> configs, short expectedErrorCode) throws Exception {
            Assertions.assertFalse((replicas.length == 0 ? 1 : 0) != 0);
            CreateTopicsRequestData request = new CreateTopicsRequestData();
            CreateTopicsRequestData.CreatableTopic topic = new CreateTopicsRequestData.CreatableTopic().setName(name);
            topic.setNumPartitions(-1).setReplicationFactor((short)-1);
            for (int i = 0; i < replicas.length; ++i) {
                topic.assignments().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableReplicaAssignment().setPartitionIndex(i).setBrokerIds(Replicas.toList((int[])replicas[i])));
            }
            configs.entrySet().forEach(e -> topic.configs().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreateableTopicConfig().setName((String)e.getKey()).setValue((String)e.getValue())));
            request.topics().add((ImplicitLinkedHashCollection.Element)topic);
            ControllerResult result = this.replicationControl.createTopics(request, Collections.singleton(name), KafkaPrincipal.ANONYMOUS);
            CreateTopicsResponseData.CreatableTopicResult topicResult = ((CreateTopicsResponseData)result.response()).topics().find(name);
            Assertions.assertNotNull((Object)topicResult);
            Assertions.assertEquals((short)expectedErrorCode, (short)topicResult.errorCode());
            if (expectedErrorCode == Errors.NONE.code()) {
                Assertions.assertEquals((int)replicas.length, (int)topicResult.numPartitions());
                Assertions.assertEquals((int)replicas[0].length, (int)topicResult.replicationFactor());
                this.replay(result.records());
            }
            return topicResult;
        }

        void createPartitions(int count, String name, int[][] replicas, short expectedErrorCode) throws Exception {
            if (replicas != null) {
                Assertions.assertFalse((replicas.length == 0 ? 1 : 0) != 0);
            }
            CreatePartitionsRequestData.CreatePartitionsTopic topic = new CreatePartitionsRequestData.CreatePartitionsTopic().setName(name).setCount(count);
            if (replicas == null) {
                topic.setAssignments(null);
            } else {
                for (int i = 0; i < replicas.length; ++i) {
                    topic.assignments().add(new CreatePartitionsRequestData.CreatePartitionsAssignment().setBrokerIds(Replicas.toList((int[])replicas[i])));
                }
            }
            ControllerResult result = this.replicationControl.createPartitions(Collections.singletonList(topic), KafkaPrincipal.ANONYMOUS);
            Assertions.assertEquals((int)1, (int)((List)result.response()).size());
            CreatePartitionsResponseData.CreatePartitionsTopicResult topicResult = (CreatePartitionsResponseData.CreatePartitionsTopicResult)((List)result.response()).get(0);
            Assertions.assertEquals((Object)name, (Object)topicResult.name());
            Assertions.assertEquals((short)expectedErrorCode, (short)topicResult.errorCode());
            this.replay(result.records());
        }

        void registerBrokers(Integer ... brokerIds) throws Exception {
            Integer[] integerArray = brokerIds;
            int n = integerArray.length;
            for (int i = 0; i < n; ++i) {
                int brokerId = integerArray[i];
                RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().setBrokerEpoch(brokerId + 100).setBrokerId(brokerId).setRack(null);
                brokerRecord.endPoints().add(new RegisterBrokerRecord.BrokerEndpoint().setSecurityProtocol(SecurityProtocol.PLAINTEXT.id).setPort(9092 + brokerId).setName("PLAINTEXT").setHost("localhost"));
                this.replay(Collections.singletonList(new ApiMessageAndVersion((ApiMessage)brokerRecord, 0)));
            }
        }

        void setClusterLinkState(TopicIdPartition topicIdPartition, AlterPartitionRequestData.ClusterLinkState clusterLinkState) throws Exception {
            PartitionRegistration partition = this.replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
            Assertions.assertNotNull((Object)partition);
            BrokerRegistration registration = (BrokerRegistration)this.clusterControl.brokerRegistrations().get(partition.leader);
            Assertions.assertFalse((boolean)registration.fenced());
            AlterPartitionRequestData.PartitionData partitionData = new AlterPartitionRequestData.PartitionData().setPartitionIndex(topicIdPartition.partitionId()).setPartitionEpoch(partition.partitionEpoch).setLeaderEpoch(partition.leaderEpoch).setLeaderRecoveryState(partition.leaderRecoveryState.value()).setNewIsr(ReplicationControlManagerTest.arrayToList(partition.isr)).setClusterLinkState(clusterLinkState);
            this.alterPartition(topicIdPartition, partitionData, partition.leader, registration.epoch());
        }

        private void alterPartition(TopicIdPartition topicIdPartition, AlterPartitionRequestData.PartitionData partitionData, int leaderId, long brokerEpoch) throws Exception {
            String topicName = this.replicationControl.getTopic(topicIdPartition.topicId()).name();
            AlterPartitionRequestData.TopicData topicData = new AlterPartitionRequestData.TopicData().setTopicName(topicName).setTopicId(topicIdPartition.topicId()).setPartitions(Collections.singletonList(partitionData));
            ControllerRequestContext requestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.ALTER_PARTITION);
            ControllerResult alterPartition = this.replicationControl.alterPartition(requestContext, new AlterPartitionRequestData().setBrokerId(leaderId).setBrokerEpoch(brokerEpoch).setTopics(Collections.singletonList(topicData)));
            this.replay(alterPartition.records());
        }

        void alterPartition(TopicIdPartition topicIdPartition, int leaderId, List<Integer> isr, LeaderRecoveryState leaderRecoveryState) throws Exception {
            BrokerRegistration registration = (BrokerRegistration)this.clusterControl.brokerRegistrations().get(leaderId);
            Assertions.assertFalse((boolean)registration.fenced());
            PartitionRegistration partition = this.replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
            Assertions.assertNotNull((Object)partition);
            Assertions.assertEquals((int)leaderId, (int)partition.leader);
            AlterPartitionRequestData.PartitionData partitionData = new AlterPartitionRequestData.PartitionData().setPartitionIndex(topicIdPartition.partitionId()).setPartitionEpoch(partition.partitionEpoch).setLeaderEpoch(partition.leaderEpoch).setLeaderRecoveryState(leaderRecoveryState.value()).setNewIsr(isr);
            this.alterPartition(topicIdPartition, partitionData, leaderId, registration.epoch());
        }

        void unfenceBrokers(Integer ... brokerIds) throws Exception {
            this.unfenceBrokers(Utils.mkSet((Object[])brokerIds));
        }

        void unfenceBrokers(Set<Integer> brokerIds) throws Exception {
            for (int brokerId : brokerIds) {
                ControllerResult result = this.replicationControl.processBrokerHeartbeat(new BrokerHeartbeatRequestData().setBrokerId(brokerId).setBrokerEpoch((long)(brokerId + 100)).setCurrentMetadataOffset(1L).setWantFence(false).setWantShutDown(false), 0L);
                Assertions.assertEquals((Object)new BrokerHeartbeatReply(true, false, false, false), (Object)result.response());
                this.replay(result.records());
            }
        }

        void inControlledShutdownBrokers(Integer ... brokerIds) throws Exception {
            this.inControlledShutdownBrokers(Utils.mkSet((Object[])brokerIds));
        }

        void inControlledShutdownBrokers(Set<Integer> brokerIds) throws Exception {
            for (int brokerId : brokerIds) {
                BrokerRegistrationChangeRecord record = new BrokerRegistrationChangeRecord().setBrokerId(brokerId).setBrokerEpoch(brokerId + 100).setInControlledShutdown(BrokerRegistrationInControlledShutdownChange.IN_CONTROLLED_SHUTDOWN.value());
                this.replay(Collections.singletonList(new ApiMessageAndVersion((ApiMessage)record, 1)));
            }
        }

        void alterTopicConfig(String topic, String configKey, String configValue) throws Exception {
            ConfigRecord configRecord = new ConfigRecord().setResourceType(ConfigResource.Type.TOPIC.id()).setResourceName(topic).setName(configKey).setValue(configValue);
            this.replay(Collections.singletonList(new ApiMessageAndVersion((ApiMessage)configRecord, 0)));
        }

        void fenceBrokers(Set<Integer> brokerIds) throws Exception {
            this.time.sleep(1000L);
            Set<Integer> unfencedBrokerIds = this.clusterControl.brokerRegistrations().keySet().stream().filter(brokerId -> !brokerIds.contains(brokerId)).collect(Collectors.toSet());
            this.unfenceBrokers(unfencedBrokerIds.toArray(new Integer[0]));
            Optional staleBroker = this.clusterControl.heartbeatManager().findOneStaleBroker();
            while (staleBroker.isPresent()) {
                ControllerResult fenceResult = this.replicationControl.maybeFenceOneStaleBroker();
                this.replay(fenceResult.records());
                staleBroker = this.clusterControl.heartbeatManager().findOneStaleBroker();
            }
            Assertions.assertEquals(brokerIds, (Object)this.clusterControl.fencedBrokerIds());
        }

        long currentBrokerEpoch(int brokerId) {
            Map registrations = this.clusterControl.brokerRegistrations();
            BrokerRegistration registration = (BrokerRegistration)registrations.get(brokerId);
            Assertions.assertNotNull((Object)registration, (String)("No current registration for broker " + brokerId));
            return registration.epoch();
        }

        OptionalInt currentLeader(TopicIdPartition topicIdPartition) {
            PartitionRegistration partition = this.replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
            return partition.leader < 0 ? OptionalInt.empty() : OptionalInt.of(partition.leader);
        }

        Uuid createClusterLink(String linkName) throws Exception {
            ArrayList<ApiMessageAndVersion> records = new ArrayList<ApiMessageAndVersion>();
            ResultOrError resultOrError = this.clusterLinkControl.createClusterLink(new CreateClusterLinksRequestData.EntryData().setLinkName(linkName).setClusterId("remote-cluster"), records::add, null);
            this.replay(records);
            Assertions.assertTrue((boolean)resultOrError.isResult());
            return (Uuid)resultOrError.result();
        }
    }
}

