/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.tools.reassign;

import com.fasterxml.jackson.core.JsonProcessingException;
import java.time.Duration;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.DescribeLogDirsResult;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionReplica;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.tools.TerseException;
import org.apache.kafka.tools.ToolsTestUtils;
import org.apache.kafka.tools.reassign.ActiveMoveState;
import org.apache.kafka.tools.reassign.CompletedMoveState;
import org.apache.kafka.tools.reassign.PartitionReassignmentState;
import org.apache.kafka.tools.reassign.ReassignPartitionsCommand;
import org.apache.kafka.tools.reassign.VerifyAssignmentResult;
import org.junit.jupiter.api.Assertions;

@ClusterTestDefaults(brokers=5, disksPerBroker=3, serverProperties={@ClusterConfigProperty(key="replica.fetch.backoff.ms", value="100"), @ClusterConfigProperty(key="auto.leader.rebalance.enable", value="false"), @ClusterConfigProperty(key="replica.lag.time.max.ms", value="1000"), @ClusterConfigProperty(id=0, key="broker.rack", value="rack0"), @ClusterConfigProperty(id=1, key="broker.rack", value="rack0"), @ClusterConfigProperty(id=2, key="broker.rack", value="rack1"), @ClusterConfigProperty(id=3, key="broker.rack", value="rack1"), @ClusterConfigProperty(id=4, key="broker.rack", value="rack1")})
public class ReassignPartitionsCommandTest {
    private final ClusterInstance clusterInstance;
    private final Map<Integer, Map<String, Long>> unthrottledBrokerConfigs = IntStream.range(0, 4).boxed().collect(Collectors.toMap(Function.identity(), i -> ReassignPartitionsCommand.BROKER_LEVEL_THROTTLES.stream().collect(Collectors.toMap(Function.identity(), t -> -1L))));

    ReassignPartitionsCommandTest(ClusterInstance clusterInstance) {
        this.clusterInstance = clusterInstance;
    }

    @ClusterTest
    public void testReassignment() throws Exception {
        this.createTopics();
        this.executeAndVerifyReassignment();
    }

    @ClusterTest
    public void testHighWaterMarkAfterPartitionReassignment() throws Exception {
        this.createTopics();
        TopicPartition foo0 = new TopicPartition("foo", 0);
        this.produceMessages(foo0.topic(), foo0.partition(), 100);
        String assignment = "{\"version\":1,\"partitions\":[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[3,1,2],\"log_dirs\":[\"any\",\"any\",\"any\"]}]}";
        this.runExecuteAssignment(false, assignment, -1L, -1L);
        try (Admin admin = Admin.create(Collections.singletonMap("bootstrap.servers", this.clusterInstance.bootstrapServers()));){
            Map<TopicPartition, PartitionReassignmentState> finalAssignment = Collections.singletonMap(foo0, new PartitionReassignmentState(Arrays.asList(3, 1, 2), Arrays.asList(3, 1, 2), true));
            this.waitForVerifyAssignment(admin, assignment, false, new VerifyAssignmentResult(finalAssignment));
            TestUtils.waitForCondition(() -> {
                ListOffsetsResult.ListOffsetsResultInfo result = (ListOffsetsResult.ListOffsetsResultInfo)admin.listOffsets(Collections.singletonMap(foo0, new OffsetSpec.LatestSpec())).partitionResult(foo0).get();
                return result.offset() == 100L;
            }, (String)"Timeout for waiting offset");
        }
    }

    @ClusterTest
    public void testGenerateAssignmentWithBootstrapServer() throws Exception {
        this.createTopics();
        TopicPartition foo0 = new TopicPartition("foo", 0);
        this.produceMessages(foo0.topic(), foo0.partition(), 100);
        try (Admin admin = Admin.create(Collections.singletonMap("bootstrap.servers", this.clusterInstance.bootstrapServers()));){
            String assignment = "{\"version\":1,\"partitions\":[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[3,1,2],\"log_dirs\":[\"any\",\"any\",\"any\"]}]}";
            ReassignPartitionsCommand.generateAssignment((Admin)admin, (String)assignment, (String)"1,2,3", (Boolean)false);
            Map<TopicPartition, PartitionReassignmentState> finalAssignment = Collections.singletonMap(foo0, new PartitionReassignmentState(Arrays.asList(0, 1, 2), Arrays.asList(3, 1, 2), true));
            this.waitForVerifyAssignment(admin, assignment, false, new VerifyAssignmentResult(finalAssignment));
        }
    }

