/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.controller;

import io.confluent.kafka.multitenant.MultiTenantPrincipal;
import io.confluent.kafka.multitenant.TenantMetadata;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.kafka.clients.admin.BrokerComponent;
import org.apache.kafka.common.CellState;
import org.apache.kafka.common.DirectoryId;
import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.MirrorTopicError;
import org.apache.kafka.common.PartitionPlacementStrategy;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.PolicyViolationException;
import org.apache.kafka.common.errors.StaleBrokerEpochException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.AlterPartitionRequestData;
import org.apache.kafka.common.message.AlterPartitionResponseData;
import org.apache.kafka.common.message.AssignReplicasToDirsRequestData;
import org.apache.kafka.common.message.AssignReplicasToDirsResponseData;
import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
import org.apache.kafka.common.message.CreateClusterLinksRequestData;
import org.apache.kafka.common.message.CreatePartitionsRequestData;
import org.apache.kafka.common.message.CreatePartitionsResponseData;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsResponseData;
import org.apache.kafka.common.message.ElectLeadersRequestData;
import org.apache.kafka.common.message.ElectLeadersResponseData;
import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.RequestHeaderData;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.BrokerReplicaExclusionRecord;
import org.apache.kafka.common.metadata.CellRecord;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.metadata.MetadataRecordType;
import org.apache.kafka.common.metadata.MirrorTopicRecord;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.RemoveTopicRecord;
import org.apache.kafka.common.metadata.TenantRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AlterPartitionRequest;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
import org.apache.kafka.controller.AclControlManager;
import org.apache.kafka.controller.BrokerHeartbeatManager;
import org.apache.kafka.controller.CellControlManager;
import org.apache.kafka.controller.ClusterControlManager;
import org.apache.kafka.controller.ClusterLinkControlManager;
import org.apache.kafka.controller.ConfigurationControlManager;
import org.apache.kafka.controller.ControllerRequestContext;
import org.apache.kafka.controller.ControllerRequestContextUtil;
import org.apache.kafka.controller.ControllerResult;
import org.apache.kafka.controller.FeatureControlManager;
import org.apache.kafka.controller.MirrorTopicControlManager;
import org.apache.kafka.controller.QuorumControllerTestEnv;
import org.apache.kafka.controller.QuorumFeatures;
import org.apache.kafka.controller.ReplicationControlManager;
import org.apache.kafka.controller.ResultOrError;
import org.apache.kafka.controller.TenantControlManager;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.image.writer.ImageWriter;
import org.apache.kafka.image.writer.ImageWriterOptions;
import org.apache.kafka.image.writer.RecordListWriter;
import org.apache.kafka.metadata.AssignmentsHelper;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.metadata.BrokerRegistrationInControlledShutdownChange;
import org.apache.kafka.metadata.ClusterLink;
import org.apache.kafka.metadata.ConfluentPartitionsPerTopicListener;
import org.apache.kafka.metadata.LeaderRecoveryState;
import org.apache.kafka.metadata.MirrorTopic;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.metadata.Replicas;
import org.apache.kafka.metadata.TopicPlacement;
import org.apache.kafka.metadata.TopicType;
import org.apache.kafka.metadata.placement.ClusterDescriber;
import org.apache.kafka.metadata.placement.PartitionAssignment;
import org.apache.kafka.metadata.placement.PartitionAssignmentTest;
import org.apache.kafka.metadata.placement.PlacementSpec;
import org.apache.kafka.metadata.placement.ReplicaPlacer;
import org.apache.kafka.metadata.placement.StripedReplicaPlacer;
import org.apache.kafka.metadata.placement.TopicAssignment;
import org.apache.kafka.metadata.placement.UsableBroker;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.common.TopicIdPartition;
import org.apache.kafka.server.policy.CreateTopicPolicy;
import org.apache.kafka.server.util.MockRandom;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Timeout(value=40L)
public class ReplicationControlManagerTest {
    private static final Logger log = LoggerFactory.getLogger(ReplicationControlManagerTest.class);
    private static final int BROKER_SESSION_TIMEOUT_MS = 1000;
    private static final int MAX_NUM_CLUSTER_TOPICS = 3;
    private static final int MAX_NUM_TENANT_PARTITIONS = 20;
    private static final short REQ_REPLICATION_FACTOR = 3;
    private static final ListPartitionReassignmentsResponseData NONE_REASSIGNING = new ListPartitionReassignmentsResponseData().setErrorMessage(null);

