/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.controller;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
import org.apache.kafka.common.errors.StaleBrokerEpochException;
import org.apache.kafka.common.message.AlterIsrRequestData;
import org.apache.kafka.common.message.AlterIsrResponseData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
import org.apache.kafka.common.message.CreatePartitionsRequestData;
import org.apache.kafka.common.message.CreatePartitionsResponseData;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsResponseData;
import org.apache.kafka.common.message.ElectLeadersRequestData;
import org.apache.kafka.common.message.ElectLeadersResponseData;
import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.controller.BrokersToIsrs;
import org.apache.kafka.controller.ClusterControlManager;
import org.apache.kafka.controller.ConfigurationControlManager;
import org.apache.kafka.controller.ControllerMetrics;
import org.apache.kafka.controller.ControllerResult;
import org.apache.kafka.controller.MockControllerMetrics;
import org.apache.kafka.controller.MockRandom;
import org.apache.kafka.controller.ReplicaPlacer;
import org.apache.kafka.controller.ReplicationControlManager;
import org.apache.kafka.controller.ResultOrError;
import org.apache.kafka.controller.StripedReplicaPlacer;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.metadata.Replicas;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Timeout(value=40L)
public class ReplicationControlManagerTest {
    private static final Logger log = LoggerFactory.getLogger(ReplicationControlManagerTest.class);
    private static final int BROKER_SESSION_TIMEOUT_MS = 1000;
    private static final ListPartitionReassignmentsResponseData NONE_REASSIGNING = new ListPartitionReassignmentsResponseData().setErrorMessage(null);