    @ClusterTest
    public void testAlterReassignmentThrottle() throws Exception {
        this.createTopics();
        this.produceMessages("foo", 0, 50);
        this.produceMessages("baz", 2, 60);
        String assignment = "{\"version\":1,\"partitions\":[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,3,2],\"log_dirs\":[\"any\",\"any\",\"any\"]},{\"topic\":\"baz\",\"partition\":2,\"replicas\":[3,2,1],\"log_dirs\":[\"any\",\"any\",\"any\"]}]}";
        try (Admin admin = Admin.create(Collections.singletonMap("bootstrap.servers", this.clusterInstance.bootstrapServers()));){
            long initialThrottle = 1L;
            this.runExecuteAssignment(false, assignment, initialThrottle, -1L);
            this.waitForInterBrokerThrottle(admin, Arrays.asList(0, 1, 2, 3), initialThrottle);
            long updatedThrottle = 300000L;
            this.runExecuteAssignment(true, assignment, updatedThrottle, -1L);
            this.waitForInterBrokerThrottle(admin, Arrays.asList(0, 1, 2, 3), updatedThrottle);
            HashMap<TopicPartition, PartitionReassignmentState> finalAssignment = new HashMap<TopicPartition, PartitionReassignmentState>();
            finalAssignment.put(new TopicPartition("foo", 0), new PartitionReassignmentState(Arrays.asList(0, 3, 2), Arrays.asList(0, 3, 2), true));
            finalAssignment.put(new TopicPartition("baz", 2), new PartitionReassignmentState(Arrays.asList(3, 2, 1), Arrays.asList(3, 2, 1), true));
            this.waitForVerifyAssignment(admin, assignment, false, new VerifyAssignmentResult(finalAssignment));
            this.waitForBrokerLevelThrottles(admin, this.unthrottledBrokerConfigs);
        }
    }

    @ClusterTest
    public void testThrottledReassignment() throws Exception {
        this.createTopics();
        this.produceMessages("foo", 0, 50);
        this.produceMessages("baz", 2, 60);
        String assignment = "{\"version\":1,\"partitions\":[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,3,2],\"log_dirs\":[\"any\",\"any\",\"any\"]},{\"topic\":\"baz\",\"partition\":2,\"replicas\":[3,2,1],\"log_dirs\":[\"any\",\"any\",\"any\"]}]}";
        HashMap<TopicPartition, PartitionReassignmentState> initialAssignment = new HashMap<TopicPartition, PartitionReassignmentState>();
        initialAssignment.put(new TopicPartition("foo", 0), new PartitionReassignmentState(Arrays.asList(0, 1, 2), Arrays.asList(0, 3, 2), true));
        initialAssignment.put(new TopicPartition("baz", 2), new PartitionReassignmentState(Arrays.asList(0, 2, 1), Arrays.asList(3, 2, 1), true));
        try (Admin admin = Admin.create(Collections.singletonMap("bootstrap.servers", this.clusterInstance.bootstrapServers()));){
            Assertions.assertEquals((Object)new VerifyAssignmentResult(initialAssignment), (Object)this.runVerifyAssignment(admin, assignment, false));
            Assertions.assertEquals(this.unthrottledBrokerConfigs, this.describeBrokerLevelThrottles(admin, this.unthrottledBrokerConfigs.keySet()));
            long interBrokerThrottle = 300000L;
            this.runExecuteAssignment(false, assignment, interBrokerThrottle, -1L);
            this.waitForInterBrokerThrottle(admin, Arrays.asList(0, 1, 2, 3), interBrokerThrottle);
            HashMap<TopicPartition, PartitionReassignmentState> finalAssignment = new HashMap<TopicPartition, PartitionReassignmentState>();
            finalAssignment.put(new TopicPartition("foo", 0), new PartitionReassignmentState(Arrays.asList(0, 3, 2), Arrays.asList(0, 3, 2), true));
            finalAssignment.put(new TopicPartition("baz", 2), new PartitionReassignmentState(Arrays.asList(3, 2, 1), Arrays.asList(3, 2, 1), true));
            TestUtils.waitForCondition(() -> {
                VerifyAssignmentResult result = this.runVerifyAssignment(admin, assignment, true);
                if (!result.partsOngoing) {
                    return true;
                }
                Assertions.assertFalse((boolean)result.partStates.values().stream().allMatch(state -> state.done), (String)("Expected at least one partition reassignment to be ongoing when result = " + String.valueOf(result)));
                Assertions.assertEquals(Arrays.asList(0, 3, 2), (Object)((PartitionReassignmentState)result.partStates.get((Object)new TopicPartition((String)"foo", (int)0))).targetReplicas);
                Assertions.assertEquals(Arrays.asList(3, 2, 1), (Object)((PartitionReassignmentState)result.partStates.get((Object)new TopicPartition((String)"baz", (int)2))).targetReplicas);
                this.waitForInterBrokerThrottle(admin, Arrays.asList(0, 1, 2, 3), interBrokerThrottle);
                return false;
            }, (String)"Expected reassignment to complete.");
            this.waitForVerifyAssignment(admin, assignment, true, new VerifyAssignmentResult(finalAssignment));
            this.waitForInterBrokerThrottle(admin, Arrays.asList(0, 1, 2, 3), interBrokerThrottle);
            this.waitForVerifyAssignment(admin, assignment, false, new VerifyAssignmentResult(finalAssignment));
            this.waitForBrokerLevelThrottles(admin, this.unthrottledBrokerConfigs);
        }
    }