    @Test
    public void testCreateVirtualTopics() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setVirtualTopicsEnabledInConfig(true).build();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        CreateTopicsResponseData.CreatableTopicResult result = ctx.createVirtualTopic("foo");
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        CreateTopicsResponseData.CreatableTopicResult expectedResponse = new CreateTopicsResponseData.CreatableTopicResult().setName("foo").setNumPartitions(0).setReplicationFactor((short)0).setErrorMessage(null).setErrorCode((short)0).setTopicId(result.topicId());
        Assertions.assertEquals((Object)expectedResponse, (Object)result);
        Assertions.assertNull((Object)replicationControl.getPartition(result.topicId(), 0));
        CreateTopicsResponseData.CreatableTopicResult result2 = ctx.createTestTopic("foo", 3, (short)3, Errors.TOPIC_ALREADY_EXISTS.code());
        CreateTopicsResponseData.CreatableTopicResult expectedResponse2 = new CreateTopicsResponseData.CreatableTopicResult().setName("foo").setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code()).setErrorMessage("Topic 'foo' already exists.");
        Assertions.assertEquals((Object)expectedResponse2, (Object)result2);
        ControllerRequestContext requestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.DELETE_TOPICS);
        ctx.deleteTopic(requestContext, result.topicId());
        CreateTopicsResponseData.CreatableTopicResult result3 = ctx.createTestTopic("foo", 3, (short)3, (short)0);
        CreateTopicsResponseData.CreatableTopicResult expectedResult3 = new CreateTopicsResponseData.CreatableTopicResult().setName("foo").setNumPartitions(3).setReplicationFactor((short)3).setErrorMessage(null).setErrorCode((short)0).setTopicId(result3.topicId());
        Assertions.assertEquals((Object)expectedResult3, (Object)result3);
        CreateTopicsResponseData.CreatableTopicResult result4 = ctx.createVirtualTopic("foo", Errors.TOPIC_ALREADY_EXISTS.code());
        CreateTopicsResponseData.CreatableTopicResult expectedResponse4 = new CreateTopicsResponseData.CreatableTopicResult().setName("foo").setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code()).setErrorMessage("Topic 'foo' already exists.");
        Assertions.assertEquals((Object)expectedResponse4, (Object)result4);
    }

    @Test
    public void testExcessiveNumberOfTopicsCannotBeCreated() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        CreateTopicsRequestData request = new CreateTopicsRequestData();
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions(5000).setReplicationFactor((short)1));
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("bar").setNumPartitions(5000).setReplicationFactor((short)1));
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("baz").setNumPartitions(1).setReplicationFactor((short)1));
        ControllerRequestContext requestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.CREATE_TOPICS);
        PolicyViolationException error = (PolicyViolationException)Assertions.assertThrows(PolicyViolationException.class, () -> replicationControl.createTopics(requestContext, request, Stream.of("foo", "bar", "baz").collect(Collectors.toSet())));
        Assertions.assertEquals((Object)error.getMessage(), (Object)"Excessively large number of partitions per request.");
    }

    @Test
    public void testExcessiveNumberOfTopicsCannotBeCreatedWithAssignments() {
        CreateTopicsRequestData request = new CreateTopicsRequestData();
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions(-1).setReplicationFactor((short)1));
        CreateTopicsRequestData.CreatableReplicaAssignmentCollection assignments = new CreateTopicsRequestData.CreatableReplicaAssignmentCollection();
        assignments.add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableReplicaAssignment().setPartitionIndex(1));
        assignments.add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableReplicaAssignment().setPartitionIndex(2));
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("baz").setAssignments(assignments));
        PolicyViolationException error = (PolicyViolationException)Assertions.assertThrows(PolicyViolationException.class, () -> ReplicationControlManager.validateTotalNumberOfPartitions((CreateTopicsRequestData)request, (int)9999));
        Assertions.assertEquals((Object)error.getMessage(), (Object)"Excessively large number of partitions per request.");
    }

    @Test
    public void testCreateTopics() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        CreateTopicsRequestData request = new CreateTopicsRequestData();
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions(-1).setReplicationFactor((short)-1));
        ControllerRequestContext requestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.CREATE_TOPICS);
        ControllerResult result = replicationControl.createTopics(requestContext, request, Collections.singleton("foo"));
        CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData();
        expectedResponse.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsResponseData.CreatableTopicResult().setName("foo").setErrorCode(Errors.INVALID_REPLICATION_FACTOR.code()).setErrorMessage("Unable to replicate the partition 3 time(s): All brokers are currently fenced."));
        Assertions.assertEquals((Object)expectedResponse, (Object)result.response());
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0);
        ctx.inControlledShutdownBrokers(0);
        ControllerResult result2 = replicationControl.createTopics(requestContext, request, Collections.singleton("foo"));
        CreateTopicsResponseData expectedResponse2 = new CreateTopicsResponseData();
        expectedResponse2.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsResponseData.CreatableTopicResult().setName("foo").setErrorCode(Errors.INVALID_REPLICATION_FACTOR.code()).setErrorMessage("Unable to replicate the partition 3 time(s): All brokers are currently fenced or in controlled shutdown."));
        Assertions.assertEquals((Object)expectedResponse2, (Object)result2.response());
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        ControllerResult result3 = replicationControl.createTopics(requestContext, request, Collections.singleton("foo"));
        CreateTopicsResponseData expectedResponse3 = new CreateTopicsResponseData();
        expectedResponse3.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsResponseData.CreatableTopicResult().setName("foo").setNumPartitions(1).setReplicationFactor((short)3).setErrorMessage(null).setErrorCode((short)0).setTopicId(((CreateTopicsResponseData)result3.response()).topics().find("foo").topicId()));
        Assertions.assertEquals((Object)expectedResponse3, (Object)result3.response());
        ctx.replay(result3.records());
        Assertions.assertEquals((Object)new PartitionRegistration.Builder().setReplicas(new int[]{1, 2, 0}).setDirectories(new Uuid[]{Uuid.fromString((String)"TESTBROKER00001DIRAAAA"), Uuid.fromString((String)"TESTBROKER00002DIRAAAA"), Uuid.fromString((String)"TESTBROKER00000DIRAAAA")}).setIsr(new int[]{1, 2, 0}).setLeader(Integer.valueOf(1)).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(Integer.valueOf(0)).setPartitionEpoch(Integer.valueOf(0)).build(), (Object)replicationControl.getPartition(((TopicRecord)((ApiMessageAndVersion)result3.records().get(0)).message()).topicId(), 0));
        ControllerResult result4 = replicationControl.createTopics(requestContext, request, Collections.singleton("foo"));
        CreateTopicsResponseData expectedResponse4 = new CreateTopicsResponseData();
        expectedResponse4.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsResponseData.CreatableTopicResult().setName("foo").setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code()).setErrorMessage("Topic 'foo' already exists."));
        Assertions.assertEquals((Object)expectedResponse4, (Object)result4.response());
    }

    @Test
    public void testCreateTopicsWithMutationQuotaExceeded() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setVirtualTopicsEnabledInConfig(true).build();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        CreateTopicsRequestData request = new CreateTopicsRequestData();
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions(-1).setReplicationFactor((short)-1));
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        ControllerRequestContext requestContext = ControllerRequestContextUtil.anonymousContextWithMutationQuotaExceededFor(ApiKeys.CREATE_TOPICS);
        ControllerResult result = replicationControl.createTopics(requestContext, request, Collections.singleton("foo"));
        CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData();
        expectedResponse.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsResponseData.CreatableTopicResult().setName("foo").setErrorCode(Errors.THROTTLING_QUOTA_EXCEEDED.code()).setErrorMessage("Quota exceeded in test"));
        Assertions.assertEquals((Object)expectedResponse, (Object)result.response());
        CreateTopicsRequestData.CreateableTopicConfigCollection validConfigs = new CreateTopicsRequestData.CreateableTopicConfigCollection();
        validConfigs.add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreateableTopicConfig().setName("confluent.topic.type").setValue(TopicType.VIRTUAL.logConfigValue()));
        CreateTopicsRequestData request2 = new CreateTopicsRequestData();
        request2.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("bar").setNumPartitions(-1).setReplicationFactor((short)-1).setConfigs(validConfigs));
        ControllerResult result2 = replicationControl.createTopics(requestContext, request2, Collections.singleton("bar"));
        CreateTopicsResponseData expectedResponse2 = new CreateTopicsResponseData();
        expectedResponse2.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsResponseData.CreatableTopicResult().setName("bar").setErrorCode(Errors.THROTTLING_QUOTA_EXCEEDED.code()).setErrorMessage("Quota exceeded in test"));
        Assertions.assertEquals((Object)expectedResponse2, (Object)result2.response());
    }

    @Test
    public void testCreateTopicsISRInvariants() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        CreateTopicsRequestData request = new CreateTopicsRequestData();
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions(-1).setReplicationFactor((short)-1));
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1);
        ctx.inControlledShutdownBrokers(1);
        ControllerRequestContext requestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.CREATE_TOPICS);
        ControllerResult result = replicationControl.createTopics(requestContext, request, Collections.singleton("foo"));
        CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData();
        expectedResponse.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsResponseData.CreatableTopicResult().setName("foo").setNumPartitions(1).setReplicationFactor((short)3).setErrorMessage(null).setErrorCode((short)0).setTopicId(((CreateTopicsResponseData)result.response()).topics().find("foo").topicId()));
        Assertions.assertEquals((Object)expectedResponse, (Object)result.response());
        ctx.replay(result.records());
        Assertions.assertEquals((Object)new PartitionRegistration.Builder().setReplicas(new int[]{1, 0, 2}).setDirectories(new Uuid[]{Uuid.fromString((String)"TESTBROKER00001DIRAAAA"), Uuid.fromString((String)"TESTBROKER00000DIRAAAA"), Uuid.fromString((String)"TESTBROKER00002DIRAAAA")}).setIsr(new int[]{0}).setLeader(Integer.valueOf(0)).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(Integer.valueOf(0)).setPartitionEpoch(Integer.valueOf(0)).build(), (Object)replicationControl.getPartition(((TopicRecord)((ApiMessageAndVersion)result.records().get(0)).message()).topicId(), 0));
    }

    @Test
    public void testCreateTopicsWithConfigs() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        CreateTopicsRequestData.CreateableTopicConfigCollection validConfigs = new CreateTopicsRequestData.CreateableTopicConfigCollection();
        validConfigs.add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreateableTopicConfig().setName("foo").setValue("notNull"));
        CreateTopicsRequestData request1 = new CreateTopicsRequestData();
        request1.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions(-1).setReplicationFactor((short)-1).setConfigs(validConfigs));
        ControllerRequestContext requestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.CREATE_TOPICS);
        ControllerResult result1 = replicationControl.createTopics(requestContext, request1, Collections.singleton("foo"));
        Assertions.assertEquals((short)0, (short)((CreateTopicsResponseData)result1.response()).topics().find("foo").errorCode());
        List records1 = result1.records();
        Assertions.assertEquals((int)3, (int)records1.size());
        ApiMessageAndVersion record0 = (ApiMessageAndVersion)records1.get(0);
        Assertions.assertEquals(TopicRecord.class, record0.message().getClass());
        ApiMessageAndVersion record1 = (ApiMessageAndVersion)records1.get(1);
        Assertions.assertEquals(ConfigRecord.class, record1.message().getClass());
        ApiMessageAndVersion lastRecord = (ApiMessageAndVersion)records1.get(2);
        Assertions.assertEquals(PartitionRecord.class, lastRecord.message().getClass());
        ctx.replay(result1.records());
        Assertions.assertEquals((Object)"notNull", ctx.configurationControl.getConfigs(new ConfigResource(ConfigResource.Type.TOPIC, "foo")).get("foo"));
        CreateTopicsRequestData.CreateableTopicConfigCollection invalidConfigs = new CreateTopicsRequestData.CreateableTopicConfigCollection();
        invalidConfigs.add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreateableTopicConfig().setName("foo").setValue(null));
        CreateTopicsRequestData request2 = new CreateTopicsRequestData();
        request2.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("bar").setNumPartitions(-1).setReplicationFactor((short)-1).setConfigs(invalidConfigs));
        ControllerResult result2 = replicationControl.createTopics(requestContext, request2, Collections.singleton("bar"));
        Assertions.assertEquals((short)Errors.INVALID_CONFIG.code(), (short)((CreateTopicsResponseData)result2.response()).topics().find("bar").errorCode());
        Assertions.assertEquals((Object)"Null value not supported for topic configs: foo", (Object)((CreateTopicsResponseData)result2.response()).topics().find("bar").errorMessage());
        CreateTopicsRequestData request3 = new CreateTopicsRequestData();
        request3.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("baz").setNumPartitions(-1).setReplicationFactor((short)-2).setConfigs(validConfigs));
        ControllerResult result3 = replicationControl.createTopics(requestContext, request3, Collections.singleton("baz"));
        Assertions.assertEquals((short)Errors.INVALID_REPLICATION_FACTOR.code(), (short)((CreateTopicsResponseData)result3.response()).topics().find("baz").errorCode());
        Assertions.assertEquals(Collections.emptyList(), (Object)result3.records());
        CreateTopicsRequestData request4 = new CreateTopicsRequestData();
        String batchedTopic1 = "batched-topic-1";
        request4.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName(batchedTopic1).setNumPartitions(-1).setReplicationFactor((short)-1).setConfigs(validConfigs));
        String batchedTopic2 = "batched-topic2";
        request4.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName(batchedTopic2).setNumPartitions(-1).setReplicationFactor((short)-2).setConfigs(validConfigs));
        HashSet<String> request4Topics = new HashSet<String>();
        request4Topics.add(batchedTopic1);
        request4Topics.add(batchedTopic2);
        ControllerResult result4 = replicationControl.createTopics(requestContext, request4, request4Topics);
        Assertions.assertEquals((short)Errors.NONE.code(), (short)((CreateTopicsResponseData)result4.response()).topics().find(batchedTopic1).errorCode());
        Assertions.assertEquals((short)Errors.INVALID_REPLICATION_FACTOR.code(), (short)((CreateTopicsResponseData)result4.response()).topics().find(batchedTopic2).errorCode());
        Assertions.assertEquals((int)3, (int)result4.records().size());
        Assertions.assertEquals(TopicRecord.class, ((ApiMessageAndVersion)result4.records().get(0)).message().getClass());
        TopicRecord batchedTopic1Record = (TopicRecord)((ApiMessageAndVersion)result4.records().get(0)).message();
        Assertions.assertEquals((Object)batchedTopic1, (Object)batchedTopic1Record.name());
        Assertions.assertEquals((Object)new ConfigRecord().setResourceName(batchedTopic1).setResourceType(ConfigResource.Type.TOPIC.id()).setName("foo").setValue("notNull"), (Object)((ApiMessageAndVersion)result4.records().get(1)).message());
        Assertions.assertEquals(PartitionRecord.class, ((ApiMessageAndVersion)result4.records().get(2)).message().getClass());
        Assertions.assertEquals((Object)batchedTopic1Record.topicId(), (Object)((PartitionRecord)((ApiMessageAndVersion)result4.records().get(2)).message()).topicId());
    }

    @Test
    public void testCreateTopicByTenant() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setVirtualTopicsEnabledInConfig(true).build();
        ctx.registerBrokers(0, 1, 2, 3, 4, 5);
        ctx.unfenceBrokers(0, 1, 2, 3, 4, 5);
        ReplicationControlManager replicationControl = ctx.replicationControl;
        String tenantId = "lkc-abcd";
        String topicName = tenantId + "_abcd";
        MultiTenantPrincipal multiTenantPrincipal = new MultiTenantPrincipal(tenantId, new TenantMetadata(tenantId, tenantId));
        ControllerRequestContext requestContext = new ControllerRequestContext(new RequestHeaderData().setRequestApiKey(ApiKeys.CREATE_TOPICS.id).setRequestApiVersion(ApiKeys.CREATE_TOPICS.latestVersion()), (KafkaPrincipal)multiTenantPrincipal, OptionalLong.empty());
        CreateTopicsRequestData request = new CreateTopicsRequestData();
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName(topicName).setNumPartitions(-1).setReplicationFactor((short)-1));
        ControllerResult result = replicationControl.createTopics(requestContext, request, Collections.singleton(topicName));
        CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData();
        Uuid topicId = ((CreateTopicsResponseData)result.response()).topics().find(topicName).topicId();
        expectedResponse.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsResponseData.CreatableTopicResult().setName(topicName).setTopicId(topicId).setErrorMessage(null).setNumPartitions(1).setReplicationFactor((short)3));
        Assertions.assertEquals((Object)expectedResponse, (Object)result.response());
        ApiMessageAndVersion topicRecord = (ApiMessageAndVersion)result.records().get(0);
        Assertions.assertEquals((Object)new ApiMessageAndVersion((ApiMessage)new TopicRecord().setName(topicName).setTopicId(topicId), MetadataRecordType.TOPIC_RECORD.highestSupportedVersion()), (Object)topicRecord);
        String topicName2 = tenantId + "_wxyz";
        CreateTopicsRequestData.CreateableTopicConfigCollection validConfigs = new CreateTopicsRequestData.CreateableTopicConfigCollection();
        validConfigs.add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreateableTopicConfig().setName("confluent.topic.type").setValue(TopicType.VIRTUAL.logConfigValue()));
        CreateTopicsRequestData request2 = new CreateTopicsRequestData();
        request2.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName(topicName2).setNumPartitions(-1).setReplicationFactor((short)-1).setConfigs(validConfigs));
        ControllerResult result2 = replicationControl.createTopics(requestContext, request2, Collections.singleton(topicName2));
        CreateTopicsResponseData expectedResponse2 = new CreateTopicsResponseData();
        expectedResponse2.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsResponseData.CreatableTopicResult().setName(topicName2).setNumPartitions(0).setReplicationFactor((short)0).setErrorMessage(null).setErrorCode((short)0).setTopicId(((CreateTopicsResponseData)result2.response()).topics().find(topicName2).topicId()));
        Assertions.assertEquals((Object)expectedResponse2, (Object)result2.response());
    }

    @Test
    public void testCreateTopicsWithCells() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setPartitionPlacementStrategy(PartitionPlacementStrategy.TENANT_IN_CELL).build();
        ctx.registerBrokers(0, 1, 2, 3, 4, 5);
        ctx.unfenceBrokers(0, 1, 2, 3, 4, 5);
        ctx.cellControl.replay(new CellRecord().setCellId(0).setMinSize((short)3).setState(CellState.READY.code()).setBrokers(Arrays.asList(0, 1, 2)));
        ctx.cellControl.replay(new CellRecord().setCellId(1).setMinSize((short)3).setState(CellState.READY.code()).setBrokers(Arrays.asList(3, 4, 5)));
        ReplicationControlManager replicationControl = ctx.replicationControl;
        CreateTopicsRequestData request = new CreateTopicsRequestData();
        String tenantId = "lkc-abcd";
        String topicName = tenantId + "_abcd";
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName(topicName).setNumPartitions(-1).setReplicationFactor((short)-1));
        MultiTenantPrincipal multiTenantPrincipal = new MultiTenantPrincipal(tenantId, new TenantMetadata(tenantId, tenantId));
        ControllerRequestContext requestContext = new ControllerRequestContext(new RequestHeaderData().setRequestApiKey(ApiKeys.CREATE_TOPICS.id).setRequestApiVersion(ApiKeys.CREATE_TOPICS.latestVersion()), (KafkaPrincipal)multiTenantPrincipal, OptionalLong.empty());
        ControllerResult result = replicationControl.createTopics(requestContext, request, Collections.singleton(topicName));
        CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData();
        Uuid topicId = ((CreateTopicsResponseData)result.response()).topics().find(topicName).topicId();
        expectedResponse.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsResponseData.CreatableTopicResult().setName(topicName).setTopicId(topicId).setErrorMessage(null).setNumPartitions(1).setReplicationFactor((short)3));
        Assertions.assertEquals((Object)expectedResponse, (Object)result.response());
        ApiMessageAndVersion tenantRecord = (ApiMessageAndVersion)result.records().get(0);
        Assertions.assertEquals((Object)new ApiMessageAndVersion((ApiMessage)new TenantRecord().setTenantId(tenantId).setCellId(0), MetadataRecordType.TENANT_RECORD.highestSupportedVersion()), (Object)tenantRecord);
    }

    @Test
    public void testCreateTopicsWithCellsWithHealthcheckTenant() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setPartitionPlacementStrategy(PartitionPlacementStrategy.TENANT_IN_CELL).build();
        ctx.registerBrokers(0, 1, 2, 3, 4, 5);
        ctx.unfenceBrokers(0, 1, 2, 3, 4, 5);
        ctx.cellControl.replay(new CellRecord().setCellId(0).setMinSize((short)3).setState(CellState.READY.code()).setBrokers(Arrays.asList(0, 1, 2)));
        ctx.cellControl.replay(new CellRecord().setCellId(1).setMinSize((short)3).setState(CellState.READY.code()).setBrokers(Arrays.asList(3, 4, 5)));
        ReplicationControlManager replicationControl = ctx.replicationControl;
        CreateTopicsRequestData request = new CreateTopicsRequestData();
        String tenantId = "lkc-abcd";
        String topicName = tenantId + "_abcd";
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName(topicName).setNumPartitions(-1).setReplicationFactor((short)-1));
        MultiTenantPrincipal healthcheckTenantPrincipal = new MultiTenantPrincipal(tenantId, new TenantMetadata.Builder("tenant1", null).healthcheckTenant(true).build());
        ControllerRequestContext requestContext = new ControllerRequestContext(new RequestHeaderData().setRequestApiKey(ApiKeys.CREATE_TOPICS.id).setRequestApiVersion(ApiKeys.CREATE_TOPICS.latestVersion()), (KafkaPrincipal)healthcheckTenantPrincipal, OptionalLong.empty());
        ControllerResult result = replicationControl.createTopics(requestContext, request, Collections.singleton(topicName));
        CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData();
        Uuid topicId = ((CreateTopicsResponseData)result.response()).topics().find(topicName).topicId();
        expectedResponse.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsResponseData.CreatableTopicResult().setName(topicName).setTopicId(topicId).setErrorMessage(null).setNumPartitions(1).setReplicationFactor((short)3));
        Assertions.assertEquals((Object)expectedResponse, (Object)result.response());
        ApiMessageAndVersion topicRecord = (ApiMessageAndVersion)result.records().get(0);
        ApiMessageAndVersion partitionRecord = (ApiMessageAndVersion)result.records().get(1);
        Assertions.assertEquals((int)2, (int)result.records().size());
        Assertions.assertTrue((boolean)(topicRecord.message() instanceof TopicRecord));
        Assertions.assertTrue((boolean)(partitionRecord.message() instanceof PartitionRecord));
    }

    @Test
    public void testCreateTopicsWithTopicPlacement() throws Exception {
        ReplicaPlacer replicaPlacer = (ReplicaPlacer)Mockito.mock(ReplicaPlacer.class);
        PartitionAssignment partitionAssignment = PartitionAssignmentTest.partitionAssignment(Arrays.asList(0, 2, 1, 3), Arrays.asList(1, 3));
        List<PartitionAssignment> partitionAssignments = Arrays.asList(partitionAssignment);
        TopicAssignment topicAssignment = new TopicAssignment(partitionAssignments);
        Mockito.when((Object)replicaPlacer.place((PlacementSpec)ArgumentMatchers.any(), (ClusterDescriber)ArgumentMatchers.any())).thenReturn((Object)topicAssignment);
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setReplicaPlacer(replicaPlacer).build();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        CreateTopicsRequestData request = new CreateTopicsRequestData();
        ctx.registerBrokers(0, 1, 2, 3);
        ctx.unfenceBrokers(0, 1, 2, 3);
        String topicPlacementJson = "{\"version\":1,\"replicas\":[{\"count\":1,\"constraints\":{\"rack\":\"rack0\"}}],\"observers\":[{\"count\":1,\"constraints\":{\"rack\":\"rack1\"}}]}";
        CreateTopicsRequestData.CreateableTopicConfigCollection configs = new CreateTopicsRequestData.CreateableTopicConfigCollection();
        configs.add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreateableTopicConfig().setName("confluent.placement.constraints").setValue(topicPlacementJson));
        String topicName = "placement-topic";
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName(topicName).setNumPartitions(1).setConfigs(configs).setReplicationFactor((short)-1));
        String invalidTopicPlacementJson = "{";
        CreateTopicsRequestData.CreateableTopicConfigCollection invalidConfigs = new CreateTopicsRequestData.CreateableTopicConfigCollection();
        invalidConfigs.add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreateableTopicConfig().setName("confluent.placement.constraints").setValue(invalidTopicPlacementJson));
        String invalidTopicName = "invalid-placement-topic";
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName(invalidTopicName).setNumPartitions(1).setConfigs(invalidConfigs).setReplicationFactor((short)-1));
        ControllerRequestContext requestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.CREATE_TOPICS);
        ControllerResult result = replicationControl.createTopics(requestContext, request, Collections.singleton(topicName));
        Uuid topicId = ((CreateTopicsResponseData)result.response()).topics().find(topicName).topicId();
        log.info("Created topic with ID {}.", (Object)topicId);
        CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData();
        expectedResponse.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsResponseData.CreatableTopicResult().setName(topicName).setNumPartitions(1).setErrorMessage(null).setErrorCode((short)0).setTopicId(topicId).setReplicationFactor((short)partitionAssignment.replicas().size()));
        expectedResponse.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsResponseData.CreatableTopicResult().setName(invalidTopicName).setErrorMessage("Invalid topic placement.").setErrorCode((short)40));
        Assertions.assertEquals((Object)expectedResponse, (Object)result.response());
        ctx.replay(result.records());
        Assertions.assertEquals((Object)new PartitionRegistration.Builder().setReplicas(new int[]{0, 2, 1, 3}).setDirectories(DirectoryId.migratingArray((int)4)).setObservers(new int[]{1, 3}).setIsr(new int[]{0, 2}).setLeader(Integer.valueOf(0)).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(Integer.valueOf(0)).setPartitionEpoch(Integer.valueOf(0)).build(), (Object)replicationControl.getPartition(topicId, 0));
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
        Assertions.assertEquals((Object)topicPlacementJson, ctx.configurationControl.getConfigs(configResource).get("confluent.placement.constraints"));
        List<ApiMessageAndVersion> records = this.writeRecordsFromImage(ctx.metadataDelta.apply(MetadataProvenance.EMPTY));
        Assertions.assertTrue((boolean)records.contains(new ApiMessageAndVersion((ApiMessage)new PartitionRecord().setPartitionId(0).setTopicId(topicId).setReplicas(Arrays.asList(0, 2, 1, 3)).setDirectories(Arrays.asList(DirectoryId.migratingArray((int)4))).setIsr(Arrays.asList(0, 2)).setObservers(Arrays.asList(1, 3)).setRemovingReplicas(Collections.emptyList()).setAddingReplicas(Collections.emptyList()).setLeader(0).setLeaderEpoch(0).setPartitionEpoch(0), 2)));
        Assertions.assertTrue((boolean)records.contains(new ApiMessageAndVersion((ApiMessage)new TopicRecord().setTopicId(topicId).setName(topicName), 0)));
    }

    @ParameterizedTest
    @EnumSource(value=MetadataVersion.class, names={"IBP_3_4_IV0", "IBP_3_5_IV0"})
    public void testTopicPlacementSupportHandling(MetadataVersion metadataVersion) throws Exception {
        CreateTopicsResponseData.CreatableTopicResult expectedResult;
        ReplicaPlacer replicaPlacer = (ReplicaPlacer)Mockito.mock(ReplicaPlacer.class);
        PartitionAssignment partitionAssignment = new PartitionAssignment(Arrays.asList(0, 1), Arrays.asList(1), __ -> DirectoryId.MIGRATING);
        List<PartitionAssignment> partitionAssignments = Arrays.asList(partitionAssignment);
        TopicAssignment topicAssignment = new TopicAssignment(partitionAssignments);
        Mockito.when((Object)replicaPlacer.place((PlacementSpec)ArgumentMatchers.any(), (ClusterDescriber)ArgumentMatchers.any())).thenReturn((Object)topicAssignment);
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setMetadataVersion(metadataVersion).setReplicaPlacer(replicaPlacer).build();
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        String topicPlacementJson = "{\"version\":1,\"replicas\":[{\"count\":1,\"constraints\":{\"rack\":\"rack0\"}}],\"observers\":[{\"count\":1,\"constraints\":{\"rack\":\"rack1\"}}]}";
        CreateTopicsRequestData.CreateableTopicConfigCollection configs = new CreateTopicsRequestData.CreateableTopicConfigCollection();
        configs.add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreateableTopicConfig().setName("confluent.placement.constraints").setValue(topicPlacementJson));
        String topicName = "placement-topic";
        CreateTopicsRequestData request = new CreateTopicsRequestData();
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName(topicName).setNumPartitions(1).setConfigs(configs).setReplicationFactor((short)-1));
        ReplicationControlManager replicationControl = ctx.replicationControl;
        ControllerRequestContext requestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.CREATE_TOPICS);
        ControllerResult result = replicationControl.createTopics(requestContext, request, Collections.singleton(topicName));
        if (metadataVersion.isLessThan(MetadataVersion.IBP_3_5_IV0)) {
            expectedResult = new CreateTopicsResponseData.CreatableTopicResult().setName(topicName).setErrorCode(Errors.INVALID_REQUEST.code()).setErrorMessage("Topic placement is not supported.");
        } else {
            Uuid topicId = ((CreateTopicsResponseData)result.response()).topics().find(topicName).topicId();
            expectedResult = new CreateTopicsResponseData.CreatableTopicResult().setName(topicName).setNumPartitions(1).setErrorMessage(null).setErrorCode((short)0).setTopicId(topicId).setReplicationFactor((short)2);
        }
        CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData();
        expectedResponse.topics().add((ImplicitLinkedHashCollection.Element)expectedResult);
        Assertions.assertEquals((Object)expectedResponse, (Object)result.response());
    }

    @Deprecated
    private List<ApiMessageAndVersion> writeRecordsFromImage(MetadataImage image) {
        RecordListWriter writer = new RecordListWriter();
        image.write((ImageWriter)writer, new ImageWriterOptions.Builder().setMetadataVersion(MetadataVersion.latestTesting()).build());
        return writer.records();
    }

    @Test
    public void testCreatePartitionsWithTopicPlacement() throws Exception {
        ReplicaPlacer replicaPlacer = (ReplicaPlacer)Mockito.mock(ReplicaPlacer.class);
        PartitionAssignment initialPartitionAssignment = new PartitionAssignment(Arrays.asList(0, 1, 2, 3), Arrays.asList(2, 3), __ -> DirectoryId.MIGRATING);
        List<PartitionAssignment> initialPartitionAssignmentList = Arrays.asList(initialPartitionAssignment);
        TopicAssignment initialTopicAssignment = new TopicAssignment(initialPartitionAssignmentList);
        PartitionAssignment createPartitionAssignment = new PartitionAssignment(Arrays.asList(1, 0, 3, 2), Arrays.asList(3, 2), __ -> DirectoryId.MIGRATING);
        List<PartitionAssignment> createPartitionAssignmentList = Arrays.asList(createPartitionAssignment);
        TopicAssignment createPartitionTopicAssignment = new TopicAssignment(createPartitionAssignmentList);
        Mockito.when((Object)replicaPlacer.place((PlacementSpec)ArgumentMatchers.any(), (ClusterDescriber)ArgumentMatchers.any())).thenReturn((Object)initialTopicAssignment, (Object[])new TopicAssignment[]{createPartitionTopicAssignment});
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setReplicaPlacer(replicaPlacer).build();
        ctx.registerBrokers(0, 1, 2, 3);
        ctx.unfenceBrokers(0, 1, 2, 3);
        String topicPlacementJson = "{\"version\":1,\"replicas\":[{\"count\":1,\"constraints\":{\"rack\":\"rack0\"}}],\"observers\":[{\"count\":1,\"constraints\":{\"rack\":\"rack1\"}}]}";
        String topicName = "placement-topic";
        CreateTopicsResponseData.CreatableTopicResult creatableTopicResult = ctx.createTestTopic(topicName, 1, Collections.singletonMap("confluent.placement.constraints", topicPlacementJson), Errors.NONE.code());
        Uuid topicId = creatableTopicResult.topicId();
        log.info("Created topic with ID {}.", (Object)topicId, (Object)topicId);
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
        Assertions.assertEquals((Object)topicPlacementJson, ctx.configurationControl.getConfigs(configResource).get("confluent.placement.constraints"));
        ReplicationControlManager replicationControl = ctx.replicationControl;
        Assertions.assertEquals((Object)new PartitionRegistration.Builder().setReplicas(new int[]{0, 1, 2, 3}).setDirectories(DirectoryId.migratingArray((int)4)).setObservers(new int[]{2, 3}).setIsr(new int[]{0, 1}).setLeader(Integer.valueOf(0)).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(Integer.valueOf(0)).setPartitionEpoch(Integer.valueOf(0)).build(), (Object)replicationControl.getPartition(topicId, 0));
        ArrayList<CreatePartitionsRequestData.CreatePartitionsTopic> topics = new ArrayList<CreatePartitionsRequestData.CreatePartitionsTopic>();
        topics.add(new CreatePartitionsRequestData.CreatePartitionsTopic().setName(topicName).setCount(2).setAssignments(null));
        ControllerRequestContext requestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.CREATE_PARTITIONS);
        ControllerResult createPartitionsResult = replicationControl.createPartitions(requestContext, topics);
        Assertions.assertEquals(Arrays.asList(new CreatePartitionsResponseData.CreatePartitionsTopicResult().setName(topicName).setErrorCode(Errors.NONE.code()).setErrorMessage(null)), (Object)createPartitionsResult.response());
        ctx.replay(createPartitionsResult.records());
        Assertions.assertEquals((Object)new PartitionRegistration.Builder().setReplicas(new int[]{0, 1, 2, 3}).setDirectories(DirectoryId.migratingArray((int)4)).setObservers(new int[]{2, 3}).setIsr(new int[]{0, 1}).setLeader(Integer.valueOf(0)).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(Integer.valueOf(0)).setPartitionEpoch(Integer.valueOf(0)).build(), (Object)replicationControl.getPartition(topicId, 0));
        Assertions.assertEquals((Object)new PartitionRegistration.Builder().setReplicas(new int[]{1, 0, 3, 2}).setDirectories(DirectoryId.migratingArray((int)4)).setObservers(new int[]{3, 2}).setIsr(new int[]{1, 0}).setLeader(Integer.valueOf(1)).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(Integer.valueOf(0)).setPartitionEpoch(Integer.valueOf(0)).build(), (Object)replicationControl.getPartition(topicId, 1));
        List<ApiMessageAndVersion> records = this.writeRecordsFromImage(ctx.metadataDelta.apply(MetadataProvenance.EMPTY));
        Assertions.assertTrue((boolean)records.containsAll(Arrays.asList(new ApiMessageAndVersion((ApiMessage)new PartitionRecord().setPartitionId(1).setTopicId(topicId).setReplicas(Arrays.asList(1, 0, 3, 2)).setDirectories(Arrays.asList(DirectoryId.migratingArray((int)4))).setIsr(Arrays.asList(1, 0)).setObservers(Arrays.asList(3, 2)).setRemovingReplicas(Collections.emptyList()).setAddingReplicas(Collections.emptyList()).setLeader(1).setLeaderEpoch(0).setPartitionEpoch(0), 2), new ApiMessageAndVersion((ApiMessage)new PartitionRecord().setPartitionId(0).setTopicId(topicId).setReplicas(Arrays.asList(0, 1, 2, 3)).setDirectories(Arrays.asList(DirectoryId.migratingArray((int)4))).setIsr(Arrays.asList(0, 1)).setObservers(Arrays.asList(2, 3)).setRemovingReplicas(Collections.emptyList()).setAddingReplicas(Collections.emptyList()).setLeader(0).setLeaderEpoch(0).setPartitionEpoch(0), 2), new ApiMessageAndVersion((ApiMessage)new TopicRecord().setTopicId(topicId).setName(topicName), 0))));
    }

    @Test
    public void testInvalidCreatePartitionsWithManualAssignmentAndTopicPlacementSpecified() throws Exception {
        ReplicaPlacer replicaPlacer = (ReplicaPlacer)Mockito.mock(ReplicaPlacer.class);
        PartitionAssignment initialPartitionAssignment = new PartitionAssignment(Arrays.asList(0, 1, 2, 3), Arrays.asList(2, 3), __ -> DirectoryId.MIGRATING);
        List<PartitionAssignment> initialPartitionAssignmentList = Arrays.asList(initialPartitionAssignment);
        TopicAssignment initialTopicAssignment = new TopicAssignment(initialPartitionAssignmentList);
        Mockito.when((Object)replicaPlacer.place((PlacementSpec)ArgumentMatchers.any(), (ClusterDescriber)ArgumentMatchers.any())).thenReturn((Object)initialTopicAssignment);
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setReplicaPlacer(replicaPlacer).build();
        ctx.registerBrokers(0, 1, 2, 3);
        ctx.unfenceBrokers(0, 1, 2, 3);
        String topicPlacementJson = "{\"version\":1,\"replicas\":[{\"count\":1,\"constraints\":{\"rack\":\"rack0\"}}],\"observers\":[{\"count\":1,\"constraints\":{\"rack\":\"rack1\"}}]}";
        String topicName = "placement-topic";
        ctx.createTestTopic(topicName, 1, Collections.singletonMap("confluent.placement.constraints", topicPlacementJson), Errors.NONE.code());
        ArrayList<CreatePartitionsRequestData.CreatePartitionsTopic> topics = new ArrayList<CreatePartitionsRequestData.CreatePartitionsTopic>();
        topics.add(new CreatePartitionsRequestData.CreatePartitionsTopic().setName(topicName).setCount(2).setAssignments(Arrays.asList(new CreatePartitionsRequestData.CreatePartitionsAssignment().setBrokerIds(Arrays.asList(0, 1)))));
        ControllerRequestContext requestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.CREATE_PARTITIONS);
        ControllerResult createPartitionsResult = ctx.replicationControl.createPartitions(requestContext, topics);
        Assertions.assertEquals((int)0, (int)createPartitionsResult.records().size());
        Assertions.assertEquals(Arrays.asList(new CreatePartitionsResponseData.CreatePartitionsTopicResult().setName(topicName).setErrorCode(Errors.INVALID_REQUEST.code()).setErrorMessage("A manual partition assignment was specified, but a topic placement {\"version\":1,\"replicas\":[{\"count\":1,\"constraints\":{\"rack\":\"rack0\"}}],\"observers\":[{\"count\":1,\"constraints\":{\"rack\":\"rack1\"}}]} exists for the topic.")), (Object)createPartitionsResult.response());
    }

    @Test
    public void testInvalidCreateVirtualTopics() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setVirtualTopicsEnabledInConfig(true).build();
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        String topicName = "foo";
        CreateTopicsResponseData.CreatableTopicResult topicResult = ctx.createTestTopic(topicName, new int[][]{{0, 1, 2}}, Collections.singletonMap("confluent.topic.type", TopicType.VIRTUAL.logConfigValue()), Errors.INVALID_REQUEST.code());
        Assertions.assertEquals((Object)"A manual partition assignment was specified but the topic type does not support it", (Object)topicResult.errorMessage());
        topicResult = ctx.createTestTopic(topicName, 3, Collections.singletonMap("confluent.topic.type", TopicType.VIRTUAL.logConfigValue()), Errors.INVALID_REQUEST.code());
        Assertions.assertEquals((Object)"A partition count was specified but the topic type does not support it", (Object)topicResult.errorMessage());
        topicResult = ctx.createTestTopic(topicName, -1, (short)3, Collections.singletonMap("confluent.topic.type", TopicType.VIRTUAL.logConfigValue()), Errors.INVALID_REQUEST.code());
        Assertions.assertEquals((Object)"A replication factor was specified but the topic type does not support it", (Object)topicResult.errorMessage());
        HashMap<String, String> configs = new HashMap<String, String>();
        configs.put("confluent.topic.type", TopicType.VIRTUAL.logConfigValue());
        configs.put("cleanup.policy", "delete");
        topicResult = ctx.createTestTopic(topicName, -1, (short)3, configs, Errors.INVALID_REQUEST.code());
        Assertions.assertEquals((Object)"Virtual topics do not support configs: cleanup.policy", (Object)topicResult.errorMessage());
    }

    @Test
    public void testCreateAndDeleteMirrorTopics() throws Exception {
        MockTime mockTime = new MockTime(0L, 1000L, 1000000L);
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setMockTime(mockTime).build();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        CreateTopicsRequestData request = new CreateTopicsRequestData();
        Uuid mirrorTopicId = Uuid.randomUuid();
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions(-1).setReplicationFactor((short)-1).setLinkName("test-link").setSourceTopicId(mirrorTopicId).setMirrorTopic("foo-origin"));
        ControllerRequestContext requestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.CREATE_TOPICS);
        ControllerResult createResult = replicationControl.createTopics(requestContext, request, Collections.singleton("foo"));
        Assertions.assertTrue((boolean)createResult.records().isEmpty());
        Assertions.assertEquals((short)Errors.CLUSTER_LINK_NOT_FOUND.code(), (short)((CreateTopicsResponseData)createResult.response()).topics().find("foo").errorCode());
        Uuid linkId = ctx.createClusterLink("test-link");
        createResult = replicationControl.createTopics(requestContext, request, Collections.singleton("foo"));
        Assertions.assertFalse((boolean)createResult.records().isEmpty());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)((CreateTopicsResponseData)createResult.response()).topics().find("foo").errorCode());
        ctx.replay(createResult.records());
        Uuid topicId = ((CreateTopicsResponseData)createResult.response()).topics().find("foo").topicId();
        List<ApiMessageAndVersion> records = this.writeRecordsFromImage(ctx.metadataDelta.apply(MetadataProvenance.EMPTY));
        Assertions.assertTrue((boolean)records.containsAll(Arrays.asList(new ApiMessageAndVersion((ApiMessage)new PartitionRecord().setPartitionId(0).setTopicId(topicId).setReplicas(Arrays.asList(1, 2, 0)).setIsr(Arrays.asList(1, 2, 0)).setDirectories(Arrays.asList(Uuid.fromString((String)"TESTBROKER00001DIRAAAA"), Uuid.fromString((String)"TESTBROKER00002DIRAAAA"), Uuid.fromString((String)"TESTBROKER00000DIRAAAA"))).setRemovingReplicas(Collections.emptyList()).setAddingReplicas(Collections.emptyList()).setLeader(1).setLeaderEpoch(0).setPartitionEpoch(0).setLinkedLeaderEpoch(-1).setLinkState(PartitionRegistration.LinkState.ACTIVE.levelCode), 2), new ApiMessageAndVersion((ApiMessage)new TopicRecord().setTopicId(topicId).setName("foo"), 0), new ApiMessageAndVersion((ApiMessage)new MirrorTopicRecord().setTopicName("foo").setTopicId(topicId).setSourceTopicName("foo-origin").setSourceTopicId(mirrorTopicId).setMirrorStartOffsets(Collections.emptyList()).setMirrorTopicState(MirrorTopic.State.MIRROR.stateName()).setClusterLinkId(linkId).setTimeMs(mockTime.milliseconds()).setClusterLinkName("test-link"), MetadataRecordType.MIRROR_TOPIC_RECORD.highestSupportedVersion()))));
        ControllerRequestContext deleteTopicsRequestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.DELETE_TOPICS);
        ControllerResult deleteResult = ctx.replicationControl.deleteTopics(deleteTopicsRequestContext, Collections.singletonList(topicId));
        Assertions.assertFalse((boolean)deleteResult.records().isEmpty());
        Assertions.assertEquals((Object)Errors.NONE, (Object)((ApiError)((Map)deleteResult.response()).get(topicId)).error());
        ctx.replay(deleteResult.records());
        Assertions.assertFalse((boolean)ctx.mirrorTopicControl.isMirrorTopic(topicId));
        Assertions.assertFalse((boolean)ctx.mirrorTopicControl.clusterLinkIdForTopicId(topicId).isPresent());
        Assertions.assertFalse((boolean)ctx.mirrorTopicControl.topicIdsForClusterLinkId(linkId, false).contains(topicId));
        Assertions.assertNull((Object)ctx.replicationControl.getTopicId("foo"));
    }

    @Test
    public void testFailMirrorPartition() throws Exception {
        MockTime mockTime = new MockTime(0L, 0L, 0L);
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setMockTime(mockTime).build();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        Uuid linkId = ctx.createClusterLink("test-link");
        Uuid mirrorTopicId = Uuid.randomUuid();
        CreateTopicsResponseData.CreatableTopicResult result = ctx.createTestTopic("foo", -1, (short)-1, Errors.NONE.code(), creatableTopic -> creatableTopic.setLinkName("test-link").setSourceTopicId(mirrorTopicId).setMirrorTopic("foo-origin"));
        Uuid topicId = result.topicId();
        TopicIdPartition p0 = new TopicIdPartition(topicId, 0);
        AlterPartitionRequestData.PartitionData shrinkIsrRequest = this.newAlterPartition(replicationControl, p0, ReplicationControlManagerTest.isrWithDefaultEpoch(0, 1, 2), LeaderRecoveryState.RECOVERED);
        shrinkIsrRequest.setClusterLinkState(new AlterPartitionRequestData.ClusterLinkState().setLinkFailed(true)).setMirrorTopicError(MirrorTopicError.NON_MONOTONIC_LOG_APPEND_EPOCH.code());
        int brokerId = ctx.currentLeader(p0).getAsInt();
        this.sendAlterPartition(ctx, brokerId, ctx.currentBrokerEpoch(brokerId), topicId, shrinkIsrRequest);
        List<ApiMessageAndVersion> records = this.writeRecordsFromImage(ctx.metadataDelta.apply(MetadataProvenance.EMPTY));
        Assertions.assertTrue((boolean)records.containsAll(Arrays.asList(new ApiMessageAndVersion((ApiMessage)new PartitionRecord().setPartitionId(0).setTopicId(topicId).setReplicas(Arrays.asList(1, 2, 0)).setIsr(Arrays.asList(0, 1, 2)).setDirectories(Arrays.asList(Uuid.fromString((String)"TESTBROKER00001DIRAAAA"), Uuid.fromString((String)"TESTBROKER00002DIRAAAA"), Uuid.fromString((String)"TESTBROKER00000DIRAAAA"))).setRemovingReplicas(Collections.emptyList()).setAddingReplicas(Collections.emptyList()).setLeader(1).setLeaderEpoch(0).setPartitionEpoch(2).setLinkedLeaderEpoch(-1).setLinkState(PartitionRegistration.LinkState.FAILED.levelCode), 2), new ApiMessageAndVersion((ApiMessage)new TopicRecord().setTopicId(topicId).setName("foo"), 0), new ApiMessageAndVersion((ApiMessage)new MirrorTopicRecord().setTopicName("foo").setTopicId(topicId).setSourceTopicName("foo-origin").setSourceTopicId(mirrorTopicId).setMirrorTopicState(MirrorTopic.State.FAILED.stateName()).setMirrorTopicError(MirrorTopicError.NON_MONOTONIC_LOG_APPEND_EPOCH.code()).setClusterLinkId(linkId).setClusterLinkName("test-link"), MetadataRecordType.MIRROR_TOPIC_RECORD.highestSupportedVersion()))));
    }

    @Test
    public void testAlterIsrForNonMirrorPartition() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        CreateTopicsResponseData.CreatableTopicResult createTopicResult = ctx.createTestTopic("foo", new int[][]{{0, 1, 2}});
        Uuid topicId = createTopicResult.topicId();
        TopicIdPartition p0 = new TopicIdPartition(topicId, 0);
        TopicPartition tp0 = new TopicPartition("foo", 0);
        AlterPartitionRequestData.PartitionData linkLeaderBump = this.newAlterPartition(replicationControl, p0, ReplicationControlManagerTest.isrWithDefaultEpoch(0, 1, 2), LeaderRecoveryState.RECOVERED);
        linkLeaderBump.setClusterLinkState(new AlterPartitionRequestData.ClusterLinkState().setLinkedLeaderEpoch(42));
        ControllerResult<AlterPartitionResponseData> result = this.sendAlterPartition(ctx, 0, ctx.currentBrokerEpoch(0), topicId, linkLeaderBump);
        Assertions.assertEquals((int)0, (int)result.records().size(), (String)"Should not generate any records for non-mirrored partition when handling only linkedLeaderEpoch");
        AlterPartitionRequestData.PartitionData linkedLeaderAndIsr = this.newAlterPartition(replicationControl, p0, ReplicationControlManagerTest.isrWithDefaultEpoch(0, 1), LeaderRecoveryState.RECOVERED);
        linkedLeaderAndIsr.setClusterLinkState(new AlterPartitionRequestData.ClusterLinkState().setLinkedLeaderEpoch(42));
        result = this.sendAlterPartition(ctx, 0, ctx.currentBrokerEpoch(0), topicId, linkedLeaderAndIsr);
        Assertions.assertEquals((int)1, (int)result.records().size(), (String)"Should still generate record for ISR change for non-mirrored partition even if linkedLeaderEpoch is set");
        AlterPartitionResponseData.PartitionData shrinkIsrResponse = this.assertAlterPartitionResponse(result, p0, Errors.NONE);
        this.assertConsistentAlterPartitionResponse(replicationControl, p0, shrinkIsrResponse);
        Assertions.assertEquals(PartitionChangeRecord.class, ((ApiMessageAndVersion)result.records().get(0)).message().getClass());
    }

    @Test
    public void testUnlinkMirrorTopic() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        Uuid linkId = ctx.createClusterLink("test-link");
        Uuid mirrorTopicId = Uuid.randomUuid();
        CreateTopicsResponseData.CreatableTopicResult result = ctx.createTestTopic("foo", -1, (short)-1, Errors.NONE.code(), creatableTopic -> creatableTopic.setLinkName("test-link").setSourceTopicId(mirrorTopicId).setMirrorTopic("foo-origin"));
        Uuid topicId = result.topicId();
        Assertions.assertTrue((boolean)ctx.mirrorTopicControl.mirrorTopic(topicId).isPresent());
        Assertions.assertEquals((Object)ctx.replicationControl.getPartition((Uuid)topicId, (int)0).linkState, (Object)PartitionRegistration.LinkState.ACTIVE);
        ArrayList<ApiMessageAndVersion> records = new ArrayList<ApiMessageAndVersion>();
        ctx.clusterLinkControl.deleteClusterLink("test-link", records::add, true);
        ctx.replay(records);
        Assertions.assertFalse((boolean)ctx.mirrorTopicControl.mirrorTopic(topicId).isPresent(), (String)"Should not be a mirror topic after link was deleted");
        Assertions.assertEquals((Object)ctx.replicationControl.getPartition((Uuid)topicId, (int)0).linkState, (Object)PartitionRegistration.LinkState.NOT_MIRROR);
    }

    @ParameterizedTest(name="testCreateTopicsWithValidateOnlyFlag with mutationQuotaExceeded: {0}")
    @ValueSource(booleans={true, false})
    public void testCreateTopicsWithValidateOnlyFlag(boolean mutationQuotaExceeded) {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        CreateTopicsRequestData request = new CreateTopicsRequestData().setValidateOnly(true);
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions(1).setReplicationFactor((short)3));
        ControllerRequestContext requestContext = mutationQuotaExceeded ? ControllerRequestContextUtil.anonymousContextWithMutationQuotaExceededFor(ApiKeys.CREATE_TOPICS) : ControllerRequestContextUtil.anonymousContextFor(ApiKeys.CREATE_TOPICS);
        ControllerResult result = ctx.replicationControl.createTopics(requestContext, request, Collections.singleton("foo"));
        Assertions.assertEquals((int)0, (int)result.records().size());
        CreateTopicsResponseData.CreatableTopicResult topicResult = ((CreateTopicsResponseData)result.response()).topics().find("foo");
        if (mutationQuotaExceeded) {
            Assertions.assertEquals((short)Errors.THROTTLING_QUOTA_EXCEEDED.code(), (short)topicResult.errorCode());
        } else {
            Assertions.assertEquals((short)Errors.NONE.code(), (short)topicResult.errorCode());
        }
    }

    @Test
    public void testInvalidCreateTopicsWithValidateOnlyFlag() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        CreateTopicsRequestData request = new CreateTopicsRequestData().setValidateOnly(true);
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions(1).setReplicationFactor((short)4));
        ControllerRequestContext requestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.CREATE_TOPICS);
        ControllerResult result = ctx.replicationControl.createTopics(requestContext, request, Collections.singleton("foo"));
        Assertions.assertEquals((int)0, (int)result.records().size());
        CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData();
        expectedResponse.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsResponseData.CreatableTopicResult().setName("foo").setErrorCode(Errors.INVALID_REPLICATION_FACTOR.code()).setErrorMessage("Unable to replicate the partition 4 time(s): The target replication factor of 4 cannot be reached because only 3 broker(s) are registered."));
        Assertions.assertEquals((Object)expectedResponse, (Object)result.response());
    }

    @Test
    public void testInvalidCreateTopicsWithReplicationFactorAndManualAssignmentWithTopicPlacementSpecified() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        CreateTopicsRequestData request = new CreateTopicsRequestData();
        String topicPlacementJson = "{\"version\":1,\"replicas\":[{\"count\":1,\"constraints\":{\"rack\":\"rack0\"}}],\"observers\":[{\"count\":1,\"constraints\":{\"rack\":\"rack1\"}}]}";
        CreateTopicsRequestData.CreateableTopicConfigCollection configs = new CreateTopicsRequestData.CreateableTopicConfigCollection();
        configs.add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreateableTopicConfig().setName("confluent.placement.constraints").setValue(topicPlacementJson));
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("placement-topic").setNumPartitions(1).setReplicationFactor((short)1).setConfigs(configs));
        ControllerRequestContext requestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.CREATE_TOPICS);
        ControllerResult result = ctx.replicationControl.createTopics(requestContext, request, Collections.singleton("placement-topic"));
        Assertions.assertEquals((int)0, (int)result.records().size());
        Assertions.assertEquals((int)0, (int)result.records().size());
        CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData();
        expectedResponse.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsResponseData.CreatableTopicResult().setName("placement-topic").setErrorCode(Errors.INVALID_REQUEST.code()).setErrorMessage("Both replicationFactor and confluent.placement.constraints are set. Both cannot be used at the same time."));
        Assertions.assertEquals((Object)expectedResponse, (Object)result.response());
        request = new CreateTopicsRequestData();
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("placement-topic").setNumPartitions(-1).setReplicationFactor((short)-1).setConfigs(configs).setAssignments(new CreateTopicsRequestData.CreatableReplicaAssignmentCollection(Arrays.asList(new CreateTopicsRequestData.CreatableReplicaAssignment().setPartitionIndex(0).setBrokerIds(Arrays.asList(0))).iterator())));
        ControllerResult resultTwo = ctx.replicationControl.createTopics(requestContext, request, Collections.singleton("placement-topic"));
        Assertions.assertEquals((int)0, (int)resultTwo.records().size());
        CreateTopicsResponseData expectedResponseTwo = new CreateTopicsResponseData();
        expectedResponseTwo.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsResponseData.CreatableTopicResult().setName("placement-topic").setErrorCode(Errors.INVALID_REQUEST.code()).setErrorMessage("Both assignments and confluent.placement.constraints are set. Both cannot be used at the same time."));
        Assertions.assertEquals((Object)expectedResponseTwo, (Object)resultTwo.response());
    }

    @Test
    public void testCreateTopicsWithPolicy() {
        MockCreateTopicPolicy createTopicPolicy = new MockCreateTopicPolicy(Arrays.asList(new CreateTopicPolicy.RequestMetadata("foo", Integer.valueOf(2), Short.valueOf((short)2), null, Collections.emptyMap()), new CreateTopicPolicy.RequestMetadata("bar", Integer.valueOf(3), Short.valueOf((short)2), null, Collections.emptyMap()), new CreateTopicPolicy.RequestMetadata("baz", null, null, Collections.singletonMap(0, Arrays.asList(2, 1, 0)), Collections.singletonMap("segment.bytes", "12300000")), new CreateTopicPolicy.RequestMetadata("quux", null, null, Collections.singletonMap(0, Arrays.asList(2, 1, 0)), Collections.emptyMap())));
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setCreateTopicPolicy(createTopicPolicy).build();
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        ctx.createTestTopic("foo", 2, (short)2, Errors.NONE.code());
        ctx.createTestTopic("baz", new int[][]{{2, 1, 0}}, Collections.singletonMap("segment.bytes", "12300000"), Errors.NONE.code());
    }

    @Test
    public void testCreateVirtualTopicWithPolicy() {
        MockCreateTopicPolicy createTopicPolicy = new MockCreateTopicPolicy(Collections.singletonList(new CreateTopicPolicy.RequestMetadata("virtualTopic", Integer.valueOf(0), null, Collections.emptyMap(), Collections.singletonMap("confluent.topic.type", TopicType.VIRTUAL.logConfigValue()))));
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setCreateTopicPolicy(createTopicPolicy).setVirtualTopicsEnabledInConfig(true).build();
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        ctx.createVirtualTopic("virtualTopic");
    }

    @Test
    public void testCreateTopicsWithPartitionViolation() throws Exception {
        MockCreateTopicPolicy createTopicPolicy = new MockCreateTopicPolicy(Arrays.asList(new CreateTopicPolicy.RequestMetadata("foo", Integer.valueOf(10), Short.valueOf((short)2), null, Collections.emptyMap()), new CreateTopicPolicy.RequestMetadata("bar", Integer.valueOf(9), Short.valueOf((short)2), null, Collections.emptyMap()), new CreateTopicPolicy.RequestMetadata("baz", Integer.valueOf(1), Short.valueOf((short)2), null, Collections.emptyMap()), new CreateTopicPolicy.RequestMetadata("baz", Integer.valueOf(5), Short.valueOf((short)2), null, Collections.emptyMap()), new CreateTopicPolicy.RequestMetadata("qux", Integer.valueOf(1), Short.valueOf((short)2), null, Collections.emptyMap()), new CreateTopicPolicy.RequestMetadata("thud", Integer.valueOf(1), Short.valueOf((short)2), null, Collections.emptyMap())));
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setCreateTopicPolicy(createTopicPolicy).build();
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        CreateTopicsResponseData.CreatableTopicResult fooTopicResult = ctx.createTestTopic("foo", 10, (short)2, Errors.NONE.code());
        ctx.createTestTopic("bar", 9, (short)2, Errors.NONE.code());
        ctx.createTestTopic("baz", 1, (short)2, Errors.POLICY_VIOLATION.code());
        ArrayList<Uuid> topicsToDelete = new ArrayList<Uuid>();
        topicsToDelete.add(fooTopicResult.topicId());
        ControllerRequestContext requestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.DELETE_TOPICS);
        ControllerResult deleteResult = ctx.replicationControl.deleteTopics(requestContext, topicsToDelete);
        RecordTestUtils.replayAll(ctx.replicationControl, deleteResult.records());
        ctx.createTestTopic("baz", 5, (short)2, Errors.NONE.code());
        ctx.createTestTopic("qux", 1, (short)2, Errors.NONE.code());
        ctx.createTestTopic("thud", 1, (short)2, Errors.POLICY_VIOLATION.code());
    }

    @Test
    public void testCreateTopicWithCollisionChars() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        CreateTopicsResponseData.CreatableTopicResult initialTopic = ctx.createTestTopic("foo.bar", 2, (short)2, Errors.NONE.code());
        Assertions.assertEquals((int)2, (int)ctx.replicationControl.getTopic(initialTopic.topicId()).numPartitions(Long.MAX_VALUE));
        ControllerRequestContext requestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.DELETE_TOPICS);
        ctx.deleteTopic(requestContext, initialTopic.topicId());
        CreateTopicsResponseData.CreatableTopicResult recreatedTopic = ctx.createTestTopic("foo.bar", 4, (short)2, Errors.NONE.code());
        Assertions.assertNotEquals((Object)initialTopic.topicId(), (Object)recreatedTopic.topicId());
        Assertions.assertEquals((int)4, (int)ctx.replicationControl.getTopic(recreatedTopic.topicId()).numPartitions(Long.MAX_VALUE));
    }

    @Test
    public void testValidateNewTopicNames() {
        HashMap topicErrors = new HashMap();
        CreateTopicsRequestData.CreatableTopicCollection topics = new CreateTopicsRequestData.CreatableTopicCollection();
        topics.add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName(""));
        topics.add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("woo"));
        topics.add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("."));
        ReplicationControlManager.validateNewTopicNames(topicErrors, (CreateTopicsRequestData.CreatableTopicCollection)topics, Collections.emptyMap());
        HashMap<String, ApiError> expectedTopicErrors = new HashMap<String, ApiError>();
        expectedTopicErrors.put("", new ApiError(Errors.INVALID_TOPIC_EXCEPTION, "Topic name is invalid: the empty string is not allowed"));
        expectedTopicErrors.put(".", new ApiError(Errors.INVALID_TOPIC_EXCEPTION, "Topic name is invalid: '.' is not allowed"));
        Assertions.assertEquals(expectedTopicErrors, topicErrors);
    }

    @Test
    public void testTopicNameCollision() {
        HashMap topicErrors = new HashMap();
        CreateTopicsRequestData.CreatableTopicCollection topics = new CreateTopicsRequestData.CreatableTopicCollection();
        topics.add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("foo.bar"));
        topics.add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("woo.bar_foo"));
        HashMap<String, TreeSet<String>> collisionMap = new HashMap<String, TreeSet<String>>();
        collisionMap.put("foo_bar", new TreeSet<String>(Arrays.asList("foo_bar")));
        collisionMap.put("woo_bar_foo", new TreeSet<String>(Arrays.asList("woo.bar.foo", "woo_bar.foo")));
        ReplicationControlManager.validateNewTopicNames(topicErrors, (CreateTopicsRequestData.CreatableTopicCollection)topics, collisionMap);
        HashMap<String, ApiError> expectedTopicErrors = new HashMap<String, ApiError>();
        expectedTopicErrors.put("foo.bar", new ApiError(Errors.INVALID_TOPIC_EXCEPTION, "Topic 'foo.bar' collides with existing topic: foo_bar"));
        expectedTopicErrors.put("woo.bar_foo", new ApiError(Errors.INVALID_TOPIC_EXCEPTION, "Topic 'woo.bar_foo' collides with existing topic: woo.bar.foo"));
        Assertions.assertEquals(expectedTopicErrors, topicErrors);
    }

    @Test
    public void testRemoveLeaderships() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2, 3);
        ctx.unfenceBrokers(0, 1, 2, 3);
        CreateTopicsResponseData.CreatableTopicResult result = ctx.createTestTopic("foo", new int[][]{{0, 1, 2}, {1, 2, 3}, {2, 3, 0}, {0, 2, 1}});
        HashSet<TopicIdPartition> expectedPartitions = new HashSet<TopicIdPartition>();
        expectedPartitions.add(new TopicIdPartition(result.topicId(), 0));
        expectedPartitions.add(new TopicIdPartition(result.topicId(), 3));
        Assertions.assertEquals(expectedPartitions, RecordTestUtils.iteratorToSet(replicationControl.brokersToIsrs().iterator(0, true)));
        ArrayList<ApiMessageAndVersion> records = new ArrayList<ApiMessageAndVersion>();
        replicationControl.handleBrokerFenced(0, records);
        ctx.replay(records);
        Assertions.assertEquals(Collections.emptySet(), RecordTestUtils.iteratorToSet(replicationControl.brokersToIsrs().iterator(0, true)));
    }

    @Test
    public void testShrinkAndExpandIsr() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        CreateTopicsResponseData.CreatableTopicResult createTopicResult = ctx.createTestTopic("foo", new int[][]{{0, 1, 2}});
        TopicIdPartition topicIdPartition = new TopicIdPartition(createTopicResult.topicId(), 0);
        Assertions.assertEquals((Object)OptionalInt.of(0), (Object)ctx.currentLeader(topicIdPartition));
        long brokerEpoch = ctx.currentBrokerEpoch(0);
        AlterPartitionRequestData.PartitionData shrinkIsrRequest = this.newAlterPartition(replicationControl, topicIdPartition, ReplicationControlManagerTest.isrWithDefaultEpoch(0, 1), LeaderRecoveryState.RECOVERED);
        ControllerResult<AlterPartitionResponseData> shrinkIsrResult = this.sendAlterPartition(ctx, 0, brokerEpoch, topicIdPartition.topicId(), shrinkIsrRequest);
        AlterPartitionResponseData.PartitionData shrinkIsrResponse = this.assertAlterPartitionResponse(shrinkIsrResult, topicIdPartition, Errors.NONE);
        this.assertConsistentAlterPartitionResponse(replicationControl, topicIdPartition, shrinkIsrResponse);
        AlterPartitionRequestData.PartitionData expandIsrRequest = this.newAlterPartition(replicationControl, topicIdPartition, ReplicationControlManagerTest.isrWithDefaultEpoch(0, 1, 2), LeaderRecoveryState.RECOVERED);
        ControllerResult<AlterPartitionResponseData> expandIsrResult = this.sendAlterPartition(ctx, 0, brokerEpoch, topicIdPartition.topicId(), expandIsrRequest);
        AlterPartitionResponseData.PartitionData expandIsrResponse = this.assertAlterPartitionResponse(expandIsrResult, topicIdPartition, Errors.NONE);
        this.assertConsistentAlterPartitionResponse(replicationControl, topicIdPartition, expandIsrResponse);
    }

    @Test
    public void testEligibleLeaderReplicas_ShrinkAndExpandIsr() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setIsElrEnabled(true).build();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        CreateTopicsResponseData.CreatableTopicResult createTopicResult = ctx.createTestTopic("foo", new int[][]{{0, 1, 2}});
        TopicIdPartition topicIdPartition = new TopicIdPartition(createTopicResult.topicId(), 0);
        Assertions.assertEquals((Object)OptionalInt.of(0), (Object)ctx.currentLeader(topicIdPartition));
        long brokerEpoch = ctx.currentBrokerEpoch(0);
        ctx.alterTopicConfig("foo", "min.insync.replicas", "2");
        AlterPartitionRequestData.PartitionData shrinkIsrRequest = this.newAlterPartition(replicationControl, topicIdPartition, ReplicationControlManagerTest.isrWithDefaultEpoch(0), LeaderRecoveryState.RECOVERED);
        ControllerResult<AlterPartitionResponseData> shrinkIsrResult = this.sendAlterPartition(ctx, 0, brokerEpoch, topicIdPartition.topicId(), shrinkIsrRequest);
        AlterPartitionResponseData.PartitionData shrinkIsrResponse = this.assertAlterPartitionResponse(shrinkIsrResult, topicIdPartition, Errors.NONE);
        this.assertConsistentAlterPartitionResponse(replicationControl, topicIdPartition, shrinkIsrResponse);
        PartitionRegistration partition = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
        if (replicationControl.isElrEnabled()) {
            Assertions.assertArrayEquals((int[])new int[]{1, 2}, (int[])partition.elr, (String)partition.toString());
            Assertions.assertArrayEquals((int[])new int[0], (int[])partition.lastKnownElr, (String)partition.toString());
        }
        AlterPartitionRequestData.PartitionData expandIsrRequest = this.newAlterPartition(replicationControl, topicIdPartition, ReplicationControlManagerTest.isrWithDefaultEpoch(0, 1), LeaderRecoveryState.RECOVERED);
        ControllerResult<AlterPartitionResponseData> expandIsrResult = this.sendAlterPartition(ctx, 0, brokerEpoch, topicIdPartition.topicId(), expandIsrRequest);
        AlterPartitionResponseData.PartitionData expandIsrResponse = this.assertAlterPartitionResponse(expandIsrResult, topicIdPartition, Errors.NONE);
        this.assertConsistentAlterPartitionResponse(replicationControl, topicIdPartition, expandIsrResponse);
        partition = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
        if (replicationControl.isElrEnabled()) {
            Assertions.assertArrayEquals((int[])new int[0], (int[])partition.elr, (String)partition.toString());
            Assertions.assertArrayEquals((int[])new int[0], (int[])partition.lastKnownElr, (String)partition.toString());
        }
    }

    @Test
    public void testEligibleLeaderReplicas_BrokerFence() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setIsElrEnabled(true).build();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2, 3);
        ctx.unfenceBrokers(0, 1, 2, 3);
        CreateTopicsResponseData.CreatableTopicResult createTopicResult = ctx.createTestTopic("foo", new int[][]{{0, 1, 2, 3}});
        TopicIdPartition topicIdPartition = new TopicIdPartition(createTopicResult.topicId(), 0);
        Assertions.assertEquals((Object)OptionalInt.of(0), (Object)ctx.currentLeader(topicIdPartition));
        ctx.alterTopicConfig("foo", "min.insync.replicas", "3");
        ctx.fenceBrokers(Utils.mkSet((Object[])new Integer[]{2, 3}));
        PartitionRegistration partition = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
        if (replicationControl.isElrEnabled()) {
            Assertions.assertArrayEquals((int[])new int[]{3}, (int[])partition.elr, (String)partition.toString());
            Assertions.assertArrayEquals((int[])new int[0], (int[])partition.lastKnownElr, (String)partition.toString());
        }
        ctx.fenceBrokers(Utils.mkSet((Object[])new Integer[]{1, 2, 3}));
        partition = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
        if (replicationControl.isElrEnabled()) {
            Assertions.assertArrayEquals((int[])new int[]{1, 3}, (int[])partition.elr, (String)partition.toString());
            Assertions.assertArrayEquals((int[])new int[0], (int[])partition.lastKnownElr, (String)partition.toString());
        }
        ctx.unfenceBrokers(0, 1, 2, 3);
        partition = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
        if (replicationControl.isElrEnabled()) {
            Assertions.assertArrayEquals((int[])new int[]{1, 3}, (int[])partition.elr, (String)partition.toString());
            Assertions.assertArrayEquals((int[])new int[0], (int[])partition.lastKnownElr, (String)partition.toString());
        }
    }

    @Test
    public void testEligibleLeaderReplicas_DeleteTopic() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setIsElrEnabled(true).build();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        CreateTopicsResponseData.CreatableTopicResult createTopicResult = ctx.createTestTopic("foo", new int[][]{{0, 1, 2}});
        TopicIdPartition topicIdPartition = new TopicIdPartition(createTopicResult.topicId(), 0);
        Assertions.assertEquals((Object)OptionalInt.of(0), (Object)ctx.currentLeader(topicIdPartition));
        long brokerEpoch = ctx.currentBrokerEpoch(0);
        ctx.alterTopicConfig("foo", "min.insync.replicas", "2");
        AlterPartitionRequestData.PartitionData shrinkIsrRequest = this.newAlterPartition(replicationControl, topicIdPartition, ReplicationControlManagerTest.isrWithDefaultEpoch(0), LeaderRecoveryState.RECOVERED);
        ControllerResult<AlterPartitionResponseData> shrinkIsrResult = this.sendAlterPartition(ctx, 0, brokerEpoch, topicIdPartition.topicId(), shrinkIsrRequest);
        AlterPartitionResponseData.PartitionData shrinkIsrResponse = this.assertAlterPartitionResponse(shrinkIsrResult, topicIdPartition, Errors.NONE);
        this.assertConsistentAlterPartitionResponse(replicationControl, topicIdPartition, shrinkIsrResponse);
        PartitionRegistration partition = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
        Assertions.assertArrayEquals((int[])new int[]{1, 2}, (int[])partition.elr, (String)partition.toString());
        Assertions.assertArrayEquals((int[])new int[0], (int[])partition.lastKnownElr, (String)partition.toString());
        Assertions.assertTrue((boolean)replicationControl.brokersToElrs().partitionsWithBrokerInElr(1).hasNext());
        ControllerRequestContext deleteTopicsRequestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.DELETE_TOPICS);
        ctx.deleteTopic(deleteTopicsRequestContext, createTopicResult.topicId());
        Assertions.assertFalse((boolean)replicationControl.brokersToElrs().partitionsWithBrokerInElr(1).hasNext());
        Assertions.assertFalse((boolean)replicationControl.brokersToIsrs().partitionsWithBrokerInIsr(0).hasNext());
    }

    @Test
    public void testEligibleLeaderReplicas_EffectiveMinIsr() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setIsElrEnabled(true).build();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        CreateTopicsResponseData.CreatableTopicResult createTopicResult = ctx.createTestTopic("foo", new int[][]{{0, 1, 2}});
        TopicIdPartition topicIdPartition = new TopicIdPartition(createTopicResult.topicId(), 0);
        Assertions.assertEquals((Object)OptionalInt.of(0), (Object)ctx.currentLeader(topicIdPartition));
        ctx.alterTopicConfig("foo", "min.insync.replicas", "5");
        Assertions.assertEquals((int)3, (int)replicationControl.getTopicEffectiveMinIsr("foo"));
    }

    @Test
    public void testEligibleLeaderReplicas_CleanElection() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setIsElrEnabled(true).build();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2, 3);
        ctx.unfenceBrokers(0, 1, 2, 3);
        CreateTopicsResponseData.CreatableTopicResult createTopicResult = ctx.createTestTopic("foo", new int[][]{{0, 1, 2, 3}});
        TopicIdPartition topicIdPartition = new TopicIdPartition(createTopicResult.topicId(), 0);
        Assertions.assertEquals((Object)OptionalInt.of(0), (Object)ctx.currentLeader(topicIdPartition));
        ctx.alterTopicConfig("foo", "min.insync.replicas", "3");
        ctx.fenceBrokers(Utils.mkSet((Object[])new Integer[]{1, 2, 3}));
        PartitionRegistration partition = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
        if (replicationControl.isElrEnabled()) {
            Assertions.assertArrayEquals((int[])new int[]{2, 3}, (int[])partition.elr, (String)partition.toString());
            Assertions.assertArrayEquals((int[])new int[0], (int[])partition.lastKnownElr, (String)partition.toString());
        }
        ctx.unfenceBrokers(2);
        ctx.fenceBrokers(Utils.mkSet((Object[])new Integer[]{0, 1}));
        partition = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
        if (replicationControl.isElrEnabled()) {
            Assertions.assertArrayEquals((int[])new int[]{0, 3}, (int[])partition.elr, (String)partition.toString());
            Assertions.assertArrayEquals((int[])new int[]{2}, (int[])partition.isr, (String)partition.toString());
            Assertions.assertEquals((int)2, (int)partition.leader, (String)partition.toString());
            Assertions.assertArrayEquals((int[])new int[0], (int[])partition.lastKnownElr, (String)partition.toString());
        }
    }

    @Test
    public void testEligibleLeaderReplicas_UncleanShutdown() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setIsElrEnabled(true).build();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2, 3);
        ctx.unfenceBrokers(0, 1, 2, 3);
        CreateTopicsResponseData.CreatableTopicResult createTopicResult = ctx.createTestTopic("foo", new int[][]{{0, 1, 2, 3}});
        TopicIdPartition topicIdPartition = new TopicIdPartition(createTopicResult.topicId(), 0);
        Assertions.assertEquals((Object)OptionalInt.of(0), (Object)ctx.currentLeader(topicIdPartition));
        ctx.alterTopicConfig("foo", "min.insync.replicas", "3");
        ctx.fenceBrokers(Utils.mkSet((Object[])new Integer[]{1, 2, 3}));
        PartitionRegistration partition = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
        Assertions.assertArrayEquals((int[])new int[]{2, 3}, (int[])partition.elr, (String)partition.toString());
        Assertions.assertArrayEquals((int[])new int[0], (int[])partition.lastKnownElr, (String)partition.toString());
        ctx.handleBrokersUncleanShutdown(3);
        partition = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
        Assertions.assertArrayEquals((int[])new int[]{2}, (int[])partition.elr, (String)partition.toString());
        Assertions.assertArrayEquals((int[])new int[0], (int[])partition.lastKnownElr, (String)partition.toString());
        ctx.handleBrokersUncleanShutdown(0);
        partition = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
        Assertions.assertArrayEquals((int[])new int[]{2}, (int[])partition.elr, (String)partition.toString());
        Assertions.assertArrayEquals((int[])new int[]{0}, (int[])partition.lastKnownElr, (String)partition.toString());
    }

    @ParameterizedTest
    @ApiKeyVersionsSource(apiKey=ApiKeys.ALTER_PARTITION)
    public void testAlterPartitionHandlesVirtualTopics(short version) {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setVirtualTopicsEnabledInConfig(true).build();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        String topicName = "foo";
        CreateTopicsResponseData.CreatableTopicResult topicResult = ctx.createVirtualTopic(topicName);
        Uuid topicId = topicResult.topicId();
        AlterPartitionRequestData request = new AlterPartitionRequestData().setBrokerId(0).setBrokerEpoch(100L).setTopics(Arrays.asList(new AlterPartitionRequestData.TopicData().setTopicName(version <= 1 ? topicName : "").setTopicId(version > 1 ? topicId : Uuid.ZERO_UUID).setPartitions(Arrays.asList(new AlterPartitionRequestData.PartitionData().setPartitionIndex(0)))));
        ControllerRequestContext requestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.ALTER_PARTITION, version);
        ControllerResult result = replicationControl.alterPartition(requestContext, request);
        AlterPartitionResponseData expectedResponse = new AlterPartitionResponseData().setTopics(Arrays.asList(new AlterPartitionResponseData.TopicData().setTopicName(version <= 1 ? topicName : "").setTopicId(version > 1 ? topicId : Uuid.ZERO_UUID).setPartitions(Arrays.asList(new AlterPartitionResponseData.PartitionData().setPartitionIndex(0).setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())))));
        Assertions.assertEquals((Object)expectedResponse, (Object)result.response());
        MultiTenantPrincipal multiTenantPrincipal = new MultiTenantPrincipal("lkc-abcd", new TenantMetadata("lkc-abcd", "lkc-abcd"));
        ControllerResult result2 = replicationControl.alterPartitionReassignments(new AlterPartitionReassignmentsRequestData().setTopics(Arrays.asList(new AlterPartitionReassignmentsRequestData.ReassignableTopic().setName(topicName).setPartitions(Arrays.asList(new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(0).setReplicas(Arrays.asList(0, 1)))))), (KafkaPrincipal)multiTenantPrincipal);
        AlterPartitionReassignmentsResponseData expectedResponse2 = new AlterPartitionReassignmentsResponseData().setResponses(Arrays.asList(new AlterPartitionReassignmentsResponseData.ReassignableTopicResponse().setName(topicName).setPartitions(Arrays.asList(new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(0).setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()).setErrorMessage("Virtual topics have no partitions."))))).setErrorMessage(null);
        Assertions.assertEquals((Object)expectedResponse2, (Object)result2.response());
        ListPartitionReassignmentsResponseData currentReassigning = new ListPartitionReassignmentsResponseData().setErrorMessage(null).setTopics(Arrays.asList(new ListPartitionReassignmentsResponseData.OngoingTopicReassignment[0]));
        Assertions.assertEquals((Object)currentReassigning, (Object)replicationControl.listPartitionReassignments(Arrays.asList(new ListPartitionReassignmentsRequestData.ListPartitionReassignmentsTopics().setName(topicName).setPartitionIndexes(Arrays.asList(0, 1, 2))), Long.MAX_VALUE));
    }

    @ParameterizedTest
    @ApiKeyVersionsSource(apiKey=ApiKeys.ALTER_PARTITION)
    public void testAlterPartitionHandleUnknownTopicIdOrName(short version) {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        String topicName = "foo";
        Uuid topicId = Uuid.randomUuid();
        AlterPartitionRequestData request = new AlterPartitionRequestData().setBrokerId(0).setBrokerEpoch(100L).setTopics(Arrays.asList(new AlterPartitionRequestData.TopicData().setTopicName(version <= 1 ? topicName : "").setTopicId(version > 1 ? topicId : Uuid.ZERO_UUID).setPartitions(Arrays.asList(new AlterPartitionRequestData.PartitionData().setPartitionIndex(0)))));
        ControllerRequestContext requestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.ALTER_PARTITION, version);
        ControllerResult result = replicationControl.alterPartition(requestContext, request);
        Errors expectedError = version > 1 ? Errors.UNKNOWN_TOPIC_ID : Errors.UNKNOWN_TOPIC_OR_PARTITION;
        AlterPartitionResponseData expectedResponse = new AlterPartitionResponseData().setTopics(Arrays.asList(new AlterPartitionResponseData.TopicData().setTopicName(version <= 1 ? topicName : "").setTopicId(version > 1 ? topicId : Uuid.ZERO_UUID).setPartitions(Arrays.asList(new AlterPartitionResponseData.PartitionData().setPartitionIndex(0).setErrorCode(expectedError.code())))));
        Assertions.assertEquals((Object)expectedResponse, (Object)result.response());
    }

    @Test
    public void testClusterLinkAlterIsr() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        ctx.registerBrokers(0);
        ctx.unfenceBrokers(0);
        ctx.createClusterLink("test-link");
        CreateTopicsResponseData.CreatableTopicResult createTopicResult = ctx.createTestTopic("foo", 1, (short)1, Errors.NONE.code(), creatableTopic -> creatableTopic.setLinkName("test-link").setMirrorTopic("foo-origin"));
        TopicIdPartition topicIdPartition = new TopicIdPartition(createTopicResult.topicId(), 0);
        long brokerEpoch = ctx.currentBrokerEpoch(0);
        AlterPartitionRequestData.PartitionData partitionData = this.newAlterPartition(replicationControl, topicIdPartition, ReplicationControlManagerTest.isrWithDefaultEpoch(0), LeaderRecoveryState.RECOVERED);
        AlterPartitionRequestData.ClusterLinkState clusterLinkState = new AlterPartitionRequestData.ClusterLinkState().setLinkedLeaderEpoch(10).setLinkFailed(false);
        partitionData.setClusterLinkState(clusterLinkState);
        ControllerResult<AlterPartitionResponseData> alterClusterLinkStateResult = this.sendAlterPartition(ctx, 0, brokerEpoch, createTopicResult.topicId(), partitionData);
        PartitionChangeRecord record = (PartitionChangeRecord)((ApiMessageAndVersion)alterClusterLinkStateResult.records().get(0)).message();
        Assertions.assertEquals((Object)new PartitionChangeRecord().setTopicId(createTopicResult.topicId()).setPartitionId(0).setLinkedLeaderEpoch(10).setLinkState(PartitionRegistration.LinkState.ACTIVE.levelCode), (Object)record);
        AlterPartitionRequestData.PartitionData partitionData1 = this.newAlterPartition(replicationControl, topicIdPartition, ReplicationControlManagerTest.isrWithDefaultEpoch(0), LeaderRecoveryState.RECOVERED);
        clusterLinkState.setLinkedLeaderEpoch(65).setLinkFailed(true);
        partitionData1.setClusterLinkState(clusterLinkState);
        alterClusterLinkStateResult = this.sendAlterPartition(ctx, 0, brokerEpoch, createTopicResult.topicId(), partitionData1);
        record = (PartitionChangeRecord)((ApiMessageAndVersion)alterClusterLinkStateResult.records().get(0)).message();
        Assertions.assertEquals((Object)record, (Object)new PartitionChangeRecord().setTopicId(createTopicResult.topicId()).setPartitionId(0).setLinkedLeaderEpoch(65).setLinkState(PartitionRegistration.LinkState.FAILED.levelCode));
    }

    @Test
    public void testInvalidAlterPartitionRequests() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        CreateTopicsResponseData.CreatableTopicResult createTopicResult = ctx.createTestTopic("foo", new int[][]{{0, 1, 2}});
        TopicIdPartition topicIdPartition = new TopicIdPartition(createTopicResult.topicId(), 0);
        int leaderId = 0;
        int notLeaderId = 1;
        Assertions.assertEquals((Object)OptionalInt.of(leaderId), (Object)ctx.currentLeader(topicIdPartition));
        long brokerEpoch = ctx.currentBrokerEpoch(0);
        AlterPartitionRequestData.PartitionData invalidLeaderRequest = this.newAlterPartition(replicationControl, topicIdPartition, ReplicationControlManagerTest.isrWithDefaultEpoch(0, 1), LeaderRecoveryState.RECOVERED);
        ControllerResult<AlterPartitionResponseData> invalidLeaderResult = this.sendAlterPartition(ctx, notLeaderId, ctx.currentBrokerEpoch(notLeaderId), topicIdPartition.topicId(), invalidLeaderRequest);
        this.assertAlterPartitionResponse(invalidLeaderResult, topicIdPartition, Errors.INVALID_REQUEST);
        AlterPartitionRequestData.PartitionData invalidBrokerEpochRequest = this.newAlterPartition(replicationControl, topicIdPartition, ReplicationControlManagerTest.isrWithDefaultEpoch(0, 1), LeaderRecoveryState.RECOVERED);
        Assertions.assertThrows(StaleBrokerEpochException.class, () -> this.sendAlterPartition(ctx, leaderId, brokerEpoch - 1L, topicIdPartition.topicId(), invalidBrokerEpochRequest));
        AlterPartitionRequestData.PartitionData invalidLeaderEpochRequest = this.newAlterPartition(replicationControl, topicIdPartition, ReplicationControlManagerTest.isrWithDefaultEpoch(0, 1), LeaderRecoveryState.RECOVERED);
        invalidLeaderEpochRequest.setLeaderEpoch(500);
        ControllerResult<AlterPartitionResponseData> invalidLeaderEpochResult = this.sendAlterPartition(ctx, leaderId, ctx.currentBrokerEpoch(leaderId), topicIdPartition.topicId(), invalidLeaderEpochRequest);
        this.assertAlterPartitionResponse(invalidLeaderEpochResult, topicIdPartition, Errors.NOT_CONTROLLER);
        AlterPartitionRequestData.PartitionData invalidPartitionEpochRequest = this.newAlterPartition(replicationControl, topicIdPartition, ReplicationControlManagerTest.isrWithDefaultEpoch(0, 1), LeaderRecoveryState.RECOVERED);
        invalidPartitionEpochRequest.setPartitionEpoch(500);
        ControllerResult<AlterPartitionResponseData> invalidPartitionEpochResult = this.sendAlterPartition(ctx, leaderId, ctx.currentBrokerEpoch(leaderId), topicIdPartition.topicId(), invalidPartitionEpochRequest);
        this.assertAlterPartitionResponse(invalidPartitionEpochResult, topicIdPartition, Errors.NOT_CONTROLLER);
        AlterPartitionRequestData.PartitionData invalidIsrRequest1 = this.newAlterPartition(replicationControl, topicIdPartition, ReplicationControlManagerTest.isrWithDefaultEpoch(0, 1, 3), LeaderRecoveryState.RECOVERED);
        ControllerResult<AlterPartitionResponseData> invalidIsrResult1 = this.sendAlterPartition(ctx, leaderId, ctx.currentBrokerEpoch(leaderId), topicIdPartition.topicId(), invalidIsrRequest1);
        this.assertAlterPartitionResponse(invalidIsrResult1, topicIdPartition, Errors.INVALID_REQUEST);
        AlterPartitionRequestData.PartitionData invalidIsrRequest2 = this.newAlterPartition(replicationControl, topicIdPartition, ReplicationControlManagerTest.isrWithDefaultEpoch(1, 2), LeaderRecoveryState.RECOVERED);
        ControllerResult<AlterPartitionResponseData> invalidIsrResult2 = this.sendAlterPartition(ctx, leaderId, ctx.currentBrokerEpoch(leaderId), topicIdPartition.topicId(), invalidIsrRequest2);
        this.assertAlterPartitionResponse(invalidIsrResult2, topicIdPartition, Errors.INVALID_REQUEST);
        AlterPartitionRequestData.PartitionData invalidIsrRecoveryRequest = this.newAlterPartition(replicationControl, topicIdPartition, ReplicationControlManagerTest.isrWithDefaultEpoch(0, 1), LeaderRecoveryState.RECOVERING);
        ControllerResult<AlterPartitionResponseData> invalidIsrRecoveryResult = this.sendAlterPartition(ctx, leaderId, ctx.currentBrokerEpoch(leaderId), topicIdPartition.topicId(), invalidIsrRecoveryRequest);
        this.assertAlterPartitionResponse(invalidIsrRecoveryResult, topicIdPartition, Errors.INVALID_REQUEST);
        AlterPartitionRequestData.PartitionData invalidRecoveryRequest = this.newAlterPartition(replicationControl, topicIdPartition, ReplicationControlManagerTest.isrWithDefaultEpoch(0), LeaderRecoveryState.RECOVERING);
        ControllerResult<AlterPartitionResponseData> invalidRecoveryResult = this.sendAlterPartition(ctx, leaderId, ctx.currentBrokerEpoch(leaderId), topicIdPartition.topicId(), invalidRecoveryRequest);
        this.assertAlterPartitionResponse(invalidRecoveryResult, topicIdPartition, Errors.INVALID_REQUEST);
    }

    private AlterPartitionRequestData.PartitionData newAlterPartition(ReplicationControlManager replicationControl, TopicIdPartition topicIdPartition, List<AlterPartitionRequestData.BrokerState> newIsrWithEpoch, LeaderRecoveryState leaderRecoveryState) {
        PartitionRegistration partitionControl = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
        return new AlterPartitionRequestData.PartitionData().setPartitionIndex(0).setLeaderEpoch(partitionControl.leaderEpoch).setPartitionEpoch(partitionControl.partitionEpoch).setNewIsrWithEpochs(newIsrWithEpoch).setLeaderRecoveryState(leaderRecoveryState.value());
    }

    private ControllerResult<AlterPartitionResponseData> sendAlterPartition(ReplicationControlTestContext ctx, int brokerId, long brokerEpoch, Uuid topicId, AlterPartitionRequestData.PartitionData partitionData) {
        AlterPartitionRequestData request = new AlterPartitionRequestData().setBrokerId(brokerId).setBrokerEpoch(brokerEpoch);
        AlterPartitionRequestData.TopicData topicData = new AlterPartitionRequestData.TopicData().setTopicId(topicId);
        request.topics().add(topicData);
        topicData.partitions().add(partitionData);
        ControllerRequestContext requestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.ALTER_PARTITION);
        ControllerResult result = ctx.replicationControl.alterPartition(requestContext, request);
        ctx.replay(result.records());
        return result;
    }

    private AlterPartitionResponseData.PartitionData assertAlterPartitionResponse(ControllerResult<AlterPartitionResponseData> alterPartitionResult, TopicIdPartition topicIdPartition, Errors expectedError) {
        AlterPartitionResponseData response = (AlterPartitionResponseData)alterPartitionResult.response();
        Assertions.assertEquals((int)1, (int)response.topics().size());
        AlterPartitionResponseData.TopicData topicData = (AlterPartitionResponseData.TopicData)response.topics().get(0);
        Assertions.assertEquals((Object)topicIdPartition.topicId(), (Object)topicData.topicId());
        Assertions.assertEquals((int)1, (int)topicData.partitions().size());
        AlterPartitionResponseData.PartitionData partitionData = (AlterPartitionResponseData.PartitionData)topicData.partitions().get(0);
        Assertions.assertEquals((int)topicIdPartition.partitionId(), (int)partitionData.partitionIndex());
        Assertions.assertEquals((Object)expectedError, (Object)Errors.forCode((short)partitionData.errorCode()));
        return partitionData;
    }

    private void assertConsistentAlterPartitionResponse(ReplicationControlManager replicationControl, TopicIdPartition topicIdPartition, AlterPartitionResponseData.PartitionData partitionData) {
        PartitionRegistration partitionControl = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
        Assertions.assertEquals((int)partitionControl.leader, (int)partitionData.leaderId());
        Assertions.assertEquals((int)partitionControl.leaderEpoch, (int)partitionData.leaderEpoch());
        Assertions.assertEquals((int)partitionControl.partitionEpoch, (int)partitionData.partitionEpoch());
        List<Integer> expectedIsr = ReplicationControlManagerTest.arrayToList(partitionControl.isr);
        Assertions.assertEquals(expectedIsr, (Object)partitionData.isr());
    }

    private void assertCreatedTopicConfigs(ReplicationControlTestContext ctx, String topic, CreateTopicsRequestData.CreateableTopicConfigCollection requestConfigs) {
        Map configs = ctx.configurationControl.getConfigs(new ConfigResource(ConfigResource.Type.TOPIC, topic));
        Assertions.assertEquals((int)requestConfigs.size(), (int)configs.size());
        for (CreateTopicsRequestData.CreateableTopicConfig requestConfig : requestConfigs) {
            String value = (String)configs.get(requestConfig.name());
            Assertions.assertEquals((Object)requestConfig.value(), (Object)value);
        }
    }

    private void assertEmptyTopicConfigs(ReplicationControlTestContext ctx, String topic) {
        Map configs = ctx.configurationControl.getConfigs(new ConfigResource(ConfigResource.Type.TOPIC, topic));
        Assertions.assertEquals(Collections.emptyMap(), (Object)configs);
    }

    @Test
    public void testDeleteTopics() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        CreateTopicsRequestData request = new CreateTopicsRequestData();
        CreateTopicsRequestData.CreateableTopicConfigCollection requestConfigs = new CreateTopicsRequestData.CreateableTopicConfigCollection();
        requestConfigs.add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreateableTopicConfig().setName("cleanup.policy").setValue("compact"));
        requestConfigs.add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreateableTopicConfig().setName("min.cleanable.dirty.ratio").setValue("0.1"));
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions(3).setReplicationFactor((short)2).setConfigs(requestConfigs));
        ctx.registerBrokers(0, 1);
        ctx.unfenceBrokers(0, 1);
        ControllerRequestContext createTopicsRequestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.CREATE_TOPICS);
        ControllerResult createResult = replicationControl.createTopics(createTopicsRequestContext, request, Collections.singleton("foo"));
        CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData();
        Uuid topicId = ((CreateTopicsResponseData)createResult.response()).topics().find("foo").topicId();
        expectedResponse.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsResponseData.CreatableTopicResult().setName("foo").setNumPartitions(3).setReplicationFactor((short)2).setErrorMessage(null).setErrorCode((short)0).setTopicId(topicId));
        Assertions.assertEquals((Object)expectedResponse, (Object)createResult.response());
        Assertions.assertNull((Object)replicationControl.getPartition(topicId, 0));
        this.assertEmptyTopicConfigs(ctx, "foo");
        ctx.replay(createResult.records());
        Assertions.assertNotNull((Object)replicationControl.getPartition(topicId, 0));
        Assertions.assertNotNull((Object)replicationControl.getPartition(topicId, 1));
        Assertions.assertNotNull((Object)replicationControl.getPartition(topicId, 2));
        Assertions.assertNull((Object)replicationControl.getPartition(topicId, 3));
        this.assertCreatedTopicConfigs(ctx, "foo", requestConfigs);
        Assertions.assertEquals(Collections.singletonMap(topicId, new ResultOrError((Object)"foo")), (Object)replicationControl.findTopicNames(Long.MAX_VALUE, Collections.singleton(topicId)));
        Assertions.assertEquals(Collections.singletonMap("foo", new ResultOrError((Object)topicId)), (Object)replicationControl.findTopicIds(Long.MAX_VALUE, Collections.singleton("foo")));
        Uuid invalidId = new Uuid(topicId.getMostSignificantBits() + 1L, topicId.getLeastSignificantBits());
        Assertions.assertEquals(Collections.singletonMap(invalidId, new ResultOrError(new ApiError(Errors.UNKNOWN_TOPIC_ID))), (Object)replicationControl.findTopicNames(Long.MAX_VALUE, Collections.singleton(invalidId)));
        Assertions.assertEquals(Collections.singletonMap("bar", new ResultOrError(new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION))), (Object)replicationControl.findTopicIds(Long.MAX_VALUE, Collections.singleton("bar")));
        ControllerRequestContext deleteTopicsRequestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.DELETE_TOPICS);
        ControllerResult invalidDeleteResult = replicationControl.deleteTopics(deleteTopicsRequestContext, Collections.singletonList(invalidId));
        Assertions.assertEquals((int)0, (int)invalidDeleteResult.records().size());
        Assertions.assertEquals(Collections.singletonMap(invalidId, new ApiError(Errors.UNKNOWN_TOPIC_ID, null)), (Object)invalidDeleteResult.response());
        ControllerResult deleteResult = replicationControl.deleteTopics(deleteTopicsRequestContext, Collections.singletonList(topicId));
        Assertions.assertTrue((boolean)deleteResult.isAtomic());
        Assertions.assertEquals(Collections.singletonMap(topicId, new ApiError(Errors.NONE, null)), (Object)deleteResult.response());
        Assertions.assertEquals((int)1, (int)deleteResult.records().size());
        ctx.replay(deleteResult.records());
        Assertions.assertNull((Object)replicationControl.getPartition(topicId, 0));
        Assertions.assertNull((Object)replicationControl.getPartition(topicId, 1));
        Assertions.assertNull((Object)replicationControl.getPartition(topicId, 2));
        Assertions.assertNull((Object)replicationControl.getPartition(topicId, 3));
        Assertions.assertEquals(Collections.singletonMap(topicId, new ResultOrError(new ApiError(Errors.UNKNOWN_TOPIC_ID))), (Object)replicationControl.findTopicNames(Long.MAX_VALUE, Collections.singleton(topicId)));
        Assertions.assertEquals(Collections.singletonMap("foo", new ResultOrError(new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION))), (Object)replicationControl.findTopicIds(Long.MAX_VALUE, Collections.singleton("foo")));
        this.assertEmptyTopicConfigs(ctx, "foo");
    }

    @Test
    public void testDeleteTopicsWithMutationQuotaExceeded() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        CreateTopicsRequestData request = new CreateTopicsRequestData();
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions(3).setReplicationFactor((short)2));
        ctx.registerBrokers(0, 1);
        ctx.unfenceBrokers(0, 1);
        ControllerRequestContext createTopicsRequestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.CREATE_TOPICS);
        ControllerResult createResult = replicationControl.createTopics(createTopicsRequestContext, request, Collections.singleton("foo"));
        CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData();
        CreateTopicsResponseData.CreatableTopicResult createdTopic = ((CreateTopicsResponseData)createResult.response()).topics().find("foo");
        Assertions.assertEquals((short)Errors.NONE.code(), (short)createdTopic.errorCode());
        ctx.replay(createResult.records());
        ControllerRequestContext deleteTopicsRequestContext = ControllerRequestContextUtil.anonymousContextWithMutationQuotaExceededFor(ApiKeys.DELETE_TOPICS);
        Uuid topicId = createdTopic.topicId();
        ControllerResult deleteResult = replicationControl.deleteTopics(deleteTopicsRequestContext, Collections.singletonList(topicId));
        Assertions.assertEquals(Collections.singletonMap(topicId, new ApiError(Errors.THROTTLING_QUOTA_EXCEEDED, "Quota exceeded in test")), (Object)deleteResult.response());
        Assertions.assertEquals((int)0, (int)deleteResult.records().size());
    }

    @Test
    public void testCreatePartitions() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setMetadataVersion(MetadataVersion.latestTesting()).build();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        CreateTopicsRequestData request = new CreateTopicsRequestData();
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions(3).setReplicationFactor((short)2));
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("bar").setNumPartitions(4).setReplicationFactor((short)2));
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("quux").setNumPartitions(2).setReplicationFactor((short)2));
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("foo2").setNumPartitions(2).setReplicationFactor((short)2));
        ctx.registerBrokersWithDirs(0, Collections.emptyList(), 1, Arrays.asList(Uuid.fromString((String)"QMzamNQVQ7GnJK9DwQHG7Q"), Uuid.fromString((String)"loDxEBLETdedNnQGOKKENw")), 3, Collections.singletonList(Uuid.fromString((String)"dxCDSgNjQvS4WuyqEKoCwA")));
        ctx.unfenceBrokers(0, 1, 3);
        ControllerRequestContext requestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.CREATE_TOPICS);
        ControllerResult createTopicResult = replicationControl.createTopics(requestContext, request, new HashSet<String>(Arrays.asList("foo", "bar", "quux", "foo2")));
        ctx.replay(createTopicResult.records());
        ArrayList<CreatePartitionsRequestData.CreatePartitionsTopic> topics = new ArrayList<CreatePartitionsRequestData.CreatePartitionsTopic>();
        topics.add(new CreatePartitionsRequestData.CreatePartitionsTopic().setName("foo").setCount(5).setAssignments(null));
        topics.add(new CreatePartitionsRequestData.CreatePartitionsTopic().setName("bar").setCount(3).setAssignments(null));
        topics.add(new CreatePartitionsRequestData.CreatePartitionsTopic().setName("baz").setCount(3).setAssignments(null));
        topics.add(new CreatePartitionsRequestData.CreatePartitionsTopic().setName("quux").setCount(2).setAssignments(null));
        ControllerResult createPartitionsResult = replicationControl.createPartitions(requestContext, topics);
        Assertions.assertEquals(Arrays.asList(new CreatePartitionsResponseData.CreatePartitionsTopicResult().setName("foo").setErrorCode(Errors.NONE.code()).setErrorMessage(null), new CreatePartitionsResponseData.CreatePartitionsTopicResult().setName("bar").setErrorCode(Errors.INVALID_PARTITIONS.code()).setErrorMessage("The topic bar currently has 4 partition(s); 3 would not be an increase."), new CreatePartitionsResponseData.CreatePartitionsTopicResult().setName("baz").setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()).setErrorMessage(null), new CreatePartitionsResponseData.CreatePartitionsTopicResult().setName("quux").setErrorCode(Errors.INVALID_PARTITIONS.code()).setErrorMessage("Topic already has 2 partition(s).")), (Object)createPartitionsResult.response());
        ctx.replay(createPartitionsResult.records());
        ArrayList<CreatePartitionsRequestData.CreatePartitionsTopic> topics2 = new ArrayList<CreatePartitionsRequestData.CreatePartitionsTopic>();
        topics2.add(new CreatePartitionsRequestData.CreatePartitionsTopic().setName("foo").setCount(6).setAssignments(Arrays.asList(new CreatePartitionsRequestData.CreatePartitionsAssignment().setBrokerIds(Arrays.asList(1, 3)))));
        topics2.add(new CreatePartitionsRequestData.CreatePartitionsTopic().setName("bar").setCount(5).setAssignments(Arrays.asList(new CreatePartitionsRequestData.CreatePartitionsAssignment().setBrokerIds(Arrays.asList(1)))));
        topics2.add(new CreatePartitionsRequestData.CreatePartitionsTopic().setName("quux").setCount(4).setAssignments(Arrays.asList(new CreatePartitionsRequestData.CreatePartitionsAssignment().setBrokerIds(Arrays.asList(1, 0)))));
        topics2.add(new CreatePartitionsRequestData.CreatePartitionsTopic().setName("foo2").setCount(3).setAssignments(Arrays.asList(new CreatePartitionsRequestData.CreatePartitionsAssignment().setBrokerIds(Arrays.asList(2, 0)))));
        ControllerResult createPartitionsResult2 = replicationControl.createPartitions(requestContext, topics2);
        Assertions.assertEquals(Arrays.asList(new CreatePartitionsResponseData.CreatePartitionsTopicResult().setName("foo").setErrorCode(Errors.NONE.code()).setErrorMessage(null), new CreatePartitionsResponseData.CreatePartitionsTopicResult().setName("bar").setErrorCode(Errors.INVALID_REPLICA_ASSIGNMENT.code()).setErrorMessage("The manual partition assignment includes a partition with 1 replica(s), but this is not consistent with previous partitions, which have 2 replica(s)."), new CreatePartitionsResponseData.CreatePartitionsTopicResult().setName("quux").setErrorCode(Errors.INVALID_REPLICA_ASSIGNMENT.code()).setErrorMessage("Attempted to add 2 additional partition(s), but only 1 assignment(s) were specified."), new CreatePartitionsResponseData.CreatePartitionsTopicResult().setName("foo2").setErrorCode(Errors.INVALID_REPLICA_ASSIGNMENT.code()).setErrorMessage("The manual partition assignment includes broker 2, but no such broker is registered.")), (Object)createPartitionsResult2.response());
        ctx.replay(createPartitionsResult2.records());
        Assertions.assertArrayEquals((Object[])new Uuid[]{DirectoryId.UNASSIGNED, Uuid.fromString((String)"dxCDSgNjQvS4WuyqEKoCwA")}, (Object[])replicationControl.getPartition((Uuid)replicationControl.getTopicId((String)"foo"), (int)5).directories);
    }

    @Test
    public void testCreatePartitionsWithMutationQuotaExceeded() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        CreateTopicsRequestData request = new CreateTopicsRequestData();
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions(3).setReplicationFactor((short)2));
        ctx.registerBrokers(0, 1);
        ctx.unfenceBrokers(0, 1);
        ControllerRequestContext createTopicsRequestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.CREATE_TOPICS);
        ControllerResult createResult = replicationControl.createTopics(createTopicsRequestContext, request, Collections.singleton("foo"));
        CreateTopicsResponseData.CreatableTopicResult createdTopic = ((CreateTopicsResponseData)createResult.response()).topics().find("foo");
        Assertions.assertEquals((short)Errors.NONE.code(), (short)createdTopic.errorCode());
        ctx.replay(createResult.records());
        ArrayList<CreatePartitionsRequestData.CreatePartitionsTopic> topics = new ArrayList<CreatePartitionsRequestData.CreatePartitionsTopic>();
        topics.add(new CreatePartitionsRequestData.CreatePartitionsTopic().setName("foo").setCount(5).setAssignments(null));
        ControllerRequestContext createPartitionsRequestContext = ControllerRequestContextUtil.anonymousContextWithMutationQuotaExceededFor(ApiKeys.CREATE_PARTITIONS);
        ControllerResult createPartitionsResult = replicationControl.createPartitions(createPartitionsRequestContext, topics);
        List<CreatePartitionsResponseData.CreatePartitionsTopicResult> expectedThrottled = Collections.singletonList(new CreatePartitionsResponseData.CreatePartitionsTopicResult().setName("foo").setErrorCode(Errors.THROTTLING_QUOTA_EXCEEDED.code()).setErrorMessage("Quota exceeded in test"));
        Assertions.assertEquals(expectedThrottled, (Object)createPartitionsResult.response());
        ArrayList<CreatePartitionsRequestData.CreatePartitionsTopic> topics2 = new ArrayList<CreatePartitionsRequestData.CreatePartitionsTopic>();
        topics2.add(new CreatePartitionsRequestData.CreatePartitionsTopic().setName("foo").setCount(4).setAssignments(Arrays.asList(new CreatePartitionsRequestData.CreatePartitionsAssignment().setBrokerIds(Arrays.asList(1, 0)))));
        ControllerResult createPartitionsResult2 = replicationControl.createPartitions(createPartitionsRequestContext, topics2);
        Assertions.assertEquals(expectedThrottled, (Object)createPartitionsResult2.response());
    }

    @Test
    public void testCreatePartitionsFailsWhenAllBrokersAreFencedOrInControlledShutdown() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        CreateTopicsRequestData request = new CreateTopicsRequestData();
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions(1).setReplicationFactor((short)2));
        ctx.registerBrokers(0, 1);
        ctx.unfenceBrokers(0, 1);
        ControllerRequestContext requestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.CREATE_TOPICS);
        ControllerResult createTopicResult = replicationControl.createTopics(requestContext, request, new HashSet<String>(Arrays.asList("foo")));
        ctx.replay(createTopicResult.records());
        ctx.registerBrokers(0, 1);
        ctx.unfenceBrokers(0);
        ctx.inControlledShutdownBrokers(0);
        ArrayList<CreatePartitionsRequestData.CreatePartitionsTopic> topics = new ArrayList<CreatePartitionsRequestData.CreatePartitionsTopic>();
        topics.add(new CreatePartitionsRequestData.CreatePartitionsTopic().setName("foo").setCount(2).setAssignments(null));
        ControllerResult createPartitionsResult = replicationControl.createPartitions(requestContext, topics);
        Assertions.assertEquals(Arrays.asList(new CreatePartitionsResponseData.CreatePartitionsTopicResult().setName("foo").setErrorCode(Errors.INVALID_REPLICATION_FACTOR.code()).setErrorMessage("Unable to replicate the partition 2 time(s): All brokers are currently fenced or in controlled shutdown.")), (Object)createPartitionsResult.response());
    }

    @Test
    public void testCreatePartitionsISRInvariants() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        CreateTopicsRequestData request = new CreateTopicsRequestData();
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions(1).setReplicationFactor((short)3));
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1);
        ctx.inControlledShutdownBrokers(1);
        ControllerRequestContext requestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.CREATE_TOPICS);
        ControllerResult result = replicationControl.createTopics(requestContext, request, Collections.singleton("foo"));
        ctx.replay(result.records());
        List<CreatePartitionsRequestData.CreatePartitionsTopic> topics = Arrays.asList(new CreatePartitionsRequestData.CreatePartitionsTopic().setName("foo").setCount(2).setAssignments(null));
        ControllerResult createPartitionsResult = replicationControl.createPartitions(requestContext, topics);
        ctx.replay(createPartitionsResult.records());
        Assertions.assertEquals((Object)new PartitionRegistration.Builder().setReplicas(new int[]{0, 1, 2}).setDirectories(new Uuid[]{Uuid.fromString((String)"TESTBROKER00000DIRAAAA"), Uuid.fromString((String)"TESTBROKER00001DIRAAAA"), Uuid.fromString((String)"TESTBROKER00002DIRAAAA")}).setIsr(new int[]{0}).setLeader(Integer.valueOf(0)).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(Integer.valueOf(0)).setPartitionEpoch(Integer.valueOf(0)).build(), (Object)replicationControl.getPartition(((TopicRecord)((ApiMessageAndVersion)result.records().get(0)).message()).topicId(), 1));
    }

    @Test
    public void testValidateGoodManualPartitionAssignments() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        Set<Integer> excludedBrokers = Collections.singleton(4);
        ctx.registerBrokers(1, 2, 3, 4);
        ctx.replicationControl.validateManualPartitionAssignment(PartitionAssignmentTest.partitionAssignment(Collections.singletonList(1)), OptionalInt.of(1), excludedBrokers, -1, Optional.empty());
        ctx.replicationControl.validateManualPartitionAssignment(PartitionAssignmentTest.partitionAssignment(Collections.singletonList(1)), OptionalInt.empty(), excludedBrokers, -1, Optional.empty());
        ctx.replicationControl.validateManualPartitionAssignment(PartitionAssignmentTest.partitionAssignment(Arrays.asList(1, 2, 3)), OptionalInt.of(3), excludedBrokers, -1, Optional.empty());
        ctx.replicationControl.validateManualPartitionAssignment(PartitionAssignmentTest.partitionAssignment(Arrays.asList(1, 2, 3)), OptionalInt.empty(), excludedBrokers, -1, Optional.empty());
        ctx.cellControl.replay(new CellRecord().setCellId(0).setBrokers(Arrays.asList(0, 1, 2, 3, 4)));
        ctx.replicationControl.validateManualPartitionAssignment(PartitionAssignmentTest.partitionAssignment(Arrays.asList(1, 2, 3)), OptionalInt.empty(), excludedBrokers, 0, Optional.empty());
    }

    @Test
    public void testValidateBadManualPartitionAssignments() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        Set<Integer> excludedBrokers = Collections.singleton(3);
        ctx.registerBrokers(1, 2, 3, 5, 6, 7);
        Assertions.assertEquals((Object)"The manual partition assignment includes an empty replica list.", (Object)((InvalidReplicaAssignmentException)Assertions.assertThrows(InvalidReplicaAssignmentException.class, () -> ctx.replicationControl.validateManualPartitionAssignment(PartitionAssignmentTest.partitionAssignment(Arrays.asList(new Integer[0])), OptionalInt.empty(), excludedBrokers, -1, Optional.empty()))).getMessage());
        Assertions.assertEquals((Object)"The manual partition assignment includes broker 4, but no such broker is registered.", (Object)((InvalidReplicaAssignmentException)Assertions.assertThrows(InvalidReplicaAssignmentException.class, () -> ctx.replicationControl.validateManualPartitionAssignment(PartitionAssignmentTest.partitionAssignment(Arrays.asList(2, 3, 4)), OptionalInt.empty(), Collections.singleton(1), -1, Optional.empty()))).getMessage());
        Assertions.assertEquals((Object)"The manual partition assignment includes the broker 2 more than once.", (Object)((InvalidReplicaAssignmentException)Assertions.assertThrows(InvalidReplicaAssignmentException.class, () -> ctx.replicationControl.validateManualPartitionAssignment(PartitionAssignmentTest.partitionAssignment(Arrays.asList(1, 2, 2)), OptionalInt.empty(), excludedBrokers, -1, Optional.empty()))).getMessage());
        Assertions.assertEquals((Object)"The manual partition assignment includes the broker 3 which is excluded from replica placement.", (Object)((InvalidReplicaAssignmentException)Assertions.assertThrows(InvalidReplicaAssignmentException.class, () -> ctx.replicationControl.validateManualPartitionAssignment(PartitionAssignmentTest.partitionAssignment(Arrays.asList(1, 2, 3)), OptionalInt.empty(), excludedBrokers, -1, Optional.empty()))).getMessage());
        Assertions.assertEquals((Object)"The manual partition assignment includes a partition with 2 replica(s), but this is not consistent with previous partitions, which have 3 replica(s).", (Object)((InvalidReplicaAssignmentException)Assertions.assertThrows(InvalidReplicaAssignmentException.class, () -> ctx.replicationControl.validateManualPartitionAssignment(PartitionAssignmentTest.partitionAssignment(Arrays.asList(1, 2)), OptionalInt.of(3), excludedBrokers, -1, Optional.empty()))).getMessage());
        ctx.cellControl.replay(new CellRecord().setCellId(0).setBrokers(Arrays.asList(0, 1, 2)));
        Assertions.assertEquals((Object)"The manual partition assignment includes brokers 5, 6, 7, but they must be from 0, 1, 2.", (Object)((InvalidReplicaAssignmentException)Assertions.assertThrows(InvalidReplicaAssignmentException.class, () -> ctx.replicationControl.validateManualPartitionAssignment(PartitionAssignmentTest.partitionAssignment(Arrays.asList(5, 6, 7)), OptionalInt.empty(), new HashSet(), 0, Optional.empty()))).getMessage());
        String topicPlacementJson = "{\"version\":1,\"replicas\":[{\"count\":1,\"constraints\":{\"rack\":\"rack0\"}}],\"observers\":[{\"count\":1,\"constraints\":{\"rack\":\"rack1\"}}]}";
        Assertions.assertEquals((Object)"The assignment contains observers but the replicas suffix doesn't match the observers.", (Object)((InvalidReplicaAssignmentException)Assertions.assertThrows(InvalidReplicaAssignmentException.class, () -> ctx.replicationControl.validateManualPartitionAssignment(new PartitionAssignment(Arrays.asList(1, 2, 3), Arrays.asList(2), __ -> DirectoryId.MIGRATING), OptionalInt.empty(), Collections.emptySet(), -1, TopicPlacement.parse((String)topicPlacementJson)))).getMessage());
        Assertions.assertEquals((Object)"Observers must be a subset of replicas.", (Object)((InvalidReplicaAssignmentException)Assertions.assertThrows(InvalidReplicaAssignmentException.class, () -> ctx.replicationControl.validateManualPartitionAssignment(new PartitionAssignment(Arrays.asList(1, 2, 3), Arrays.asList(1, 2, 3), __ -> DirectoryId.MIGRATING), OptionalInt.empty(), Collections.emptySet(), -1, TopicPlacement.parse((String)topicPlacementJson)))).getMessage());
        Assertions.assertEquals((Object)"Assignment contains observers but there is no topic placement configured for the topic.", (Object)((InvalidReplicaAssignmentException)Assertions.assertThrows(InvalidReplicaAssignmentException.class, () -> ctx.replicationControl.validateManualPartitionAssignment(new PartitionAssignment(Arrays.asList(1, 2, 3), Arrays.asList(1, 2, 3), __ -> DirectoryId.MIGRATING), OptionalInt.empty(), Collections.emptySet(), -1, Optional.empty()))).getMessage());
        Assertions.assertEquals((Object)"Replicas ([1]) do not match the sync replicas constraints ([ConstraintCount(count=1,constraints={rack=rack0})]). The following replicas matched: [].", (Object)((InvalidReplicaAssignmentException)Assertions.assertThrows(InvalidReplicaAssignmentException.class, () -> ctx.replicationControl.validateManualPartitionAssignment(new PartitionAssignment(Arrays.asList(1, 2), Arrays.asList(2), __ -> DirectoryId.MIGRATING), OptionalInt.empty(), Collections.emptySet(), -1, TopicPlacement.parse((String)topicPlacementJson)))).getMessage());
    }

    @ParameterizedTest
    @ApiKeyVersionsSource(apiKey=ApiKeys.ALTER_PARTITION)
    public void testReassignPartitions(short version) {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replication = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2, 3);
        ctx.unfenceBrokers(0, 1, 2, 3);
        Uuid fooId = ctx.createTestTopic("foo", new int[][]{{1, 2, 3}, {3, 2, 1}}).topicId();
        ctx.createTestTopic("bar", new int[][]{{1, 2, 3}}).topicId();
        Assertions.assertEquals((Object)NONE_REASSIGNING, (Object)replication.listPartitionReassignments(null, Long.MAX_VALUE));
        ctx.clusterControl.replay(new BrokerReplicaExclusionRecord().setBrokerExclusions(Collections.singletonList(new BrokerReplicaExclusionRecord.BrokerReplicaExclusion().setBrokerId(1).setReason("test"))));
        MultiTenantPrincipal multiTenantPrincipal = new MultiTenantPrincipal("lkc-abcd", new TenantMetadata("lkc-abcd", "lkc-abcd"));
        ControllerResult alterResult = replication.alterPartitionReassignments(new AlterPartitionReassignmentsRequestData().setTopics(Arrays.asList(new AlterPartitionReassignmentsRequestData.ReassignableTopic().setName("foo").setPartitions(Arrays.asList(new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(0).setReplicas(Arrays.asList(3, 2, 1)), new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(1).setReplicas(Arrays.asList(0, 2, 1)), new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(2).setReplicas(Arrays.asList(0, 2, 1)))), new AlterPartitionReassignmentsRequestData.ReassignableTopic().setName("bar"))), (KafkaPrincipal)multiTenantPrincipal);
        Assertions.assertEquals((Object)new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(Arrays.asList(new AlterPartitionReassignmentsResponseData.ReassignableTopicResponse().setName("foo").setPartitions(Arrays.asList(new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(0).setErrorMessage(null), new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(1).setErrorMessage(null), new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(2).setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()).setErrorMessage("Unable to find partition foo:2."))), new AlterPartitionReassignmentsResponseData.ReassignableTopicResponse().setName("bar"))), (Object)alterResult.response());
        ctx.replay(alterResult.records());
        ListPartitionReassignmentsResponseData currentReassigning = new ListPartitionReassignmentsResponseData().setErrorMessage(null).setTopics(Arrays.asList(new ListPartitionReassignmentsResponseData.OngoingTopicReassignment().setName("foo").setPartitions(Arrays.asList(new ListPartitionReassignmentsResponseData.OngoingPartitionReassignment().setPartitionIndex(1).setRemovingReplicas(Arrays.asList(3)).setAddingReplicas(Arrays.asList(0)).setReplicas(Arrays.asList(0, 2, 1, 3))))));
        Assertions.assertEquals((Object)currentReassigning, (Object)replication.listPartitionReassignments(null, Long.MAX_VALUE));
        Assertions.assertEquals((Object)NONE_REASSIGNING, (Object)replication.listPartitionReassignments(Arrays.asList(new ListPartitionReassignmentsRequestData.ListPartitionReassignmentsTopics().setName("bar").setPartitionIndexes(Arrays.asList(0, 1, 2))), Long.MAX_VALUE));
        Assertions.assertEquals((Object)currentReassigning, (Object)replication.listPartitionReassignments(Arrays.asList(new ListPartitionReassignmentsRequestData.ListPartitionReassignmentsTopics().setName("foo").setPartitionIndexes(Arrays.asList(0, 1, 2))), Long.MAX_VALUE));
        ControllerResult cancelResult = replication.alterPartitionReassignments(new AlterPartitionReassignmentsRequestData().setTopics(Arrays.asList(new AlterPartitionReassignmentsRequestData.ReassignableTopic().setName("foo").setPartitions(Arrays.asList(new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(0).setReplicas(null), new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(1).setReplicas(null), new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(2).setReplicas(null))), new AlterPartitionReassignmentsRequestData.ReassignableTopic().setName("bar").setPartitions(Arrays.asList(new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(0).setReplicas(null))))), (KafkaPrincipal)multiTenantPrincipal);
        Assertions.assertEquals((Object)ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion((ApiMessage)new PartitionChangeRecord().setTopicId(fooId).setPartitionId(1).setReplicas(Arrays.asList(2, 1, 3)).setDirectories(Arrays.asList(Uuid.fromString((String)"TESTBROKER00002DIRAAAA"), Uuid.fromString((String)"TESTBROKER00001DIRAAAA"), Uuid.fromString((String)"TESTBROKER00003DIRAAAA"))).setLeader(3).setRemovingReplicas(Collections.emptyList()).setAddingReplicas(Collections.emptyList()), MetadataVersion.latestTesting().partitionChangeRecordVersion())), (Object)new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(Arrays.asList(new AlterPartitionReassignmentsResponseData.ReassignableTopicResponse().setName("foo").setPartitions(Arrays.asList(new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(0).setErrorCode(Errors.NO_REASSIGNMENT_IN_PROGRESS.code()).setErrorMessage(null), new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(1).setErrorCode(Errors.NONE.code()).setErrorMessage(null), new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(2).setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()).setErrorMessage("Unable to find partition foo:2."))), new AlterPartitionReassignmentsResponseData.ReassignableTopicResponse().setName("bar").setPartitions(Arrays.asList(new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(0).setErrorCode(Errors.NO_REASSIGNMENT_IN_PROGRESS.code()).setErrorMessage(null)))))), (Object)cancelResult);
        log.info("running final alterPartition...");
        ControllerRequestContext requestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.ALTER_PARTITION, version);
        AlterPartitionRequestData alterPartitionRequestData = new AlterPartitionRequestData().setBrokerId(3).setBrokerEpoch(103L).setTopics(Arrays.asList(new AlterPartitionRequestData.TopicData().setTopicName(version <= 1 ? "foo" : "").setTopicId(version > 1 ? fooId : Uuid.ZERO_UUID).setPartitions(Arrays.asList(new AlterPartitionRequestData.PartitionData().setPartitionIndex(1).setPartitionEpoch(1).setLeaderEpoch(0).setNewIsrWithEpochs(ReplicationControlManagerTest.isrWithDefaultEpoch(3, 0, 2, 1))))));
        ControllerResult alterPartitionResult = replication.alterPartition(requestContext, new AlterPartitionRequest.Builder(alterPartitionRequestData, version > 1).build(version).data());
        Errors expectedError = version > 1 ? Errors.NEW_LEADER_ELECTED : Errors.FENCED_LEADER_EPOCH;
        Assertions.assertEquals((Object)new AlterPartitionResponseData().setTopics(Arrays.asList(new AlterPartitionResponseData.TopicData().setTopicName(version <= 1 ? "foo" : "").setTopicId(version > 1 ? fooId : Uuid.ZERO_UUID).setPartitions(Arrays.asList(new AlterPartitionResponseData.PartitionData().setPartitionIndex(1).setErrorCode(expectedError.code()))))), (Object)alterPartitionResult.response());
        ctx.replay(alterPartitionResult.records());
        Assertions.assertEquals((Object)NONE_REASSIGNING, (Object)replication.listPartitionReassignments(null, Long.MAX_VALUE));
    }

    private Map<Integer, String> brokersWithRacks() {
        HashMap<Integer, String> brokers = new HashMap<Integer, String>();
        brokers.put(0, "rack0");
        brokers.put(1, "rack0");
        brokers.put(2, "rack1");
        brokers.put(3, "rack1");
        return brokers;
    }

    @ParameterizedTest
    @ApiKeyVersionsSource(apiKey=ApiKeys.ALTER_PARTITION)
    public void testReassignPartitionsWithObservers(short version) throws Exception {
        ReplicaPlacer replicaPlacer = (ReplicaPlacer)Mockito.mock(ReplicaPlacer.class);
        PartitionAssignment partitionAssignment = new PartitionAssignment(Arrays.asList(0, 2), Arrays.asList(2), __ -> DirectoryId.MIGRATING);
        List<PartitionAssignment> partitionAssignments = Arrays.asList(partitionAssignment);
        TopicAssignment topicAssignment = new TopicAssignment(partitionAssignments);
        Mockito.when((Object)replicaPlacer.place((PlacementSpec)ArgumentMatchers.any(), (ClusterDescriber)ArgumentMatchers.any())).thenReturn((Object)topicAssignment);
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setReplicaPlacer(replicaPlacer).build();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        CreateTopicsRequestData request = new CreateTopicsRequestData();
        Map<Integer, String> brokers = this.brokersWithRacks();
        ctx.registerBrokers(brokers);
        ctx.unfenceBrokers(brokers.keySet());
        String topicPlacementJson = "{\"version\":1,\"replicas\":[{\"count\":1,\"constraints\":{\"rack\":\"rack0\"}}],\"observers\":[{\"count\":1,\"constraints\":{\"rack\":\"rack1\"}}]}";
        CreateTopicsRequestData.CreateableTopicConfigCollection configs = new CreateTopicsRequestData.CreateableTopicConfigCollection();
        configs.add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreateableTopicConfig().setName("confluent.placement.constraints").setValue(topicPlacementJson));
        String topicName = "placement-topic";
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName(topicName).setNumPartitions(1).setConfigs(configs).setReplicationFactor((short)-1));
        ControllerResult createTopicResult = replicationControl.createTopics(ControllerRequestContextUtil.anonymousContextFor(ApiKeys.CREATE_TOPICS), request, Collections.singleton(topicName));
        Uuid topicId = ((CreateTopicsResponseData)createTopicResult.response()).topics().find(topicName).topicId();
        log.info("Created topic with ID {}.", (Object)topicId, (Object)topicId);
        CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData();
        expectedResponse.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsResponseData.CreatableTopicResult().setName(topicName).setNumPartitions(1).setErrorMessage(null).setErrorCode((short)0).setTopicId(topicId).setReplicationFactor((short)partitionAssignment.replicas().size()));
        Assertions.assertEquals((Object)expectedResponse, (Object)createTopicResult.response());
        ctx.replay(createTopicResult.records());
        ReplicationControlManager replication = ctx.replicationControl;
        MultiTenantPrincipal multiTenantPrincipal = new MultiTenantPrincipal("lkc-abcd", new TenantMetadata("lkc-abcd", "lkc-abcd"));
        ControllerResult alterResult = replication.alterPartitionReassignments(new AlterPartitionReassignmentsRequestData().setTopics(Arrays.asList(new AlterPartitionReassignmentsRequestData.ReassignableTopic().setName(topicName).setPartitions(Arrays.asList(new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(0).setReplicas(Arrays.asList(1, 3)).setObservers(Arrays.asList(3)))))), (KafkaPrincipal)multiTenantPrincipal);
        Assertions.assertEquals((Object)new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(Arrays.asList(new AlterPartitionReassignmentsResponseData.ReassignableTopicResponse().setName(topicName).setPartitions(Arrays.asList(new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(0).setErrorMessage(null))))), (Object)alterResult.response());
        ctx.replay(alterResult.records());
        ListPartitionReassignmentsResponseData currentReassigning = new ListPartitionReassignmentsResponseData().setErrorMessage(null).setTopics(Arrays.asList(new ListPartitionReassignmentsResponseData.OngoingTopicReassignment().setName(topicName).setPartitions(Arrays.asList(new ListPartitionReassignmentsResponseData.OngoingPartitionReassignment().setPartitionIndex(0).setRemovingReplicas(Arrays.asList(0, 2)).setAddingReplicas(Arrays.asList(1, 3)).setReplicas(Arrays.asList(1, 0, 3, 2)).setObservers(Arrays.asList(3, 2))))));
        Assertions.assertEquals((Object)currentReassigning, (Object)replication.listPartitionReassignments(null, Long.MAX_VALUE));
        ControllerRequestContext requestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.ALTER_PARTITION, version);
        AlterPartitionRequestData alterIsrRequest = new AlterPartitionRequestData().setBrokerId(0).setBrokerEpoch(100L).setTopics(Arrays.asList(new AlterPartitionRequestData.TopicData().setTopicName(version <= 1 ? topicName : "").setTopicId(version > 1 ? topicId : Uuid.ZERO_UUID).setPartitions(Arrays.asList(new AlterPartitionRequestData.PartitionData().setPartitionIndex(0).setPartitionEpoch(1).setLeaderEpoch(0).setNewIsrWithEpochs(ReplicationControlManagerTest.isrWithDefaultEpoch(0, 1))))));
        ControllerResult alterPartitionResult = replication.alterPartition(requestContext, new AlterPartitionRequest.Builder(alterIsrRequest, version > 1).build(version).data());
        Errors expectedError = version > 1 ? Errors.NEW_LEADER_ELECTED : Errors.FENCED_LEADER_EPOCH;
        Assertions.assertEquals((Object)new AlterPartitionResponseData().setTopics(Arrays.asList(new AlterPartitionResponseData.TopicData().setTopicName(version <= 1 ? topicName : "").setTopicId(version > 1 ? topicId : Uuid.ZERO_UUID).setPartitions(Arrays.asList(new AlterPartitionResponseData.PartitionData().setPartitionIndex(0).setErrorCode(expectedError.code()))))), (Object)alterPartitionResult.response());
        ctx.replay(alterPartitionResult.records());
        Assertions.assertEquals((Object)NONE_REASSIGNING, (Object)replication.listPartitionReassignments(null, Long.MAX_VALUE));
        alterResult = replication.alterPartitionReassignments(new AlterPartitionReassignmentsRequestData().setTopics(Arrays.asList(new AlterPartitionReassignmentsRequestData.ReassignableTopic().setName(topicName).setPartitions(Arrays.asList(new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(0).setReplicas(Arrays.asList(0, 2)).setObservers(Arrays.asList(2)))))), (KafkaPrincipal)multiTenantPrincipal);
        Assertions.assertEquals((Object)new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(Arrays.asList(new AlterPartitionReassignmentsResponseData.ReassignableTopicResponse().setName(topicName).setPartitions(Arrays.asList(new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(0).setErrorMessage(null))))), (Object)alterResult.response());
        ctx.replay(alterResult.records());
        currentReassigning = new ListPartitionReassignmentsResponseData().setErrorMessage(null).setTopics(Arrays.asList(new ListPartitionReassignmentsResponseData.OngoingTopicReassignment().setName(topicName).setPartitions(Arrays.asList(new ListPartitionReassignmentsResponseData.OngoingPartitionReassignment().setPartitionIndex(0).setRemovingReplicas(Arrays.asList(1, 3)).setAddingReplicas(Arrays.asList(0, 2)).setReplicas(Arrays.asList(0, 1, 2, 3)).setObservers(Arrays.asList(2, 3))))));
        Assertions.assertEquals((Object)currentReassigning, (Object)replication.listPartitionReassignments(null, Long.MAX_VALUE));
        ControllerResult cancelResult = replication.alterPartitionReassignments(new AlterPartitionReassignmentsRequestData().setTopics(Arrays.asList(new AlterPartitionReassignmentsRequestData.ReassignableTopic().setName(topicName).setPartitions(Arrays.asList(new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(0).setReplicas(null))))), (KafkaPrincipal)multiTenantPrincipal);
        Assertions.assertEquals((Object)ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion((ApiMessage)new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setReplicas(Arrays.asList(1, 3)).setDirectories(Arrays.asList(DirectoryId.migratingArray((int)2))).setObservers(Arrays.asList(3)).setLeader(1).setRemovingReplicas(Collections.emptyList()).setAddingReplicas(Collections.emptyList()).setRemovingObservers(Collections.emptyList()).setAddingObservers(Collections.emptyList()), 2)), (Object)new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(Arrays.asList(new AlterPartitionReassignmentsResponseData.ReassignableTopicResponse().setName(topicName).setPartitions(Arrays.asList(new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(0).setErrorCode(Errors.NONE.code()).setErrorMessage(null)))))), (Object)cancelResult);
        ctx.replay(cancelResult.records());
        Assertions.assertEquals((Object)NONE_REASSIGNING, (Object)replication.listPartitionReassignments(null, Long.MAX_VALUE));
    }

    @ParameterizedTest
    @ApiKeyVersionsSource(apiKey=ApiKeys.ALTER_PARTITION)
    public void testAlterPartitionsDemotesObserver(short version) throws Exception {
        ReplicaPlacer replicaPlacer = (ReplicaPlacer)Mockito.mock(ReplicaPlacer.class);
        PartitionAssignment partitionAssignment = new PartitionAssignment(Arrays.asList(0, 1, 2, 3), Arrays.asList(2, 3), __ -> DirectoryId.MIGRATING);
        List<PartitionAssignment> partitionAssignments = Arrays.asList(partitionAssignment);
        TopicAssignment topicAssignment = new TopicAssignment(partitionAssignments);
        Mockito.when((Object)replicaPlacer.place((PlacementSpec)ArgumentMatchers.any(), (ClusterDescriber)ArgumentMatchers.any())).thenReturn((Object)topicAssignment);
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setReplicaPlacer(replicaPlacer).build();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        CreateTopicsRequestData request = new CreateTopicsRequestData();
        Map<Integer, String> brokers = this.brokersWithRacks();
        ctx.registerBrokers(brokers);
        ctx.unfenceBrokers(brokers.keySet());
        String topicPlacementJson = "{\"version\":1,\"replicas\":[{\"count\":2,\"constraints\":{\"rack\":\"rack0\"}}],\"observers\":[{\"count\":2,\"constraints\":{\"rack\":\"rack1\"}}]}";
        CreateTopicsRequestData.CreateableTopicConfigCollection configs = new CreateTopicsRequestData.CreateableTopicConfigCollection();
        configs.add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreateableTopicConfig().setName("confluent.placement.constraints").setValue(topicPlacementJson));
        String topicName = "placement-topic";
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName(topicName).setNumPartitions(1).setConfigs(configs).setReplicationFactor((short)-1));
        ControllerResult createTopicResult = replicationControl.createTopics(ControllerRequestContextUtil.anonymousContextFor(ApiKeys.CREATE_TOPICS), request, Collections.singleton(topicName));
        Uuid topicId = ((CreateTopicsResponseData)createTopicResult.response()).topics().find(topicName).topicId();
        log.info("Created topic with ID {}.", (Object)topicId);
        CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData();
        expectedResponse.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsResponseData.CreatableTopicResult().setName(topicName).setNumPartitions(1).setErrorMessage(null).setErrorCode((short)0).setTopicId(topicId).setReplicationFactor((short)partitionAssignment.replicas().size()));
        Assertions.assertEquals((Object)expectedResponse, (Object)createTopicResult.response());
        ctx.replay(createTopicResult.records());
        AlterPartitionRequestData alterIsrRequestFromSyncLeader = new AlterPartitionRequestData().setBrokerId(0).setBrokerEpoch(100L).setTopics(Arrays.asList(new AlterPartitionRequestData.TopicData().setTopicName(version <= 1 ? topicName : "").setTopicId(version > 1 ? topicId : Uuid.ZERO_UUID).setPartitions(Arrays.asList(new AlterPartitionRequestData.PartitionData().setPartitionIndex(0).setPartitionEpoch(0).setLeaderEpoch(0).setNewIsrWithEpochs(ReplicationControlManagerTest.isrWithDefaultEpoch(0, 1, 2, 3))))));
        ReplicationControlManager replication = ctx.replicationControl;
        ControllerResult alterPartitionResultFromSyncLeader = replication.alterPartition(ControllerRequestContextUtil.anonymousContextFor(ApiKeys.ALTER_PARTITION, version), new AlterPartitionRequest.Builder(alterIsrRequestFromSyncLeader, version > 1).build(version).data());
        Assertions.assertEquals((Object)new AlterPartitionResponseData().setTopics(Arrays.asList(new AlterPartitionResponseData.TopicData().setTopicName(version <= 1 ? topicName : "").setTopicId(version > 1 ? topicId : Uuid.ZERO_UUID).setPartitions(Arrays.asList(new AlterPartitionResponseData.PartitionData().setPartitionIndex(0).setPartitionEpoch(1).setIsr(Arrays.asList(0, 1, 2, 3)).setErrorCode(Errors.NONE.code()))))), (Object)alterPartitionResultFromSyncLeader.response());
        ctx.replay(alterPartitionResultFromSyncLeader.records());
        PartitionRegistration partitionRegistrationBeforeFencing = replication.getPartition(topicId, 0);
        Assertions.assertEquals((int)0, (int)partitionRegistrationBeforeFencing.leader);
        ctx.fenceBrokers(Utils.mkSet((Object[])new Integer[]{0, 1}));
        PartitionRegistration partitionRegistrationAfterFencing = replication.getPartition(topicId, 0);
        int leader = partitionRegistrationAfterFencing.leader;
        Assertions.assertTrue((leader == 2 || leader == 3 ? 1 : 0) != 0);
        ctx.unfenceBrokers(Utils.mkSet((Object[])new Integer[]{0, 1}));
        PartitionRegistration partitionRegistrationAfterUnfencing = replication.getPartition(topicId, 0);
        HashSet<Integer> expectedIsrAfterUnfencing = new HashSet<Integer>();
        expectedIsrAfterUnfencing.add(2);
        expectedIsrAfterUnfencing.add(3);
        Assertions.assertEquals(expectedIsrAfterUnfencing, (Object)Replicas.toSet((int[])partitionRegistrationAfterUnfencing.isr));
        BrokerRegistration observerRegistration = ctx.clusterControl.registration(leader);
        List brokerStateList = Arrays.asList(0, 1, 2, 3).stream().map(brokerId -> ReplicationControlManagerTest.brokerState(brokerId, ctx.clusterControl.registration(brokerId.intValue()).epoch())).collect(Collectors.toList());
        AlterPartitionRequestData alterIsrRequestFromObserver = new AlterPartitionRequestData().setBrokerId(leader).setBrokerEpoch(observerRegistration.epoch()).setTopics(Arrays.asList(new AlterPartitionRequestData.TopicData().setTopicName(version <= 1 ? topicName : "").setTopicId(version > 1 ? topicId : Uuid.ZERO_UUID).setPartitions(Arrays.asList(new AlterPartitionRequestData.PartitionData().setPartitionIndex(0).setPartitionEpoch(partitionRegistrationAfterUnfencing.partitionEpoch).setLeaderEpoch(partitionRegistrationAfterUnfencing.leaderEpoch).setNewIsrWithEpochs(brokerStateList)))));
        ControllerResult alterPartitionResultFromObserverLeader = replication.alterPartition(ControllerRequestContextUtil.anonymousContextFor(ApiKeys.ALTER_PARTITION, version), new AlterPartitionRequest.Builder(alterIsrRequestFromObserver, version > 1).build(version).data());
        Errors expectedError = version > 1 ? Errors.NEW_LEADER_ELECTED : Errors.FENCED_LEADER_EPOCH;
        Assertions.assertEquals((Object)new AlterPartitionResponseData().setTopics(Arrays.asList(new AlterPartitionResponseData.TopicData().setTopicName(version <= 1 ? topicName : "").setTopicId(version > 1 ? topicId : Uuid.ZERO_UUID).setPartitions(Arrays.asList(new AlterPartitionResponseData.PartitionData().setErrorCode(expectedError.code()).setPartitionIndex(0).setLeaderId(0).setLeaderEpoch(0).setPartitionEpoch(0))))), (Object)alterPartitionResultFromObserverLeader.response());
        ctx.replay(alterPartitionResultFromObserverLeader.records());
        PartitionRegistration partitionRegistrationAfterObserverExpandedIsr = replication.getPartition(topicId, 0);
        Assertions.assertEquals((int)0, (int)partitionRegistrationAfterObserverExpandedIsr.leader);
        Set finalIsr = Replicas.toSet((int[])partitionRegistrationAfterObserverExpandedIsr.isr);
        Assertions.assertEquals(new HashSet<Integer>(Arrays.asList(0, 1, 2, 3)), (Object)finalIsr);
    }

    @ParameterizedTest
    @ApiKeyVersionsSource(apiKey=ApiKeys.ALTER_PARTITION)
    public void testReassignPartitionsWithObserversDoesNotSetSyncReplicaFields(short version) throws Exception {
        ReplicaPlacer replicaPlacer = (ReplicaPlacer)Mockito.mock(ReplicaPlacer.class);
        PartitionAssignment originalAssignment = new PartitionAssignment(Arrays.asList(0, 2), Arrays.asList(2), __ -> DirectoryId.MIGRATING);
        List<PartitionAssignment> partitionAssignments = Arrays.asList(originalAssignment);
        TopicAssignment topicAssignment = new TopicAssignment(partitionAssignments);
        Mockito.when((Object)replicaPlacer.place((PlacementSpec)ArgumentMatchers.any(), (ClusterDescriber)ArgumentMatchers.any())).thenReturn((Object)topicAssignment);
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setReplicaPlacer(replicaPlacer).build();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        CreateTopicsRequestData request = new CreateTopicsRequestData();
        Map<Integer, String> brokers = this.brokersWithRacks();
        ctx.registerBrokers(brokers);
        ctx.unfenceBrokers(brokers.keySet());
        String topicPlacementJson = "{\"version\":1,\"replicas\":[{\"count\":1,\"constraints\":{\"rack\":\"rack0\"}}],\"observers\":[{\"count\":1,\"constraints\":{\"rack\":\"rack1\"}}]}";
        CreateTopicsRequestData.CreateableTopicConfigCollection configs = new CreateTopicsRequestData.CreateableTopicConfigCollection();
        configs.add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreateableTopicConfig().setName("confluent.placement.constraints").setValue(topicPlacementJson));
        String topicName = "placement-topic";
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName(topicName).setNumPartitions(1).setConfigs(configs).setReplicationFactor((short)-1));
        ControllerResult createTopicResult = replicationControl.createTopics(ControllerRequestContextUtil.anonymousContextFor(ApiKeys.CREATE_TOPICS), request, Collections.singleton(topicName));
        Uuid topicId = ((CreateTopicsResponseData)createTopicResult.response()).topics().find(topicName).topicId();
        log.info("Created topic with ID {}.", (Object)topicId, (Object)topicId);
        CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData();
        expectedResponse.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsResponseData.CreatableTopicResult().setName(topicName).setNumPartitions(1).setErrorMessage(null).setErrorCode((short)0).setTopicId(topicId).setReplicationFactor((short)originalAssignment.replicas().size()));
        Assertions.assertEquals((Object)expectedResponse, (Object)createTopicResult.response());
        ctx.replay(createTopicResult.records());
        ReplicationControlManager replication = ctx.replicationControl;
        MultiTenantPrincipal multiTenantPrincipal = new MultiTenantPrincipal("lkc-abcd", new TenantMetadata("lkc-abcd", "lkc-abcd"));
        PartitionAssignment reassignmentAssignment = new PartitionAssignment(Arrays.asList(1, 3), Arrays.asList(3), __ -> DirectoryId.MIGRATING);
        HashSet addingSyncReplicas = new HashSet(reassignmentAssignment.syncReplicas());
        addingSyncReplicas.removeAll(originalAssignment.syncReplicas());
        Assertions.assertEquals(Collections.singleton(1), addingSyncReplicas);
        HashSet removingSyncReplicas = new HashSet(originalAssignment.syncReplicas());
        addingSyncReplicas.removeAll(reassignmentAssignment.syncReplicas());
        Assertions.assertEquals(Collections.singleton(0), removingSyncReplicas);
        ControllerResult alterResult = replication.alterPartitionReassignments(new AlterPartitionReassignmentsRequestData().setTopics(Arrays.asList(new AlterPartitionReassignmentsRequestData.ReassignableTopic().setName(topicName).setPartitions(Arrays.asList(new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(0).setReplicas(reassignmentAssignment.replicas()).setObservers(reassignmentAssignment.observers()))))), (KafkaPrincipal)multiTenantPrincipal);
        Assertions.assertEquals((Object)new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(Arrays.asList(new AlterPartitionReassignmentsResponseData.ReassignableTopicResponse().setName(topicName).setPartitions(Arrays.asList(new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(0).setErrorMessage(null))))), (Object)alterResult.response());
        List alterPartitionReassignmentRecords = alterResult.records();
        Assertions.assertEquals((int)1, (int)alterPartitionReassignmentRecords.size());
        ApiMessageAndVersion alterResultRecord = (ApiMessageAndVersion)alterPartitionReassignmentRecords.get(0);
        Assertions.assertTrue((boolean)(alterResultRecord.message() instanceof PartitionChangeRecord));
        PartitionChangeRecord actualPartitionChangeRecord = (PartitionChangeRecord)alterResultRecord.message();
        PartitionChangeRecord expectedPartitionChangeRecord = new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setIsr(null).setLeader(-2).setReplicas(Arrays.asList(1, 0, 3, 2)).setDirectories(Arrays.asList(DirectoryId.migratingArray((int)4))).setObservers(Arrays.asList(3, 2)).setAddingReplicas(Arrays.asList(1, 3)).setRemovingReplicas(Arrays.asList(0, 2)).setDeprecatedDoNotUseTag1004(null).setDeprecatedDoNotUseTag1003(null).setAddingObservers(Arrays.asList(3)).setRemovingObservers(Arrays.asList(2)).setLeaderRecoveryState((byte)-1).setLinkedLeaderEpoch(-1).setLinkState(PartitionRegistration.LinkState.NOT_MIRROR.levelCode);
        Assertions.assertEquals((Object)expectedPartitionChangeRecord, (Object)actualPartitionChangeRecord);
    }

    @ParameterizedTest
    @ApiKeyVersionsSource(apiKey=ApiKeys.ALTER_PARTITION)
    public void testAlterPartitionShouldRejectFencedBrokers(short version) {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replication = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2, 3, 4);
        ctx.unfenceBrokers(0, 1, 2, 3, 4);
        Uuid fooId = ctx.createTestTopic("foo", new int[][]{{1, 2, 3, 4}}).topicId();
        ArrayList<Object> fenceRecords = new ArrayList<ApiMessageAndVersion>();
        replication.handleBrokerFenced(3, fenceRecords);
        ctx.replay(fenceRecords);
        Assertions.assertEquals((Object)new PartitionRegistration.Builder().setReplicas(new int[]{1, 2, 3, 4}).setDirectories(new Uuid[]{Uuid.fromString((String)"TESTBROKER00001DIRAAAA"), Uuid.fromString((String)"TESTBROKER00002DIRAAAA"), Uuid.fromString((String)"TESTBROKER00003DIRAAAA"), Uuid.fromString((String)"TESTBROKER00004DIRAAAA")}).setIsr(new int[]{1, 2, 4}).setLeader(Integer.valueOf(1)).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(Integer.valueOf(0)).setPartitionEpoch(Integer.valueOf(1)).build(), (Object)replication.getPartition(fooId, 0));
        AlterPartitionRequestData alterIsrRequest = new AlterPartitionRequestData().setBrokerId(1).setBrokerEpoch(101L).setTopics(Arrays.asList(new AlterPartitionRequestData.TopicData().setTopicName(version <= 1 ? "foo" : "").setTopicId(version > 1 ? fooId : Uuid.ZERO_UUID).setPartitions(Arrays.asList(new AlterPartitionRequestData.PartitionData().setPartitionIndex(0).setPartitionEpoch(1).setLeaderEpoch(0).setNewIsrWithEpochs(ReplicationControlManagerTest.isrWithDefaultEpoch(1, 2, 3, 4))))));
        ControllerRequestContext requestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.ALTER_PARTITION, version);
        ControllerResult alterPartitionResult = replication.alterPartition(requestContext, new AlterPartitionRequest.Builder(alterIsrRequest, version > 1).build(version).data());
        Errors expectedError = version <= 1 ? Errors.OPERATION_NOT_ATTEMPTED : Errors.INELIGIBLE_REPLICA;
        Assertions.assertEquals((Object)new AlterPartitionResponseData().setTopics(Arrays.asList(new AlterPartitionResponseData.TopicData().setTopicName(version <= 1 ? "foo" : "").setTopicId(version > 1 ? fooId : Uuid.ZERO_UUID).setPartitions(Arrays.asList(new AlterPartitionResponseData.PartitionData().setPartitionIndex(0).setErrorCode(expectedError.code()))))), (Object)alterPartitionResult.response());
        fenceRecords = new ArrayList();
        replication.handleBrokerUnfenced(3, 103L, fenceRecords);
        ctx.replay(fenceRecords);
        alterPartitionResult = replication.alterPartition(requestContext, alterIsrRequest);
        Assertions.assertEquals((Object)new AlterPartitionResponseData().setTopics(Arrays.asList(new AlterPartitionResponseData.TopicData().setTopicName(version <= 1 ? "foo" : "").setTopicId(version > 1 ? fooId : Uuid.ZERO_UUID).setPartitions(Arrays.asList(new AlterPartitionResponseData.PartitionData().setPartitionIndex(0).setLeaderId(1).setLeaderEpoch(0).setIsr(Arrays.asList(1, 2, 3, 4)).setPartitionEpoch(2).setErrorCode(Errors.NONE.code()))))), (Object)alterPartitionResult.response());
    }

    @ParameterizedTest
    @ApiKeyVersionsSource(apiKey=ApiKeys.ALTER_PARTITION)
    public void testAlterPartitionShouldRejectBrokersWithStaleEpoch(short version) throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replication = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2, 3, 4);
        ctx.unfenceBrokers(0, 1, 2, 3, 4);
        Uuid fooId = ctx.createTestTopic("foo", new int[][]{{1, 2, 3, 4}}).topicId();
        ctx.alterPartition(new TopicIdPartition(fooId, 0), 1, ReplicationControlManagerTest.isrWithDefaultEpoch(1, 2, 3), LeaderRecoveryState.RECOVERED);
        AlterPartitionRequestData alterIsrRequest = new AlterPartitionRequestData().setBrokerId(1).setBrokerEpoch(101L).setTopics(Arrays.asList(new AlterPartitionRequestData.TopicData().setTopicName(version <= 1 ? "foo" : "").setTopicId(version > 1 ? fooId : Uuid.ZERO_UUID).setPartitions(Arrays.asList(new AlterPartitionRequestData.PartitionData().setPartitionIndex(0).setPartitionEpoch(1).setLeaderEpoch(0).setNewIsrWithEpochs(ReplicationControlManagerTest.isrWithDefaultEpoch(1, 2, 3, 4))))));
        long newEpoch = ReplicationControlManagerTest.defaultBrokerEpoch(4) + 1000L;
        RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().setBrokerEpoch(newEpoch).setBrokerId(4).setRack(null);
        brokerRecord.endPoints().add((ImplicitLinkedHashCollection.Element)new RegisterBrokerRecord.BrokerEndpoint().setSecurityProtocol(SecurityProtocol.PLAINTEXT.id).setPort(9096).setName("PLAINTEXT").setHost("localhost"));
        ctx.replay(Collections.singletonList(new ApiMessageAndVersion((ApiMessage)brokerRecord, 0)));
        ControllerResult result = ctx.replicationControl.processBrokerHeartbeat(new BrokerHeartbeatRequestData().setBrokerId(4).setBrokerEpoch(newEpoch).setCurrentMetadataOffset(1L).setWantFence(false).setWantShutDown(false), 0L);
        Assertions.assertEquals((Object)new BrokerHeartbeatReply(true, false, false, false), (Object)result.response());
        ctx.replay(result.records());
        ControllerRequestContext requestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.ALTER_PARTITION, version);
        ControllerResult alterPartitionResult = replication.alterPartition(requestContext, new AlterPartitionRequest.Builder(alterIsrRequest, version > 1).build(version).data());
        if (version >= 3) {
            Assertions.assertEquals((Object)new AlterPartitionResponseData().setTopics(Arrays.asList(new AlterPartitionResponseData.TopicData().setTopicName("").setTopicId(fooId).setPartitions(Arrays.asList(new AlterPartitionResponseData.PartitionData().setPartitionIndex(0).setErrorCode(Errors.INELIGIBLE_REPLICA.code()))))), (Object)alterPartitionResult.response());
        } else {
            Assertions.assertEquals((short)Errors.NONE.code(), (short)((AlterPartitionResponseData)alterPartitionResult.response()).errorCode());
        }
    }

    @ParameterizedTest
    @ApiKeyVersionsSource(apiKey=ApiKeys.ALTER_PARTITION)
    public void testAlterPartitionShouldRejectShuttingDownBrokers(short version) {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replication = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2, 3, 4);
        ctx.unfenceBrokers(0, 1, 2, 3, 4);
        Uuid fooId = ctx.createTestTopic("foo", new int[][]{{1, 2, 3, 4}}).topicId();
        Assertions.assertEquals((Object)new PartitionRegistration.Builder().setReplicas(new int[]{1, 2, 3, 4}).setDirectories(new Uuid[]{Uuid.fromString((String)"TESTBROKER00001DIRAAAA"), Uuid.fromString((String)"TESTBROKER00002DIRAAAA"), Uuid.fromString((String)"TESTBROKER00003DIRAAAA"), Uuid.fromString((String)"TESTBROKER00004DIRAAAA")}).setIsr(new int[]{1, 2, 3, 4}).setLeader(Integer.valueOf(1)).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(Integer.valueOf(0)).setPartitionEpoch(Integer.valueOf(0)).build(), (Object)replication.getPartition(fooId, 0));
        ctx.inControlledShutdownBrokers(3);
        AlterPartitionRequestData alterIsrRequest = new AlterPartitionRequestData().setBrokerId(1).setBrokerEpoch(101L).setTopics(Arrays.asList(new AlterPartitionRequestData.TopicData().setTopicName(version <= 1 ? "foo" : "").setTopicId(version > 1 ? fooId : Uuid.ZERO_UUID).setPartitions(Arrays.asList(new AlterPartitionRequestData.PartitionData().setPartitionIndex(0).setPartitionEpoch(0).setLeaderEpoch(0).setNewIsrWithEpochs(ReplicationControlManagerTest.isrWithDefaultEpoch(1, 2, 3, 4))))));
        ControllerRequestContext requestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.ALTER_PARTITION, version);
        ControllerResult alterPartitionResult = replication.alterPartition(requestContext, new AlterPartitionRequest.Builder(alterIsrRequest, version > 1).build(version).data());
        Errors expectedError = version <= 1 ? Errors.OPERATION_NOT_ATTEMPTED : Errors.INELIGIBLE_REPLICA;
        Assertions.assertEquals((Object)new AlterPartitionResponseData().setTopics(Arrays.asList(new AlterPartitionResponseData.TopicData().setTopicName(version <= 1 ? "foo" : "").setTopicId(version > 1 ? fooId : Uuid.ZERO_UUID).setPartitions(Arrays.asList(new AlterPartitionResponseData.PartitionData().setPartitionIndex(0).setErrorCode(expectedError.code()))))), (Object)alterPartitionResult.response());
    }

    @Test
    public void testCreateTopicWithBrokerDegradationsExplicitAssignment() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replication = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2, 3, 4);
        ctx.unfenceBrokers(0, 1, 2, 3, 4);
        ctx.addDegradationsToBrokers(new HashSet<Integer>(Arrays.asList(0, 2)));
        Assertions.assertEquals((Object)Utils.mkSet((Object[])new Integer[]{0, 2}), ctx.clusterControl.activeBrokerComponentDegradations().keySet());
        Uuid fooId = ctx.createTestTopic("foo", new int[][]{{0, 1, 2}, {2, 0, 4}, {3, 2, 1}}).topicId();
        Assertions.assertArrayEquals((int[])new int[]{0, 1, 2}, (int[])replication.getPartition((Uuid)fooId, (int)0).replicas);
        Assertions.assertEquals((int)1, (int)replication.getPartition((Uuid)fooId, (int)0).leader);
        Assertions.assertArrayEquals((int[])new int[]{2, 0, 4}, (int[])replication.getPartition((Uuid)fooId, (int)1).replicas);
        Assertions.assertEquals((int)4, (int)replication.getPartition((Uuid)fooId, (int)1).leader);
        Assertions.assertArrayEquals((int[])new int[]{3, 2, 1}, (int[])replication.getPartition((Uuid)fooId, (int)2).replicas);
        Assertions.assertEquals((int)3, (int)replication.getPartition((Uuid)fooId, (int)2).leader);
    }

    @Test
    public void testCreateTopicWithBrokerDegradationsDefaultAssignment() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replication = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2, 3, 4);
        ctx.unfenceBrokers(0, 1, 2, 3, 4);
        ctx.addDegradationsToBrokers(new HashSet<Integer>(Arrays.asList(0, 2)));
        Assertions.assertEquals((Object)Utils.mkSet((Object[])new Integer[]{0, 2}), ctx.clusterControl.activeBrokerComponentDegradations().keySet());
        Uuid fooId = ctx.createTestTopic("foo", 3, (short)3, Errors.NONE.code()).topicId();
        Assertions.assertArrayEquals((int[])new int[]{0, 1, 2}, (int[])replication.getPartition((Uuid)fooId, (int)0).replicas);
        Assertions.assertEquals((int)1, (int)replication.getPartition((Uuid)fooId, (int)0).leader);
        Assertions.assertArrayEquals((int[])new int[]{1, 2, 3}, (int[])replication.getPartition((Uuid)fooId, (int)1).replicas);
        Assertions.assertEquals((int)1, (int)replication.getPartition((Uuid)fooId, (int)1).leader);
        Assertions.assertArrayEquals((int[])new int[]{2, 3, 4}, (int[])replication.getPartition((Uuid)fooId, (int)2).replicas);
        Assertions.assertEquals((int)3, (int)replication.getPartition((Uuid)fooId, (int)2).leader);
    }

    @Test
    public void testCreatePartitionsWithBrokerDegradationsExplicitAssignment() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replication = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2, 3, 4);
        ctx.unfenceBrokers(0, 1, 2, 3, 4);
        ctx.addDegradationsToBrokers(new HashSet<Integer>(Arrays.asList(0, 2)));
        Assertions.assertEquals((Object)Utils.mkSet((Object[])new Integer[]{0, 2}), ctx.clusterControl.activeBrokerComponentDegradations().keySet());
        Uuid fooId = ctx.createTestTopic("foo", 1, (short)3, Errors.NONE.code()).topicId();
        Assertions.assertArrayEquals((int[])new int[]{0, 1, 2}, (int[])replication.getPartition((Uuid)fooId, (int)0).replicas);
        Assertions.assertEquals((int)1, (int)replication.getPartition((Uuid)fooId, (int)0).leader);
        ctx.createPartitions(3, "foo", new int[][]{{2, 0, 4}, {3, 2, 1}}, Errors.NONE.code());
        Assertions.assertArrayEquals((int[])new int[]{2, 0, 4}, (int[])replication.getPartition((Uuid)fooId, (int)1).replicas);
        Assertions.assertEquals((int)4, (int)replication.getPartition((Uuid)fooId, (int)1).leader);
        Assertions.assertArrayEquals((int[])new int[]{3, 2, 1}, (int[])replication.getPartition((Uuid)fooId, (int)2).replicas);
        Assertions.assertEquals((int)3, (int)replication.getPartition((Uuid)fooId, (int)2).leader);
    }

    @Test
    public void testCreatePartitionsWithBrokerDegradationsDefaultAssignment() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replication = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2, 3, 4);
        ctx.unfenceBrokers(0, 1, 2, 3, 4);
        ctx.addDegradationsToBrokers(new HashSet<Integer>(Arrays.asList(0, 2)));
        Assertions.assertEquals((Object)Utils.mkSet((Object[])new Integer[]{0, 2}), ctx.clusterControl.activeBrokerComponentDegradations().keySet());
        Uuid fooId = ctx.createTestTopic("foo", 1, (short)3, Errors.NONE.code()).topicId();
        Assertions.assertArrayEquals((int[])new int[]{0, 1, 2}, (int[])replication.getPartition((Uuid)fooId, (int)0).replicas);
        Assertions.assertEquals((int)1, (int)replication.getPartition((Uuid)fooId, (int)0).leader);
        ctx.createPartitions(3, "foo", null, Errors.NONE.code());
        Assertions.assertArrayEquals((int[])new int[]{2, 3, 4}, (int[])replication.getPartition((Uuid)fooId, (int)1).replicas);
        Assertions.assertEquals((int)3, (int)replication.getPartition((Uuid)fooId, (int)1).leader);
        Assertions.assertArrayEquals((int[])new int[]{3, 4, 0}, (int[])replication.getPartition((Uuid)fooId, (int)2).replicas);
        Assertions.assertEquals((int)3, (int)replication.getPartition((Uuid)fooId, (int)2).leader);
    }

    @ParameterizedTest
    @ApiKeyVersionsSource(apiKey=ApiKeys.ALTER_PARTITION)
    public void testAlterPartitionWithBrokerDegradations(short version) throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replication = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2, 3, 4);
        ctx.unfenceBrokers(0, 1, 2, 3, 4);
        Uuid fooId = ctx.createTestTopic("foo", new int[][]{{1, 2, 3, 4}}).topicId();
        Assertions.assertEquals((Object)new PartitionRegistration.Builder().setReplicas(new int[]{1, 2, 3, 4}).setDirectories(new Uuid[]{Uuid.fromString((String)"TESTBROKER00001DIRAAAA"), Uuid.fromString((String)"TESTBROKER00002DIRAAAA"), Uuid.fromString((String)"TESTBROKER00003DIRAAAA"), Uuid.fromString((String)"TESTBROKER00004DIRAAAA")}).setIsr(new int[]{1, 2, 3, 4}).setLeader(Integer.valueOf(1)).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(Integer.valueOf(0)).setPartitionEpoch(Integer.valueOf(0)).build(), (Object)replication.getPartition(fooId, 0));
        ctx.addDegradationsToBrokers(new HashSet<Integer>(Arrays.asList(1, 2)));
        AlterPartitionRequestData alterIsrRequest = new AlterPartitionRequestData().setBrokerId(1).setBrokerEpoch(101L).setTopics(Arrays.asList(new AlterPartitionRequestData.TopicData().setTopicName(version <= 1 ? "foo" : "").setTopicId(version > 1 ? fooId : Uuid.ZERO_UUID).setPartitions(Arrays.asList(new AlterPartitionRequestData.PartitionData().setPartitionIndex(0).setPartitionEpoch(0).setLeaderEpoch(0).setNewIsrWithEpochs(ReplicationControlManagerTest.isrWithDefaultEpoch(1, 2, 3, 4))))));
        ControllerRequestContext requestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.ALTER_PARTITION, version);
        ControllerResult alterPartitionResult = replication.alterPartition(requestContext, new AlterPartitionRequest.Builder(alterIsrRequest, version > 1).build(version).data());
        Errors expectedError = version <= 1 ? Errors.FENCED_LEADER_EPOCH : Errors.NEW_LEADER_ELECTED;
        Assertions.assertEquals((Object)new AlterPartitionResponseData().setTopics(Arrays.asList(new AlterPartitionResponseData.TopicData().setTopicName(version <= 1 ? "foo" : "").setTopicId(version > 1 ? fooId : Uuid.ZERO_UUID).setPartitions(Arrays.asList(new AlterPartitionResponseData.PartitionData().setPartitionIndex(0).setErrorCode(expectedError.code()))))), (Object)alterPartitionResult.response());
    }

    @Test
    public void testCancelReassignPartitions() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replication = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2, 3, 4);
        ctx.unfenceBrokers(0, 1, 2, 3, 4);
        MultiTenantPrincipal multiTenantPrincipal = new MultiTenantPrincipal("lkc-abcd", new TenantMetadata("lkc-abcd", "lkc-abcd"));
        Uuid fooId = ctx.createTestTopic("foo", new int[][]{{1, 2, 3, 4}, {0, 1, 2, 3}, {4, 3, 1, 0}, {2, 3, 4, 1}}).topicId();
        Uuid barId = ctx.createTestTopic("bar", new int[][]{{4, 3, 2}}).topicId();
        Assertions.assertEquals((Object)NONE_REASSIGNING, (Object)replication.listPartitionReassignments(null, Long.MAX_VALUE));
        ArrayList<ApiMessageAndVersion> fenceRecords = new ArrayList<ApiMessageAndVersion>();
        replication.handleBrokerFenced(3, fenceRecords);
        ctx.replay(fenceRecords);
        Assertions.assertEquals((Object)new PartitionRegistration.Builder().setReplicas(new int[]{1, 2, 3, 4}).setIsr(new int[]{1, 2, 4}).setDirectories(new Uuid[]{Uuid.fromString((String)"TESTBROKER00001DIRAAAA"), Uuid.fromString((String)"TESTBROKER00002DIRAAAA"), Uuid.fromString((String)"TESTBROKER00003DIRAAAA"), Uuid.fromString((String)"TESTBROKER00004DIRAAAA")}).setLeader(Integer.valueOf(1)).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(Integer.valueOf(0)).setPartitionEpoch(Integer.valueOf(1)).build(), (Object)replication.getPartition(fooId, 0));
        ControllerResult alterResult = replication.alterPartitionReassignments(new AlterPartitionReassignmentsRequestData().setTopics(Arrays.asList(new AlterPartitionReassignmentsRequestData.ReassignableTopic().setName("foo").setPartitions(Arrays.asList(new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(0).setReplicas(Arrays.asList(1, 2, 4)), new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(1).setReplicas(Arrays.asList(1, 2, 3, 0)), new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(2).setReplicas(Arrays.asList(5, 6, 7)), new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(3).setReplicas(Arrays.asList(new Integer[0])))), new AlterPartitionReassignmentsRequestData.ReassignableTopic().setName("bar").setPartitions(Arrays.asList(new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(0).setReplicas(Arrays.asList(1, 2, 3, 4, 0)))))), (KafkaPrincipal)multiTenantPrincipal);
        Assertions.assertEquals((Object)new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(Arrays.asList(new AlterPartitionReassignmentsResponseData.ReassignableTopicResponse().setName("foo").setPartitions(Arrays.asList(new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(0).setErrorMessage(null), new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(1).setErrorMessage(null), new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(2).setErrorCode(Errors.INVALID_REPLICA_ASSIGNMENT.code()).setErrorMessage("The manual partition assignment includes broker 5, but no such broker is registered."), new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(3).setErrorCode(Errors.INVALID_REPLICA_ASSIGNMENT.code()).setErrorMessage("The manual partition assignment includes an empty replica list."))), new AlterPartitionReassignmentsResponseData.ReassignableTopicResponse().setName("bar").setPartitions(Arrays.asList(new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(0).setErrorMessage(null))))), (Object)alterResult.response());
        ctx.replay(alterResult.records());
        Assertions.assertEquals((Object)new PartitionRegistration.Builder().setReplicas(new int[]{1, 2, 4}).setIsr(new int[]{1, 2, 4}).setDirectories(new Uuid[]{Uuid.fromString((String)"TESTBROKER00001DIRAAAA"), Uuid.fromString((String)"TESTBROKER00002DIRAAAA"), Uuid.fromString((String)"TESTBROKER00004DIRAAAA")}).setLeader(Integer.valueOf(1)).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(Integer.valueOf(1)).setPartitionEpoch(Integer.valueOf(2)).build(), (Object)replication.getPartition(fooId, 0));
        Assertions.assertEquals((Object)new PartitionRegistration.Builder().setReplicas(new int[]{1, 2, 3, 0}).setIsr(new int[]{0, 1, 2}).setDirectories(new Uuid[]{Uuid.fromString((String)"TESTBROKER00001DIRAAAA"), Uuid.fromString((String)"TESTBROKER00002DIRAAAA"), Uuid.fromString((String)"TESTBROKER00003DIRAAAA"), Uuid.fromString((String)"TESTBROKER00000DIRAAAA")}).setLeader(Integer.valueOf(0)).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(Integer.valueOf(0)).setPartitionEpoch(Integer.valueOf(2)).build(), (Object)replication.getPartition(fooId, 1));
        Assertions.assertEquals((Object)new PartitionRegistration.Builder().setReplicas(new int[]{1, 2, 3, 4, 0}).setIsr(new int[]{4, 2}).setDirectories(new Uuid[]{Uuid.fromString((String)"TESTBROKER00001DIRAAAA"), Uuid.fromString((String)"TESTBROKER00002DIRAAAA"), Uuid.fromString((String)"TESTBROKER00003DIRAAAA"), Uuid.fromString((String)"TESTBROKER00004DIRAAAA"), Uuid.fromString((String)"TESTBROKER00000DIRAAAA")}).setAddingReplicas(new int[]{0, 1}).setLeader(Integer.valueOf(4)).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(Integer.valueOf(0)).setPartitionEpoch(Integer.valueOf(2)).build(), (Object)replication.getPartition(barId, 0));
        ListPartitionReassignmentsResponseData currentReassigning = new ListPartitionReassignmentsResponseData().setErrorMessage(null).setTopics(Arrays.asList(new ListPartitionReassignmentsResponseData.OngoingTopicReassignment().setName("bar").setPartitions(Arrays.asList(new ListPartitionReassignmentsResponseData.OngoingPartitionReassignment().setPartitionIndex(0).setRemovingReplicas(Collections.emptyList()).setAddingReplicas(Arrays.asList(0, 1)).setReplicas(Arrays.asList(1, 2, 3, 4, 0))))));
        Assertions.assertEquals((Object)currentReassigning, (Object)replication.listPartitionReassignments(null, Long.MAX_VALUE));
        Assertions.assertEquals((Object)NONE_REASSIGNING, (Object)replication.listPartitionReassignments(Arrays.asList(new ListPartitionReassignmentsRequestData.ListPartitionReassignmentsTopics().setName("foo").setPartitionIndexes(Arrays.asList(0, 1, 2))), Long.MAX_VALUE));
        Assertions.assertEquals((Object)currentReassigning, (Object)replication.listPartitionReassignments(Arrays.asList(new ListPartitionReassignmentsRequestData.ListPartitionReassignmentsTopics().setName("bar").setPartitionIndexes(Arrays.asList(0, 1, 2))), Long.MAX_VALUE));
        ControllerResult alterPartitionResult = replication.alterPartition(ControllerRequestContextUtil.anonymousContextFor(ApiKeys.ALTER_PARTITION), new AlterPartitionRequestData().setBrokerId(4).setBrokerEpoch(104L).setTopics(Arrays.asList(new AlterPartitionRequestData.TopicData().setTopicId(barId).setPartitions(Arrays.asList(new AlterPartitionRequestData.PartitionData().setPartitionIndex(0).setPartitionEpoch(2).setLeaderEpoch(0).setNewIsrWithEpochs(ReplicationControlManagerTest.isrWithDefaultEpoch(4, 1, 2, 0)))))));
        Assertions.assertEquals((Object)new AlterPartitionResponseData().setTopics(Arrays.asList(new AlterPartitionResponseData.TopicData().setTopicId(barId).setPartitions(Arrays.asList(new AlterPartitionResponseData.PartitionData().setPartitionIndex(0).setLeaderId(4).setLeaderEpoch(0).setIsr(Arrays.asList(4, 1, 2, 0)).setPartitionEpoch(3).setErrorCode(Errors.NONE.code()))))), (Object)alterPartitionResult.response());
        ControllerResult cancelResult = replication.alterPartitionReassignments(new AlterPartitionReassignmentsRequestData().setTopics(Arrays.asList(new AlterPartitionReassignmentsRequestData.ReassignableTopic().setName("foo").setPartitions(Arrays.asList(new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(0).setReplicas(null))), new AlterPartitionReassignmentsRequestData.ReassignableTopic().setName("bar").setPartitions(Arrays.asList(new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(0).setReplicas(null))))), (KafkaPrincipal)multiTenantPrincipal);
        Assertions.assertEquals((Object)ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion((ApiMessage)new PartitionChangeRecord().setTopicId(barId).setPartitionId(0).setLeader(4).setReplicas(Arrays.asList(2, 3, 4)).setDirectories(Arrays.asList(Uuid.fromString((String)"TESTBROKER00002DIRAAAA"), Uuid.fromString((String)"TESTBROKER00003DIRAAAA"), Uuid.fromString((String)"TESTBROKER00004DIRAAAA"))).setRemovingReplicas(null).setAddingReplicas(Collections.emptyList()), MetadataVersion.latestTesting().partitionChangeRecordVersion())), (Object)new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(Arrays.asList(new AlterPartitionReassignmentsResponseData.ReassignableTopicResponse().setName("foo").setPartitions(Arrays.asList(new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(0).setErrorCode(Errors.NO_REASSIGNMENT_IN_PROGRESS.code()).setErrorMessage(null))), new AlterPartitionReassignmentsResponseData.ReassignableTopicResponse().setName("bar").setPartitions(Arrays.asList(new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(0).setErrorMessage(null)))))), (Object)cancelResult);
        ctx.replay(cancelResult.records());
        Assertions.assertEquals((Object)NONE_REASSIGNING, (Object)replication.listPartitionReassignments(null, Long.MAX_VALUE));
        Assertions.assertEquals((Object)new PartitionRegistration.Builder().setReplicas(new int[]{2, 3, 4}).setIsr(new int[]{4, 2}).setDirectories(new Uuid[]{Uuid.fromString((String)"TESTBROKER00002DIRAAAA"), Uuid.fromString((String)"TESTBROKER00003DIRAAAA"), Uuid.fromString((String)"TESTBROKER00004DIRAAAA")}).setLeader(Integer.valueOf(4)).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(Integer.valueOf(1)).setPartitionEpoch(Integer.valueOf(3)).build(), (Object)replication.getPartition(barId, 0));
    }

    @Test
    public void testManualPartitionAssignmentOnAllFencedBrokers() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ctx.registerBrokers(0, 1, 2, 3);
        ctx.createTestTopic("foo", new int[][]{{0, 1, 2}}, Errors.INVALID_REPLICA_ASSIGNMENT.code());
    }

    @Test
    public void testCreatePartitionsFailsWithManualAssignmentWithAllFenced() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ctx.registerBrokers(0, 1, 2, 3, 4, 5);
        ctx.unfenceBrokers(0, 1, 2);
        Uuid fooId = ctx.createTestTopic("foo", new int[][]{{0, 1, 2}}).topicId();
        ctx.createPartitions(2, "foo", new int[][]{{3, 4, 5}}, Errors.INVALID_REPLICA_ASSIGNMENT.code());
        ctx.createPartitions(2, "foo", new int[][]{{2, 4, 5}}, Errors.NONE.code());
        Assertions.assertEquals((Object)new PartitionRegistration.Builder().setReplicas(new int[]{2, 4, 5}).setDirectories(new Uuid[]{Uuid.fromString((String)"TESTBROKER00002DIRAAAA"), Uuid.fromString((String)"TESTBROKER00004DIRAAAA"), Uuid.fromString((String)"TESTBROKER00005DIRAAAA")}).setIsr(new int[]{2}).setLeader(Integer.valueOf(2)).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(Integer.valueOf(0)).setPartitionEpoch(Integer.valueOf(0)).build(), (Object)ctx.replicationControl.getPartition(fooId, 1));
    }

    @Test
    public void testCreatePartitionsHonorsCreateTopicsPolicy() throws Exception {
        MockCreateTopicPolicy createTopicPolicy = new MockCreateTopicPolicy(Arrays.asList(new CreateTopicPolicy.RequestMetadata("foo", null, null, Collections.singletonMap(0, Arrays.asList(0, 1, 2)), Collections.emptyMap()), new CreateTopicPolicy.RequestMetadata("foo", null, null, Collections.singletonMap(1, Arrays.asList(2, 1, 0)), Collections.emptyMap()), new CreateTopicPolicy.RequestMetadata("foo", Integer.valueOf(2), Short.valueOf((short)3), null, Collections.emptyMap())));
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setCreateTopicPolicy(createTopicPolicy).build();
        ctx.registerBrokers(0, 1, 2, 3, 4, 5);
        ctx.unfenceBrokers(0, 1, 2);
        Uuid fooId = ctx.createTestTopic("foo", new int[][]{{0, 1, 2}}).topicId();
        ctx.createPartitions(2, "foo", new int[][]{{2, 1, 0}}, Errors.NONE.code());
        Assertions.assertEquals((Object)new PartitionRegistration.Builder().setReplicas(new int[]{0, 1, 2}).setDirectories(new Uuid[]{Uuid.fromString((String)("TESTBROKER" + Integer.toString(100000).substring(1) + "DIRAAAA")), Uuid.fromString((String)("TESTBROKER" + Integer.toString(100001).substring(1) + "DIRAAAA")), Uuid.fromString((String)("TESTBROKER" + Integer.toString(100002).substring(1) + "DIRAAAA"))}).setIsr(new int[]{0, 1, 2}).setLeader(Integer.valueOf(0)).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(Integer.valueOf(0)).setPartitionEpoch(Integer.valueOf(0)).build(), (Object)ctx.replicationControl.getPartition(fooId, 0));
        Assertions.assertEquals((Object)new PartitionRegistration.Builder().setReplicas(new int[]{2, 1, 0}).setDirectories(new Uuid[]{Uuid.fromString((String)("TESTBROKER" + Integer.toString(100002).substring(1) + "DIRAAAA")), Uuid.fromString((String)("TESTBROKER" + Integer.toString(100001).substring(1) + "DIRAAAA")), Uuid.fromString((String)("TESTBROKER" + Integer.toString(100000).substring(1) + "DIRAAAA"))}).setIsr(new int[]{2, 1, 0}).setLeader(Integer.valueOf(2)).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(Integer.valueOf(0)).setPartitionEpoch(Integer.valueOf(0)).build(), (Object)ctx.replicationControl.getPartition(fooId, 1));
        ctx.createPartitions(4, "foo", null, Errors.NONE.code());
    }

    private void assertLeaderAndIsr(ReplicationControlManager replication, TopicIdPartition topicIdPartition, int leaderId, int[] isr) {
        PartitionRegistration registration = replication.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
        Assertions.assertArrayEquals((int[])isr, (int[])registration.isr);
        Assertions.assertEquals((int)leaderId, (int)registration.leader);
    }

    @Test
    public void testElectIgnoresVirtualTopics() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setVirtualTopicsEnabledInConfig(true).build();
        ReplicationControlManager replication = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        ctx.createTestTopic("foo", new int[][]{{0, 1, 2}, {1, 2, 0}, {2, 0, 1}}).topicId();
        CreateTopicsRequestData.CreateableTopicConfigCollection validConfigs = new CreateTopicsRequestData.CreateableTopicConfigCollection();
        validConfigs.add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreateableTopicConfig().setName("confluent.topic.type").setValue(TopicType.VIRTUAL.logConfigValue()));
        CreateTopicsRequestData request = new CreateTopicsRequestData();
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("bar").setNumPartitions(-1).setReplicationFactor((short)-1).setConfigs(validConfigs));
        ControllerRequestContext requestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.CREATE_TOPICS);
        ControllerResult result = replication.createTopics(requestContext, request, Collections.singleton("bar"));
        ctx.replay(result.records());
        ctx.fenceBrokers(Collections.singleton(0));
        ElectLeadersRequestData electRequest = this.buildElectLeadersRequest(ElectionType.PREFERRED, null);
        ControllerResult electionResult = replication.electLeaders(electRequest);
        ElectLeadersResponseData expectedResponse = this.buildElectLeadersResponse(Errors.NONE, true, Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)new TopicPartition("foo", 0), (Object)new ApiError(Errors.PREFERRED_LEADER_NOT_AVAILABLE)), Utils.mkEntry((Object)new TopicPartition("foo", 1), (Object)new ApiError(Errors.ELECTION_NOT_NEEDED)), Utils.mkEntry((Object)new TopicPartition("foo", 2), (Object)new ApiError(Errors.ELECTION_NOT_NEEDED))}));
        this.assertElectLeadersResponse(expectedResponse, (ElectLeadersResponseData)electionResult.response());
        Assertions.assertEquals(Collections.emptyList(), (Object)electionResult.records());
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testElectUncleanLeaders_WithoutElr(boolean electAllPartitions) throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build(MetadataVersion.IBP_3_6_IV1);
        ReplicationControlManager replication = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2, 3, 4);
        ctx.unfenceBrokers(0, 1, 2, 3, 4);
        Uuid fooId = ctx.createTestTopic("foo", new int[][]{{1, 2, 3}, {2, 3, 4}, {0, 2, 1}}).topicId();
        TopicIdPartition partition0 = new TopicIdPartition(fooId, 0);
        TopicIdPartition partition1 = new TopicIdPartition(fooId, 1);
        TopicIdPartition partition2 = new TopicIdPartition(fooId, 2);
        ctx.fenceBrokers(Utils.mkSet((Object[])new Integer[]{2, 3}));
        ctx.fenceBrokers(Utils.mkSet((Object[])new Integer[]{1, 2, 3}));
        this.assertLeaderAndIsr(replication, partition0, -1, new int[]{1});
        this.assertLeaderAndIsr(replication, partition1, 4, new int[]{4});
        this.assertLeaderAndIsr(replication, partition2, 0, new int[]{0});
        ElectLeadersRequestData request = this.buildElectLeadersRequest(ElectionType.UNCLEAN, electAllPartitions ? null : Collections.singletonMap("foo", Arrays.asList(0, 1, 2)));
        ControllerResult result1 = replication.electLeaders(request);
        Assertions.assertEquals(Collections.emptyList(), (Object)result1.records());
        ElectLeadersResponseData expectedResponse1 = this.buildElectLeadersResponse(Errors.NONE, electAllPartitions, Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)new TopicPartition("foo", 0), (Object)new ApiError(Errors.ELIGIBLE_LEADERS_NOT_AVAILABLE)), Utils.mkEntry((Object)new TopicPartition("foo", 1), (Object)new ApiError(Errors.ELECTION_NOT_NEEDED)), Utils.mkEntry((Object)new TopicPartition("foo", 2), (Object)new ApiError(Errors.ELECTION_NOT_NEEDED))}));
        this.assertElectLeadersResponse(expectedResponse1, (ElectLeadersResponseData)result1.response());
        ctx.unfenceBrokers(Utils.mkSet((Object[])new Integer[]{2}));
        ctx.alterPartition(partition1, 4, ReplicationControlManagerTest.isrWithDefaultEpoch(2, 4), LeaderRecoveryState.RECOVERED);
        ControllerResult result = replication.electLeaders(request);
        Assertions.assertEquals((int)1, (int)result.records().size());
        ApiMessageAndVersion record = (ApiMessageAndVersion)result.records().get(0);
        Assertions.assertInstanceOf(PartitionChangeRecord.class, (Object)record.message());
        PartitionChangeRecord partitionChangeRecord = (PartitionChangeRecord)record.message();
        Assertions.assertEquals((int)0, (int)partitionChangeRecord.partitionId());
        Assertions.assertEquals((int)2, (int)partitionChangeRecord.leader());
        Assertions.assertEquals(Collections.singletonList(2), (Object)partitionChangeRecord.isr());
        ctx.replay(result.records());
        this.assertLeaderAndIsr(replication, partition0, 2, new int[]{2});
        this.assertLeaderAndIsr(replication, partition1, 4, new int[]{2, 4});
        this.assertLeaderAndIsr(replication, partition2, 0, new int[]{0});
        ElectLeadersResponseData expectedResponse = this.buildElectLeadersResponse(Errors.NONE, electAllPartitions, Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)new TopicPartition("foo", 0), (Object)ApiError.NONE), Utils.mkEntry((Object)new TopicPartition("foo", 1), (Object)new ApiError(Errors.ELECTION_NOT_NEEDED)), Utils.mkEntry((Object)new TopicPartition("foo", 2), (Object)new ApiError(Errors.ELECTION_NOT_NEEDED))}));
        this.assertElectLeadersResponse(expectedResponse, (ElectLeadersResponseData)result.response());
    }

    @Test
    public void testPreferredElectionDoesNotTriggerUncleanElection() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replication = ctx.replicationControl;
        ctx.registerBrokers(1, 2, 3, 4);
        ctx.unfenceBrokers(1, 2, 3, 4);
        Uuid fooId = ctx.createTestTopic("foo", new int[][]{{1, 2, 3}}).topicId();
        TopicIdPartition partition = new TopicIdPartition(fooId, 0);
        ctx.fenceBrokers(Utils.mkSet((Object[])new Integer[]{2, 3}));
        ctx.fenceBrokers(Utils.mkSet((Object[])new Integer[]{1, 2, 3}));
        ctx.unfenceBrokers(Utils.mkSet((Object[])new Integer[]{2}));
        this.assertLeaderAndIsr(replication, partition, -1, new int[]{1});
        ctx.alterTopicConfig("foo", "unclean.leader.election.enable", "true");
        ElectLeadersRequestData request = this.buildElectLeadersRequest(ElectionType.PREFERRED, Collections.singletonMap("foo", Collections.singletonList(0)));
        ControllerResult result = replication.electLeaders(request);
        Assertions.assertEquals(Collections.emptyList(), (Object)result.records());
        ElectLeadersResponseData expectedResponse = this.buildElectLeadersResponse(Errors.NONE, false, Collections.singletonMap(new TopicPartition("foo", 0), new ApiError(Errors.PREFERRED_LEADER_NOT_AVAILABLE)));
        Assertions.assertEquals((Object)expectedResponse, (Object)result.response());
    }

    private ElectLeadersRequestData buildElectLeadersRequest(ElectionType electionType, Map<String, List<Integer>> partitions) {
        ElectLeadersRequestData request = new ElectLeadersRequestData().setElectionType(electionType.value);
        if (partitions == null) {
            request.setTopicPartitions(null);
        } else {
            partitions.forEach((topic, partitionIds) -> request.topicPartitions().add((ImplicitLinkedHashCollection.Element)new ElectLeadersRequestData.TopicPartitions().setTopic(topic).setPartitions(partitionIds)));
        }
        return request;
    }

    @Test
    public void testFenceMultipleBrokers() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replication = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2, 3, 4);
        ctx.unfenceBrokers(0, 1, 2, 3, 4);
        Uuid fooId = ctx.createTestTopic("foo", new int[][]{{1, 2, 3}, {2, 3, 4}, {0, 2, 1}}).topicId();
        Assertions.assertTrue((boolean)ctx.clusterControl.fencedBrokerIds().isEmpty());
        ctx.fenceBrokers(Utils.mkSet((Object[])new Integer[]{2, 3}));
        PartitionRegistration partition0 = replication.getPartition(fooId, 0);
        PartitionRegistration partition1 = replication.getPartition(fooId, 1);
        PartitionRegistration partition2 = replication.getPartition(fooId, 2);
        Assertions.assertArrayEquals((int[])new int[]{1, 2, 3}, (int[])partition0.replicas);
        Assertions.assertArrayEquals((int[])new int[]{1}, (int[])partition0.isr);
        Assertions.assertEquals((int)1, (int)partition0.leader);
        Assertions.assertArrayEquals((int[])new int[]{2, 3, 4}, (int[])partition1.replicas);
        Assertions.assertArrayEquals((int[])new int[]{4}, (int[])partition1.isr);
        Assertions.assertEquals((int)4, (int)partition1.leader);
        Assertions.assertArrayEquals((int[])new int[]{0, 2, 1}, (int[])partition2.replicas);
        Assertions.assertArrayEquals((int[])new int[]{0, 1}, (int[])partition2.isr);
        Assertions.assertNotEquals((int)2, (int)partition2.leader);
    }

    @Test
    public void testFenceBrokerWithDegradations() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replication = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2, 3, 4);
        ctx.unfenceBrokers(0, 1, 2, 3, 4);
        Uuid fooId = ctx.createTestTopic("foo", new int[][]{{1, 2, 3}, {2, 3, 4}, {0, 2, 1}}).topicId();
        Assertions.assertTrue((boolean)ctx.clusterControl.fencedBrokerIds().isEmpty());
        ctx.addDegradationsToBrokers(Utils.mkSet((Object[])new Integer[]{0, 1}));
        ctx.fenceBrokers(Utils.mkSet((Object[])new Integer[]{0, 1}));
        PartitionRegistration partition0 = replication.getPartition(fooId, 0);
        PartitionRegistration partition1 = replication.getPartition(fooId, 1);
        PartitionRegistration partition2 = replication.getPartition(fooId, 2);
        Assertions.assertArrayEquals((int[])new int[]{1, 2, 3}, (int[])partition0.replicas);
        Assertions.assertArrayEquals((int[])new int[]{2, 3}, (int[])partition0.isr);
        Assertions.assertEquals((int)2, (int)partition0.leader);
        Assertions.assertArrayEquals((int[])new int[]{2, 3, 4}, (int[])partition1.replicas);
        Assertions.assertArrayEquals((int[])new int[]{2, 3, 4}, (int[])partition1.isr);
        Assertions.assertEquals((int)2, (int)partition1.leader);
        Assertions.assertArrayEquals((int[])new int[]{0, 2, 1}, (int[])partition2.replicas);
        Assertions.assertArrayEquals((int[])new int[]{2}, (int[])partition2.isr);
        Assertions.assertEquals((int)2, (int)partition2.leader);
    }

    @Test
    public void testUnfenceBrokerWithDegradations() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replication = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2, 3, 4, 5);
        ctx.unfenceBrokers(0, 1, 2, 3, 4);
        Uuid fooId = ctx.createTestTopic("foo", new int[][]{{0, 1, 2}, {4, 5, 3}}).topicId();
        ctx.fenceBrokers(Utils.mkSet((Object[])new Integer[]{3, 4}));
        PartitionRegistration partition0 = replication.getPartition(fooId, 0);
        PartitionRegistration partition1 = replication.getPartition(fooId, 1);
        Assertions.assertArrayEquals((int[])new int[]{0, 1, 2}, (int[])partition0.replicas);
        Assertions.assertArrayEquals((int[])new int[]{0, 1, 2}, (int[])partition0.isr);
        Assertions.assertEquals((int)0, (int)partition0.leader);
        Assertions.assertArrayEquals((int[])new int[]{4, 5, 3}, (int[])partition1.replicas);
        Assertions.assertArrayEquals((int[])new int[]{4}, (int[])partition1.isr);
        Assertions.assertEquals((int)-1, (int)partition1.leader);
        ctx.addDegradationsToBrokers(Utils.mkSet((Object[])new Integer[]{4}));
        ArrayList<ApiMessageAndVersion> unfencedRecords = new ArrayList<ApiMessageAndVersion>();
        replication.handleBrokerUnfenced(4, 104L, unfencedRecords);
        ctx.replay(unfencedRecords);
        partition0 = replication.getPartition(fooId, 0);
        partition1 = replication.getPartition(fooId, 1);
        Assertions.assertArrayEquals((int[])new int[]{0, 1, 2}, (int[])partition0.replicas);
        Assertions.assertArrayEquals((int[])new int[]{0, 1, 2}, (int[])partition0.isr);
        Assertions.assertEquals((int)0, (int)partition0.leader);
        Assertions.assertArrayEquals((int[])new int[]{4, 5, 3}, (int[])partition1.replicas);
        Assertions.assertArrayEquals((int[])new int[]{4}, (int[])partition1.isr);
        Assertions.assertEquals((int)4, (int)partition1.leader);
    }

    @Test
    public void testElectPreferredLeaders() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replication = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2, 3, 4);
        ctx.unfenceBrokers(1, 2, 3, 4);
        ctx.inControlledShutdownBrokers(1);
        Uuid fooId = ctx.createTestTopic("foo", new int[][]{{1, 2, 3}, {2, 3, 4}, {0, 2, 1}}).topicId();
        ElectLeadersRequestData request1 = new ElectLeadersRequestData().setElectionType(ElectionType.PREFERRED.value).setTopicPartitions(new ElectLeadersRequestData.TopicPartitionsCollection(Arrays.asList(new ElectLeadersRequestData.TopicPartitions().setTopic("foo").setPartitions(Arrays.asList(0, 1, 2)), new ElectLeadersRequestData.TopicPartitions().setTopic("bar").setPartitions(Arrays.asList(0, 1))).iterator()));
        ControllerResult election1Result = replication.electLeaders(request1);
        ElectLeadersResponseData expectedResponse1 = this.buildElectLeadersResponse(Errors.NONE, false, Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)new TopicPartition("foo", 0), (Object)new ApiError(Errors.PREFERRED_LEADER_NOT_AVAILABLE)), Utils.mkEntry((Object)new TopicPartition("foo", 1), (Object)new ApiError(Errors.ELECTION_NOT_NEEDED)), Utils.mkEntry((Object)new TopicPartition("foo", 2), (Object)new ApiError(Errors.PREFERRED_LEADER_NOT_AVAILABLE)), Utils.mkEntry((Object)new TopicPartition("bar", 0), (Object)new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "No such topic as bar")), Utils.mkEntry((Object)new TopicPartition("bar", 1), (Object)new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "No such topic as bar"))}));
        this.assertElectLeadersResponse(expectedResponse1, (ElectLeadersResponseData)election1Result.response());
        Assertions.assertEquals(Collections.emptyList(), (Object)election1Result.records());
        ctx.registerBrokers(1);
        ctx.unfenceBrokers(0, 1);
        ControllerResult alterPartitionResult = replication.alterPartition(ControllerRequestContextUtil.anonymousContextFor(ApiKeys.ALTER_PARTITION), new AlterPartitionRequestData().setBrokerId(2).setBrokerEpoch(102L).setTopics(Arrays.asList(new AlterPartitionRequestData.TopicData().setTopicId(fooId).setPartitions(Arrays.asList(new AlterPartitionRequestData.PartitionData().setPartitionIndex(0).setPartitionEpoch(0).setLeaderEpoch(0).setNewIsrWithEpochs(ReplicationControlManagerTest.isrWithDefaultEpoch(1, 2, 3)), new AlterPartitionRequestData.PartitionData().setPartitionIndex(2).setPartitionEpoch(0).setLeaderEpoch(0).setNewIsrWithEpochs(ReplicationControlManagerTest.isrWithDefaultEpoch(0, 2, 1)))))));
        Assertions.assertEquals((Object)new AlterPartitionResponseData().setTopics(Arrays.asList(new AlterPartitionResponseData.TopicData().setTopicId(fooId).setPartitions(Arrays.asList(new AlterPartitionResponseData.PartitionData().setPartitionIndex(0).setLeaderId(2).setLeaderEpoch(0).setIsr(Arrays.asList(1, 2, 3)).setPartitionEpoch(1).setErrorCode(Errors.NONE.code()), new AlterPartitionResponseData.PartitionData().setPartitionIndex(2).setLeaderId(2).setLeaderEpoch(0).setIsr(Arrays.asList(0, 2, 1)).setPartitionEpoch(1).setErrorCode(Errors.NONE.code()))))), (Object)alterPartitionResult.response());
        ElectLeadersResponseData expectedResponse2 = this.buildElectLeadersResponse(Errors.NONE, false, Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)new TopicPartition("foo", 0), (Object)ApiError.NONE), Utils.mkEntry((Object)new TopicPartition("foo", 1), (Object)new ApiError(Errors.ELECTION_NOT_NEEDED)), Utils.mkEntry((Object)new TopicPartition("foo", 2), (Object)ApiError.NONE), Utils.mkEntry((Object)new TopicPartition("bar", 0), (Object)new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "No such topic as bar")), Utils.mkEntry((Object)new TopicPartition("bar", 1), (Object)new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "No such topic as bar"))}));
        ctx.replay(alterPartitionResult.records());
        ControllerResult election2Result = replication.electLeaders(request1);
        this.assertElectLeadersResponse(expectedResponse2, (ElectLeadersResponseData)election2Result.response());
        Assertions.assertEquals(Arrays.asList(new ApiMessageAndVersion((ApiMessage)new PartitionChangeRecord().setPartitionId(0).setTopicId(fooId).setLeader(1), MetadataVersion.latestTesting().partitionChangeRecordVersion()), new ApiMessageAndVersion((ApiMessage)new PartitionChangeRecord().setPartitionId(2).setTopicId(fooId).setLeader(0), MetadataVersion.latestTesting().partitionChangeRecordVersion())), (Object)election2Result.records());
    }

    @Test
    public void testElectWithDegradations() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replication = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        ctx.addDegradationsToBrokers(Utils.mkSet((Object[])new Integer[]{0}));
        ctx.createTestTopic("foo", new int[][]{{0, 1, 2}, {1, 0, 2}, {2, 1, 0}}).topicId();
        ElectLeadersRequestData request1 = new ElectLeadersRequestData().setElectionType(ElectionType.PREFERRED.value).setTopicPartitions(new ElectLeadersRequestData.TopicPartitionsCollection(Arrays.asList(new ElectLeadersRequestData.TopicPartitions().setTopic("foo").setPartitions(Arrays.asList(0, 1, 2))).iterator()));
        ControllerResult election1Result = replication.electLeaders(request1);
        ElectLeadersResponseData expectedResponse1 = this.buildElectLeadersResponse(Errors.NONE, false, Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)new TopicPartition("foo", 0), (Object)new ApiError(Errors.PREFERRED_LEADER_NOT_AVAILABLE)), Utils.mkEntry((Object)new TopicPartition("foo", 1), (Object)new ApiError(Errors.ELECTION_NOT_NEEDED)), Utils.mkEntry((Object)new TopicPartition("foo", 2), (Object)new ApiError(Errors.ELECTION_NOT_NEEDED))}));
        this.assertElectLeadersResponse(expectedResponse1, (ElectLeadersResponseData)election1Result.response());
        Assertions.assertEquals(Collections.emptyList(), (Object)election1Result.records());
        ctx.alterTopicConfig("foo", "unclean.leader.election.enable", "true");
        ElectLeadersRequestData request2 = new ElectLeadersRequestData().setElectionType(ElectionType.UNCLEAN.value).setTopicPartitions(new ElectLeadersRequestData.TopicPartitionsCollection(Arrays.asList(new ElectLeadersRequestData.TopicPartitions().setTopic("foo").setPartitions(Arrays.asList(0, 1, 2))).iterator()));
        ControllerResult election2Result = replication.electLeaders(request2);
        ElectLeadersResponseData expectedResponse2 = this.buildElectLeadersResponse(Errors.NONE, false, Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)new TopicPartition("foo", 0), (Object)new ApiError(Errors.ELECTION_NOT_NEEDED)), Utils.mkEntry((Object)new TopicPartition("foo", 1), (Object)new ApiError(Errors.ELECTION_NOT_NEEDED)), Utils.mkEntry((Object)new TopicPartition("foo", 2), (Object)new ApiError(Errors.ELECTION_NOT_NEEDED))}));
        this.assertElectLeadersResponse(expectedResponse2, (ElectLeadersResponseData)election2Result.response());
        Assertions.assertEquals(Collections.emptyList(), (Object)election2Result.records());
    }

    @Test
    public void testBalanceLinkedTopicPartitionLeaders() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replication = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        String linkName = "foo_link";
        String topicName = "bar";
        String sourceTopicName = "source_bar";
        ctx.createClusterLink(linkName);
        CreateTopicsResponseData.CreatableTopicResult result = ctx.createLinkTopic(linkName, topicName, sourceTopicName, Uuid.randomUuid());
        Uuid topicId = result.topicId();
        PartitionRegistration partition0 = replication.getPartition(topicId, 0);
        Assertions.assertTrue((boolean)partition0.isMirror());
        int linkedLeaderEpoch = 25;
        AlterPartitionRequestData.ClusterLinkState linkState = new AlterPartitionRequestData.ClusterLinkState().setLinkedLeaderEpoch(linkedLeaderEpoch).setLinkFailed(false);
        ctx.setClusterLinkState(new TopicIdPartition(topicId, 0), linkState);
        int leaderId = partition0.leader;
        ctx.fenceBrokers(Collections.singleton(leaderId));
        Assertions.assertTrue((boolean)replication.arePartitionLeadersImbalanced());
        ControllerResult balanceResult = replication.maybeBalancePartitionLeaders();
        Assertions.assertFalse((boolean)((Boolean)balanceResult.response()));
        Assertions.assertEquals(Collections.emptyList(), (Object)balanceResult.records());
    }

    @Test
    public void testBalancePartitionLeaders() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replication = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2, 3, 4);
        ctx.unfenceBrokers(2, 3, 4);
        Uuid fooId = ctx.createTestTopic("foo", new int[][]{{1, 2, 3}, {2, 3, 4}, {0, 2, 1}}).topicId();
        Assertions.assertTrue((boolean)replication.arePartitionLeadersImbalanced());
        ctx.unfenceBrokers(1);
        ControllerResult alterPartitionResult = replication.alterPartition(ControllerRequestContextUtil.anonymousContextFor(ApiKeys.ALTER_PARTITION), new AlterPartitionRequestData().setBrokerId(2).setBrokerEpoch(102L).setTopics(Arrays.asList(new AlterPartitionRequestData.TopicData().setTopicId(fooId).setPartitions(Arrays.asList(new AlterPartitionRequestData.PartitionData().setPartitionIndex(0).setPartitionEpoch(0).setLeaderEpoch(0).setNewIsrWithEpochs(ReplicationControlManagerTest.isrWithDefaultEpoch(1, 2, 3)))))));
        Assertions.assertEquals((Object)new AlterPartitionResponseData().setTopics(Arrays.asList(new AlterPartitionResponseData.TopicData().setTopicId(fooId).setPartitions(Arrays.asList(new AlterPartitionResponseData.PartitionData().setPartitionIndex(0).setLeaderId(2).setLeaderEpoch(0).setIsr(Arrays.asList(1, 2, 3)).setPartitionEpoch(1).setErrorCode(Errors.NONE.code()))))), (Object)alterPartitionResult.response());
        ctx.replay(alterPartitionResult.records());
        ControllerResult balanceResult = replication.maybeBalancePartitionLeaders();
        ctx.replay(balanceResult.records());
        PartitionChangeRecord expectedChangeRecord = new PartitionChangeRecord().setPartitionId(0).setTopicId(fooId).setLeader(1);
        Assertions.assertEquals(Arrays.asList(new ApiMessageAndVersion((ApiMessage)expectedChangeRecord, MetadataVersion.latestTesting().partitionChangeRecordVersion())), (Object)balanceResult.records());
        Assertions.assertTrue((boolean)replication.arePartitionLeadersImbalanced());
        Assertions.assertFalse((boolean)((Boolean)balanceResult.response()));
        ctx.unfenceBrokers(0);
        alterPartitionResult = replication.alterPartition(ControllerRequestContextUtil.anonymousContextFor(ApiKeys.ALTER_PARTITION), new AlterPartitionRequestData().setBrokerId(2).setBrokerEpoch(102L).setTopics(Arrays.asList(new AlterPartitionRequestData.TopicData().setTopicId(fooId).setPartitions(Arrays.asList(new AlterPartitionRequestData.PartitionData().setPartitionIndex(2).setPartitionEpoch(0).setLeaderEpoch(0).setNewIsrWithEpochs(ReplicationControlManagerTest.isrWithDefaultEpoch(0, 2, 1)))))));
        Assertions.assertEquals((Object)new AlterPartitionResponseData().setTopics(Arrays.asList(new AlterPartitionResponseData.TopicData().setTopicId(fooId).setPartitions(Arrays.asList(new AlterPartitionResponseData.PartitionData().setPartitionIndex(2).setLeaderId(2).setLeaderEpoch(0).setIsr(Arrays.asList(0, 2, 1)).setPartitionEpoch(1).setErrorCode(Errors.NONE.code()))))), (Object)alterPartitionResult.response());
        ctx.replay(alterPartitionResult.records());
        balanceResult = replication.maybeBalancePartitionLeaders();
        ctx.replay(balanceResult.records());
        expectedChangeRecord = new PartitionChangeRecord().setPartitionId(2).setTopicId(fooId).setLeader(0);
        Assertions.assertEquals(Arrays.asList(new ApiMessageAndVersion((ApiMessage)expectedChangeRecord, MetadataVersion.latestTesting().partitionChangeRecordVersion())), (Object)balanceResult.records());
        Assertions.assertFalse((boolean)replication.arePartitionLeadersImbalanced());
        Assertions.assertFalse((boolean)((Boolean)balanceResult.response()));
    }

    @Test
    public void testBalancePartitionLeadersForInternalTopicFirst() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replication = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2, 3, 4);
        ctx.unfenceBrokers(0, 2, 3, 4);
        Uuid fooId = ctx.createTestTopic("foo", new int[][]{{1, 2, 3}, {2, 3, 4}, {0, 2, 1}}).topicId();
        Uuid internalId = ctx.createTestTopic("_confluent-tier-state", new int[][]{{1, 2, 3}, {2, 3, 4}, {0, 2, 1}}).topicId();
        Uuid cusotmerOffsetId = ctx.createTestTopic("__consumer_offsets", new int[][]{{1, 2, 3}, {2, 3, 4}, {0, 2, 1}}).topicId();
        Assertions.assertTrue((boolean)replication.arePartitionLeadersImbalanced());
        ctx.unfenceBrokers(1);
        ControllerResult alterPartitionResult = replication.alterPartition(ControllerRequestContextUtil.anonymousContextFor(ApiKeys.ALTER_PARTITION), new AlterPartitionRequestData().setBrokerId(2).setBrokerEpoch(102L).setTopics(Arrays.asList(new AlterPartitionRequestData.TopicData().setTopicId(fooId).setPartitions(Arrays.asList(new AlterPartitionRequestData.PartitionData().setPartitionIndex(0).setPartitionEpoch(0).setLeaderEpoch(0).setNewIsrWithEpochs(ReplicationControlManagerTest.isrWithDefaultEpoch(1, 2, 3)))), new AlterPartitionRequestData.TopicData().setTopicId(internalId).setPartitions(Arrays.asList(new AlterPartitionRequestData.PartitionData().setPartitionIndex(0).setPartitionEpoch(0).setLeaderEpoch(0).setNewIsrWithEpochs(ReplicationControlManagerTest.isrWithDefaultEpoch(1, 2, 3)))), new AlterPartitionRequestData.TopicData().setTopicId(cusotmerOffsetId).setPartitions(Arrays.asList(new AlterPartitionRequestData.PartitionData().setPartitionIndex(0).setPartitionEpoch(0).setLeaderEpoch(0).setNewIsrWithEpochs(ReplicationControlManagerTest.isrWithDefaultEpoch(1, 2, 3)))))));
        ctx.replay(alterPartitionResult.records());
        ControllerResult balanceResult = replication.maybeBalancePartitionLeaders();
        ctx.replay(balanceResult.records());
        PartitionChangeRecord expectedChangeRecord = new PartitionChangeRecord().setPartitionId(0).setTopicId(internalId).setLeader(1);
        Assertions.assertEquals(Arrays.asList(new ApiMessageAndVersion((ApiMessage)expectedChangeRecord, MetadataVersion.latestTesting().partitionChangeRecordVersion())), (Object)balanceResult.records());
        Assertions.assertTrue((boolean)replication.arePartitionLeadersImbalanced());
        Assertions.assertTrue((boolean)((Boolean)balanceResult.response()));
        for (int i = 0; i < 11; ++i) {
            balanceResult = replication.maybeBalancePartitionLeaders();
            ctx.time.sleep(1000L);
            Assertions.assertTrue((boolean)replication.arePartitionLeadersImbalanced());
            Assertions.assertTrue((boolean)((Boolean)balanceResult.response()));
            Assertions.assertEquals((int)0, (int)balanceResult.records().size());
        }
        balanceResult = replication.maybeBalancePartitionLeaders();
        PartitionChangeRecord expectedChangeRecord1 = new PartitionChangeRecord().setPartitionId(0).setTopicId(fooId).setLeader(1);
        PartitionChangeRecord expectedChangeRecord2 = new PartitionChangeRecord().setPartitionId(0).setTopicId(cusotmerOffsetId).setLeader(1);
        Assertions.assertEquals(Arrays.asList(new ApiMessageAndVersion((ApiMessage)expectedChangeRecord2, MetadataVersion.latestTesting().partitionChangeRecordVersion()), new ApiMessageAndVersion((ApiMessage)expectedChangeRecord1, MetadataVersion.latestTesting().partitionChangeRecordVersion())), (Object)balanceResult.records());
        Assertions.assertTrue((boolean)replication.arePartitionLeadersImbalanced());
        Assertions.assertFalse((boolean)((Boolean)balanceResult.response()));
    }

    @Test
    public void testNoPartitionLeadersToBalance() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replication = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2, 3, 4);
        ctx.unfenceBrokers(0, 1, 2, 3, 4);
        Uuid fooId = ctx.createTestTopic("foo", new int[][]{{1, 2, 3}, {2, 3, 4}, {0, 2, 1}}).topicId();
        Uuid internalId = ctx.createTestTopic("_confluent-tier-state", new int[][]{{1, 2, 3}, {2, 3, 4}, {0, 2, 1}}).topicId();
        Assertions.assertFalse((boolean)replication.arePartitionLeadersImbalanced());
    }

    @Test
    public void testBalancePartitionLeadersWithDegradation() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replication = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        ctx.addDegradationsToBrokers(Utils.mkSet((Object[])new Integer[]{2}));
        Uuid fooId = ctx.createTestTopic("foo", new int[][]{{0, 1, 2}, {0, 1, 2}, {1, 0, 2}, {1, 0, 2}, {2, 1, 0}}).topicId();
        Assertions.assertTrue((boolean)replication.arePartitionLeadersImbalanced());
        ControllerResult balanceResult = replication.maybeBalancePartitionLeaders();
        ctx.replay(balanceResult.records());
        Assertions.assertEquals(Collections.emptyList(), (Object)balanceResult.records());
        Assertions.assertTrue((boolean)replication.arePartitionLeadersImbalanced());
        ctx.removeDegradationsFromBrokers(Utils.mkSet((Object[])new Integer[]{2}));
        balanceResult = replication.maybeBalancePartitionLeaders();
        ctx.replay(balanceResult.records());
        PartitionChangeRecord expectedChangeRecord = new PartitionChangeRecord().setPartitionId(4).setTopicId(fooId).setLeader(2);
        Assertions.assertEquals(Arrays.asList(new ApiMessageAndVersion((ApiMessage)expectedChangeRecord, 2)), (Object)balanceResult.records());
        Assertions.assertFalse((boolean)replication.arePartitionLeadersImbalanced());
    }

    private void assertElectLeadersResponse(ElectLeadersResponseData expected, ElectLeadersResponseData actual) {
        Assertions.assertEquals((Object)Errors.forCode((short)expected.errorCode()), (Object)Errors.forCode((short)actual.errorCode()));
        Assertions.assertEquals(this.collectElectLeadersErrors(expected), this.collectElectLeadersErrors(actual));
    }

    private Map<TopicPartition, ElectLeadersResponseData.PartitionResult> collectElectLeadersErrors(ElectLeadersResponseData response) {
        HashMap<TopicPartition, ElectLeadersResponseData.PartitionResult> res = new HashMap<TopicPartition, ElectLeadersResponseData.PartitionResult>();
        response.replicaElectionResults().forEach(topicResult -> {
            String topic = topicResult.topic();
            topicResult.partitionResult().forEach(partitionResult -> {
                TopicPartition topicPartition = new TopicPartition(topic, partitionResult.partitionId());
                res.put(topicPartition, (ElectLeadersResponseData.PartitionResult)partitionResult);
            });
        });
        return res;
    }

    private ElectLeadersResponseData buildElectLeadersResponse(Errors topLevelError, boolean electAllPartitions, Map<TopicPartition, ApiError> errors) {
        Map<String, List<Map.Entry>> errorsByTopic = errors.entrySet().stream().collect(Collectors.groupingBy(entry -> ((TopicPartition)entry.getKey()).topic()));
        ElectLeadersResponseData response = new ElectLeadersResponseData().setErrorCode(topLevelError.code());
        errorsByTopic.forEach((topic, partitionErrors) -> {
            ElectLeadersResponseData.ReplicaElectionResult electionResult = new ElectLeadersResponseData.ReplicaElectionResult().setTopic(topic);
            electionResult.setPartitionResult(partitionErrors.stream().filter(entry -> !electAllPartitions || ((ApiError)entry.getValue()).error() != Errors.ELECTION_NOT_NEEDED).map(entry -> {
                TopicPartition topicPartition = (TopicPartition)entry.getKey();
                ApiError error = (ApiError)entry.getValue();
                return new ElectLeadersResponseData.PartitionResult().setPartitionId(topicPartition.partition()).setErrorCode(error.error().code()).setErrorMessage(error.message());
            }).collect(Collectors.toList()));
            response.replicaElectionResults().add(electionResult);
        });
        return response;
    }

    @Test
    public void testKRaftCellDescriber() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replication = ctx.replicationControl;
        ctx.registerBrokersWithDirs(0, Collections.emptyList(), 1, Collections.emptyList(), 2, Arrays.asList(Uuid.fromString((String)"ozwqsVMFSNiYQUPSJA3j0w")), 3, Arrays.asList(Uuid.fromString((String)"SSDgCZ4BTyec5QojGT65qg"), Uuid.fromString((String)"K8KwMrviRcOUvgI8FPOJWg")), 4, Collections.emptyList());
        ctx.unfenceBrokers(2, 3, 4);
        ctx.createTestTopic("foo", new int[][]{{1, 2, 3}, {2, 3, 4}, {0, 2, 1}}).topicId();
        ctx.createTestTopic("bar", new int[][]{{2, 3, 4}, {3, 4, 2}}).topicId();
        HashSet topicNames = new HashSet();
        ReplicationControlManager.KRaftCellDescriber describer = replication.getKRaftCellDescriber(-1);
        describer.topicNames().forEachRemaining(name -> topicNames.add(name));
        Assertions.assertEquals(new HashSet<String>(Arrays.asList("foo", "bar")), topicNames);
        Assertions.assertEquals(Arrays.asList(Arrays.asList(1, 2, 3), Arrays.asList(2, 3, 4), Arrays.asList(0, 2, 1)), (Object)describer.replicasForTopicName("foo"));
        Assertions.assertEquals(Arrays.asList(Arrays.asList(2, 3, 4), Arrays.asList(3, 4, 2)), (Object)describer.replicasForTopicName("bar"));
        Assertions.assertEquals(Arrays.asList(new Object[0]), (Object)describer.replicasForTopicName("baz"));
        HashSet brokers = new HashSet();
        describer.usableBrokers().forEachRemaining(broker -> brokers.add(broker));
        Assertions.assertEquals(new HashSet<UsableBroker>(Arrays.asList(new UsableBroker(0, Optional.empty(), true), new UsableBroker(1, Optional.empty(), true), new UsableBroker(2, Optional.empty(), false), new UsableBroker(3, Optional.empty(), false), new UsableBroker(4, Optional.empty(), false))), brokers);
        Assertions.assertEquals((Object)DirectoryId.MIGRATING, (Object)describer.defaultDir(1));
        Assertions.assertEquals((Object)Uuid.fromString((String)"ozwqsVMFSNiYQUPSJA3j0w"), (Object)describer.defaultDir(2));
        Assertions.assertEquals((Object)DirectoryId.UNASSIGNED, (Object)describer.defaultDir(3));
    }

    @Test
    public void testKRaftCellDescriberCellFilter() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replication = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2, 3, 4);
        ctx.unfenceBrokers(2, 3, 4);
        ctx.cellControl.replay(new CellRecord().setCellId(0).setMinSize((short)3).setState(CellState.READY.code()).setBrokers(Arrays.asList(0, 1, 2)));
        ctx.cellControl.replay(new CellRecord().setCellId(1).setMinSize((short)3).setState(CellState.READY.code()).setBrokers(Arrays.asList(3, 4, 5)));
        ReplicationControlManager.KRaftCellDescriber describer = replication.getKRaftCellDescriber(0);
        HashSet brokers = new HashSet();
        describer.usableBrokers().forEachRemaining(broker -> brokers.add(broker));
        Assertions.assertEquals(new HashSet<UsableBroker>(Arrays.asList(new UsableBroker(0, Optional.empty(), true, 0), new UsableBroker(1, Optional.empty(), true, 0), new UsableBroker(2, Optional.empty(), false, 0))), brokers);
    }

    @ParameterizedTest
    @EnumSource(value=MetadataVersion.class, names={"IBP_3_3_IV2", "IBP_3_3_IV3"})
    public void testProcessBrokerHeartbeatInControlledShutdown(MetadataVersion metadataVersion) {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setMetadataVersion(metadataVersion).build();
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        Uuid topicId = ctx.createTestTopic("foo", new int[][]{{0, 1, 2}}).topicId();
        BrokerHeartbeatRequestData heartbeatRequest = new BrokerHeartbeatRequestData().setBrokerId(0).setBrokerEpoch(100L).setCurrentMetadataOffset(0L).setWantShutDown(true);
        ControllerResult result = ctx.replicationControl.processBrokerHeartbeat(heartbeatRequest, 0L);
        ArrayList<ApiMessageAndVersion> expectedRecords = new ArrayList<ApiMessageAndVersion>();
        if (metadataVersion.isInControlledShutdownStateSupported()) {
            expectedRecords.add(new ApiMessageAndVersion((ApiMessage)new BrokerRegistrationChangeRecord().setBrokerEpoch(100L).setBrokerId(0).setInControlledShutdown(BrokerRegistrationInControlledShutdownChange.IN_CONTROLLED_SHUTDOWN.value()), 1));
        }
        Assertions.assertEquals(expectedRecords, (Object)result.records());
    }

    @ParameterizedTest
    @EnumSource(value=MetadataVersion.class, names={"IBP_3_3_IV2", "IBP_3_3_IV3"})
    public void testProcessBrokerHeartbeatInControlledShutdownWithPartitionSlicing(MetadataVersion metadataVersion) throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setMetadataVersion(metadataVersion).setMaxPartitionChangesPerSlice(2).build();
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        ctx.clusterControl.heartbeatManager().touch(0, false, 123L);
        ctx.clusterControl.heartbeatManager().touch(1, false, 123L);
        ctx.clusterControl.heartbeatManager().touch(2, false, 122L);
        Uuid topicId = ctx.createTestTopic("foo", new int[][]{{0, 1, 2}, {0, 1, 2}, {0, 1, 2}, {1, 0, 2}}).topicId();
        BrokerHeartbeatRequestData heartbeatRequest = new BrokerHeartbeatRequestData().setBrokerId(0).setBrokerEpoch(100L).setCurrentMetadataOffset(0L).setWantShutDown(true);
        ControllerResult resultFirstHeartBeat = ctx.replicationControl.processBrokerHeartbeat(heartbeatRequest, 0L);
        ControllerResult partitionChangeResult0 = ctx.replicationControl.partitionChangesBecauseOfControlledShutdown();
        Assertions.assertEquals((int)-1, (Integer)((Integer)partitionChangeResult0.response()));
        Assertions.assertEquals(Collections.emptyList(), (Object)partitionChangeResult0.records());
        Assertions.assertTrue((boolean)((BrokerHeartbeatReply)resultFirstHeartBeat.response()).inControlledShutdown());
        Assertions.assertFalse((boolean)((BrokerHeartbeatReply)resultFirstHeartBeat.response()).shouldShutDown());
        ctx.clusterControl.heartbeatManager().updateControlledShutdownOffset(0, 123L);
        ControllerResult partitionChangeResult1 = ctx.replicationControl.partitionChangesBecauseOfControlledShutdown();
        Assertions.assertEquals((int)0, (Integer)((Integer)partitionChangeResult1.response()));
        ArrayList<ApiMessageAndVersion> expectedRecordsFirstPartitionChange = new ArrayList<ApiMessageAndVersion>();
        expectedRecordsFirstPartitionChange.add(new ApiMessageAndVersion((ApiMessage)new PartitionChangeRecord().setPartitionId(0).setTopicId(topicId).setIsr(Arrays.asList(1, 2)).setLeader(1), 0));
        expectedRecordsFirstPartitionChange.add(new ApiMessageAndVersion((ApiMessage)new PartitionChangeRecord().setPartitionId(1).setTopicId(topicId).setIsr(Arrays.asList(1, 2)).setLeader(1), 0));
        ctx.replay(partitionChangeResult1.records());
        Assertions.assertEquals(expectedRecordsFirstPartitionChange, (Object)partitionChangeResult1.records());
        ControllerResult partitionChangeResult2 = ctx.replicationControl.partitionChangesBecauseOfControlledShutdown();
        Assertions.assertEquals((int)0, (Integer)((Integer)partitionChangeResult2.response()));
        ArrayList<ApiMessageAndVersion> expectedRecordsSecondPartitionChange = new ArrayList<ApiMessageAndVersion>();
        expectedRecordsSecondPartitionChange.add(new ApiMessageAndVersion((ApiMessage)new PartitionChangeRecord().setPartitionId(2).setTopicId(topicId).setIsr(Arrays.asList(1, 2)).setLeader(1), 0));
        expectedRecordsSecondPartitionChange.add(new ApiMessageAndVersion((ApiMessage)new PartitionChangeRecord().setPartitionId(3).setTopicId(topicId).setIsr(Arrays.asList(1, 2)).setLeader(1), 0));
        ctx.replay(partitionChangeResult2.records());
        Assertions.assertEquals(expectedRecordsSecondPartitionChange, (Object)partitionChangeResult2.records());
        BrokerHeartbeatRequestData heartbeatRequestSecondTime = new BrokerHeartbeatRequestData().setBrokerId(0).setBrokerEpoch(100L).setCurrentMetadataOffset(0L).setWantShutDown(true);
        ControllerResult resultSecondHeartBeat = ctx.replicationControl.processBrokerHeartbeat(heartbeatRequestSecondTime, 0L);
        ctx.replay(resultSecondHeartBeat.records());
        Assertions.assertEquals((Object)new BrokerHeartbeatReply(true, false, true, false), (Object)resultSecondHeartBeat.response());
        ctx.clusterControl.heartbeatManager().touch(2, false, 123L);
        ControllerResult resultThirdHeartbeat = ctx.replicationControl.processBrokerHeartbeat(heartbeatRequestSecondTime, 0L);
        ctx.replay(resultThirdHeartbeat.records());
        Assertions.assertEquals((Object)new BrokerHeartbeatReply(true, true, false, true), (Object)resultThirdHeartbeat.response());
    }

    private static List<Integer> arrayToList(int[] array) {
        return IntStream.of(array).boxed().collect(Collectors.toList());
    }

    @Test
    public void testComputeTopicPlacementConfigsHandlesUserProvidedPlacementEmptyDefault() {
        String name = "topic";
        CreateTopicsRequestData.CreatableTopic topic = new CreateTopicsRequestData.CreatableTopic().setName(name);
        String topicPlacementJson = "{\"version\":1,\"replicas\":[{\"count\":1,\"constraints\":{\"rack\":\"rack0\"}}],\"observers\":[{\"count\":1,\"constraints\":{\"rack\":\"rack1\"}}]}";
        TopicPlacement topicPlacement = (TopicPlacement)TopicPlacement.parse((String)topicPlacementJson).get();
        topic.configs().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreateableTopicConfig().setName("confluent.placement.constraints").setValue(topicPlacementJson));
        HashMap topicErrors = new HashMap();
        HashMap topicPlacements = new HashMap();
        HashMap topicConfigs = new HashMap();
        ReplicationControlManager.computeTopicPlacementConfigChanges(topicErrors, topicPlacements, (CreateTopicsRequestData.CreatableTopic)topic, (boolean)true, Optional.empty(), topicConfigs);
        Assertions.assertTrue((boolean)topicErrors.isEmpty());
        Assertions.assertTrue((boolean)topicConfigs.containsKey("confluent.placement.constraints"));
        Assertions.assertEquals((Object)topicPlacementJson, ((Map.Entry)topicConfigs.get("confluent.placement.constraints")).getValue());
        Assertions.assertEquals((Object)topicPlacement, topicPlacements.get(topic.name()));
        String defaultTopicPlacementStr = "{\"version\":1,\"replicas\":[{\"count\":1,\"constraints\":{\"rack\":\"rack2\"}}],\"observers\":[{\"count\":1,\"constraints\":{\"rack\":\"rack3\"}}]}";
        Optional defaultTopicPlacement = TopicPlacement.parse((String)defaultTopicPlacementStr);
        Assertions.assertTrue((boolean)defaultTopicPlacement.isPresent());
        topicErrors = new HashMap();
        topicPlacements = new HashMap();
        topicConfigs = new HashMap();
        ReplicationControlManager.computeTopicPlacementConfigChanges(topicErrors, topicPlacements, (CreateTopicsRequestData.CreatableTopic)topic, (boolean)true, (Optional)defaultTopicPlacement, topicConfigs);
        Assertions.assertTrue((boolean)topicErrors.isEmpty());
        Assertions.assertTrue((boolean)topicConfigs.containsKey("confluent.placement.constraints"));
        Assertions.assertEquals((Object)topicPlacementJson, ((Map.Entry)topicConfigs.get("confluent.placement.constraints")).getValue());
        Assertions.assertEquals((Object)topicPlacement, topicPlacements.get(topic.name()));
    }

    @Test
    public void testComputeTopicPlacementConfigsHandlesEmptyAndNonEmptyDefaultPlacement() {
        String name = "topic";
        CreateTopicsRequestData.CreatableTopic topic = new CreateTopicsRequestData.CreatableTopic().setName(name).setReplicationFactor((short)-1);
        HashMap topicErrors = new HashMap();
        HashMap topicPlacements = new HashMap();
        HashMap topicConfigs = new HashMap();
        String defaultTopicPlacementStr = "{\"version\":1,\"replicas\":[{\"count\":1,\"constraints\":{\"rack\":\"rack2\"}}],\"observers\":[{\"count\":1,\"constraints\":{\"rack\":\"rack3\"}}]}";
        Optional defaultTopicPlacement = TopicPlacement.parse((String)defaultTopicPlacementStr);
        Assertions.assertTrue((boolean)defaultTopicPlacement.isPresent());
        ReplicationControlManager.computeTopicPlacementConfigChanges(topicErrors, topicPlacements, (CreateTopicsRequestData.CreatableTopic)topic, (boolean)true, (Optional)defaultTopicPlacement, topicConfigs);
        Assertions.assertTrue((boolean)topicErrors.isEmpty());
        Assertions.assertTrue((boolean)topicConfigs.containsKey("confluent.placement.constraints"));
        Assertions.assertEquals((Object)defaultTopicPlacementStr, ((Map.Entry)topicConfigs.get("confluent.placement.constraints")).getValue());
        Assertions.assertEquals(defaultTopicPlacement.get(), topicPlacements.get(topic.name()));
    }

    @Test
    public void testComputeTopicPlacementConfigsHandlesUnsupportedTopicPlacement() {
        String name = "topic";
        CreateTopicsRequestData.CreatableTopic topic = new CreateTopicsRequestData.CreatableTopic().setName(name);
        String topicPlacementJson = "{\"version\":1,\"replicas\":[{\"count\":1,\"constraints\":{\"rack\":\"rack0\"}}],\"observers\":[{\"count\":1,\"constraints\":{\"rack\":\"rack1\"}}]}";
        topic.configs().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreateableTopicConfig().setName("confluent.placement.constraints").setValue(topicPlacementJson));
        HashMap topicErrors = new HashMap();
        HashMap topicPlacements = new HashMap();
        HashMap topicConfigs = new HashMap();
        ReplicationControlManager.computeTopicPlacementConfigChanges(topicErrors, topicPlacements, (CreateTopicsRequestData.CreatableTopic)topic, (boolean)false, Optional.empty(), topicConfigs);
        Assertions.assertFalse((boolean)topicErrors.isEmpty());
        Assertions.assertFalse((boolean)topicConfigs.containsKey("confluent.placement.constraints"));
        Assertions.assertEquals((Object)new ApiError(Errors.INVALID_REQUEST, "Topic placement is not supported."), topicErrors.get(name));
    }

    @Test
    public void testComputeTopicPlacementConfigsHandlesNoPlacement() {
        String name = "topic";
        CreateTopicsRequestData.CreatableTopic topic = new CreateTopicsRequestData.CreatableTopic().setName(name).setReplicationFactor((short)3);
        HashMap topicErrors = new HashMap();
        HashMap topicPlacements = new HashMap();
        HashMap topicConfigs = new HashMap();
        ReplicationControlManager.computeTopicPlacementConfigChanges(topicErrors, topicPlacements, (CreateTopicsRequestData.CreatableTopic)topic, (boolean)true, Optional.empty(), topicConfigs);
        Assertions.assertTrue((boolean)topicErrors.isEmpty());
        Assertions.assertFalse((boolean)topicConfigs.containsKey("confluent.placement.constraints"));
        Assertions.assertFalse((boolean)topicPlacements.containsKey(topic.name()));
    }

    @Test
    public void testGetTopicPlacementFromRequestHandlesTopicPlacementNotSupported() {
        String name = "topic";
        CreateTopicsRequestData.CreatableTopic topic = new CreateTopicsRequestData.CreatableTopic().setName(name);
        String topicPlacementJson = "{\"version\":1,\"replicas\":[{\"count\":1,\"constraints\":{\"rack\":\"rack2\"}}],\"observers\":[{\"count\":1,\"constraints\":{\"rack\":\"rack3\"}}]}";
        topic.configs().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreateableTopicConfig().setName("confluent.placement.constraints").setValue(topicPlacementJson));
        Assertions.assertThrows(InvalidRequestException.class, () -> ReplicationControlManager.getTopicPlacementFromRequest((boolean)false, (CreateTopicsRequestData.CreateableTopicConfigCollection)topic.configs()));
    }

    @Test
    public void testGetTopicPlacementFromRequestHandlesEmptyString() {
        String name = "topic";
        CreateTopicsRequestData.CreatableTopic topic = new CreateTopicsRequestData.CreatableTopic().setName(name);
        String topicPlacementJson = "";
        topic.configs().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreateableTopicConfig().setName("confluent.placement.constraints").setValue(topicPlacementJson));
        Assertions.assertEquals(Optional.empty(), (Object)ReplicationControlManager.getTopicPlacementFromRequest((boolean)false, (CreateTopicsRequestData.CreateableTopicConfigCollection)topic.configs()));
        Assertions.assertEquals(Optional.empty(), (Object)ReplicationControlManager.getTopicPlacementFromRequest((boolean)true, (CreateTopicsRequestData.CreateableTopicConfigCollection)topic.configs()));
    }

    @Test
    public void testGetTopicPlacementFromRequestHandlesNull() {
        String name = "topic";
        CreateTopicsRequestData.CreatableTopic topic = new CreateTopicsRequestData.CreatableTopic().setName(name);
        Assertions.assertFalse((boolean)ReplicationControlManager.getTopicPlacementFromRequest((boolean)true, (CreateTopicsRequestData.CreateableTopicConfigCollection)topic.configs()).isPresent());
    }

    @Test
    public void testCreatePartitionsAndAlterPartitionReassignmentsReturnExceptionWhenTopicConfigExistsButisNottSupported() throws Exception {
        ReplicaPlacer replicaPlacer = (ReplicaPlacer)Mockito.mock(ReplicaPlacer.class);
        PartitionAssignment partitionAssignment = new PartitionAssignment(Arrays.asList(0, 2, 1, 3), Arrays.asList(1, 3), __ -> DirectoryId.MIGRATING);
        List<PartitionAssignment> partitionAssignments = Arrays.asList(partitionAssignment);
        TopicAssignment topicAssignment = new TopicAssignment(partitionAssignments);
        Mockito.when((Object)replicaPlacer.place((PlacementSpec)ArgumentMatchers.any(), (ClusterDescriber)ArgumentMatchers.any())).thenReturn((Object)topicAssignment);
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setReplicaPlacer(replicaPlacer).build();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        CreateTopicsRequestData request = new CreateTopicsRequestData();
        ctx.registerBrokers(0, 1, 2, 3);
        ctx.unfenceBrokers(0, 1, 2, 3);
        String topicPlacementJson = "{\"version\":1,\"replicas\":[{\"count\":1,\"constraints\":{\"rack\":\"rack0\"}}],\"observers\":[{\"count\":1,\"constraints\":{\"rack\":\"rack1\"}}]}";
        CreateTopicsRequestData.CreateableTopicConfigCollection configs = new CreateTopicsRequestData.CreateableTopicConfigCollection();
        configs.add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreateableTopicConfig().setName("confluent.placement.constraints").setValue(topicPlacementJson));
        String topicName = "placement-topic";
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName(topicName).setNumPartitions(1).setConfigs(configs).setReplicationFactor((short)-1));
        ControllerResult result = replicationControl.createTopics(ControllerRequestContextUtil.anonymousContextFor(ApiKeys.CREATE_PARTITIONS), request, Collections.singleton(topicName));
        Uuid topicId = ((CreateTopicsResponseData)result.response()).topics().find(topicName).topicId();
        log.info("Created topic with ID {}.", (Object)topicId);
        CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData();
        expectedResponse.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsResponseData.CreatableTopicResult().setName(topicName).setNumPartitions(1).setErrorMessage(null).setErrorCode((short)0).setTopicId(topicId).setReplicationFactor((short)partitionAssignment.replicas().size()));
        ctx.replay(result.records());
        FeatureControlManager featureControl = ctx.featureControl;
        FeatureLevelRecord featureLevelRecord = new FeatureLevelRecord();
        featureLevelRecord.setName("metadata.version");
        featureLevelRecord.setFeatureLevel(MetadataVersion.IBP_3_4_IV0.apacheFeatureLevel());
        featureControl.replay(featureLevelRecord);
        MultiTenantPrincipal multiTenantPrincipal = new MultiTenantPrincipal("lkc-abcd", new TenantMetadata("lkc-abcd", "lkc-abcd"));
        ControllerResult alterResult = replicationControl.alterPartitionReassignments(new AlterPartitionReassignmentsRequestData().setTopics(Arrays.asList(new AlterPartitionReassignmentsRequestData.ReassignableTopic().setName(topicName).setPartitions(Arrays.asList(new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(0).setReplicas(Arrays.asList(1, 3)).setObservers(Arrays.asList(3)))))), (KafkaPrincipal)multiTenantPrincipal);
        Assertions.assertEquals((Object)new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(Arrays.asList(new AlterPartitionReassignmentsResponseData.ReassignableTopicResponse().setName(topicName).setPartitions(Arrays.asList(new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(0).setErrorCode(Errors.INVALID_REQUEST.code()).setErrorMessage("Topic placement configuration exists but is not supported in the cluster. Please delete the confluent.placement.constraints configuration for placement-topic, or upgrade the cluster metadata to at least IBP_3_5_IV0"))))), (Object)alterResult.response());
        CreatePartitionsRequestData.CreatePartitionsTopic topic = new CreatePartitionsRequestData.CreatePartitionsTopic().setName(topicName).setCount(2);
        topic.setAssignments(null);
        ControllerResult createPartitionsResult = replicationControl.createPartitions(ControllerRequestContextUtil.anonymousContextFor(ApiKeys.CREATE_PARTITIONS), Collections.singletonList(topic));
        Assertions.assertEquals((int)1, (int)((List)createPartitionsResult.response()).size());
        CreatePartitionsResponseData.CreatePartitionsTopicResult topicResult = (CreatePartitionsResponseData.CreatePartitionsTopicResult)((List)createPartitionsResult.response()).get(0);
        Assertions.assertEquals((Object)topicName, (Object)topicResult.name());
        Assertions.assertEquals((short)Errors.INVALID_REQUEST.code(), (short)topicResult.errorCode());
        Assertions.assertEquals((Object)"Topic placement configuration exists but is not supported in the cluster. Please delete the confluent.placement.constraints configuration for placement-topic, or upgrade the cluster metadata to at least IBP_3_5_IV0", (Object)topicResult.errorMessage());
    }

    @Test
    public void testProcessExpiredBrokerHeartbeat() {
        MockTime mockTime = new MockTime(0L, 0L, 0L);
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setMockTime(mockTime).build();
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        BrokerHeartbeatRequestData heartbeatRequest = new BrokerHeartbeatRequestData().setBrokerId(0).setBrokerEpoch(100L).setCurrentMetadataOffset(123L).setWantShutDown(false);
        mockTime.sleep(100L);
        ctx.replicationControl.processExpiredBrokerHeartbeat(heartbeatRequest);
        Optional<BrokerHeartbeatManager.BrokerHeartbeatState> state = ctx.clusterControl.heartbeatManager().brokers().stream().filter(broker -> broker.id() == 0).findFirst();
        Assertions.assertTrue((boolean)state.isPresent());
        Assertions.assertEquals((int)0, (int)state.get().id());
        Assertions.assertEquals((long)100000000L, (long)state.get().lastContactNs);
        Assertions.assertEquals((long)123L, (long)state.get().metadataOffset);
    }

    @Test
    public void testReassignPartitionsHandlesNewReassignmentThatRemovesPreviouslyAddingReplicas() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replication = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2, 3, 4, 5);
        ctx.unfenceBrokers(0, 1, 2, 3, 4, 5);
        String topic = "topic-1";
        Uuid topicId = ctx.createTestTopic(topic, new int[][]{{0, 1}}).topicId();
        log.debug("Created topic with ID {}", (Object)topicId);
        Assertions.assertEquals((Object)NONE_REASSIGNING, (Object)replication.listPartitionReassignments(null, Long.MAX_VALUE));
        MultiTenantPrincipal multiTenantPrincipal = new MultiTenantPrincipal("lkc-abcd", new TenantMetadata("lkc-abcd", "lkc-abcd"));
        ControllerResult alterResultOne = replication.alterPartitionReassignments(new AlterPartitionReassignmentsRequestData().setTopics(Arrays.asList(new AlterPartitionReassignmentsRequestData.ReassignableTopic().setName(topic).setPartitions(Arrays.asList(new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(0).setReplicas(Arrays.asList(2, 3)))))), (KafkaPrincipal)multiTenantPrincipal);
        Assertions.assertEquals((Object)new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(Arrays.asList(new AlterPartitionReassignmentsResponseData.ReassignableTopicResponse().setName(topic).setPartitions(Arrays.asList(new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(0).setErrorMessage(null))))), (Object)alterResultOne.response());
        ctx.replay(alterResultOne.records());
        ListPartitionReassignmentsResponseData currentReassigning = new ListPartitionReassignmentsResponseData().setErrorMessage(null).setTopics(Arrays.asList(new ListPartitionReassignmentsResponseData.OngoingTopicReassignment().setName(topic).setPartitions(Arrays.asList(new ListPartitionReassignmentsResponseData.OngoingPartitionReassignment().setPartitionIndex(0).setRemovingReplicas(Arrays.asList(0, 1)).setAddingReplicas(Arrays.asList(2, 3)).setReplicas(Arrays.asList(2, 3, 0, 1))))));
        Assertions.assertEquals((Object)currentReassigning, (Object)replication.listPartitionReassignments(null, Long.MAX_VALUE));
        PartitionRegistration partition = replication.getPartition(topicId, 0);
        AlterPartitionRequestData alterPartitionRequestData = new AlterPartitionRequestData().setBrokerId(partition.leader).setBrokerEpoch(ctx.currentBrokerEpoch(partition.leader)).setTopics(Arrays.asList(new AlterPartitionRequestData.TopicData().setTopicId(topicId).setPartitions(Arrays.asList(new AlterPartitionRequestData.PartitionData().setPartitionIndex(0).setPartitionEpoch(partition.partitionEpoch).setLeaderEpoch(partition.leaderEpoch).setNewIsrWithEpochs(ReplicationControlManagerTest.isrWithDefaultEpoch(0, 1, 2))))));
        ControllerResult alterPartitionResult = replication.alterPartition(ControllerRequestContextUtil.anonymousContextFor(ApiKeys.ALTER_PARTITION), ((AlterPartitionRequest)new AlterPartitionRequest.Builder(alterPartitionRequestData, true).build()).data());
        Assertions.assertEquals((Object)new AlterPartitionResponseData().setTopics(Arrays.asList(new AlterPartitionResponseData.TopicData().setTopicId(topicId).setPartitions(Arrays.asList(new AlterPartitionResponseData.PartitionData().setPartitionIndex(0).setIsr(Arrays.asList(0, 1, 2)).setPartitionEpoch(partition.partitionEpoch + 1).setErrorCode(Errors.NONE.code()))))), (Object)alterPartitionResult.response());
        ctx.replay(alterPartitionResult.records());
        ElectLeadersRequestData request = this.buildElectLeadersRequest(ElectionType.PREFERRED, Collections.singletonMap(topic, Collections.singletonList(0)));
        ControllerResult electLeaderTwoResult = replication.electLeaders(request);
        ElectLeadersResponseData.ReplicaElectionResult replicaElectionResult = new ElectLeadersResponseData.ReplicaElectionResult().setTopic(topic);
        replicaElectionResult.setPartitionResult(Arrays.asList(new ElectLeadersResponseData.PartitionResult().setPartitionId(0).setErrorCode(Errors.NONE.code()).setErrorMessage(null)));
        Assertions.assertEquals((Object)new ElectLeadersResponseData().setErrorCode(Errors.NONE.code()).setReplicaElectionResults(Arrays.asList(replicaElectionResult)), (Object)electLeaderTwoResult.response());
        ctx.replay(electLeaderTwoResult.records());
        partition = replication.getPartition(topicId, 0);
        Assertions.assertEquals((int)2, (int)partition.leader);
        ControllerResult alterResultTwo = replication.alterPartitionReassignments(new AlterPartitionReassignmentsRequestData().setTopics(Arrays.asList(new AlterPartitionReassignmentsRequestData.ReassignableTopic().setName(topic).setPartitions(Arrays.asList(new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(0).setReplicas(Arrays.asList(4, 5)))))), (KafkaPrincipal)multiTenantPrincipal);
        Assertions.assertEquals((Object)new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(Arrays.asList(new AlterPartitionReassignmentsResponseData.ReassignableTopicResponse().setName(topic).setPartitions(Arrays.asList(new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(0).setErrorMessage(null))))), (Object)alterResultTwo.response());
        ctx.replay(alterResultTwo.records());
        currentReassigning = new ListPartitionReassignmentsResponseData().setErrorMessage(null).setTopics(Arrays.asList(new ListPartitionReassignmentsResponseData.OngoingTopicReassignment().setName(topic).setPartitions(Arrays.asList(new ListPartitionReassignmentsResponseData.OngoingPartitionReassignment().setPartitionIndex(0).setRemovingReplicas(Arrays.asList(0, 1, 2, 3)).setAddingReplicas(Arrays.asList(4, 5)).setReplicas(Arrays.asList(4, 5, 0, 1, 2, 3))))));
        Assertions.assertEquals((Object)currentReassigning, (Object)replication.listPartitionReassignments(null, Long.MAX_VALUE));
        partition = replication.getPartition(topicId, 0);
        Assertions.assertEquals((int)2, (int)partition.leader);
        Assertions.assertTrue((boolean)Replicas.toSet((int[])partition.replicas).contains(partition.leader));
        AlterPartitionRequestData alterPartitionRequestDataTwo = new AlterPartitionRequestData().setBrokerId(partition.leader).setBrokerEpoch(ctx.currentBrokerEpoch(partition.leader)).setTopics(Arrays.asList(new AlterPartitionRequestData.TopicData().setTopicId(topicId).setPartitions(Arrays.asList(new AlterPartitionRequestData.PartitionData().setPartitionIndex(0).setPartitionEpoch(partition.partitionEpoch).setLeaderEpoch(partition.leaderEpoch).setNewIsrWithEpochs(ReplicationControlManagerTest.isrWithDefaultEpoch(0, 1, 2, 3, 4, 5))))));
        ControllerResult alterPartitionResultTwo = replication.alterPartition(ControllerRequestContextUtil.anonymousContextFor(ApiKeys.ALTER_PARTITION), ((AlterPartitionRequest)new AlterPartitionRequest.Builder(alterPartitionRequestDataTwo, true).build()).data());
        Assertions.assertEquals((Object)new AlterPartitionResponseData().setTopics(Arrays.asList(new AlterPartitionResponseData.TopicData().setTopicId(topicId).setPartitions(Arrays.asList(new AlterPartitionResponseData.PartitionData().setPartitionIndex(0).setErrorCode(Errors.NEW_LEADER_ELECTED.code()))))), (Object)alterPartitionResultTwo.response());
        ctx.replay(alterPartitionResultTwo.records());
        partition = replication.getPartition(topicId, 0);
        Assertions.assertEquals((int)4, (int)partition.leader);
        Assertions.assertEquals((Object)NONE_REASSIGNING, (Object)replication.listPartitionReassignments(null, Long.MAX_VALUE));
    }

    private static AlterPartitionRequestData.BrokerState brokerState(int brokerId, Long brokerEpoch) {
        return new AlterPartitionRequestData.BrokerState().setBrokerId(brokerId).setBrokerEpoch(brokerEpoch.longValue());
    }

    private static Long defaultBrokerEpoch(int brokerId) {
        return (long)brokerId + 100L;
    }

    private static List<AlterPartitionRequestData.BrokerState> isrWithDefaultEpoch(Integer ... isr) {
        return Arrays.stream(isr).map(brokerId -> ReplicationControlManagerTest.brokerState(brokerId, ReplicationControlManagerTest.defaultBrokerEpoch(brokerId))).collect(Collectors.toList());
    }

    @Test
    public void testDuplicateTopicIdReplay() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        replicationControl.replay(new TopicRecord().setName("foo").setTopicId(Uuid.fromString((String)"Ktv3YkMQRe-MId4VkkrMyw")));
        Assertions.assertEquals((Object)"Found duplicate TopicRecord for foo with topic ID Ktv3YkMQRe-MId4VkkrMyw", (Object)((RuntimeException)Assertions.assertThrows(RuntimeException.class, () -> replicationControl.replay(new TopicRecord().setName("foo").setTopicId(Uuid.fromString((String)"Ktv3YkMQRe-MId4VkkrMyw"))))).getMessage());
        Assertions.assertEquals((Object)"Found duplicate TopicRecord for foo with a different ID than before. Previous ID was Ktv3YkMQRe-MId4VkkrMyw and new ID is 8auUWq8zQqe_99H_m2LAmw", (Object)((RuntimeException)Assertions.assertThrows(RuntimeException.class, () -> replicationControl.replay(new TopicRecord().setName("foo").setTopicId(Uuid.fromString((String)"8auUWq8zQqe_99H_m2LAmw"))))).getMessage());
    }

    @Test
    void testHandleAssignReplicasToDirsFailsOnOlderMv() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setMetadataVersion(MetadataVersion.IBP_3_7_IV1).build();
        Assertions.assertThrows(UnsupportedVersionException.class, () -> ctx.replicationControl.handleAssignReplicasToDirs(new AssignReplicasToDirsRequestData()));
    }

    @Test
    void testHandleAssignReplicasToDirs() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        final Uuid dir1b1 = Uuid.fromString((String)"hO2YI5bgRUmByNPHiHxjNQ");
        final Uuid dir2b1 = Uuid.fromString((String)"R3Gb1HLoTzuKMgAkH5Vtpw");
        Uuid dir1b2 = Uuid.fromString((String)"TBGa8UayQi6KguqF5nC0sw");
        final Uuid offlineDir = Uuid.fromString((String)"zvAf9BKZRyyrEWz4FX2nLA");
        ctx.registerBrokersWithDirs(1, Arrays.asList(dir1b1, dir2b1), 2, Collections.singletonList(dir1b2));
        ctx.unfenceBrokers(1, 2);
        final Uuid topicA = ctx.createTestTopic("a", new int[][]{{1, 2}, {1, 2}, {1, 2}}).topicId();
        final Uuid topicB = ctx.createTestTopic("b", new int[][]{{1, 2}, {1, 2}}).topicId();
        final Uuid topicC = ctx.createTestTopic("c", new int[][]{{2}}).topicId();
        ControllerResult<AssignReplicasToDirsResponseData> controllerResult = ctx.assignReplicasToDirs(1, (Map<TopicIdPartition, Uuid>)new HashMap<TopicIdPartition, Uuid>(){
            {
                this.put(new TopicIdPartition(topicA, 0), dir1b1);
                this.put(new TopicIdPartition(topicA, 1), dir2b1);
                this.put(new TopicIdPartition(topicA, 2), offlineDir);
                this.put(new TopicIdPartition(topicB, 0), dir1b1);
                this.put(new TopicIdPartition(topicB, 1), DirectoryId.LOST);
                this.put(new TopicIdPartition(Uuid.fromString((String)"nLU9hKNXSZuMe5PO2A4dVQ"), 1), dir2b1);
                this.put(new TopicIdPartition(topicA, 137), dir1b1);
                this.put(new TopicIdPartition(topicC, 0), dir1b1);
            }
        });
        Assertions.assertEquals((Object)AssignmentsHelper.normalize((AssignReplicasToDirsResponseData)AssignmentsHelper.buildResponseData((short)0, (int)0, (Map)new HashMap<Uuid, Map<TopicIdPartition, Errors>>(){
            {
                this.put(dir1b1, new HashMap<TopicIdPartition, Errors>(){
                    {
                        this.put(new TopicIdPartition(topicA, 0), Errors.NONE);
                        this.put(new TopicIdPartition(topicA, 137), Errors.UNKNOWN_TOPIC_OR_PARTITION);
                        this.put(new TopicIdPartition(topicB, 0), Errors.NONE);
                        this.put(new TopicIdPartition(topicC, 0), Errors.NOT_LEADER_OR_FOLLOWER);
                    }
                });
                this.put(dir2b1, new HashMap<TopicIdPartition, Errors>(){
                    {
                        this.put(new TopicIdPartition(topicA, 1), Errors.NONE);
                        this.put(new TopicIdPartition(Uuid.fromString((String)"nLU9hKNXSZuMe5PO2A4dVQ"), 1), Errors.UNKNOWN_TOPIC_ID);
                    }
                });
                this.put(offlineDir, new HashMap<TopicIdPartition, Errors>(){
                    {
                        this.put(new TopicIdPartition(topicA, 2), Errors.NONE);
                    }
                });
                this.put(DirectoryId.LOST, new HashMap<TopicIdPartition, Errors>(){
                    {
                        this.put(new TopicIdPartition(topicB, 1), Errors.NONE);
                    }
                });
            }
        })), (Object)AssignmentsHelper.normalize((AssignReplicasToDirsResponseData)((AssignReplicasToDirsResponseData)controllerResult.response())));
        short recordVersion = ctx.featureControl.metadataVersion().partitionChangeRecordVersion();
        Assertions.assertEquals(ReplicationControlManagerTest.sortPartitionChangeRecords(Arrays.asList(new ApiMessageAndVersion((ApiMessage)new PartitionChangeRecord().setTopicId(topicA).setPartitionId(0).setDirectories(Arrays.asList(dir1b1, dir1b2)), recordVersion), new ApiMessageAndVersion((ApiMessage)new PartitionChangeRecord().setTopicId(topicA).setPartitionId(1).setDirectories(Arrays.asList(dir2b1, dir1b2)), recordVersion), new ApiMessageAndVersion((ApiMessage)new PartitionChangeRecord().setTopicId(topicA).setPartitionId(2).setDirectories(Arrays.asList(offlineDir, dir1b2)), recordVersion), new ApiMessageAndVersion((ApiMessage)new PartitionChangeRecord().setTopicId(topicB).setPartitionId(0).setDirectories(Arrays.asList(dir1b1, dir1b2)), recordVersion), new ApiMessageAndVersion((ApiMessage)new PartitionChangeRecord().setTopicId(topicB).setPartitionId(1).setDirectories(Arrays.asList(DirectoryId.LOST, dir1b2)), recordVersion), new ApiMessageAndVersion((ApiMessage)new PartitionChangeRecord().setTopicId(topicA).setPartitionId(2).setIsr(Collections.singletonList(2)).setLeader(2), recordVersion), new ApiMessageAndVersion((ApiMessage)new PartitionChangeRecord().setTopicId(topicB).setPartitionId(1).setIsr(Collections.singletonList(2)).setLeader(2), recordVersion))), ReplicationControlManagerTest.sortPartitionChangeRecords(controllerResult.records()));
        ctx.replay(controllerResult.records());
        Assertions.assertEquals((Object)new HashSet<TopicIdPartition>(){
            {
                this.add(new TopicIdPartition(topicA, 0));
                this.add(new TopicIdPartition(topicA, 1));
                this.add(new TopicIdPartition(topicB, 0));
            }
        }, RecordTestUtils.iteratorToSet(ctx.replicationControl.brokersToIsrs().iterator(1, true)));
        Assertions.assertEquals((Object)new HashSet<TopicIdPartition>(){
            {
                this.add(new TopicIdPartition(topicA, 2));
                this.add(new TopicIdPartition(topicB, 1));
                this.add(new TopicIdPartition(topicC, 0));
            }
        }, RecordTestUtils.iteratorToSet(ctx.replicationControl.brokersToIsrs().iterator(2, true)));
    }

    @Test
    void testHandleDirectoriesOffline() {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
        int b1 = 101;
        int b2 = 102;
        final Uuid dir1b1 = Uuid.fromString((String)"suitdzfTTdqoWcy8VqmkUg");
        final Uuid dir2b1 = Uuid.fromString((String)"yh3acnzGSeurSTj8aIhOjw");
        final Uuid dir1b2 = Uuid.fromString((String)"OmpmJ8RjQliQlEFht56DwQ");
        final Uuid dir2b2 = Uuid.fromString((String)"w05baLpsT5Oz0LvKTKXoDw");
        ctx.registerBrokersWithDirs(b1, Arrays.asList(dir1b1, dir2b1), b2, Arrays.asList(dir1b2, dir2b2));
        ctx.unfenceBrokers(b1, b2);
        final Uuid topicA = ctx.createTestTopic("a", new int[][]{{b1, b2}, {b1, b2}}).topicId();
        final Uuid topicB = ctx.createTestTopic("b", new int[][]{{b1, b2}, {b1, b2}}).topicId();
        ctx.assignReplicasToDirs(b1, (Map<TopicIdPartition, Uuid>)new HashMap<TopicIdPartition, Uuid>(){
            {
                this.put(new TopicIdPartition(topicA, 0), dir1b1);
                this.put(new TopicIdPartition(topicA, 1), dir2b1);
                this.put(new TopicIdPartition(topicB, 0), dir1b1);
                this.put(new TopicIdPartition(topicB, 1), dir2b1);
            }
        });
        ctx.assignReplicasToDirs(b2, (Map<TopicIdPartition, Uuid>)new HashMap<TopicIdPartition, Uuid>(){
            {
                this.put(new TopicIdPartition(topicA, 0), dir1b2);
                this.put(new TopicIdPartition(topicA, 1), dir2b2);
                this.put(new TopicIdPartition(topicB, 0), dir1b2);
                this.put(new TopicIdPartition(topicB, 1), dir2b2);
            }
        });
        ArrayList<ApiMessageAndVersion> records = new ArrayList<ApiMessageAndVersion>();
        ctx.replicationControl.handleDirectoriesOffline(b1, ReplicationControlManagerTest.defaultBrokerEpoch(b1).longValue(), Arrays.asList(dir1b1, dir1b2), records);
        Assertions.assertEquals(Collections.singletonList(new ApiMessageAndVersion((ApiMessage)new BrokerRegistrationChangeRecord().setBrokerId(b1).setBrokerEpoch(ReplicationControlManagerTest.defaultBrokerEpoch(b1).longValue()).setLogDirs(Collections.singletonList(dir2b1)), 2)), ReplicationControlManagerTest.filter(records, BrokerRegistrationChangeRecord.class));
        short partitionChangeRecordVersion = ctx.featureControl.metadataVersion().partitionChangeRecordVersion();
        Assertions.assertEquals(ReplicationControlManagerTest.sortPartitionChangeRecords(Arrays.asList(new ApiMessageAndVersion((ApiMessage)new PartitionChangeRecord().setTopicId(topicA).setPartitionId(0).setLeader(b2).setIsr(Collections.singletonList(b2)), partitionChangeRecordVersion), new ApiMessageAndVersion((ApiMessage)new PartitionChangeRecord().setTopicId(topicB).setPartitionId(0).setLeader(b2).setIsr(Collections.singletonList(b2)), partitionChangeRecordVersion))), ReplicationControlManagerTest.sortPartitionChangeRecords(ReplicationControlManagerTest.filter(records, PartitionChangeRecord.class)));
        Assertions.assertEquals((int)3, (int)records.size());
        ctx.replay(records);
        Assertions.assertEquals(Collections.singletonList(dir2b1), (Object)ctx.clusterControl.registration(b1).directories());
    }

    private static List<ApiMessageAndVersion> sortPartitionChangeRecords(List<ApiMessageAndVersion> records) {
        records = new ArrayList<ApiMessageAndVersion>(records);
        records.sort(Comparator.comparing(record -> {
            PartitionChangeRecord partitionChangeRecord = (PartitionChangeRecord)record.message();
            return partitionChangeRecord.topicId() + "-" + partitionChangeRecord.partitionId();
        }));
        return records;
    }

    private static List<ApiMessageAndVersion> filter(List<ApiMessageAndVersion> records, Class<? extends ApiMessage> clazz) {
        return records.stream().filter(r -> clazz.equals(r.message().getClass())).collect(Collectors.toList());
    }

    private static class MockCreateTopicPolicy
    implements CreateTopicPolicy,
    ConfluentPartitionsPerTopicListener {
        private final List<CreateTopicPolicy.RequestMetadata> expecteds;
        private final AtomicLong index = new AtomicLong(0L);
        private int numTenantPartitions = 0;
        private int numClusterTopics = 0;
        private int numPendingClusterTopics = 0;
        private int numPendingTenantPartitions = 0;

        MockCreateTopicPolicy(List<CreateTopicPolicy.RequestMetadata> expecteds) {
            this.expecteds = expecteds;
        }

        public void validate(CreateTopicPolicy.RequestMetadata actual) throws PolicyViolationException {
            int partitionsPassed;
            long curIndex = this.index.getAndIncrement();
            if (curIndex >= (long)this.expecteds.size()) {
                throw new PolicyViolationException("Unexpected topic creation: index out of range at " + curIndex);
            }
            int n = partitionsPassed = actual.numPartitions() == null ? actual.replicasAssignments().size() : actual.numPartitions().intValue();
            if (partitionsPassed + this.numTenantPartitions >= 20) {
                throw new PolicyViolationException("Exceeds tenant partitions");
            }
            Short repFactorPassed = actual.replicationFactor();
            if (repFactorPassed != null && repFactorPassed != 3) {
                throw new PolicyViolationException("Topic replication factor must be 3");
            }
            if (this.numClusterTopics >= 3) {
                throw new PolicyViolationException("Exceeds cluster topics");
            }
        }

        public void close() {
        }

        public void configure(Map<String, ?> configs) {
        }

        public void fullUpdate(Iterator<Map.Entry<String, Integer>> iterator, Map<Integer, Integer> brokersToNumReplicas) {
        }

        public void partialUpdate(String topicName, int numPartitionsAddedOrDeleted, int numTopicsAddedOrDeleted, Map<Integer, List<Integer>> partitionIdToReplicasAdded, Map<Integer, List<Integer>> partitionIdToReplicasDeleted, boolean pending) {
            if (pending) {
                this.numPendingClusterTopics += numTopicsAddedOrDeleted;
                this.numPendingTenantPartitions += numPartitionsAddedOrDeleted;
            } else {
                this.numClusterTopics += numTopicsAddedOrDeleted;
                this.numTenantPartitions += numPartitionsAddedOrDeleted;
            }
        }

        public void clearPending() {
            this.numPendingClusterTopics = 0;
            this.numPendingTenantPartitions = 0;
        }
    }

    private static class ReplicationControlTestContext {
        final SnapshotRegistry snapshotRegistry;
        final LogContext logContext;
        final MockTime time;
        final FeatureControlManager featureControl;
        final CellControlManager cellControl;
        final ClusterControlManager clusterControl;
        final ConfigurationControlManager configurationControl;
        final MirrorTopicControlManager mirrorTopicControl;
        final AclControlManager aclControlManager;
        final ReplicationControlManager replicationControl;
        final ClusterLinkControlManager clusterLinkControl;
        final MetadataDelta metadataDelta = new MetadataDelta.Builder().setImage(MetadataImage.EMPTY).build();
        final TenantControlManager tenantControl;
        public static final Function<String, String> NAME_TO_TENANT_CALLBACK = name -> {
            if (!name.startsWith("lkc-")) {
                return null;
            }
            int delimIndex = name.indexOf("_");
            return delimIndex == -1 ? null : name.substring(0, delimIndex);
        };

        void replay(List<ApiMessageAndVersion> records) {
            RecordTestUtils.replayAll(this.clusterControl, records);
            RecordTestUtils.replayAll(this.configurationControl, records);
            RecordTestUtils.replayAll(this.replicationControl, records);
            RecordTestUtils.replayAll(this.clusterLinkControl, records);
            RecordTestUtils.replayAll(this.mirrorTopicControl, records);
            RecordTestUtils.replayAll(this.cellControl, records);
            records.forEach(record -> this.metadataDelta.replay(record.message(), record.version()));
        }

        private ReplicationControlTestContext(MetadataVersion metadataVersion, Optional<CreateTopicPolicy> createTopicPolicy, MockTime time, PartitionPlacementStrategy partitionPlacementStrategy, ReplicaPlacer replicaPlacer, int maxPartitionChangesPerSlice, Boolean isElrEnabled, LogContext logContext, SnapshotRegistry snapshotRegistry, ConfigurationControlManager configurationControl, boolean virtualTopicsEnabledInConfig) {
            this.logContext = logContext;
            this.snapshotRegistry = snapshotRegistry;
            this.aclControlManager = new AclControlManager.Builder().setLogContext(logContext).setSnapshotRegistry(snapshotRegistry).setValidLinkIdChecker(this::isValidClusterLink).build();
            this.time = time;
            this.featureControl = new FeatureControlManager.Builder().setSnapshotRegistry(snapshotRegistry).setQuorumFeatures(new QuorumFeatures(0, QuorumFeatures.defaultFeatureMap((boolean)true), Collections.singletonList(0))).setMetadataVersion(metadataVersion).build();
            this.mirrorTopicControl = new MirrorTopicControlManager(snapshotRegistry, logContext, (Time)time, this::resolveTopicId, this::resolveClusterLinkId, this::resolveClusterLink, __ -> Optional.empty(), this.featureControl);
            this.cellControl = new CellControlManager(logContext, (Time)time, snapshotRegistry, this.featureControl, new Random(0L), 15, 6, 15, true);
            this.tenantControl = new TenantControlManager(logContext, this.featureControl, this.cellControl, partitionPlacementStrategy, 3);
            this.clusterControl = new ClusterControlManager.Builder().setLogContext(logContext).setTime((Time)time).setSnapshotRegistry(snapshotRegistry).setSessionTimeoutNs(TimeUnit.MILLISECONDS.convert(1000L, TimeUnit.NANOSECONDS)).setReplicaPlacer(replicaPlacer).setFeatureControlManager(this.featureControl).setCellControlManager(this.cellControl).setBrokerUncleanShutdownHandler(this::handleUncleanBrokerShutdown).build();
            this.configurationControl = configurationControl;
            this.replicationControl = new ReplicationControlManager.Builder().setSnapshotRegistry(snapshotRegistry).setLogContext(logContext).setMaxElectionsPerImbalance(Integer.MAX_VALUE).setConfigurationControl(configurationControl).setClusterControl(this.clusterControl).setCreateTopicPolicy(createTopicPolicy).setMirrorTopicControl(this.mirrorTopicControl).setFeatureControl(this.featureControl).setTenantControl(this.tenantControl).setNameToTenantCallback(NAME_TO_TENANT_CALLBACK).setMaxPartitionChangesPerSlice(maxPartitionChangesPerSlice).setEligibleLeaderReplicasEnabled(isElrEnabled.booleanValue()).setTime((Time)time).setVirtualTopicsEnabledInConfig(virtualTopicsEnabledInConfig).build();
            this.clusterLinkControl = new ClusterLinkControlManager(snapshotRegistry, logContext, configurationControl, this.mirrorTopicControl, this.featureControl, arg_0 -> ((ReplicationControlManager)this.replicationControl).unlinkMirrorTopic(arg_0), arg_0 -> ((AclControlManager)this.aclControlManager).unlinkAcls(arg_0), "clusterId", Optional.empty());
            this.clusterControl.activate();
        }

        Boolean isValidClusterLink(Uuid linkId) {
            return this.clusterLinkControl.isValidLinkId(linkId);
        }

        Optional<Uuid> resolveTopicId(String topicName) {
            return Optional.ofNullable(this.replicationControl.getTopicId(topicName));
        }

        Optional<Uuid> resolveClusterLinkId(String linkName) {
            return this.clusterLinkControl.getClusterLinkId(linkName);
        }

        Optional<ClusterLink> resolveClusterLink(Uuid linkId) {
            return this.clusterLinkControl.getClusterLink(linkId.toString());
        }

        void handleUncleanBrokerShutdown(int brokerId, List<ApiMessageAndVersion> records) {
            this.replicationControl.handleBrokerUncleanShutdown(brokerId, records);
        }

        CreateTopicsResponseData.CreatableTopicResult createTestTopic(String name, int numPartitions, short replicationFactor, short expectedErrorCode) {
            return this.createTestTopic(name, numPartitions, replicationFactor, expectedErrorCode, __ -> {});
        }

        CreateTopicsResponseData.CreatableTopicResult createTestTopic(String name, int numPartitions, short replicationFactor, short expectedErrorCode, Consumer<CreateTopicsRequestData.CreatableTopic> preCreationHook) {
            CreateTopicsRequestData request = new CreateTopicsRequestData();
            CreateTopicsRequestData.CreatableTopic topic = new CreateTopicsRequestData.CreatableTopic().setName(name);
            topic.setNumPartitions(numPartitions).setReplicationFactor(replicationFactor);
            preCreationHook.accept(topic);
            request.topics().add((ImplicitLinkedHashCollection.Element)topic);
            ControllerRequestContext requestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.CREATE_TOPICS);
            ControllerResult result = this.replicationControl.createTopics(requestContext, request, Collections.singleton(name));
            CreateTopicsResponseData.CreatableTopicResult topicResult = ((CreateTopicsResponseData)result.response()).topics().find(name);
            Assertions.assertNotNull((Object)topicResult);
            Assertions.assertEquals((Object)Errors.forCode((short)expectedErrorCode), (Object)Errors.forCode((short)topicResult.errorCode()));
            if (expectedErrorCode == Errors.NONE.code()) {
                this.replay(result.records());
            }
            return topicResult;
        }

        CreateTopicsResponseData.CreatableTopicResult createTestTopic(String name, int[][] replicas) {
            return this.createTestTopic(name, replicas, Collections.emptyMap(), (short)0);
        }

        CreateTopicsResponseData.CreatableTopicResult createTestTopic(String name, int[][] replicas, short expectedErrorCode) {
            return this.createTestTopic(name, replicas, Collections.emptyMap(), expectedErrorCode);
        }

        CreateTopicsResponseData.CreatableTopicResult createVirtualTopic(String name, short expectedErrorCode) {
            return this.createTestTopic(name, new int[0][], Collections.singletonMap("confluent.topic.type", TopicType.VIRTUAL.logConfigValue()), expectedErrorCode);
        }

        CreateTopicsResponseData.CreatableTopicResult createVirtualTopic(String name) {
            return this.createVirtualTopic(name, (short)0);
        }

        CreateTopicsResponseData.CreatableTopicResult createLinkTopic(String linkName, String topicName, String mirrorTopicName, Uuid mirrorTopicId) throws Exception {
            CreateTopicsRequestData.CreatableTopic topic = new CreateTopicsRequestData.CreatableTopic().setName(topicName).setMirrorTopic(mirrorTopicName).setSourceTopicId(mirrorTopicId).setLinkName(linkName).setNumPartitions(-1).setReplicationFactor((short)-1);
            CreateTopicsRequestData request = new CreateTopicsRequestData();
            request.topics().add((ImplicitLinkedHashCollection.Element)topic);
            ControllerRequestContext requestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.CREATE_TOPICS);
            ControllerResult result = this.replicationControl.createTopics(requestContext, request, Collections.singleton(topicName));
            CreateTopicsResponseData.CreatableTopicResult topicResult = ((CreateTopicsResponseData)result.response()).topics().find(topicName);
            Assertions.assertNotNull((Object)topicResult);
            Assertions.assertEquals((Object)Errors.NONE, (Object)Errors.forCode((short)topicResult.errorCode()));
            this.replay(result.records());
            return topicResult;
        }

        CreateTopicsResponseData.CreatableTopicResult createTestTopic(String name, int partitions, Map<String, String> configs, short expectedErrorCode) {
            return this.createTestTopic(name, partitions, (short)-1, configs, expectedErrorCode);
        }

        CreateTopicsResponseData.CreatableTopicResult createTestTopic(String name, int partitions, short replicationFactor, Map<String, String> configs, short expectedErrorCode) {
            CreateTopicsRequestData request = new CreateTopicsRequestData();
            CreateTopicsRequestData.CreatableTopic topic = new CreateTopicsRequestData.CreatableTopic().setName(name);
            topic.setNumPartitions(partitions).setReplicationFactor(replicationFactor);
            configs.entrySet().forEach(e -> topic.configs().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreateableTopicConfig().setName((String)e.getKey()).setValue((String)e.getValue())));
            request.topics().add((ImplicitLinkedHashCollection.Element)topic);
            ControllerRequestContext requestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.CREATE_TOPICS);
            ControllerResult result = this.replicationControl.createTopics(requestContext, request, Collections.singleton(name));
            CreateTopicsResponseData.CreatableTopicResult topicResult = ((CreateTopicsResponseData)result.response()).topics().find(name);
            Assertions.assertNotNull((Object)topicResult);
            Assertions.assertEquals((short)expectedErrorCode, (short)topicResult.errorCode());
            if (expectedErrorCode == Errors.NONE.code()) {
                Assertions.assertEquals((int)partitions, (int)topicResult.numPartitions());
                this.replay(result.records());
            }
            return topicResult;
        }

        CreateTopicsResponseData.CreatableTopicResult createTestTopic(String name, int[][] replicas, Map<String, String> configs, short expectedErrorCode) {
            boolean virtual;
            boolean bl = virtual = configs.containsKey("confluent.topic.type") && configs.get("confluent.topic.type").equals(TopicType.VIRTUAL.logConfigValue());
            if (!virtual) {
                Assertions.assertNotEquals((int)0, (int)replicas.length);
            }
            CreateTopicsRequestData request = new CreateTopicsRequestData();
            CreateTopicsRequestData.CreatableTopic topic = new CreateTopicsRequestData.CreatableTopic().setName(name);
            topic.setNumPartitions(-1).setReplicationFactor((short)-1);
            for (int i = 0; i < replicas.length; ++i) {
                topic.assignments().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableReplicaAssignment().setPartitionIndex(i).setBrokerIds(Replicas.toList((int[])replicas[i])));
            }
            configs.forEach((key, value) -> topic.configs().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreateableTopicConfig().setName(key).setValue(value)));
            request.topics().add((ImplicitLinkedHashCollection.Element)topic);
            ControllerRequestContext requestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.CREATE_TOPICS);
            ControllerResult result = this.replicationControl.createTopics(requestContext, request, Collections.singleton(name));
            CreateTopicsResponseData.CreatableTopicResult topicResult = ((CreateTopicsResponseData)result.response()).topics().find(name);
            Assertions.assertNotNull((Object)topicResult);
            Assertions.assertEquals((short)expectedErrorCode, (short)topicResult.errorCode());
            if (expectedErrorCode == Errors.NONE.code()) {
                if (!virtual) {
                    Assertions.assertEquals((int)replicas.length, (int)topicResult.numPartitions());
                    Assertions.assertEquals((int)replicas[0].length, (int)topicResult.replicationFactor());
                }
                this.replay(result.records());
            }
            return topicResult;
        }

        void deleteTopic(ControllerRequestContext context, Uuid topicId) {
            ControllerResult result = this.replicationControl.deleteTopics(context, Collections.singleton(topicId));
            Assertions.assertEquals(Collections.singleton(topicId), ((Map)result.response()).keySet());
            Assertions.assertEquals((Object)Errors.NONE, (Object)((ApiError)((Map)result.response()).get(topicId)).error());
            Assertions.assertEquals((int)1, (int)result.records().size());
            ApiMessageAndVersion removeRecordAndVersion = (ApiMessageAndVersion)result.records().get(0);
            Assertions.assertInstanceOf(RemoveTopicRecord.class, (Object)removeRecordAndVersion.message());
            RemoveTopicRecord removeRecord = (RemoveTopicRecord)removeRecordAndVersion.message();
            Assertions.assertEquals((Object)topicId, (Object)removeRecord.topicId());
            this.replay(result.records());
        }

        void createPartitions(int count, String name, int[][] replicas, short expectedErrorCode) {
            if (replicas != null) {
                Assertions.assertNotEquals((int)0, (int)replicas.length);
            }
            CreatePartitionsRequestData.CreatePartitionsTopic topic = new CreatePartitionsRequestData.CreatePartitionsTopic().setName(name).setCount(count);
            if (replicas == null) {
                topic.setAssignments(null);
            } else {
                for (int i = 0; i < replicas.length; ++i) {
                    topic.assignments().add(new CreatePartitionsRequestData.CreatePartitionsAssignment().setBrokerIds(Replicas.toList((int[])replicas[i])));
                }
            }
            ControllerRequestContext requestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.CREATE_PARTITIONS);
            ControllerResult result = this.replicationControl.createPartitions(requestContext, Collections.singletonList(topic));
            Assertions.assertEquals((int)1, (int)((List)result.response()).size());
            CreatePartitionsResponseData.CreatePartitionsTopicResult topicResult = (CreatePartitionsResponseData.CreatePartitionsTopicResult)((List)result.response()).get(0);
            Assertions.assertEquals((Object)name, (Object)topicResult.name());
            Assertions.assertEquals((short)expectedErrorCode, (short)topicResult.errorCode());
            this.replay(result.records());
        }

        void registerBrokers(Integer ... brokerIds) {
            Object[] brokersAndDirs = new Object[brokerIds.length * 2];
            for (int i = 0; i < brokerIds.length; ++i) {
                brokersAndDirs[i * 2] = brokerIds[i];
                brokersAndDirs[i * 2 + 1] = Collections.singletonList(Uuid.fromString((String)("TESTBROKER" + Integer.toString(100000 + brokerIds[i]).substring(1) + "DIRAAAA")));
            }
            this.registerBrokersWithDirs(brokersAndDirs);
        }

        void registerBrokersWithDirs(Object ... brokerIdsAndDirs) {
            if (brokerIdsAndDirs.length % 2 != 0) {
                throw new IllegalArgumentException("uneven number of arguments");
            }
            for (int i = 0; i < brokerIdsAndDirs.length / 2; ++i) {
                int brokerId = (Integer)brokerIdsAndDirs[i * 2];
                List logDirs = (List)brokerIdsAndDirs[i * 2 + 1];
                RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().setBrokerEpoch(ReplicationControlManagerTest.defaultBrokerEpoch(brokerId).longValue()).setBrokerId(brokerId).setRack(null).setLogDirs(logDirs);
                brokerRecord.endPoints().add((ImplicitLinkedHashCollection.Element)new RegisterBrokerRecord.BrokerEndpoint().setSecurityProtocol(SecurityProtocol.PLAINTEXT.id).setPort(9092 + brokerId).setName("PLAINTEXT").setHost("localhost"));
                this.replay(Collections.singletonList(new ApiMessageAndVersion((ApiMessage)brokerRecord, 3)));
            }
        }

        void registerBrokers(Map<Integer, String> idsToRack) throws Exception {
            for (Map.Entry<Integer, String> entry : idsToRack.entrySet()) {
                Integer brokerId = entry.getKey();
                String rack = entry.getValue();
                RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().setBrokerEpoch((long)(brokerId + 100)).setBrokerId(brokerId.intValue()).setRack(rack);
                brokerRecord.endPoints().add((ImplicitLinkedHashCollection.Element)new RegisterBrokerRecord.BrokerEndpoint().setSecurityProtocol(SecurityProtocol.PLAINTEXT.id).setPort(9092 + brokerId).setName("PLAINTEXT").setHost("localhost"));
                this.replay(Collections.singletonList(new ApiMessageAndVersion((ApiMessage)brokerRecord, 0)));
            }
        }

        void setClusterLinkState(TopicIdPartition topicIdPartition, AlterPartitionRequestData.ClusterLinkState clusterLinkState) throws Exception {
            PartitionRegistration partition = this.replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
            Assertions.assertNotNull((Object)partition);
            BrokerRegistration registration = (BrokerRegistration)this.clusterControl.brokerRegistrations().get(partition.leader);
            Assertions.assertFalse((boolean)registration.fenced());
            AlterPartitionRequestData.PartitionData partitionData = new AlterPartitionRequestData.PartitionData().setPartitionIndex(topicIdPartition.partitionId()).setPartitionEpoch(partition.partitionEpoch).setLeaderEpoch(partition.leaderEpoch).setLeaderRecoveryState(partition.leaderRecoveryState.value()).setNewIsrWithEpochs(Arrays.stream(partition.isr).mapToObj(brokerId -> ReplicationControlManagerTest.brokerState(brokerId, ReplicationControlManagerTest.defaultBrokerEpoch(brokerId))).collect(Collectors.toList())).setClusterLinkState(clusterLinkState);
            this.alterPartition(topicIdPartition, partitionData, partition.leader, registration.epoch());
        }

        private void alterPartition(TopicIdPartition topicIdPartition, AlterPartitionRequestData.PartitionData partitionData, int leaderId, long brokerEpoch) throws Exception {
            String topicName = this.replicationControl.getTopic(topicIdPartition.topicId()).name();
            AlterPartitionRequestData.TopicData topicData = new AlterPartitionRequestData.TopicData().setTopicName(topicName).setTopicId(topicIdPartition.topicId()).setPartitions(Collections.singletonList(partitionData));
            ControllerRequestContext requestContext = ControllerRequestContextUtil.anonymousContextFor(ApiKeys.ALTER_PARTITION);
            ControllerResult alterPartition = this.replicationControl.alterPartition(requestContext, new AlterPartitionRequestData().setBrokerId(leaderId).setBrokerEpoch(brokerEpoch).setTopics(Collections.singletonList(topicData)));
            this.replay(alterPartition.records());
        }

        void handleBrokersUncleanShutdown(Integer ... brokerIds) {
            ArrayList<ApiMessageAndVersion> records = new ArrayList<ApiMessageAndVersion>();
            Integer[] integerArray = brokerIds;
            int n = integerArray.length;
            for (int i = 0; i < n; ++i) {
                int brokerId = integerArray[i];
                this.replicationControl.handleBrokerUncleanShutdown(brokerId, records);
            }
            this.replay(records);
        }

        void alterPartition(TopicIdPartition topicIdPartition, int leaderId, List<AlterPartitionRequestData.BrokerState> isrWithEpoch, LeaderRecoveryState leaderRecoveryState) throws Exception {
            BrokerRegistration registration = (BrokerRegistration)this.clusterControl.brokerRegistrations().get(leaderId);
            Assertions.assertFalse((boolean)registration.fenced());
            PartitionRegistration partition = this.replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
            Assertions.assertNotNull((Object)partition);
            Assertions.assertEquals((int)leaderId, (int)partition.leader);
            AlterPartitionRequestData.PartitionData partitionData = new AlterPartitionRequestData.PartitionData().setPartitionIndex(topicIdPartition.partitionId()).setPartitionEpoch(partition.partitionEpoch).setLeaderEpoch(partition.leaderEpoch).setLeaderRecoveryState(leaderRecoveryState.value()).setNewIsrWithEpochs(isrWithEpoch);
            this.alterPartition(topicIdPartition, partitionData, leaderId, registration.epoch());
        }

        void unfenceBrokers(Integer ... brokerIds) {
            this.unfenceBrokers(Utils.mkSet((Object[])brokerIds));
        }

        void unfenceBrokers(Set<Integer> brokerIds) {
            for (int brokerId : brokerIds) {
                ControllerResult result = this.replicationControl.processBrokerHeartbeat(new BrokerHeartbeatRequestData().setBrokerId(brokerId).setBrokerEpoch(ReplicationControlManagerTest.defaultBrokerEpoch(brokerId).longValue()).setCurrentMetadataOffset(1L).setWantFence(false).setWantShutDown(false), 0L);
                Assertions.assertEquals((Object)new BrokerHeartbeatReply(true, false, false, false), (Object)result.response());
                this.replay(result.records());
            }
        }

        void inControlledShutdownBrokers(Integer ... brokerIds) {
            this.inControlledShutdownBrokers(Utils.mkSet((Object[])brokerIds));
        }

        void inControlledShutdownBrokers(Set<Integer> brokerIds) {
            for (int brokerId : brokerIds) {
                BrokerRegistrationChangeRecord record = new BrokerRegistrationChangeRecord().setBrokerId(brokerId).setBrokerEpoch(ReplicationControlManagerTest.defaultBrokerEpoch(brokerId).longValue()).setInControlledShutdown(BrokerRegistrationInControlledShutdownChange.IN_CONTROLLED_SHUTDOWN.value());
                this.replay(Collections.singletonList(new ApiMessageAndVersion((ApiMessage)record, 1)));
            }
        }

        void addDegradationsToBrokers(Set<Integer> brokerIds) throws Exception {
            for (int brokerId : brokerIds) {
                BrokerRegistrationChangeRecord record = new BrokerRegistrationChangeRecord().setBrokerId(brokerId).setBrokerEpoch((long)(brokerId + 100)).setDegradedComponents(Collections.singletonList(new BrokerRegistrationChangeRecord.DegradedComponent().setReason("rcca-123").setComponentCode(BrokerComponent.UNSPECIFIED.id())));
                this.replay(Collections.singletonList(new ApiMessageAndVersion((ApiMessage)record, 1)));
            }
        }

        void removeDegradationsFromBrokers(Set<Integer> brokerIds) throws Exception {
            for (int brokerId : brokerIds) {
                BrokerRegistrationChangeRecord record = new BrokerRegistrationChangeRecord().setBrokerId(brokerId).setBrokerEpoch((long)(brokerId + 100)).setDegradedComponents(Collections.emptyList());
                this.replay(Collections.singletonList(new ApiMessageAndVersion((ApiMessage)record, 1)));
            }
        }

        void alterTopicConfig(String topic, String configKey, String configValue) {
            ConfigRecord configRecord = new ConfigRecord().setResourceType(ConfigResource.Type.TOPIC.id()).setResourceName(topic).setName(configKey).setValue(configValue);
            this.replay(Collections.singletonList(new ApiMessageAndVersion((ApiMessage)configRecord, 0)));
        }

        void fenceBrokers(Set<Integer> brokerIds) {
            this.time.sleep(1000L);
            Set<Integer> unfencedBrokerIds = this.clusterControl.brokerRegistrations().keySet().stream().filter(brokerId -> !brokerIds.contains(brokerId)).collect(Collectors.toSet());
            this.unfenceBrokers(unfencedBrokerIds.toArray(new Integer[0]));
            Optional staleBroker = this.clusterControl.heartbeatManager().findOneStaleBroker();
            while (staleBroker.isPresent()) {
                ControllerResult fenceResult = this.replicationControl.maybeFenceOneStaleBroker();
                this.replay(fenceResult.records());
                staleBroker = this.clusterControl.heartbeatManager().findOneStaleBroker();
            }
            Assertions.assertEquals(brokerIds, (Object)this.clusterControl.fencedBrokerIds());
        }

        long currentBrokerEpoch(int brokerId) {
            Map registrations = this.clusterControl.brokerRegistrations();
            BrokerRegistration registration = (BrokerRegistration)registrations.get(brokerId);
            Assertions.assertNotNull((Object)registration, (String)("No current registration for broker " + brokerId));
            return registration.epoch();
        }

        OptionalInt currentLeader(TopicIdPartition topicIdPartition) {
            PartitionRegistration partition = this.replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
            return partition.leader < 0 ? OptionalInt.empty() : OptionalInt.of(partition.leader);
        }

        Uuid createClusterLink(String linkName) {
            ArrayList<ApiMessageAndVersion> records = new ArrayList<ApiMessageAndVersion>();
            ResultOrError resultOrError = this.clusterLinkControl.createClusterLink(new CreateClusterLinksRequestData.EntryData().setLinkName(linkName).setClusterId("remote-cluster"), records::add, null);
            this.replay(records);
            Assertions.assertTrue((boolean)resultOrError.isResult());
            return (Uuid)resultOrError.result();
        }

        ControllerResult<AssignReplicasToDirsResponseData> assignReplicasToDirs(int brokerId, Map<TopicIdPartition, Uuid> assignment) {
            ControllerResult result = this.replicationControl.handleAssignReplicasToDirs(AssignmentsHelper.buildRequestData((int)brokerId, (long)ReplicationControlManagerTest.defaultBrokerEpoch(brokerId), assignment));
            Assertions.assertNotNull((Object)result.response());
            Assertions.assertEquals((short)Errors.NONE.code(), (short)((AssignReplicasToDirsResponseData)result.response()).errorCode());
            this.replay(result.records());
            return result;
        }

        private static class Builder {
            private Optional<CreateTopicPolicy> createTopicPolicy = Optional.empty();
            private MetadataVersion metadataVersion = MetadataVersion.latestTesting();
            private MockTime mockTime = new MockTime();
            private PartitionPlacementStrategy partitionPlacementStrategy = PartitionPlacementStrategy.CLUSTER_WIDE;
            private ReplicaPlacer replicaPlacer;
            private int maxPartitionChangesPerSlice;
            private boolean isElrEnabled = false;
            private LogContext logContext = new LogContext();
            private SnapshotRegistry snapshotRegistry = new SnapshotRegistry(this.logContext);
            private boolean virtualTopicsEnabledInConfig;
            private Function<SnapshotRegistry, ConfigurationControlManager> configurationControl = snapshotRegistry -> new ConfigurationControlManager.Builder().setSnapshotRegistry(snapshotRegistry).setKafkaConfigSchema(QuorumControllerTestEnv.DEFAULT_SCHEMA).build();

            private Builder() {
            }

            Builder setCreateTopicPolicy(CreateTopicPolicy createTopicPolicy) {
                this.createTopicPolicy = Optional.of(createTopicPolicy);
                return this;
            }

            Builder setMetadataVersion(MetadataVersion metadataVersion) {
                this.metadataVersion = metadataVersion;
                return this;
            }

            Builder setIsElrEnabled(Boolean isElrEnabled) {
                this.isElrEnabled = isElrEnabled;
                return this;
            }

            Builder setMockTime(MockTime mockTime) {
                this.mockTime = mockTime;
                return this;
            }

            Builder setPartitionPlacementStrategy(PartitionPlacementStrategy partitionPlacementStrategy) {
                this.partitionPlacementStrategy = partitionPlacementStrategy;
                return this;
            }

            Builder setReplicaPlacer(ReplicaPlacer replicaPlacer) {
                this.replicaPlacer = replicaPlacer;
                return this;
            }

            Builder setMaxPartitionChangesPerSlice(int maxPartitionChangesPerSlice) {
                this.maxPartitionChangesPerSlice = maxPartitionChangesPerSlice;
                return this;
            }

            Builder setLogContext(LogContext logContext) {
                this.logContext = logContext;
                return this;
            }

            Builder setSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
                this.snapshotRegistry = snapshotRegistry;
                return this;
            }

            Builder setConfigurationControl(Function<SnapshotRegistry, ConfigurationControlManager> configurationControl) {
                this.configurationControl = configurationControl;
                return this;
            }

            Builder setVirtualTopicsEnabledInConfig(boolean virtualTopicsEnabledInConfig) {
                this.virtualTopicsEnabledInConfig = virtualTopicsEnabledInConfig;
                return this;
            }

            ReplicationControlTestContext build() {
                if (this.replicaPlacer == null) {
                    this.replicaPlacer = new StripedReplicaPlacer((Random)new MockRandom());
                }
                return new ReplicationControlTestContext(this.metadataVersion, this.createTopicPolicy, this.mockTime, this.partitionPlacementStrategy, this.replicaPlacer, this.maxPartitionChangesPerSlice, this.isElrEnabled, this.logContext, this.snapshotRegistry, this.configurationControl.apply(this.snapshotRegistry), this.virtualTopicsEnabledInConfig);
            }

            ReplicationControlTestContext build(MetadataVersion metadataVersion) {
                if (this.replicaPlacer == null) {
                    this.replicaPlacer = new StripedReplicaPlacer((Random)new MockRandom());
                }
                return new ReplicationControlTestContext(metadataVersion, this.createTopicPolicy, this.mockTime, this.partitionPlacementStrategy, this.replicaPlacer, this.maxPartitionChangesPerSlice, this.isElrEnabled, this.logContext, this.snapshotRegistry, this.configurationControl.apply(this.snapshotRegistry), this.virtualTopicsEnabledInConfig);
            }
        }
    }
}