    @Test
    public void testCreateTopics() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        CreateTopicsRequestData request = new CreateTopicsRequestData();
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions(-1).setReplicationFactor((short)-1));
        ControllerResult result = replicationControl.createTopics(request);
        CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData();
        expectedResponse.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsResponseData.CreatableTopicResult().setName("foo").setErrorCode(Errors.INVALID_REPLICATION_FACTOR.code()).setErrorMessage("Unable to replicate the partition 3 time(s): All brokers are currently fenced."));
        Assertions.assertEquals((Object)expectedResponse, (Object)result.response());
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        ControllerResult result2 = replicationControl.createTopics(request);
        CreateTopicsResponseData expectedResponse2 = new CreateTopicsResponseData();
        expectedResponse2.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsResponseData.CreatableTopicResult().setName("foo").setNumPartitions(1).setReplicationFactor((short)3).setErrorMessage(null).setErrorCode((short)0).setTopicId(((CreateTopicsResponseData)result2.response()).topics().find("foo").topicId()));
        Assertions.assertEquals((Object)expectedResponse2, (Object)result2.response());
        ctx.replay(result2.records());
        Assertions.assertEquals((Object)new PartitionRegistration(new int[]{1, 2, 0}, new int[]{1, 2, 0}, Replicas.NONE, Replicas.NONE, 1, 0, 0), (Object)replicationControl.getPartition(((TopicRecord)((ApiMessageAndVersion)result2.records().get(0)).message()).topicId(), 0));
        ControllerResult result3 = replicationControl.createTopics(request);
        CreateTopicsResponseData expectedResponse3 = new CreateTopicsResponseData();
        expectedResponse3.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsResponseData.CreatableTopicResult().setName("foo").setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code()).setErrorMessage(Errors.TOPIC_ALREADY_EXISTS.exception().getMessage()));
        Assertions.assertEquals((Object)expectedResponse3, (Object)result3.response());
        Uuid fooId = ((CreateTopicsResponseData)result2.response()).topics().find("foo").topicId();
        RecordTestUtils.assertBatchIteratorContains(Arrays.asList(Arrays.asList(new ApiMessageAndVersion((ApiMessage)new PartitionRecord().setPartitionId(0).setTopicId(fooId).setReplicas(Arrays.asList(1, 2, 0)).setIsr(Arrays.asList(1, 2, 0)).setRemovingReplicas(Collections.emptyList()).setAddingReplicas(Collections.emptyList()).setLeader(1).setLeaderEpoch(0).setPartitionEpoch(0), 0), new ApiMessageAndVersion((ApiMessage)new TopicRecord().setTopicId(fooId).setName("foo"), 0))), (Iterator<List<ApiMessageAndVersion>>)ctx.replicationControl.iterator(Long.MAX_VALUE));
    }

    @Test
    public void testGlobalTopicAndPartitionMetrics() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        CreateTopicsRequestData request = new CreateTopicsRequestData();
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions(1).setReplicationFactor((short)-1));
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        ArrayList<Uuid> topicsToDelete = new ArrayList<Uuid>();
        ControllerResult result = replicationControl.createTopics(request);
        topicsToDelete.add(((CreateTopicsResponseData)result.response()).topics().find("foo").topicId());
        RecordTestUtils.replayAll(replicationControl, result.records());
        Assertions.assertEquals((int)1, (int)ctx.metrics.globalTopicsCount());
        request = new CreateTopicsRequestData();
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("bar").setNumPartitions(1).setReplicationFactor((short)-1));
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("baz").setNumPartitions(2).setReplicationFactor((short)-1));
        result = replicationControl.createTopics(request);
        RecordTestUtils.replayAll(replicationControl, result.records());
        Assertions.assertEquals((int)3, (int)ctx.metrics.globalTopicsCount());
        Assertions.assertEquals((int)4, (int)ctx.metrics.globalPartitionCount());
        topicsToDelete.add(((CreateTopicsResponseData)result.response()).topics().find("baz").topicId());
        ControllerResult deleteResult = replicationControl.deleteTopics(topicsToDelete);
        RecordTestUtils.replayAll(replicationControl, deleteResult.records());
        Assertions.assertEquals((int)1, (int)ctx.metrics.globalTopicsCount());
        Assertions.assertEquals((int)1, (int)ctx.metrics.globalPartitionCount());
        Uuid topicToDelete = ((CreateTopicsResponseData)result.response()).topics().find("bar").topicId();
        deleteResult = replicationControl.deleteTopics(Collections.singletonList(topicToDelete));
        RecordTestUtils.replayAll(replicationControl, deleteResult.records());
        Assertions.assertEquals((int)0, (int)ctx.metrics.globalTopicsCount());
        Assertions.assertEquals((int)0, (int)ctx.metrics.globalPartitionCount());
    }

    @Test
    public void testOfflinePartitionAndReplicaImbalanceMetrics() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2, 3);
        ctx.unfenceBrokers(0, 1, 2, 3);
        CreateTopicsResponseData.CreatableTopicResult foo = ctx.createTestTopic("foo", new int[][]{{0, 2}, {0, 1}});
        CreateTopicsResponseData.CreatableTopicResult zar = ctx.createTestTopic("zar", new int[][]{{0, 1, 2}, {1, 2, 3}, {1, 2, 0}});
        ControllerResult result = replicationControl.unregisterBroker(0);
        ctx.replay(result.records());
        Assertions.assertEquals((int)0, (int)ctx.metrics.offlinePartitionCount());
        Assertions.assertEquals((int)3, (int)ctx.metrics.preferredReplicaImbalanceCount());
        result = replicationControl.unregisterBroker(1);
        ctx.replay(result.records());
        Assertions.assertEquals((int)1, (int)ctx.metrics.offlinePartitionCount());
        Assertions.assertEquals((int)5, (int)ctx.metrics.preferredReplicaImbalanceCount());
        result = replicationControl.unregisterBroker(2);
        ctx.replay(result.records());
        Assertions.assertEquals((int)4, (int)ctx.metrics.offlinePartitionCount());
        result = replicationControl.unregisterBroker(3);
        ctx.replay(result.records());
        Assertions.assertEquals((int)5, (int)ctx.metrics.offlinePartitionCount());
        ArrayList<Object> records = new ArrayList<ApiMessageAndVersion>();
        replicationControl.deleteTopic(foo.topicId(), records);
        ctx.replay(records);
        Assertions.assertEquals((int)3, (int)ctx.metrics.offlinePartitionCount());
        records = new ArrayList();
        replicationControl.deleteTopic(zar.topicId(), records);
        ctx.replay(records);
        Assertions.assertEquals((int)0, (int)ctx.metrics.offlinePartitionCount());
    }

    @Test
    public void testValidateNewTopicNames() {
        HashMap topicErrors = new HashMap();
        CreateTopicsRequestData.CreatableTopicCollection topics = new CreateTopicsRequestData.CreatableTopicCollection();
        topics.add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName(""));
        topics.add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("woo"));
        topics.add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("."));
        ReplicationControlManager.validateNewTopicNames(topicErrors, (CreateTopicsRequestData.CreatableTopicCollection)topics);
        HashMap<String, ApiError> expectedTopicErrors = new HashMap<String, ApiError>();
        expectedTopicErrors.put("", new ApiError(Errors.INVALID_TOPIC_EXCEPTION, "Topic name is illegal, it can't be empty"));
        expectedTopicErrors.put(".", new ApiError(Errors.INVALID_TOPIC_EXCEPTION, "Topic name cannot be \".\" or \"..\""));
        Assertions.assertEquals(expectedTopicErrors, topicErrors);
    }

    @Test
    public void testRemoveLeaderships() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2, 3);
        ctx.unfenceBrokers(0, 1, 2, 3);
        CreateTopicsResponseData.CreatableTopicResult result = ctx.createTestTopic("foo", new int[][]{{0, 1, 2}, {1, 2, 3}, {2, 3, 0}, {0, 2, 1}});
        HashSet<BrokersToIsrs.TopicIdPartition> expectedPartitions = new HashSet<BrokersToIsrs.TopicIdPartition>();
        expectedPartitions.add(new BrokersToIsrs.TopicIdPartition(result.topicId(), 0));
        expectedPartitions.add(new BrokersToIsrs.TopicIdPartition(result.topicId(), 3));
        Assertions.assertEquals(expectedPartitions, RecordTestUtils.iteratorToSet(replicationControl.brokersToIsrs().iterator(0, true)));
        ArrayList<ApiMessageAndVersion> records = new ArrayList<ApiMessageAndVersion>();
        replicationControl.handleBrokerFenced(0, records);
        ctx.replay(records);
        Assertions.assertEquals(Collections.emptySet(), RecordTestUtils.iteratorToSet(replicationControl.brokersToIsrs().iterator(0, true)));
    }

    @Test
    public void testShrinkAndExpandIsr() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        CreateTopicsResponseData.CreatableTopicResult createTopicResult = ctx.createTestTopic("foo", new int[][]{{0, 1, 2}});
        BrokersToIsrs.TopicIdPartition topicIdPartition = new BrokersToIsrs.TopicIdPartition(createTopicResult.topicId(), 0);
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        Assertions.assertEquals((Object)OptionalInt.of(0), (Object)ctx.currentLeader(topicIdPartition));
        long brokerEpoch = ctx.currentBrokerEpoch(0);
        AlterIsrRequestData.PartitionData shrinkIsrRequest = this.newAlterIsrPartition(replicationControl, topicIdPartition, Arrays.asList(0, 1));
        ControllerResult<AlterIsrResponseData> shrinkIsrResult = this.sendAlterIsr(replicationControl, 0, brokerEpoch, "foo", shrinkIsrRequest);
        AlterIsrResponseData.PartitionData shrinkIsrResponse = this.assertAlterIsrResponse(shrinkIsrResult, topicPartition, Errors.NONE);
        this.assertConsistentAlterIsrResponse(replicationControl, topicIdPartition, shrinkIsrResponse);
        AlterIsrRequestData.PartitionData expandIsrRequest = this.newAlterIsrPartition(replicationControl, topicIdPartition, Arrays.asList(0, 1, 2));
        ControllerResult<AlterIsrResponseData> expandIsrResult = this.sendAlterIsr(replicationControl, 0, brokerEpoch, "foo", expandIsrRequest);
        AlterIsrResponseData.PartitionData expandIsrResponse = this.assertAlterIsrResponse(expandIsrResult, topicPartition, Errors.NONE);
        this.assertConsistentAlterIsrResponse(replicationControl, topicIdPartition, expandIsrResponse);
    }

    @Test
    public void testInvalidAlterIsrRequests() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2);
        ctx.unfenceBrokers(0, 1, 2);
        CreateTopicsResponseData.CreatableTopicResult createTopicResult = ctx.createTestTopic("foo", new int[][]{{0, 1, 2}});
        BrokersToIsrs.TopicIdPartition topicIdPartition = new BrokersToIsrs.TopicIdPartition(createTopicResult.topicId(), 0);
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        Assertions.assertEquals((Object)OptionalInt.of(0), (Object)ctx.currentLeader(topicIdPartition));
        long brokerEpoch = ctx.currentBrokerEpoch(0);
        AlterIsrRequestData.PartitionData invalidLeaderRequest = this.newAlterIsrPartition(replicationControl, topicIdPartition, Arrays.asList(0, 1));
        ControllerResult<AlterIsrResponseData> invalidLeaderResult = this.sendAlterIsr(replicationControl, 1, ctx.currentBrokerEpoch(1), "foo", invalidLeaderRequest);
        this.assertAlterIsrResponse(invalidLeaderResult, topicPartition, Errors.INVALID_REQUEST);
        AlterIsrRequestData.PartitionData invalidBrokerEpochRequest = this.newAlterIsrPartition(replicationControl, topicIdPartition, Arrays.asList(0, 1));
        Assertions.assertThrows(StaleBrokerEpochException.class, () -> this.sendAlterIsr(replicationControl, 0, brokerEpoch - 1L, "foo", invalidBrokerEpochRequest));
        AlterIsrRequestData.PartitionData invalidLeaderEpochRequest = this.newAlterIsrPartition(replicationControl, topicIdPartition, Arrays.asList(0, 1));
        invalidLeaderEpochRequest.setLeaderEpoch(500);
        ControllerResult<AlterIsrResponseData> invalidLeaderEpochResult = this.sendAlterIsr(replicationControl, 1, ctx.currentBrokerEpoch(1), "foo", invalidLeaderEpochRequest);
        this.assertAlterIsrResponse(invalidLeaderEpochResult, topicPartition, Errors.FENCED_LEADER_EPOCH);
        AlterIsrRequestData.PartitionData invalidIsrRequest1 = this.newAlterIsrPartition(replicationControl, topicIdPartition, Arrays.asList(0, 1));
        invalidIsrRequest1.setNewIsr(Arrays.asList(0, 1, 3));
        ControllerResult<AlterIsrResponseData> invalidIsrResult1 = this.sendAlterIsr(replicationControl, 1, ctx.currentBrokerEpoch(1), "foo", invalidIsrRequest1);
        this.assertAlterIsrResponse(invalidIsrResult1, topicPartition, Errors.INVALID_REQUEST);
        AlterIsrRequestData.PartitionData invalidIsrRequest2 = this.newAlterIsrPartition(replicationControl, topicIdPartition, Arrays.asList(0, 1));
        invalidIsrRequest2.setNewIsr(Arrays.asList(1, 2));
        ControllerResult<AlterIsrResponseData> invalidIsrResult2 = this.sendAlterIsr(replicationControl, 1, ctx.currentBrokerEpoch(1), "foo", invalidIsrRequest2);
        this.assertAlterIsrResponse(invalidIsrResult2, topicPartition, Errors.INVALID_REQUEST);
    }

    private AlterIsrRequestData.PartitionData newAlterIsrPartition(ReplicationControlManager replicationControl, BrokersToIsrs.TopicIdPartition topicIdPartition, List<Integer> newIsr) {
        PartitionRegistration partitionControl = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
        return new AlterIsrRequestData.PartitionData().setPartitionIndex(0).setLeaderEpoch(partitionControl.leaderEpoch).setCurrentIsrVersion(partitionControl.partitionEpoch).setNewIsr(newIsr);
    }

    private ControllerResult<AlterIsrResponseData> sendAlterIsr(ReplicationControlManager replicationControl, int brokerId, long brokerEpoch, String topic, AlterIsrRequestData.PartitionData partitionData) throws Exception {
        AlterIsrRequestData request = new AlterIsrRequestData().setBrokerId(brokerId).setBrokerEpoch(brokerEpoch);
        AlterIsrRequestData.TopicData topicData = new AlterIsrRequestData.TopicData().setName(topic);
        request.topics().add(topicData);
        topicData.partitions().add(partitionData);
        ControllerResult result = replicationControl.alterIsr(request);
        RecordTestUtils.replayAll(replicationControl, result.records());
        return result;
    }

    private AlterIsrResponseData.PartitionData assertAlterIsrResponse(ControllerResult<AlterIsrResponseData> alterIsrResult, TopicPartition topicPartition, Errors expectedError) {
        AlterIsrResponseData response = (AlterIsrResponseData)alterIsrResult.response();
        Assertions.assertEquals((int)1, (int)response.topics().size());
        AlterIsrResponseData.TopicData topicData = (AlterIsrResponseData.TopicData)response.topics().get(0);
        Assertions.assertEquals((Object)topicPartition.topic(), (Object)topicData.name());
        Assertions.assertEquals((int)1, (int)topicData.partitions().size());
        AlterIsrResponseData.PartitionData partitionData = (AlterIsrResponseData.PartitionData)topicData.partitions().get(0);
        Assertions.assertEquals((int)topicPartition.partition(), (int)partitionData.partitionIndex());
        Assertions.assertEquals((Object)expectedError, (Object)Errors.forCode((short)partitionData.errorCode()));
        return partitionData;
    }

    private void assertConsistentAlterIsrResponse(ReplicationControlManager replicationControl, BrokersToIsrs.TopicIdPartition topicIdPartition, AlterIsrResponseData.PartitionData partitionData) {
        PartitionRegistration partitionControl = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
        Assertions.assertEquals((int)partitionControl.leader, (int)partitionData.leaderId());
        Assertions.assertEquals((int)partitionControl.leaderEpoch, (int)partitionData.leaderEpoch());
        Assertions.assertEquals((int)partitionControl.partitionEpoch, (int)partitionData.currentIsrVersion());
        List expectedIsr = IntStream.of(partitionControl.isr).boxed().collect(Collectors.toList());
        Assertions.assertEquals(expectedIsr, (Object)partitionData.isr());
    }

    private void assertCreatedTopicConfigs(ReplicationControlTestContext ctx, String topic, CreateTopicsRequestData.CreateableTopicConfigCollection requestConfigs) {
        Map configs = ctx.configurationControl.getConfigs(new ConfigResource(ConfigResource.Type.TOPIC, topic));
        Assertions.assertEquals((int)requestConfigs.size(), (int)configs.size());
        for (CreateTopicsRequestData.CreateableTopicConfig requestConfig : requestConfigs) {
            String value = (String)configs.get(requestConfig.name());
            Assertions.assertEquals((Object)requestConfig.value(), (Object)value);
        }
    }

    private void assertEmptyTopicConfigs(ReplicationControlTestContext ctx, String topic) {
        Map configs = ctx.configurationControl.getConfigs(new ConfigResource(ConfigResource.Type.TOPIC, topic));
        Assertions.assertEquals(Collections.emptyMap(), (Object)configs);
    }

    @Test
    public void testDeleteTopics() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        CreateTopicsRequestData request = new CreateTopicsRequestData();
        CreateTopicsRequestData.CreateableTopicConfigCollection requestConfigs = new CreateTopicsRequestData.CreateableTopicConfigCollection();
        requestConfigs.add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreateableTopicConfig().setName("cleanup.policy").setValue("compact"));
        requestConfigs.add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreateableTopicConfig().setName("min.cleanable.dirty.ratio").setValue("0.1"));
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions(3).setReplicationFactor((short)2).setConfigs(requestConfigs));
        ctx.registerBrokers(0, 1);
        ctx.unfenceBrokers(0, 1);
        ControllerResult createResult = replicationControl.createTopics(request);
        CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData();
        Uuid topicId = ((CreateTopicsResponseData)createResult.response()).topics().find("foo").topicId();
        expectedResponse.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsResponseData.CreatableTopicResult().setName("foo").setNumPartitions(3).setReplicationFactor((short)2).setErrorMessage(null).setErrorCode((short)0).setTopicId(topicId));
        Assertions.assertEquals((Object)expectedResponse, (Object)createResult.response());
        Assertions.assertNull((Object)replicationControl.getPartition(topicId, 0));
        this.assertEmptyTopicConfigs(ctx, "foo");
        ctx.replay(createResult.records());
        Assertions.assertNotNull((Object)replicationControl.getPartition(topicId, 0));
        Assertions.assertNotNull((Object)replicationControl.getPartition(topicId, 1));
        Assertions.assertNotNull((Object)replicationControl.getPartition(topicId, 2));
        Assertions.assertNull((Object)replicationControl.getPartition(topicId, 3));
        this.assertCreatedTopicConfigs(ctx, "foo", requestConfigs);
        Assertions.assertEquals(Collections.singletonMap(topicId, new ResultOrError((Object)"foo")), (Object)replicationControl.findTopicNames(Long.MAX_VALUE, Collections.singleton(topicId)));
        Assertions.assertEquals(Collections.singletonMap("foo", new ResultOrError((Object)topicId)), (Object)replicationControl.findTopicIds(Long.MAX_VALUE, Collections.singleton("foo")));
        Uuid invalidId = new Uuid(topicId.getMostSignificantBits() + 1L, topicId.getLeastSignificantBits());
        Assertions.assertEquals(Collections.singletonMap(invalidId, new ResultOrError(new ApiError(Errors.UNKNOWN_TOPIC_ID))), (Object)replicationControl.findTopicNames(Long.MAX_VALUE, Collections.singleton(invalidId)));
        Assertions.assertEquals(Collections.singletonMap("bar", new ResultOrError(new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION))), (Object)replicationControl.findTopicIds(Long.MAX_VALUE, Collections.singleton("bar")));
        ControllerResult invalidDeleteResult = replicationControl.deleteTopics(Collections.singletonList(invalidId));
        Assertions.assertEquals((int)0, (int)invalidDeleteResult.records().size());
        Assertions.assertEquals(Collections.singletonMap(invalidId, new ApiError(Errors.UNKNOWN_TOPIC_ID, null)), (Object)invalidDeleteResult.response());
        ControllerResult deleteResult = replicationControl.deleteTopics(Collections.singletonList(topicId));
        Assertions.assertTrue((boolean)deleteResult.isAtomic());
        Assertions.assertEquals(Collections.singletonMap(topicId, new ApiError(Errors.NONE, null)), (Object)deleteResult.response());
        Assertions.assertEquals((int)1, (int)deleteResult.records().size());
        ctx.replay(deleteResult.records());
        Assertions.assertNull((Object)replicationControl.getPartition(topicId, 0));
        Assertions.assertNull((Object)replicationControl.getPartition(topicId, 1));
        Assertions.assertNull((Object)replicationControl.getPartition(topicId, 2));
        Assertions.assertNull((Object)replicationControl.getPartition(topicId, 3));
        Assertions.assertEquals(Collections.singletonMap(topicId, new ResultOrError(new ApiError(Errors.UNKNOWN_TOPIC_ID))), (Object)replicationControl.findTopicNames(Long.MAX_VALUE, Collections.singleton(topicId)));
        Assertions.assertEquals(Collections.singletonMap("foo", new ResultOrError(new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION))), (Object)replicationControl.findTopicIds(Long.MAX_VALUE, Collections.singleton("foo")));
        this.assertEmptyTopicConfigs(ctx, "foo");
    }

    @Test
    public void testCreatePartitions() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
        ReplicationControlManager replicationControl = ctx.replicationControl;
        CreateTopicsRequestData request = new CreateTopicsRequestData();
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions(3).setReplicationFactor((short)2));
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("bar").setNumPartitions(4).setReplicationFactor((short)2));
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("quux").setNumPartitions(2).setReplicationFactor((short)2));
        request.topics().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableTopic().setName("foo2").setNumPartitions(2).setReplicationFactor((short)2));
        ctx.registerBrokers(0, 1);
        ctx.unfenceBrokers(0, 1);
        ControllerResult createTopicResult = replicationControl.createTopics(request);
        ctx.replay(createTopicResult.records());
        ArrayList<CreatePartitionsRequestData.CreatePartitionsTopic> topics = new ArrayList<CreatePartitionsRequestData.CreatePartitionsTopic>();
        topics.add(new CreatePartitionsRequestData.CreatePartitionsTopic().setName("foo").setCount(5).setAssignments(null));
        topics.add(new CreatePartitionsRequestData.CreatePartitionsTopic().setName("bar").setCount(3).setAssignments(null));
        topics.add(new CreatePartitionsRequestData.CreatePartitionsTopic().setName("baz").setCount(3).setAssignments(null));
        topics.add(new CreatePartitionsRequestData.CreatePartitionsTopic().setName("quux").setCount(2).setAssignments(null));
        ControllerResult createPartitionsResult = replicationControl.createPartitions(topics);
        Assertions.assertEquals(Arrays.asList(new CreatePartitionsResponseData.CreatePartitionsTopicResult().setName("foo").setErrorCode(Errors.NONE.code()).setErrorMessage(null), new CreatePartitionsResponseData.CreatePartitionsTopicResult().setName("bar").setErrorCode(Errors.INVALID_PARTITIONS.code()).setErrorMessage("The topic bar currently has 4 partition(s); 3 would not be an increase."), new CreatePartitionsResponseData.CreatePartitionsTopicResult().setName("baz").setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()).setErrorMessage(null), new CreatePartitionsResponseData.CreatePartitionsTopicResult().setName("quux").setErrorCode(Errors.INVALID_PARTITIONS.code()).setErrorMessage("Topic already has 2 partition(s).")), (Object)createPartitionsResult.response());
        ctx.replay(createPartitionsResult.records());
        ArrayList<CreatePartitionsRequestData.CreatePartitionsTopic> topics2 = new ArrayList<CreatePartitionsRequestData.CreatePartitionsTopic>();
        topics2.add(new CreatePartitionsRequestData.CreatePartitionsTopic().setName("foo").setCount(6).setAssignments(Arrays.asList(new CreatePartitionsRequestData.CreatePartitionsAssignment().setBrokerIds(Arrays.asList(1, 0)))));
        topics2.add(new CreatePartitionsRequestData.CreatePartitionsTopic().setName("bar").setCount(5).setAssignments(Arrays.asList(new CreatePartitionsRequestData.CreatePartitionsAssignment().setBrokerIds(Arrays.asList(1)))));
        topics2.add(new CreatePartitionsRequestData.CreatePartitionsTopic().setName("quux").setCount(4).setAssignments(Arrays.asList(new CreatePartitionsRequestData.CreatePartitionsAssignment().setBrokerIds(Arrays.asList(1, 0)))));
        topics2.add(new CreatePartitionsRequestData.CreatePartitionsTopic().setName("foo2").setCount(3).setAssignments(Arrays.asList(new CreatePartitionsRequestData.CreatePartitionsAssignment().setBrokerIds(Arrays.asList(2, 0)))));
        ControllerResult createPartitionsResult2 = replicationControl.createPartitions(topics2);
        Assertions.assertEquals(Arrays.asList(new CreatePartitionsResponseData.CreatePartitionsTopicResult().setName("foo").setErrorCode(Errors.NONE.code()).setErrorMessage(null), new CreatePartitionsResponseData.CreatePartitionsTopicResult().setName("bar").setErrorCode(Errors.INVALID_REPLICA_ASSIGNMENT.code()).setErrorMessage("The manual partition assignment includes a partition with 1 replica(s), but this is not consistent with previous partitions, which have 2 replica(s)."), new CreatePartitionsResponseData.CreatePartitionsTopicResult().setName("quux").setErrorCode(Errors.INVALID_REPLICA_ASSIGNMENT.code()).setErrorMessage("Attempted to add 2 additional partition(s), but only 1 assignment(s) were specified."), new CreatePartitionsResponseData.CreatePartitionsTopicResult().setName("foo2").setErrorCode(Errors.INVALID_REPLICA_ASSIGNMENT.code()).setErrorMessage("The manual partition assignment includes broker 2, but no such broker is registered.")), (Object)createPartitionsResult2.response());
        ctx.replay(createPartitionsResult2.records());
    }

    @Test
    public void testValidateGoodManualPartitionAssignments() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
        ctx.registerBrokers(1, 2, 3);
        ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(1), OptionalInt.of(1));
        ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(1), OptionalInt.empty());
        ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(1, 2, 3), OptionalInt.of(3));
        ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(1, 2, 3), OptionalInt.empty());
    }

    @Test
    public void testValidateBadManualPartitionAssignments() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
        ctx.registerBrokers(1, 2);
        Assertions.assertEquals((Object)"The manual partition assignment includes an empty replica list.", (Object)((InvalidReplicaAssignmentException)Assertions.assertThrows(InvalidReplicaAssignmentException.class, () -> ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(new Integer[0]), OptionalInt.empty()))).getMessage());
        Assertions.assertEquals((Object)"The manual partition assignment includes broker 3, but no such broker is registered.", (Object)((InvalidReplicaAssignmentException)Assertions.assertThrows(InvalidReplicaAssignmentException.class, () -> ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(1, 2, 3), OptionalInt.empty()))).getMessage());
        Assertions.assertEquals((Object)"The manual partition assignment includes the broker 2 more than once.", (Object)((InvalidReplicaAssignmentException)Assertions.assertThrows(InvalidReplicaAssignmentException.class, () -> ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(1, 2, 2), OptionalInt.empty()))).getMessage());
        Assertions.assertEquals((Object)"The manual partition assignment includes a partition with 2 replica(s), but this is not consistent with previous partitions, which have 3 replica(s).", (Object)((InvalidReplicaAssignmentException)Assertions.assertThrows(InvalidReplicaAssignmentException.class, () -> ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(1, 2), OptionalInt.of(3)))).getMessage());
    }

    @Test
    public void testReassignPartitions() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
        ReplicationControlManager replication = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2, 3);
        ctx.unfenceBrokers(0, 1, 2, 3);
        Uuid fooId = ctx.createTestTopic("foo", new int[][]{{1, 2, 3}, {3, 2, 1}}).topicId();
        ctx.createTestTopic("bar", new int[][]{{1, 2, 3}}).topicId();
        Assertions.assertEquals((Object)NONE_REASSIGNING, (Object)replication.listPartitionReassignments(null));
        ControllerResult alterResult = replication.alterPartitionReassignments(new AlterPartitionReassignmentsRequestData().setTopics(Arrays.asList(new AlterPartitionReassignmentsRequestData.ReassignableTopic().setName("foo").setPartitions(Arrays.asList(new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(0).setReplicas(Arrays.asList(3, 2, 1)), new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(1).setReplicas(Arrays.asList(0, 2, 1)), new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(2).setReplicas(Arrays.asList(0, 2, 1)))), new AlterPartitionReassignmentsRequestData.ReassignableTopic().setName("bar"))));
        Assertions.assertEquals((Object)new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(Arrays.asList(new AlterPartitionReassignmentsResponseData.ReassignableTopicResponse().setName("foo").setPartitions(Arrays.asList(new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(0).setErrorMessage(null), new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(1).setErrorMessage(null), new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(2).setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()).setErrorMessage("Unable to find partition foo:2."))), new AlterPartitionReassignmentsResponseData.ReassignableTopicResponse().setName("bar"))), (Object)alterResult.response());
        ctx.replay(alterResult.records());
        ListPartitionReassignmentsResponseData currentReassigning = new ListPartitionReassignmentsResponseData().setErrorMessage(null).setTopics(Arrays.asList(new ListPartitionReassignmentsResponseData.OngoingTopicReassignment().setName("foo").setPartitions(Arrays.asList(new ListPartitionReassignmentsResponseData.OngoingPartitionReassignment().setPartitionIndex(1).setRemovingReplicas(Arrays.asList(3)).setAddingReplicas(Arrays.asList(0)).setReplicas(Arrays.asList(0, 2, 1, 3))))));
        Assertions.assertEquals((Object)currentReassigning, (Object)replication.listPartitionReassignments(null));
        Assertions.assertEquals((Object)NONE_REASSIGNING, (Object)replication.listPartitionReassignments(Arrays.asList(new ListPartitionReassignmentsRequestData.ListPartitionReassignmentsTopics().setName("bar").setPartitionIndexes(Arrays.asList(0, 1, 2)))));
        Assertions.assertEquals((Object)currentReassigning, (Object)replication.listPartitionReassignments(Arrays.asList(new ListPartitionReassignmentsRequestData.ListPartitionReassignmentsTopics().setName("foo").setPartitionIndexes(Arrays.asList(0, 1, 2)))));
        ControllerResult cancelResult = replication.alterPartitionReassignments(new AlterPartitionReassignmentsRequestData().setTopics(Arrays.asList(new AlterPartitionReassignmentsRequestData.ReassignableTopic().setName("foo").setPartitions(Arrays.asList(new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(0).setReplicas(null), new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(1).setReplicas(null), new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(2).setReplicas(null))), new AlterPartitionReassignmentsRequestData.ReassignableTopic().setName("bar").setPartitions(Arrays.asList(new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(0).setReplicas(null))))));
        Assertions.assertEquals((Object)ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion((ApiMessage)new PartitionChangeRecord().setTopicId(fooId).setPartitionId(1).setReplicas(Arrays.asList(2, 1, 3)).setLeader(3).setRemovingReplicas(Collections.emptyList()).setAddingReplicas(Collections.emptyList()), 0)), (Object)new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(Arrays.asList(new AlterPartitionReassignmentsResponseData.ReassignableTopicResponse().setName("foo").setPartitions(Arrays.asList(new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(0).setErrorCode(Errors.NO_REASSIGNMENT_IN_PROGRESS.code()).setErrorMessage(null), new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(1).setErrorCode(Errors.NONE.code()).setErrorMessage(null), new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(2).setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()).setErrorMessage("Unable to find partition foo:2."))), new AlterPartitionReassignmentsResponseData.ReassignableTopicResponse().setName("bar").setPartitions(Arrays.asList(new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(0).setErrorCode(Errors.NO_REASSIGNMENT_IN_PROGRESS.code()).setErrorMessage(null)))))), (Object)cancelResult);
        log.info("running final alterIsr...");
        ControllerResult alterIsrResult = replication.alterIsr(new AlterIsrRequestData().setBrokerId(3).setBrokerEpoch(103L).setTopics(Arrays.asList(new AlterIsrRequestData.TopicData().setName("foo").setPartitions(Arrays.asList(new AlterIsrRequestData.PartitionData().setPartitionIndex(1).setCurrentIsrVersion(1).setLeaderEpoch(0).setNewIsr(Arrays.asList(3, 0, 2, 1)))))));
        Assertions.assertEquals((Object)new AlterIsrResponseData().setTopics(Arrays.asList(new AlterIsrResponseData.TopicData().setName("foo").setPartitions(Arrays.asList(new AlterIsrResponseData.PartitionData().setPartitionIndex(1).setErrorCode(Errors.FENCED_LEADER_EPOCH.code()))))), (Object)alterIsrResult.response());
        ctx.replay(alterIsrResult.records());
        Assertions.assertEquals((Object)NONE_REASSIGNING, (Object)replication.listPartitionReassignments(null));
    }

    @Test
    public void testCancelReassignPartitions() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
        ReplicationControlManager replication = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2, 3, 4);
        ctx.unfenceBrokers(0, 1, 2, 3, 4);
        Uuid fooId = ctx.createTestTopic("foo", new int[][]{{1, 2, 3, 4}, {0, 1, 2, 3}, {4, 3, 1, 0}, {2, 3, 4, 1}}).topicId();
        Uuid barId = ctx.createTestTopic("bar", new int[][]{{4, 3, 2}}).topicId();
        Assertions.assertEquals((Object)NONE_REASSIGNING, (Object)replication.listPartitionReassignments(null));
        ArrayList<ApiMessageAndVersion> fenceRecords = new ArrayList<ApiMessageAndVersion>();
        replication.handleBrokerFenced(3, fenceRecords);
        ctx.replay(fenceRecords);
        Assertions.assertEquals((Object)new PartitionRegistration(new int[]{1, 2, 3, 4}, new int[]{1, 2, 4}, new int[0], new int[0], 1, 1, 1), (Object)replication.getPartition(fooId, 0));
        ControllerResult alterResult = replication.alterPartitionReassignments(new AlterPartitionReassignmentsRequestData().setTopics(Arrays.asList(new AlterPartitionReassignmentsRequestData.ReassignableTopic().setName("foo").setPartitions(Arrays.asList(new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(0).setReplicas(Arrays.asList(1, 2, 3)), new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(1).setReplicas(Arrays.asList(1, 2, 3, 0)), new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(2).setReplicas(Arrays.asList(5, 6, 7)), new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(3).setReplicas(Arrays.asList(new Integer[0])))), new AlterPartitionReassignmentsRequestData.ReassignableTopic().setName("bar").setPartitions(Arrays.asList(new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(0).setReplicas(Arrays.asList(1, 2, 3, 4, 0)))))));
        Assertions.assertEquals((Object)new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(Arrays.asList(new AlterPartitionReassignmentsResponseData.ReassignableTopicResponse().setName("foo").setPartitions(Arrays.asList(new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(0).setErrorMessage(null), new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(1).setErrorMessage(null), new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(2).setErrorCode(Errors.INVALID_REPLICA_ASSIGNMENT.code()).setErrorMessage("The manual partition assignment includes broker 5, but no such broker is registered."), new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(3).setErrorCode(Errors.INVALID_REPLICA_ASSIGNMENT.code()).setErrorMessage("The manual partition assignment includes an empty replica list."))), new AlterPartitionReassignmentsResponseData.ReassignableTopicResponse().setName("bar").setPartitions(Arrays.asList(new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(0).setErrorMessage(null))))), (Object)alterResult.response());
        ctx.replay(alterResult.records());
        Assertions.assertEquals((Object)new PartitionRegistration(new int[]{1, 2, 3}, new int[]{1, 2}, new int[0], new int[0], 1, 2, 2), (Object)replication.getPartition(fooId, 0));
        Assertions.assertEquals((Object)new PartitionRegistration(new int[]{1, 2, 3, 0}, new int[]{0, 1, 2}, new int[0], new int[0], 0, 1, 2), (Object)replication.getPartition(fooId, 1));
        Assertions.assertEquals((Object)new PartitionRegistration(new int[]{1, 2, 3, 4, 0}, new int[]{4, 2}, new int[0], new int[]{0, 1}, 4, 1, 2), (Object)replication.getPartition(barId, 0));
        ListPartitionReassignmentsResponseData currentReassigning = new ListPartitionReassignmentsResponseData().setErrorMessage(null).setTopics(Arrays.asList(new ListPartitionReassignmentsResponseData.OngoingTopicReassignment().setName("bar").setPartitions(Arrays.asList(new ListPartitionReassignmentsResponseData.OngoingPartitionReassignment().setPartitionIndex(0).setRemovingReplicas(Collections.emptyList()).setAddingReplicas(Arrays.asList(0, 1)).setReplicas(Arrays.asList(1, 2, 3, 4, 0))))));
        Assertions.assertEquals((Object)currentReassigning, (Object)replication.listPartitionReassignments(null));
        Assertions.assertEquals((Object)NONE_REASSIGNING, (Object)replication.listPartitionReassignments(Arrays.asList(new ListPartitionReassignmentsRequestData.ListPartitionReassignmentsTopics().setName("foo").setPartitionIndexes(Arrays.asList(0, 1, 2)))));
        Assertions.assertEquals((Object)currentReassigning, (Object)replication.listPartitionReassignments(Arrays.asList(new ListPartitionReassignmentsRequestData.ListPartitionReassignmentsTopics().setName("bar").setPartitionIndexes(Arrays.asList(0, 1, 2)))));
        ControllerResult alterIsrResult = replication.alterIsr(new AlterIsrRequestData().setBrokerId(4).setBrokerEpoch(104L).setTopics(Arrays.asList(new AlterIsrRequestData.TopicData().setName("bar").setPartitions(Arrays.asList(new AlterIsrRequestData.PartitionData().setPartitionIndex(0).setCurrentIsrVersion(2).setLeaderEpoch(1).setNewIsr(Arrays.asList(4, 1, 2, 3, 0)))))));
        Assertions.assertEquals((Object)new AlterIsrResponseData().setTopics(Arrays.asList(new AlterIsrResponseData.TopicData().setName("bar").setPartitions(Arrays.asList(new AlterIsrResponseData.PartitionData().setPartitionIndex(0).setLeaderId(4).setLeaderEpoch(1).setIsr(Arrays.asList(4, 1, 2, 3, 0)).setCurrentIsrVersion(3).setErrorCode(Errors.NONE.code()))))), (Object)alterIsrResult.response());
        ControllerResult cancelResult = replication.alterPartitionReassignments(new AlterPartitionReassignmentsRequestData().setTopics(Arrays.asList(new AlterPartitionReassignmentsRequestData.ReassignableTopic().setName("foo").setPartitions(Arrays.asList(new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(0).setReplicas(null))), new AlterPartitionReassignmentsRequestData.ReassignableTopic().setName("bar").setPartitions(Arrays.asList(new AlterPartitionReassignmentsRequestData.ReassignablePartition().setPartitionIndex(0).setReplicas(null))))));
        Assertions.assertEquals((Object)ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion((ApiMessage)new PartitionChangeRecord().setTopicId(barId).setPartitionId(0).setLeader(4).setReplicas(Arrays.asList(2, 3, 4)).setRemovingReplicas(null).setAddingReplicas(Collections.emptyList()), 0)), (Object)new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(Arrays.asList(new AlterPartitionReassignmentsResponseData.ReassignableTopicResponse().setName("foo").setPartitions(Arrays.asList(new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(0).setErrorCode(Errors.NO_REASSIGNMENT_IN_PROGRESS.code()).setErrorMessage(null))), new AlterPartitionReassignmentsResponseData.ReassignableTopicResponse().setName("bar").setPartitions(Arrays.asList(new AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse().setPartitionIndex(0).setErrorMessage(null)))))), (Object)cancelResult);
        ctx.replay(cancelResult.records());
        Assertions.assertEquals((Object)NONE_REASSIGNING, (Object)replication.listPartitionReassignments(null));
        Assertions.assertEquals((Object)new PartitionRegistration(new int[]{2, 3, 4}, new int[]{4, 2}, new int[0], new int[0], 4, 2, 3), (Object)replication.getPartition(barId, 0));
    }

    @Test
    public void testManualPartitionAssignmentOnAllFencedBrokers() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
        ctx.registerBrokers(0, 1, 2, 3);
        ctx.createTestTopic("foo", new int[][]{{0, 1, 2}}, Errors.INVALID_REPLICA_ASSIGNMENT.code());
    }

    @Test
    public void testCreatePartitionsFailsWithManualAssignmentWithAllFenced() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
        ctx.registerBrokers(0, 1, 2, 3, 4, 5);
        ctx.unfenceBrokers(0, 1, 2);
        Uuid fooId = ctx.createTestTopic("foo", new int[][]{{0, 1, 2}}).topicId();
        ctx.createPartitions(2, "foo", new int[][]{{3, 4, 5}}, Errors.INVALID_REPLICA_ASSIGNMENT.code());
        ctx.createPartitions(2, "foo", new int[][]{{2, 4, 5}}, Errors.NONE.code());
        Assertions.assertEquals((Object)new PartitionRegistration(new int[]{2, 4, 5}, new int[]{2}, Replicas.NONE, Replicas.NONE, 2, 0, 0), (Object)ctx.replicationControl.getPartition(fooId, 1));
    }

    @Test
    public void testFenceMultipleBrokers() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
        ReplicationControlManager replication = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2, 3, 4);
        ctx.unfenceBrokers(0, 1, 2, 3, 4);
        Uuid fooId = ctx.createTestTopic("foo", new int[][]{{1, 2, 3}, {2, 3, 4}, {0, 2, 1}}).topicId();
        Assertions.assertTrue((boolean)ctx.clusterControl.fencedBrokerIds().isEmpty());
        ctx.fenceBrokers(Utils.mkSet((Object[])new Integer[]{2, 3}));
        PartitionRegistration partition0 = replication.getPartition(fooId, 0);
        PartitionRegistration partition1 = replication.getPartition(fooId, 1);
        PartitionRegistration partition2 = replication.getPartition(fooId, 2);
        Assertions.assertArrayEquals((int[])new int[]{1, 2, 3}, (int[])partition0.replicas);
        Assertions.assertArrayEquals((int[])new int[]{1}, (int[])partition0.isr);
        Assertions.assertEquals((int)1, (int)partition0.leader);
        Assertions.assertArrayEquals((int[])new int[]{2, 3, 4}, (int[])partition1.replicas);
        Assertions.assertArrayEquals((int[])new int[]{4}, (int[])partition1.isr);
        Assertions.assertEquals((int)4, (int)partition1.leader);
        Assertions.assertArrayEquals((int[])new int[]{0, 2, 1}, (int[])partition2.replicas);
        Assertions.assertArrayEquals((int[])new int[]{0, 1}, (int[])partition2.isr);
        Assertions.assertNotEquals((int)2, (int)partition2.leader);
    }

    @Test
    public void testElectLeaders() throws Exception {
        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
        ReplicationControlManager replication = ctx.replicationControl;
        ctx.registerBrokers(0, 1, 2, 3, 4);
        ctx.unfenceBrokers(2, 3, 4);
        Uuid fooId = ctx.createTestTopic("foo", new int[][]{{1, 2, 3}, {2, 3, 4}, {0, 2, 1}}).topicId();
        ElectLeadersRequestData request1 = new ElectLeadersRequestData().setElectionType((byte)0).setTopicPartitions(new ElectLeadersRequestData.TopicPartitionsCollection(Arrays.asList(new ElectLeadersRequestData.TopicPartitions().setTopic("foo").setPartitions(Arrays.asList(0, 1)), new ElectLeadersRequestData.TopicPartitions().setTopic("bar").setPartitions(Arrays.asList(0, 1))).iterator()));
        ControllerResult election1Result = replication.electLeaders(request1);
        ElectLeadersResponseData expectedResponse1 = new ElectLeadersResponseData().setErrorCode((short)0).setReplicaElectionResults(Arrays.asList(new ElectLeadersResponseData.ReplicaElectionResult().setTopic("foo").setPartitionResult(Arrays.asList(new ElectLeadersResponseData.PartitionResult().setPartitionId(0).setErrorCode(Errors.NONE.code()).setErrorMessage(null), new ElectLeadersResponseData.PartitionResult().setPartitionId(1).setErrorCode(Errors.NONE.code()).setErrorMessage(null))), new ElectLeadersResponseData.ReplicaElectionResult().setTopic("bar").setPartitionResult(Arrays.asList(new ElectLeadersResponseData.PartitionResult().setPartitionId(0).setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()).setErrorMessage("No such topic as bar"), new ElectLeadersResponseData.PartitionResult().setPartitionId(1).setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()).setErrorMessage("No such topic as bar")))));
        Assertions.assertEquals((Object)expectedResponse1, (Object)election1Result.response());
        Assertions.assertEquals(Collections.emptyList(), (Object)election1Result.records());
        ctx.unfenceBrokers(0, 1);
        ControllerResult alterIsrResult = replication.alterIsr(new AlterIsrRequestData().setBrokerId(2).setBrokerEpoch(102L).setTopics(Arrays.asList(new AlterIsrRequestData.TopicData().setName("foo").setPartitions(Arrays.asList(new AlterIsrRequestData.PartitionData().setPartitionIndex(0).setCurrentIsrVersion(0).setLeaderEpoch(0).setNewIsr(Arrays.asList(1, 2, 3)))))));
        Assertions.assertEquals((Object)new AlterIsrResponseData().setTopics(Arrays.asList(new AlterIsrResponseData.TopicData().setName("foo").setPartitions(Arrays.asList(new AlterIsrResponseData.PartitionData().setPartitionIndex(0).setLeaderId(2).setLeaderEpoch(0).setIsr(Arrays.asList(1, 2, 3)).setCurrentIsrVersion(1).setErrorCode(Errors.NONE.code()))))), (Object)alterIsrResult.response());
        ctx.replay(alterIsrResult.records());
        ControllerResult election2Result = replication.electLeaders(request1);
        Assertions.assertEquals((Object)expectedResponse1, (Object)election2Result.response());
        Assertions.assertEquals(Arrays.asList(new ApiMessageAndVersion((ApiMessage)new PartitionChangeRecord().setPartitionId(0).setTopicId(fooId).setLeader(1), 0)), (Object)election2Result.records());
    }

    private static class ReplicationControlTestContext {
        final SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
        final LogContext logContext = new LogContext();
        final MockTime time = new MockTime();
        final MockRandom random = new MockRandom();
        final ClusterControlManager clusterControl = new ClusterControlManager(this.logContext, (Time)this.time, this.snapshotRegistry, 1000L, (ReplicaPlacer)new StripedReplicaPlacer((Random)this.random));
        final ControllerMetrics metrics = new MockControllerMetrics();
        final ConfigurationControlManager configurationControl = new ConfigurationControlManager(new LogContext(), this.snapshotRegistry, Collections.emptyMap());
        final ReplicationControlManager replicationControl = new ReplicationControlManager(this.snapshotRegistry, new LogContext(), 3, 1, this.configurationControl, this.clusterControl, this.metrics);

        void replay(List<ApiMessageAndVersion> records) throws Exception {
            RecordTestUtils.replayAll(this.clusterControl, records);
            RecordTestUtils.replayAll(this.configurationControl, records);
            RecordTestUtils.replayAll(this.replicationControl, records);
        }

        ReplicationControlTestContext() {
            this.clusterControl.activate();
        }

        CreateTopicsResponseData.CreatableTopicResult createTestTopic(String name, int[][] replicas) throws Exception {
            return this.createTestTopic(name, replicas, (short)0);
        }

        CreateTopicsResponseData.CreatableTopicResult createTestTopic(String name, int[][] replicas, short expectedErrorCode) throws Exception {
            Assertions.assertFalse((replicas.length == 0 ? 1 : 0) != 0);
            CreateTopicsRequestData request = new CreateTopicsRequestData();
            CreateTopicsRequestData.CreatableTopic topic = new CreateTopicsRequestData.CreatableTopic().setName(name);
            topic.setNumPartitions(-1).setReplicationFactor((short)-1);
            for (int i = 0; i < replicas.length; ++i) {
                topic.assignments().add((ImplicitLinkedHashCollection.Element)new CreateTopicsRequestData.CreatableReplicaAssignment().setPartitionIndex(i).setBrokerIds(Replicas.toList((int[])replicas[i])));
            }
            request.topics().add((ImplicitLinkedHashCollection.Element)topic);
            ControllerResult result = this.replicationControl.createTopics(request);
            CreateTopicsResponseData.CreatableTopicResult topicResult = ((CreateTopicsResponseData)result.response()).topics().find(name);
            Assertions.assertNotNull((Object)topicResult);
            Assertions.assertEquals((short)expectedErrorCode, (short)topicResult.errorCode());
            if (expectedErrorCode == Errors.NONE.code()) {
                Assertions.assertEquals((int)replicas.length, (int)topicResult.numPartitions());
                Assertions.assertEquals((int)replicas[0].length, (int)topicResult.replicationFactor());
                this.replay(result.records());
            }
            return topicResult;
        }

        void createPartitions(int count, String name, int[][] replicas, short expectedErrorCode) throws Exception {
            Assertions.assertFalse((replicas.length == 0 ? 1 : 0) != 0);
            CreatePartitionsRequestData.CreatePartitionsTopic topic = new CreatePartitionsRequestData.CreatePartitionsTopic().setName(name).setCount(count);
            for (int i = 0; i < replicas.length; ++i) {
                topic.assignments().add(new CreatePartitionsRequestData.CreatePartitionsAssignment().setBrokerIds(Replicas.toList((int[])replicas[i])));
            }
            ControllerResult result = this.replicationControl.createPartitions(Collections.singletonList(topic));
            Assertions.assertEquals((int)1, (int)((List)result.response()).size());
            CreatePartitionsResponseData.CreatePartitionsTopicResult topicResult = (CreatePartitionsResponseData.CreatePartitionsTopicResult)((List)result.response()).get(0);
            Assertions.assertEquals((Object)name, (Object)topicResult.name());
            Assertions.assertEquals((short)expectedErrorCode, (short)topicResult.errorCode());
            this.replay(result.records());
        }

        void registerBrokers(Integer ... brokerIds) throws Exception {
            Integer[] integerArray = brokerIds;
            int n = integerArray.length;
            for (int i = 0; i < n; ++i) {
                int brokerId = integerArray[i];
                RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().setBrokerEpoch(brokerId + 100).setBrokerId(brokerId);
                brokerRecord.endPoints().add(new RegisterBrokerRecord.BrokerEndpoint().setSecurityProtocol(SecurityProtocol.PLAINTEXT.id).setPort(9092 + brokerId).setName("PLAINTEXT").setHost("localhost"));
                this.replay(Collections.singletonList(new ApiMessageAndVersion((ApiMessage)brokerRecord, 0)));
            }
        }

        void unfenceBrokers(Integer ... brokerIds) throws Exception {
            Integer[] integerArray = brokerIds;
            int n = integerArray.length;
            for (int i = 0; i < n; ++i) {
                int brokerId = integerArray[i];
                ControllerResult result = this.replicationControl.processBrokerHeartbeat(new BrokerHeartbeatRequestData().setBrokerId(brokerId).setBrokerEpoch((long)(brokerId + 100)).setCurrentMetadataOffset(1L).setWantFence(false).setWantShutDown(false), 0L);
                Assertions.assertEquals((Object)new BrokerHeartbeatReply(true, false, false, false), (Object)result.response());
                this.replay(result.records());
            }
        }

        void fenceBrokers(Set<Integer> brokerIds) throws Exception {
            this.time.sleep(1000L);
            Set<Integer> unfencedBrokerIds = this.clusterControl.brokerRegistrations().keySet().stream().filter(brokerId -> !brokerIds.contains(brokerId)).collect(Collectors.toSet());
            this.unfenceBrokers(unfencedBrokerIds.toArray(new Integer[0]));
            Optional staleBroker = this.clusterControl.heartbeatManager().findOneStaleBroker();
            while (staleBroker.isPresent()) {
                ControllerResult fenceResult = this.replicationControl.maybeFenceOneStaleBroker();
                this.replay(fenceResult.records());
                staleBroker = this.clusterControl.heartbeatManager().findOneStaleBroker();
            }
            Assertions.assertEquals(brokerIds, (Object)this.clusterControl.fencedBrokerIds());
        }

        long currentBrokerEpoch(int brokerId) {
            Map registrations = this.clusterControl.brokerRegistrations();
            BrokerRegistration registration = (BrokerRegistration)registrations.get(brokerId);
            Assertions.assertNotNull((Object)registration, (String)("No current registration for broker " + brokerId));
            return registration.epoch();
        }

        OptionalInt currentLeader(BrokersToIsrs.TopicIdPartition topicIdPartition) {
            PartitionRegistration partition = this.replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
            return partition.leader < 0 ? OptionalInt.empty() : OptionalInt.of(partition.leader);
        }
    }
}