    @ClusterTest
    public void testProduceAndConsumeWithReassignmentInProgress() throws Exception {
        this.createTopics();
        this.produceMessages("baz", 2, 60);
        String assignment = "{\"version\":1,\"partitions\":[{\"topic\":\"baz\",\"partition\":2,\"replicas\":[3,2,1],\"log_dirs\":[\"any\",\"any\",\"any\"]}]}";
        this.runExecuteAssignment(false, assignment, 300L, -1L);
        this.produceMessages("baz", 2, 100);
        Properties consumerProps = new Properties();
        consumerProps.put("bootstrap.servers", this.clusterInstance.bootstrapServers());
        consumerProps.put("group.id", "group");
        consumerProps.put("auto.offset.reset", "earliest");
        consumerProps.put("enable.auto.commit", "true");
        TopicPartition part = new TopicPartition("baz", 2);
        try (KafkaConsumer consumer = new KafkaConsumer(consumerProps, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer());){
            consumer.assign(Collections.singleton(part));
            ArrayList allRecords = new ArrayList();
            TestUtils.waitForCondition(() -> ReassignPartitionsCommandTest.lambda$testProduceAndConsumeWithReassignmentInProgress$5((Consumer)consumer, allRecords), (String)"Timeout for waiting enough records");
        }
        this.removeReplicationThrottleForPartitions(part);
        Map<TopicPartition, PartitionReassignmentState> finalAssignment = Collections.singletonMap(part, new PartitionReassignmentState(Arrays.asList(3, 2, 1), Arrays.asList(3, 2, 1), true));
        try (Admin admin = Admin.create(Collections.singletonMap("bootstrap.servers", this.clusterInstance.bootstrapServers()));){
            this.waitForVerifyAssignment(admin, assignment, false, new VerifyAssignmentResult(finalAssignment));
        }
    }

    @ClusterTest
    public void testCancellationWithBootstrapServer() throws Exception {
        this.testCancellationAction(true);
    }

    @ClusterTest(types={Type.KRAFT, Type.CO_KRAFT})
    public void testCancellationWithBootstrapController() throws Exception {
        this.testCancellationAction(false);
    }

    @ClusterTest
    public void testCancellationWithAddingAndRemovingReplicaInIsr() throws Exception {
        this.createTopics();
        TopicPartition foo0 = new TopicPartition("foo", 0);
        this.produceMessages(foo0.topic(), foo0.partition(), 200);
        String assignment = "{\"version\":1,\"partitions\":[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,3,4],\"log_dirs\":[\"any\",\"any\",\"any\"]}]}";
        this.setReplicationThrottleForPartitions(foo0);
        this.runExecuteAssignment(false, assignment, -1L, -1L);
        try (Admin admin = Admin.create(Collections.singletonMap("bootstrap.servers", this.clusterInstance.bootstrapServers()));){
            TestUtils.waitForCondition(() -> {
                Set isr = ((TopicDescription)((Map)admin.describeTopics(Collections.singleton(foo0.topic())).allTopicNames().get()).get(foo0.topic())).partitions().stream().filter(p -> p.partition() == foo0.partition()).flatMap(p -> p.isr().stream()).map(Node::id).collect(Collectors.toSet());
                return isr.containsAll(Arrays.asList(0, 1, 2, 3));
            }, (String)"Timed out while waiting for replica 3 to join the ISR");
        }
        Assertions.assertEquals(new AbstractMap.SimpleImmutableEntry(Collections.singleton(foo0), Collections.emptySet()), this.runCancelAssignment(assignment, true, true));
        this.verifyReplicaDeleted(new TopicPartitionReplica(foo0.topic(), foo0.partition(), 3));
        this.verifyReplicaDeleted(new TopicPartitionReplica(foo0.topic(), foo0.partition(), 4));
    }

    @ClusterTest
    public void testCancellationWithAddingReplicaInIsr() throws Exception {
        this.createTopics();
        TopicPartition foo0 = new TopicPartition("foo", 0);
        this.produceMessages(foo0.topic(), foo0.partition(), 200);
        String assignment = "{\"version\":1,\"partitions\":[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,1,2,3,4],\"log_dirs\":[\"any\",\"any\",\"any\",\"any\",\"any\"]}]}";
        this.setReplicationThrottleForPartitions(foo0);
        this.runExecuteAssignment(false, assignment, -1L, -1L);
        try (Admin admin = Admin.create(Collections.singletonMap("bootstrap.servers", this.clusterInstance.bootstrapServers()));){
            TestUtils.waitForCondition(() -> {
                Set isr = ((TopicDescription)((Map)admin.describeTopics(Collections.singleton(foo0.topic())).allTopicNames().get()).get(foo0.topic())).partitions().stream().filter(p -> p.partition() == foo0.partition()).flatMap(p -> p.isr().stream()).map(Node::id).collect(Collectors.toSet());
                return isr.containsAll(Arrays.asList(0, 1, 2, 3));
            }, (String)"Timed out while waiting for replica 3 to join the ISR");
        }
        Assertions.assertEquals(new AbstractMap.SimpleImmutableEntry(Collections.singleton(foo0), Collections.emptySet()), this.runCancelAssignment(assignment, true, true));
        this.verifyReplicaDeleted(new TopicPartitionReplica(foo0.topic(), foo0.partition(), 3));
        this.verifyReplicaDeleted(new TopicPartitionReplica(foo0.topic(), foo0.partition(), 4));
    }

    @ClusterTest(types={Type.KRAFT})
    public void testLogDirReassignment() throws Exception {
        this.createTopics();
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        this.produceMessages(topicPartition.topic(), topicPartition.partition(), 700);
        int targetBrokerId = 0;
        List<Integer> replicas = Arrays.asList(0, 1, 2);
        LogDirReassignment reassignment = this.buildLogDirReassignment(topicPartition, targetBrokerId, replicas);
        long logDirThrottle = 1L;
        this.runExecuteAssignment(false, reassignment.json, -1L, logDirThrottle);
        try (Admin admin = Admin.create(Collections.singletonMap("bootstrap.servers", this.clusterInstance.bootstrapServers()));){
            this.waitForVerifyAssignment(admin, reassignment.json, true, new VerifyAssignmentResult(Collections.singletonMap(topicPartition, new PartitionReassignmentState(Arrays.asList(0, 1, 2), Arrays.asList(0, 1, 2), true)), false, Collections.singletonMap(new TopicPartitionReplica(topicPartition.topic(), topicPartition.partition(), 0), new ActiveMoveState(reassignment.currentDir, reassignment.targetDir, reassignment.targetDir)), true));
            this.waitForLogDirThrottle(admin, Collections.singleton(0), logDirThrottle);
            admin.incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.BROKER, "0"), Collections.singletonList(new AlterConfigOp(new ConfigEntry("replica.alter.log.dirs.io.max.bytes.per.second", ""), AlterConfigOp.OpType.DELETE)))).all().get();
            this.waitForBrokerLevelThrottles(admin, this.unthrottledBrokerConfigs);
            this.waitForVerifyAssignment(admin, reassignment.json, true, new VerifyAssignmentResult(Collections.singletonMap(topicPartition, new PartitionReassignmentState(Arrays.asList(0, 1, 2), Arrays.asList(0, 1, 2), true)), false, Collections.singletonMap(new TopicPartitionReplica(topicPartition.topic(), topicPartition.partition(), 0), new CompletedMoveState(reassignment.targetDir)), false));
            BrokerDirs info1 = new BrokerDirs(admin.describeLogDirs((Collection)IntStream.range(0, 4).boxed().collect(Collectors.toList())), 0);
            Assertions.assertEquals((Object)reassignment.targetDir, (Object)info1.curLogDirs.getOrDefault(topicPartition, ""));
        }
    }

    @ClusterTest(types={Type.KRAFT})
    public void testAlterLogDirReassignmentThrottle() throws Exception {
        this.createTopics();
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        this.produceMessages(topicPartition.topic(), topicPartition.partition(), 700);
        int targetBrokerId = 0;
        List<Integer> replicas = Arrays.asList(0, 1, 2);
        LogDirReassignment reassignment = this.buildLogDirReassignment(topicPartition, targetBrokerId, replicas);
        long initialLogDirThrottle = 1L;
        this.runExecuteAssignment(false, reassignment.json, -1L, initialLogDirThrottle);
        try (Admin admin = Admin.create(Collections.singletonMap("bootstrap.servers", this.clusterInstance.bootstrapServers()));){
            this.waitForLogDirThrottle(admin, new HashSet<Integer>(Collections.singletonList(0)), initialLogDirThrottle);
            long updatedLogDirThrottle = 3000000L;
            this.runExecuteAssignment(true, reassignment.json, -1L, updatedLogDirThrottle);
            this.waitForLogDirThrottle(admin, Collections.singleton(0), updatedLogDirThrottle);
            this.waitForVerifyAssignment(admin, reassignment.json, true, new VerifyAssignmentResult(Collections.singletonMap(topicPartition, new PartitionReassignmentState(Arrays.asList(0, 1, 2), Arrays.asList(0, 1, 2), true)), false, Collections.singletonMap(new TopicPartitionReplica(topicPartition.topic(), topicPartition.partition(), targetBrokerId), new CompletedMoveState(reassignment.targetDir)), false));
        }
    }

    @ClusterTest
    public void testDisallowReplicationFactorChange() {
        this.createTopics();
        String assignment = "{\"version\":1,\"partitions\":[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,1],\"log_dirs\":[\"any\",\"any\"]},{\"topic\":\"foo\",\"partition\":1,\"replicas\":[0,1,2,3],\"log_dirs\":[\"any\",\"any\",\"any\",\"any\"]},{\"topic\":\"bar\",\"partition\":0,\"replicas\":[3],\"log_dirs\":[\"any\"]}]}";
        try (Admin admin = this.clusterInstance.admin();){
            Assertions.assertEquals((Object)"Error reassigning partition(s):\nbar-0: The replication factor is changed from 3 to 1\nfoo-0: The replication factor is changed from 3 to 2\nfoo-1: The replication factor is changed from 3 to 4", (Object)((TerseException)Assertions.assertThrows(TerseException.class, () -> ReassignPartitionsCommand.executeAssignment((Admin)admin, (Boolean)false, (String)assignment, (Long)-1L, (Long)-1L, (Long)10000L, (Time)Time.SYSTEM, (boolean)true))).getMessage());
        }
    }

    private void createTopics() {
        try (Admin admin = Admin.create(Collections.singletonMap("bootstrap.servers", this.clusterInstance.bootstrapServers()));){
            HashMap<Integer, List<Integer>> fooReplicasAssignments = new HashMap<Integer, List<Integer>>();
            fooReplicasAssignments.put(0, Arrays.asList(0, 1, 2));
            fooReplicasAssignments.put(1, Arrays.asList(1, 2, 3));
            Assertions.assertDoesNotThrow(() -> (Uuid)admin.createTopics(Collections.singletonList(new NewTopic("foo", fooReplicasAssignments))).topicId("foo").get());
            Assertions.assertDoesNotThrow(() -> this.clusterInstance.waitForTopic("foo", fooReplicasAssignments.size()));
            HashMap<Integer, List<Integer>> barReplicasAssignments = new HashMap<Integer, List<Integer>>();
            barReplicasAssignments.put(0, Arrays.asList(3, 2, 1));
            Assertions.assertDoesNotThrow(() -> (Uuid)admin.createTopics(Collections.singletonList(new NewTopic("bar", barReplicasAssignments))).topicId("bar").get());
            Assertions.assertDoesNotThrow(() -> this.clusterInstance.waitForTopic("bar", barReplicasAssignments.size()));
            HashMap<Integer, List<Integer>> bazReplicasAssignments = new HashMap<Integer, List<Integer>>();
            bazReplicasAssignments.put(0, Arrays.asList(1, 0, 2));
            bazReplicasAssignments.put(1, Arrays.asList(2, 0, 1));
            bazReplicasAssignments.put(2, Arrays.asList(0, 2, 1));
            Assertions.assertDoesNotThrow(() -> (Uuid)admin.createTopics(Collections.singletonList(new NewTopic("baz", bazReplicasAssignments))).topicId("baz").get());
            Assertions.assertDoesNotThrow(() -> this.clusterInstance.waitForTopic("baz", bazReplicasAssignments.size()));
        }
    }

    private void produceMessages(String topic, int partition, int numMessages) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.clusterInstance.bootstrapServers());
        try (KafkaProducer producer = new KafkaProducer(properties, (Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());){
            IntStream.range(0, numMessages).forEach(i -> {
                ProducerRecord record = new ProducerRecord(topic, Integer.valueOf(partition), null, (Object)new byte[10000]);
                Assertions.assertDoesNotThrow(() -> (RecordMetadata)producer.send(record).get());
            });
        }
    }

    private void executeAndVerifyReassignment() throws InterruptedException {
        String assignment = "{\"version\":1,\"partitions\":[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,1,3],\"log_dirs\":[\"any\",\"any\",\"any\"]},{\"topic\":\"bar\",\"partition\":0,\"replicas\":[3,2,0],\"log_dirs\":[\"any\",\"any\",\"any\"]}]}";
        TopicPartition foo0 = new TopicPartition("foo", 0);
        TopicPartition bar0 = new TopicPartition("bar", 0);
        HashMap<TopicPartition, PartitionReassignmentState> initialAssignment = new HashMap<TopicPartition, PartitionReassignmentState>();
        initialAssignment.put(foo0, new PartitionReassignmentState(Arrays.asList(0, 1, 2), Arrays.asList(0, 1, 3), true));
        initialAssignment.put(bar0, new PartitionReassignmentState(Arrays.asList(3, 2, 1), Arrays.asList(3, 2, 0), true));
        try (Admin admin = Admin.create(Collections.singletonMap("bootstrap.servers", this.clusterInstance.bootstrapServers()));){
            this.waitForVerifyAssignment(admin, assignment, false, new VerifyAssignmentResult(initialAssignment));
            this.runExecuteAssignment(false, assignment, -1L, -1L);
            Assertions.assertEquals(this.unthrottledBrokerConfigs, this.describeBrokerLevelThrottles(admin, this.unthrottledBrokerConfigs.keySet()));
            HashMap<TopicPartition, PartitionReassignmentState> finalAssignment = new HashMap<TopicPartition, PartitionReassignmentState>();
            finalAssignment.put(foo0, new PartitionReassignmentState(Arrays.asList(0, 1, 3), Arrays.asList(0, 1, 3), true));
            finalAssignment.put(bar0, new PartitionReassignmentState(Arrays.asList(3, 2, 0), Arrays.asList(3, 2, 0), true));
            VerifyAssignmentResult verifyAssignmentResult = this.runVerifyAssignment(admin, assignment, false);
            Assertions.assertFalse((boolean)verifyAssignmentResult.movesOngoing);
            this.waitForVerifyAssignment(admin, assignment, false, new VerifyAssignmentResult(finalAssignment));
            Assertions.assertEquals(this.unthrottledBrokerConfigs, this.describeBrokerLevelThrottles(admin, this.unthrottledBrokerConfigs.keySet()));
        }
        this.verifyReplicaDeleted(new TopicPartitionReplica(foo0.topic(), foo0.partition(), 2));
        this.verifyReplicaDeleted(new TopicPartitionReplica(bar0.topic(), bar0.partition(), 1));
    }

    private void verifyReplicaDeleted(TopicPartitionReplica topicPartitionReplica) throws InterruptedException {
        try (Admin admin = Admin.create(Collections.singletonMap("bootstrap.servers", this.clusterInstance.bootstrapServers()));){
            TestUtils.waitForCondition(() -> {
                TopicDescription topicDescription = (TopicDescription)Assertions.assertDoesNotThrow(() -> (TopicDescription)((KafkaFuture)admin.describeTopics(Collections.singleton(topicPartitionReplica.topic())).topicNameValues().get(topicPartitionReplica.topic())).get());
                return topicDescription.partitions().stream().noneMatch(topicPartitionInfo -> {
                    if (topicPartitionInfo.partition() != topicPartitionReplica.partition()) {
                        return false;
                    }
                    return topicPartitionInfo.replicas().stream().anyMatch(node -> node.id() == topicPartitionReplica.brokerId());
                });
            }, (String)("Timed out waiting for replica " + topicPartitionReplica.brokerId() + " of " + String.valueOf(topicPartitionReplica) + " to be deleted"));
        }
    }

    private void waitForLogDirThrottle(Admin admin, Set<Integer> throttledBrokers, Long logDirThrottle) {
        HashMap<String, Long> throttledConfigMap = new HashMap<String, Long>();
        throttledConfigMap.put("leader.replication.throttled.rate", -1L);
        throttledConfigMap.put("follower.replication.throttled.rate", -1L);
        throttledConfigMap.put("replica.alter.log.dirs.io.max.bytes.per.second", logDirThrottle);
        this.waitForBrokerThrottles(admin, throttledBrokers, throttledConfigMap);
    }

    private void waitForInterBrokerThrottle(Admin admin, List<Integer> throttledBrokers, Long interBrokerThrottle) {
        HashMap<String, Long> throttledConfigMap = new HashMap<String, Long>();
        throttledConfigMap.put("leader.replication.throttled.rate", interBrokerThrottle);
        throttledConfigMap.put("follower.replication.throttled.rate", interBrokerThrottle);
        throttledConfigMap.put("replica.alter.log.dirs.io.max.bytes.per.second", -1L);
        this.waitForBrokerThrottles(admin, throttledBrokers, throttledConfigMap);
    }

    private void waitForBrokerThrottles(Admin admin, Collection<Integer> throttledBrokers, Map<String, Long> throttleConfig) {
        HashMap throttledBrokerConfigs = new HashMap();
        this.unthrottledBrokerConfigs.forEach((brokerId, unthrottledConfig) -> {
            Map expectedThrottleConfig = throttledBrokers.contains(brokerId) ? throttleConfig : unthrottledConfig;
            throttledBrokerConfigs.put(brokerId, expectedThrottleConfig);
        });
        Assertions.assertDoesNotThrow(() -> this.waitForBrokerLevelThrottles(admin, throttledBrokerConfigs));
    }

    private void waitForBrokerLevelThrottles(Admin admin, Map<Integer, Map<String, Long>> targetThrottles) throws InterruptedException {
        AtomicReference curThrottles = new AtomicReference(new HashMap());
        TestUtils.waitForCondition(() -> {
            Assertions.assertDoesNotThrow(() -> curThrottles.set(this.describeBrokerLevelThrottles(admin, targetThrottles.keySet())));
            return targetThrottles.equals(curThrottles.get());
        }, (String)("timed out waiting for broker throttle to become " + String.valueOf(targetThrottles) + ".  Latest throttles were " + String.valueOf(curThrottles.get())));
    }

    private Map<Integer, Map<String, Long>> describeBrokerLevelThrottles(Admin admin, Collection<Integer> brokerIds) {
        return brokerIds.stream().collect(Collectors.toMap(Function.identity(), brokerId -> {
            ConfigResource brokerResource = new ConfigResource(ConfigResource.Type.BROKER, brokerId.toString());
            Config brokerConfigs = (Config)Assertions.assertDoesNotThrow(() -> (Config)((KafkaFuture)admin.describeConfigs(Collections.singleton(brokerResource)).values().get(brokerResource)).get());
            return ReassignPartitionsCommand.BROKER_LEVEL_THROTTLES.stream().collect(Collectors.toMap(Function.identity(), name -> Optional.ofNullable(brokerConfigs.get(name)).map(e -> Long.parseLong(e.value())).orElse(-1L)));
        }));
    }

    private LogDirReassignment buildLogDirReassignment(TopicPartition topicPartition, int brokerId, List<Integer> replicas) throws ExecutionException, InterruptedException {
        try (Admin admin = Admin.create(Collections.singletonMap("bootstrap.servers", this.clusterInstance.bootstrapServers()));){
            DescribeLogDirsResult describeLogDirsResult = admin.describeLogDirs((Collection)IntStream.range(0, 4).boxed().collect(Collectors.toList()));
            BrokerDirs logDirInfo = new BrokerDirs(describeLogDirsResult, brokerId);
            Assertions.assertTrue((boolean)logDirInfo.futureLogDirs.isEmpty());
            String currentDir = logDirInfo.curLogDirs.get(topicPartition);
            String newDir = logDirInfo.logDirs.stream().filter(dir -> !dir.equals(currentDir)).findFirst().get();
            List logDirs = replicas.stream().map(replicaId -> {
                if (replicaId == brokerId) {
                    return "\"" + newDir + "\"";
                }
                return "\"any\"";
            }).collect(Collectors.toList());
            String reassignmentJson = " { \"version\": 1,  \"partitions\": [    {     \"topic\": \"" + topicPartition.topic() + "\",     \"partition\": " + topicPartition.partition() + ",     \"replicas\": [" + replicas.stream().map(Object::toString).collect(Collectors.joining(",")) + "],     \"log_dirs\": [" + String.join((CharSequence)",", logDirs) + "]    }   ]  }";
            LogDirReassignment logDirReassignment = new LogDirReassignment(reassignmentJson, currentDir, newDir);
            return logDirReassignment;
        }
    }

    private VerifyAssignmentResult runVerifyAssignment(Admin admin, String jsonString, Boolean preserveThrottles) {
        try {
            return ReassignPartitionsCommand.verifyAssignment((Admin)admin, (String)jsonString, (Boolean)preserveThrottles);
        }
        catch (JsonProcessingException | InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    private void waitForVerifyAssignment(Admin admin, String jsonString, Boolean preserveThrottles, VerifyAssignmentResult expectedResult) throws InterruptedException {
        VerifyAssignmentResult[] latestResult = new VerifyAssignmentResult[]{null};
        TestUtils.waitForCondition(() -> {
            latestResult[0] = this.runVerifyAssignment(admin, jsonString, preserveThrottles);
            return expectedResult.equals((Object)latestResult[0]);
        }, (String)("Timed out waiting for verifyAssignment result " + String.valueOf(expectedResult)));
    }

    private void runExecuteAssignment(Boolean additional, String reassignmentJson, Long interBrokerThrottle, Long replicaAlterLogDirsThrottle) throws RuntimeException {
        try (Admin admin = Admin.create(Collections.singletonMap("bootstrap.servers", this.clusterInstance.bootstrapServers()));){
            ReassignPartitionsCommand.executeAssignment((Admin)admin, (Boolean)additional, (String)reassignmentJson, (Long)interBrokerThrottle, (Long)replicaAlterLogDirsThrottle, (Long)10000L, (Time)Time.SYSTEM, (boolean)false);
        }
        catch (JsonProcessingException | InterruptedException | ExecutionException | TerseException e) {
            throw new RuntimeException(e);
        }
    }

    private Map.Entry<Set<TopicPartition>, Set<TopicPartitionReplica>> runCancelAssignment(String jsonString, Boolean preserveThrottles, Boolean useBootstrapServer) {
        Map.Entry entry;
        block8: {
            Map<String, String> config = useBootstrapServer != false ? Collections.singletonMap("bootstrap.servers", this.clusterInstance.bootstrapServers()) : Collections.singletonMap("bootstrap.controllers", this.clusterInstance.bootstrapControllers());
            Admin admin = Admin.create(config);
            try {
                entry = ReassignPartitionsCommand.cancelAssignment((Admin)admin, (String)jsonString, (Boolean)preserveThrottles, (Long)10000L, (Time)Time.SYSTEM);
                if (admin == null) break block8;
            }
            catch (Throwable throwable) {
                try {
                    if (admin != null) {
                        try {
                            admin.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                    }
                    throw throwable;
                }
                catch (JsonProcessingException | InterruptedException | ExecutionException | TerseException e) {
                    throw new RuntimeException(e);
                }
            }
            admin.close();
        }
        return entry;
    }

    private void testCancellationAction(boolean useBootstrapServer) throws InterruptedException {
        this.createTopics();
        TopicPartition foo0 = new TopicPartition("foo", 0);
        TopicPartition baz1 = new TopicPartition("baz", 1);
        this.produceMessages(foo0.topic(), foo0.partition(), 200);
        this.produceMessages(baz1.topic(), baz1.partition(), 200);
        String assignment = "{\"version\":1,\"partitions\":[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,1,3],\"log_dirs\":[\"any\",\"any\",\"any\"]},{\"topic\":\"baz\",\"partition\":1,\"replicas\":[0,2,3],\"log_dirs\":[\"any\",\"any\",\"any\"]}]}";
        try (Admin admin = Admin.create(Collections.singletonMap("bootstrap.servers", this.clusterInstance.bootstrapServers()));){
            Assertions.assertEquals(this.unthrottledBrokerConfigs, this.describeBrokerLevelThrottles(admin, this.unthrottledBrokerConfigs.keySet()));
            long interBrokerThrottle = 1L;
            this.runExecuteAssignment(false, assignment, interBrokerThrottle, -1L);
            this.waitForInterBrokerThrottle(admin, Arrays.asList(0, 1, 2, 3), interBrokerThrottle);
            HashMap<TopicPartition, PartitionReassignmentState> partStates = new HashMap<TopicPartition, PartitionReassignmentState>();
            partStates.put(foo0, new PartitionReassignmentState(Arrays.asList(0, 1, 3, 2), Arrays.asList(0, 1, 3), false));
            partStates.put(baz1, new PartitionReassignmentState(Arrays.asList(0, 2, 3, 1), Arrays.asList(0, 2, 3), false));
            this.waitForVerifyAssignment(admin, assignment, true, new VerifyAssignmentResult(partStates, true, Collections.emptyMap(), false));
            Assertions.assertEquals(new AbstractMap.SimpleImmutableEntry(new HashSet<TopicPartition>(Arrays.asList(foo0, baz1)), Collections.emptySet()), this.runCancelAssignment(assignment, true, useBootstrapServer));
            this.waitForInterBrokerThrottle(admin, Arrays.asList(0, 1, 2, 3), interBrokerThrottle);
            Assertions.assertEquals(new AbstractMap.SimpleImmutableEntry(Collections.emptySet(), Collections.emptySet()), this.runCancelAssignment(assignment, false, useBootstrapServer));
            this.waitForBrokerLevelThrottles(admin, this.unthrottledBrokerConfigs);
            Assertions.assertFalse((boolean)this.runVerifyAssignment((Admin)admin, (String)assignment, (Boolean)Boolean.valueOf((boolean)false)).partsOngoing);
        }
        this.verifyReplicaDeleted(new TopicPartitionReplica(foo0.topic(), foo0.partition(), 3));
        this.verifyReplicaDeleted(new TopicPartitionReplica(baz1.topic(), baz1.partition(), 3));
    }

    private void removeReplicationThrottleForPartitions(TopicPartition part) {
        try (Admin admin = Admin.create(Collections.singletonMap("bootstrap.servers", this.clusterInstance.bootstrapServers()));){
            this.removePartitionReplicaThrottles(admin, new HashSet<TopicPartition>(Collections.singleton(part)));
            Assertions.assertDoesNotThrow(() -> ToolsTestUtils.throttleAllBrokersReplication(admin, Arrays.asList(0, 1, 2, 3), Integer.MAX_VALUE));
        }
    }

    private void removePartitionReplicaThrottles(Admin adminClient, Set<TopicPartition> partitions) {
        Map<ConfigResource, Collection> throttles = partitions.stream().map(tp -> {
            ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, tp.topic());
            return new AbstractMap.SimpleEntry<ConfigResource, List<AlterConfigOp>>(resource, Arrays.asList(new AlterConfigOp(new ConfigEntry("leader.replication.throttled.replicas", ""), AlterConfigOp.OpType.DELETE), new AlterConfigOp(new ConfigEntry("follower.replication.throttled.replicas", ""), AlterConfigOp.OpType.DELETE)));
        }).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
        Assertions.assertDoesNotThrow(() -> (Void)adminClient.incrementalAlterConfigs(throttles).all().get());
    }

    private void setReplicationThrottleForPartitions(TopicPartition topicPartition) {
        try (Admin admin = Admin.create(Collections.singletonMap("bootstrap.servers", this.clusterInstance.bootstrapServers()));){
            Assertions.assertDoesNotThrow(() -> ToolsTestUtils.throttleAllBrokersReplication(admin, Collections.singletonList(4), 1));
            Assertions.assertDoesNotThrow(() -> ToolsTestUtils.assignThrottledPartitionReplicas(admin, Collections.singletonMap(topicPartition, Collections.singletonList(4))));
        }
    }

    private static /* synthetic */ boolean lambda$testProduceAndConsumeWithReassignmentInProgress$5(Consumer consumer, List allRecords) throws Exception {
        ConsumerRecords records = consumer.poll(Duration.ofMillis(100L));
        records.records("baz").forEach(allRecords::add);
        return allRecords.size() >= 100;
    }

    static class LogDirReassignment {
        final String json;
        final String currentDir;
        final String targetDir;

        public LogDirReassignment(String json, String currentDir, String targetDir) {
            this.json = json;
            this.currentDir = currentDir;
            this.targetDir = targetDir;
        }
    }

    private static class BrokerDirs {
        final DescribeLogDirsResult result;
        final int brokerId;
        final Set<String> logDirs = new HashSet<String>();
        final Map<TopicPartition, String> curLogDirs = new HashMap<TopicPartition, String>();
        final Map<TopicPartition, String> futureLogDirs = new HashMap<TopicPartition, String>();

        public BrokerDirs(DescribeLogDirsResult result, int brokerId) throws ExecutionException, InterruptedException {
            this.result = result;
            this.brokerId = brokerId;
            ((Map)((KafkaFuture)result.descriptions().get(brokerId)).get()).forEach((logDirName, logDirInfo) -> {
                this.logDirs.add((String)logDirName);
                logDirInfo.replicaInfos().forEach((part, info) -> {
                    if (info.isFuture()) {
                        this.futureLogDirs.put((TopicPartition)part, (String)logDirName);
                    } else {
                        this.curLogDirs.put((TopicPartition)part, (String)logDirName);
                    }
                });
            });
        }
    }
}

