package org.apache.kafka.tools.reassign;

import com.fasterxml.jackson.core.JsonProcessingException;
import java.io.Closeable;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import kafka.server.HostedPartition$None$;
import kafka.server.IsrChangePropagationConfig;
import kafka.server.KafkaBroker;
import kafka.server.KafkaConfig;
import kafka.server.QuorumTestHarness;
import kafka.server.ZkAlterPartitionManager;
import kafka.utils.TestUtils;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.DescribeLogDirsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionReplica;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.tools.TerseException;
import org.apache.kafka.tools.ToolsTestUtils;
import org.apache.kafka.tools.consumer.group.ConsumerGroupCommandTest;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import scala.Option;
import scala.Some$;
import scala.collection.JavaConverters;
import scala.collection.Seq;

@Timeout(300)
/* loaded from: input_file:org/apache/kafka/tools/reassign/ReassignPartitionsIntegrationTest.class */
public class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
    ReassignPartitionsTestCluster cluster;
    private final Map<Integer, Map<String, Long>> unthrottledBrokerConfigs = new HashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/tools/reassign/ReassignPartitionsIntegrationTest$BrokerDirs.class */
    public static class BrokerDirs {
        final DescribeLogDirsResult result;
        final int brokerId;
        final Set<String> logDirs = new HashSet();
        final Map<TopicPartition, String> curLogDirs = new HashMap();
        final Map<TopicPartition, String> futureLogDirs = new HashMap();

        public BrokerDirs(DescribeLogDirsResult describeLogDirsResult, int i) throws ExecutionException, InterruptedException {
            this.result = describeLogDirsResult;
            this.brokerId = i;
            ((Map) ((KafkaFuture) describeLogDirsResult.descriptions().get(Integer.valueOf(i))).get()).forEach((str, logDirDescription) -> {
                this.logDirs.add(str);
                logDirDescription.replicaInfos().forEach((topicPartition, replicaInfo) -> {
                    if (replicaInfo.isFuture()) {
                        this.futureLogDirs.put(topicPartition, str);
                    } else {
                        this.curLogDirs.put(topicPartition, str);
                    }
                });
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/tools/reassign/ReassignPartitionsIntegrationTest$LogDirReassignment.class */
    public static class LogDirReassignment {
        final String json;
        final String currentDir;
        final String targetDir;

        public LogDirReassignment(String str, String str2, String str3) {
            this.json = str;
            this.currentDir = str2;
            this.targetDir = str3;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/tools/reassign/ReassignPartitionsIntegrationTest$ReassignPartitionsTestCluster.class */
    public class ReassignPartitionsTestCluster implements Closeable {
        private final List<KafkaConfig> brokerConfigs = new ArrayList();
        private final Map<Integer, String> brokers = new HashMap();
        private final Map<String, List<List<Integer>>> topics;
        private final List<KafkaBroker> servers;
        private String brokerList;
        private Admin adminClient;

        public ReassignPartitionsTestCluster(Map<String, String> map, Map<Integer, Map<String, String>> map2) {
            this.brokers.put(0, "rack0");
            this.brokers.put(1, "rack0");
            this.brokers.put(2, "rack1");
            this.brokers.put(3, "rack1");
            this.brokers.put(4, "rack1");
            this.topics = new HashMap();
            this.topics.put(ConsumerGroupCommandTest.TOPIC, Arrays.asList(Arrays.asList(0, 1, 2), Arrays.asList(1, 2, 3)));
            this.topics.put("bar", Arrays.asList(Arrays.asList(3, 2, 1)));
            this.topics.put("baz", Arrays.asList(Arrays.asList(1, 0, 2), Arrays.asList(2, 0, 1), Arrays.asList(0, 2, 1)));
            this.servers = new ArrayList();
            this.brokers.forEach((num, str) -> {
                Properties createBrokerConfig = TestUtils.createBrokerConfig(num.intValue(), ReassignPartitionsIntegrationTest.this.zkConnectOrNull(), false, true, TestUtils.RandomPort(), Option.empty(), Option.empty(), Option.empty(), true, false, TestUtils.RandomPort(), false, TestUtils.RandomPort(), false, TestUtils.RandomPort(), Some$.MODULE$.apply(str), 3, false, 1, (short) 1, false);
                createBrokerConfig.setProperty("replica.fetch.backoff.ms", "100");
                createBrokerConfig.setProperty("auto.leader.rebalance.enable", "false");
                createBrokerConfig.setProperty("replica.lag.time.max.ms", "1000");
                createBrokerConfig.getClass();
                map.forEach(createBrokerConfig::setProperty);
                Map map3 = (Map) map2.getOrDefault(num, Collections.emptyMap());
                createBrokerConfig.getClass();
                map3.forEach(createBrokerConfig::setProperty);
                this.brokerConfigs.add(new KafkaConfig(createBrokerConfig));
            });
        }

        public void setup() throws ExecutionException, InterruptedException {
            createServers();
            createTopics();
        }

        public void createServers() {
            this.brokers.keySet().forEach(num -> {
                this.servers.add(ReassignPartitionsIntegrationTest.this.createBroker(this.brokerConfigs.get(num.intValue()), Time.SYSTEM, true, Option.empty()));
            });
        }

        public void createTopics() throws ExecutionException, InterruptedException {
            TestUtils.waitUntilBrokerMetadataIsPropagated(ReassignPartitionsIntegrationTest.seq(this.servers), 15000L);
            this.brokerList = TestUtils.plaintextBootstrapServers(ReassignPartitionsIntegrationTest.seq(this.servers));
            this.adminClient = Admin.create(Collections.singletonMap("bootstrap.servers", this.brokerList));
            this.adminClient.createTopics((Collection) this.topics.entrySet().stream().map(entry -> {
                HashMap hashMap = new HashMap();
                Iterator it = ((List) entry.getValue()).iterator();
                int i = 0;
                while (it.hasNext()) {
                    hashMap.put(Integer.valueOf(i), it.next());
                    i++;
                }
                return new NewTopic((String) entry.getKey(), hashMap);
            }).collect(Collectors.toList())).all().get();
            this.topics.forEach((str, list) -> {
                TestUtils.waitForAllPartitionsMetadata(ReassignPartitionsIntegrationTest.seq(this.servers), str, list.size());
            });
            if (ReassignPartitionsIntegrationTest.this.isKRaftTest()) {
                TestUtils.ensureConsistentKRaftMetadata(ReassignPartitionsIntegrationTest.seq(ReassignPartitionsIntegrationTest.this.cluster.servers), ReassignPartitionsIntegrationTest.this.controllerServer(), "Timeout waiting for controller metadata propagating to brokers");
            }
        }

        public void produceMessages(String str, int i, int i2) {
            TestUtils.produceMessages(ReassignPartitionsIntegrationTest.seq(this.servers), ReassignPartitionsIntegrationTest.seq((List) IntStream.range(0, i2).mapToObj(i3 -> {
                return new ProducerRecord(str, Integer.valueOf(i), (Object) null, new byte[10000]);
            }).collect(Collectors.toList())), -1);
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            this.brokerList = null;
            Utils.closeQuietly(this.adminClient, "adminClient");
            this.adminClient = null;
            try {
                TestUtils.shutdownServers(ReassignPartitionsIntegrationTest.seq(this.servers), true);
            } finally {
                this.servers.clear();
            }
        }
    }

    public ReassignPartitionsIntegrationTest() {
        IntStream.range(0, 4).forEach(i -> {
            HashMap hashMap = new HashMap();
            ReassignPartitionsCommand.BROKER_LEVEL_THROTTLES.forEach(str -> {
            });
            this.unthrottledBrokerConfigs.put(Integer.valueOf(i), hashMap);
        });
    }

    @AfterEach
    public void tearDown() {
        Utils.closeQuietly(this.cluster, "ReassignPartitionsTestCluster");
        super.tearDown();
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest
    public void testReassignment(String str) throws Exception {
        this.cluster = new ReassignPartitionsTestCluster(Collections.emptyMap(), Collections.emptyMap());
        this.cluster.setup();
        executeAndVerifyReassignment();
    }

    @ValueSource(strings = {"zk"})
    @ParameterizedTest
    public void testReassignmentWithAlterPartitionDisabled(String str) throws Exception {
        this.cluster = new ReassignPartitionsTestCluster(Collections.singletonMap("inter.broker.protocol.version", MetadataVersion.IBP_2_7_IV1.version()), Collections.emptyMap());
        this.cluster.setup();
        executeAndVerifyReassignment();
    }

    @ValueSource(strings = {"zk"})
    @ParameterizedTest
    public void testReassignmentCompletionDuringPartialUpgrade(String str) throws Exception {
        ZkAlterPartitionManager.DefaultIsrPropagationConfig_$eq(new IsrChangePropagationConfig(500L, 100L, 500L));
        Map singletonMap = Collections.singletonMap("inter.broker.protocol.version", MetadataVersion.IBP_2_7_IV1.version());
        HashMap hashMap = new HashMap();
        hashMap.put(1, singletonMap);
        hashMap.put(2, singletonMap);
        hashMap.put(3, singletonMap);
        this.cluster = new ReassignPartitionsTestCluster(Collections.emptyMap(), hashMap);
        this.cluster.setup();
        executeAndVerifyReassignment();
    }

    private void executeAndVerifyReassignment() throws ExecutionException, InterruptedException {
        TopicPartition topicPartition = new TopicPartition(ConsumerGroupCommandTest.TOPIC, 0);
        TopicPartition topicPartition2 = new TopicPartition("bar", 0);
        HashMap hashMap = new HashMap();
        hashMap.put(topicPartition, new PartitionReassignmentState(Arrays.asList(0, 1, 2), Arrays.asList(0, 1, 3), true));
        hashMap.put(topicPartition2, new PartitionReassignmentState(Arrays.asList(3, 2, 1), Arrays.asList(3, 2, 0), true));
        waitForVerifyAssignment(this.cluster.adminClient, "{\"version\":1,\"partitions\":[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,1,3],\"log_dirs\":[\"any\",\"any\",\"any\"]},{\"topic\":\"bar\",\"partition\":0,\"replicas\":[3,2,0],\"log_dirs\":[\"any\",\"any\",\"any\"]}]}", false, new VerifyAssignmentResult(hashMap));
        runExecuteAssignment(this.cluster.adminClient, false, "{\"version\":1,\"partitions\":[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,1,3],\"log_dirs\":[\"any\",\"any\",\"any\"]},{\"topic\":\"bar\",\"partition\":0,\"replicas\":[3,2,0],\"log_dirs\":[\"any\",\"any\",\"any\"]}]}", -1L, -1L);
        Assertions.assertEquals(this.unthrottledBrokerConfigs, describeBrokerLevelThrottles(this.unthrottledBrokerConfigs.keySet()));
        HashMap hashMap2 = new HashMap();
        hashMap2.put(topicPartition, new PartitionReassignmentState(Arrays.asList(0, 1, 3), Arrays.asList(0, 1, 3), true));
        hashMap2.put(topicPartition2, new PartitionReassignmentState(Arrays.asList(3, 2, 0), Arrays.asList(3, 2, 0), true));
        Assertions.assertFalse(runVerifyAssignment(this.cluster.adminClient, "{\"version\":1,\"partitions\":[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,1,3],\"log_dirs\":[\"any\",\"any\",\"any\"]},{\"topic\":\"bar\",\"partition\":0,\"replicas\":[3,2,0],\"log_dirs\":[\"any\",\"any\",\"any\"]}]}", false).movesOngoing);
        waitForVerifyAssignment(this.cluster.adminClient, "{\"version\":1,\"partitions\":[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,1,3],\"log_dirs\":[\"any\",\"any\",\"any\"]},{\"topic\":\"bar\",\"partition\":0,\"replicas\":[3,2,0],\"log_dirs\":[\"any\",\"any\",\"any\"]}]}", false, new VerifyAssignmentResult(hashMap2));
        Assertions.assertEquals(this.unthrottledBrokerConfigs, describeBrokerLevelThrottles(this.unthrottledBrokerConfigs.keySet()));
        verifyReplicaDeleted(topicPartition, 2);
        verifyReplicaDeleted(topicPartition2, 1);
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest
    public void testHighWaterMarkAfterPartitionReassignment(String str) throws Exception {
        this.cluster = new ReassignPartitionsTestCluster(Collections.emptyMap(), Collections.emptyMap());
        this.cluster.setup();
        TopicPartition topicPartition = new TopicPartition(ConsumerGroupCommandTest.TOPIC, 0);
        ((KafkaBroker) this.cluster.servers.get(0)).replicaManager().logManager().truncateFullyAndStartAt(topicPartition, 123L, false, Option.empty());
        runExecuteAssignment(this.cluster.adminClient, false, "{\"version\":1,\"partitions\":[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[3,1,2],\"log_dirs\":[\"any\",\"any\",\"any\"]}]}", -1L, -1L);
        waitForVerifyAssignment(this.cluster.adminClient, "{\"version\":1,\"partitions\":[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[3,1,2],\"log_dirs\":[\"any\",\"any\",\"any\"]}]}", false, new VerifyAssignmentResult(Collections.singletonMap(topicPartition, new PartitionReassignmentState(Arrays.asList(3, 1, 2), Arrays.asList(3, 1, 2), true))));
        TestUtils.waitUntilTrue(() -> {
            return Boolean.valueOf(((KafkaBroker) this.cluster.servers.get(3)).replicaManager().onlinePartition(topicPartition).map((v0) -> {
                return v0.leaderLogIfLocal();
            }).isDefined());
        }, () -> {
            return "broker 3 should be the new leader";
        }, 15000L, 10L);
        Assertions.assertEquals(123L, ((KafkaBroker) this.cluster.servers.get(3)).replicaManager().localLogOrException(topicPartition).highWatermark(), "Expected broker 3 to have the correct high water mark for the partition.");
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest
    public void testAlterReassignmentThrottle(String str) throws Exception {
        this.cluster = new ReassignPartitionsTestCluster(Collections.emptyMap(), Collections.emptyMap());
        this.cluster.setup();
        this.cluster.produceMessages(ConsumerGroupCommandTest.TOPIC, 0, 50);
        this.cluster.produceMessages("baz", 2, 60);
        runExecuteAssignment(this.cluster.adminClient, false, "{\"version\":1,\"partitions\":[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,3,2],\"log_dirs\":[\"any\",\"any\",\"any\"]},{\"topic\":\"baz\",\"partition\":2,\"replicas\":[3,2,1],\"log_dirs\":[\"any\",\"any\",\"any\"]}]}", 1L, -1L);
        waitForInterBrokerThrottle(Arrays.asList(0, 1, 2, 3), 1L);
        runExecuteAssignment(this.cluster.adminClient, true, "{\"version\":1,\"partitions\":[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,3,2],\"log_dirs\":[\"any\",\"any\",\"any\"]},{\"topic\":\"baz\",\"partition\":2,\"replicas\":[3,2,1],\"log_dirs\":[\"any\",\"any\",\"any\"]}]}", 300000L, -1L);
        waitForInterBrokerThrottle(Arrays.asList(0, 1, 2, 3), 300000L);
        HashMap hashMap = new HashMap();
        hashMap.put(new TopicPartition(ConsumerGroupCommandTest.TOPIC, 0), new PartitionReassignmentState(Arrays.asList(0, 3, 2), Arrays.asList(0, 3, 2), true));
        hashMap.put(new TopicPartition("baz", 2), new PartitionReassignmentState(Arrays.asList(3, 2, 1), Arrays.asList(3, 2, 1), true));
        waitForVerifyAssignment(this.cluster.adminClient, "{\"version\":1,\"partitions\":[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,3,2],\"log_dirs\":[\"any\",\"any\",\"any\"]},{\"topic\":\"baz\",\"partition\":2,\"replicas\":[3,2,1],\"log_dirs\":[\"any\",\"any\",\"any\"]}]}", false, new VerifyAssignmentResult(hashMap));
        waitForBrokerLevelThrottles(this.unthrottledBrokerConfigs);
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest
    public void testThrottledReassignment(String str) throws Exception {
        this.cluster = new ReassignPartitionsTestCluster(Collections.emptyMap(), Collections.emptyMap());
        this.cluster.setup();
        this.cluster.produceMessages(ConsumerGroupCommandTest.TOPIC, 0, 50);
        this.cluster.produceMessages("baz", 2, 60);
        String str2 = "{\"version\":1,\"partitions\":[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,3,2],\"log_dirs\":[\"any\",\"any\",\"any\"]},{\"topic\":\"baz\",\"partition\":2,\"replicas\":[3,2,1],\"log_dirs\":[\"any\",\"any\",\"any\"]}]}";
        HashMap hashMap = new HashMap();
        hashMap.put(new TopicPartition(ConsumerGroupCommandTest.TOPIC, 0), new PartitionReassignmentState(Arrays.asList(0, 1, 2), Arrays.asList(0, 3, 2), true));
        hashMap.put(new TopicPartition("baz", 2), new PartitionReassignmentState(Arrays.asList(0, 2, 1), Arrays.asList(3, 2, 1), true));
        Assertions.assertEquals(new VerifyAssignmentResult(hashMap), runVerifyAssignment(this.cluster.adminClient, "{\"version\":1,\"partitions\":[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,3,2],\"log_dirs\":[\"any\",\"any\",\"any\"]},{\"topic\":\"baz\",\"partition\":2,\"replicas\":[3,2,1],\"log_dirs\":[\"any\",\"any\",\"any\"]}]}", false));
        Assertions.assertEquals(this.unthrottledBrokerConfigs, describeBrokerLevelThrottles(this.unthrottledBrokerConfigs.keySet()));
        long j = 300000;
        runExecuteAssignment(this.cluster.adminClient, false, "{\"version\":1,\"partitions\":[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,3,2],\"log_dirs\":[\"any\",\"any\",\"any\"]},{\"topic\":\"baz\",\"partition\":2,\"replicas\":[3,2,1],\"log_dirs\":[\"any\",\"any\",\"any\"]}]}", 300000L, -1L);
        waitForInterBrokerThrottle(Arrays.asList(0, 1, 2, 3), 300000L);
        HashMap hashMap2 = new HashMap();
        hashMap2.put(new TopicPartition(ConsumerGroupCommandTest.TOPIC, 0), new PartitionReassignmentState(Arrays.asList(0, 3, 2), Arrays.asList(0, 3, 2), true));
        hashMap2.put(new TopicPartition("baz", 2), new PartitionReassignmentState(Arrays.asList(3, 2, 1), Arrays.asList(3, 2, 1), true));
        TestUtils.waitUntilTrue(() -> {
            VerifyAssignmentResult runVerifyAssignment = runVerifyAssignment(this.cluster.adminClient, str2, true);
            if (!runVerifyAssignment.partsOngoing) {
                return true;
            }
            Assertions.assertFalse(runVerifyAssignment.partStates.values().stream().allMatch(partitionReassignmentState -> {
                return partitionReassignmentState.done;
            }), "Expected at least one partition reassignment to be ongoing when result = " + runVerifyAssignment);
            Assertions.assertEquals(Arrays.asList(0, 3, 2), ((PartitionReassignmentState) runVerifyAssignment.partStates.get(new TopicPartition(ConsumerGroupCommandTest.TOPIC, 0))).targetReplicas);
            Assertions.assertEquals(Arrays.asList(3, 2, 1), ((PartitionReassignmentState) runVerifyAssignment.partStates.get(new TopicPartition("baz", 2))).targetReplicas);
            System.out.println("Current result: " + runVerifyAssignment);
            waitForInterBrokerThrottle(Arrays.asList(0, 1, 2, 3), Long.valueOf(j));
            return false;
        }, () -> {
            return "Expected reassignment to complete.";
        }, 15000L, 100L);
        waitForVerifyAssignment(this.cluster.adminClient, "{\"version\":1,\"partitions\":[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,3,2],\"log_dirs\":[\"any\",\"any\",\"any\"]},{\"topic\":\"baz\",\"partition\":2,\"replicas\":[3,2,1],\"log_dirs\":[\"any\",\"any\",\"any\"]}]}", true, new VerifyAssignmentResult(hashMap2));
        waitForInterBrokerThrottle(Arrays.asList(0, 1, 2, 3), 300000L);
        waitForVerifyAssignment(this.cluster.adminClient, "{\"version\":1,\"partitions\":[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,3,2],\"log_dirs\":[\"any\",\"any\",\"any\"]},{\"topic\":\"baz\",\"partition\":2,\"replicas\":[3,2,1],\"log_dirs\":[\"any\",\"any\",\"any\"]}]}", false, new VerifyAssignmentResult(hashMap2));
        waitForBrokerLevelThrottles(this.unthrottledBrokerConfigs);
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest
    public void testProduceAndConsumeWithReassignmentInProgress(String str) throws Exception {
        this.cluster = new ReassignPartitionsTestCluster(Collections.emptyMap(), Collections.emptyMap());
        this.cluster.setup();
        this.cluster.produceMessages("baz", 2, 60);
        runExecuteAssignment(this.cluster.adminClient, false, "{\"version\":1,\"partitions\":[{\"topic\":\"baz\",\"partition\":2,\"replicas\":[3,2,1],\"log_dirs\":[\"any\",\"any\",\"any\"]}]}", 300L, -1L);
        this.cluster.produceMessages("baz", 2, 100);
        Consumer createConsumer = TestUtils.createConsumer(this.cluster.brokerList, "group", "earliest", true, false, 500, SecurityProtocol.PLAINTEXT, Option.empty(), Option.empty(), new ByteArrayDeserializer(), new ByteArrayDeserializer());
        TopicPartition topicPartition = new TopicPartition("baz", 2);
        try {
            createConsumer.assign(Collections.singleton(topicPartition));
            TestUtils.pollUntilAtLeastNumRecords(createConsumer, 100, 15000L);
            createConsumer.close();
            removeReplicationThrottleForPartitions(this.cluster.adminClient, topicPartition);
            waitForVerifyAssignment(this.cluster.adminClient, "{\"version\":1,\"partitions\":[{\"topic\":\"baz\",\"partition\":2,\"replicas\":[3,2,1],\"log_dirs\":[\"any\",\"any\",\"any\"]}]}", false, new VerifyAssignmentResult(Collections.singletonMap(topicPartition, new PartitionReassignmentState(Arrays.asList(3, 2, 1), Arrays.asList(3, 2, 1), true))));
        } catch (Throwable th) {
            createConsumer.close();
            throw th;
        }
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest
    public void testCancellation(String str) throws Exception {
        TopicPartition topicPartition = new TopicPartition(ConsumerGroupCommandTest.TOPIC, 0);
        TopicPartition topicPartition2 = new TopicPartition("baz", 1);
        this.cluster = new ReassignPartitionsTestCluster(Collections.emptyMap(), Collections.emptyMap());
        this.cluster.setup();
        this.cluster.produceMessages(topicPartition.topic(), topicPartition.partition(), 200);
        this.cluster.produceMessages(topicPartition2.topic(), topicPartition2.partition(), 200);
        Assertions.assertEquals(this.unthrottledBrokerConfigs, describeBrokerLevelThrottles(this.unthrottledBrokerConfigs.keySet()));
        runExecuteAssignment(this.cluster.adminClient, false, "{\"version\":1,\"partitions\":[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,1,3],\"log_dirs\":[\"any\",\"any\",\"any\"]},{\"topic\":\"baz\",\"partition\":1,\"replicas\":[0,2,3],\"log_dirs\":[\"any\",\"any\",\"any\"]}]}", 1L, -1L);
        waitForInterBrokerThrottle(Arrays.asList(0, 1, 2, 3), 1L);
        HashMap hashMap = new HashMap();
        hashMap.put(topicPartition, new PartitionReassignmentState(Arrays.asList(0, 1, 3, 2), Arrays.asList(0, 1, 3), false));
        hashMap.put(topicPartition2, new PartitionReassignmentState(Arrays.asList(0, 2, 3, 1), Arrays.asList(0, 2, 3), false));
        waitForVerifyAssignment(this.cluster.adminClient, "{\"version\":1,\"partitions\":[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,1,3],\"log_dirs\":[\"any\",\"any\",\"any\"]},{\"topic\":\"baz\",\"partition\":1,\"replicas\":[0,2,3],\"log_dirs\":[\"any\",\"any\",\"any\"]}]}", true, new VerifyAssignmentResult(hashMap, true, Collections.emptyMap(), false));
        Assertions.assertEquals(new AbstractMap.SimpleImmutableEntry(new HashSet(Arrays.asList(topicPartition, topicPartition2)), Collections.emptySet()), runCancelAssignment(this.cluster.adminClient, "{\"version\":1,\"partitions\":[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,1,3],\"log_dirs\":[\"any\",\"any\",\"any\"]},{\"topic\":\"baz\",\"partition\":1,\"replicas\":[0,2,3],\"log_dirs\":[\"any\",\"any\",\"any\"]}]}", true));
        waitForInterBrokerThrottle(Arrays.asList(0, 1, 2, 3), 1L);
        Assertions.assertEquals(new AbstractMap.SimpleImmutableEntry(Collections.emptySet(), Collections.emptySet()), runCancelAssignment(this.cluster.adminClient, "{\"version\":1,\"partitions\":[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,1,3],\"log_dirs\":[\"any\",\"any\",\"any\"]},{\"topic\":\"baz\",\"partition\":1,\"replicas\":[0,2,3],\"log_dirs\":[\"any\",\"any\",\"any\"]}]}", false));
        waitForBrokerLevelThrottles(this.unthrottledBrokerConfigs);
        Assertions.assertFalse(runVerifyAssignment(this.cluster.adminClient, "{\"version\":1,\"partitions\":[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,1,3],\"log_dirs\":[\"any\",\"any\",\"any\"]},{\"topic\":\"baz\",\"partition\":1,\"replicas\":[0,2,3],\"log_dirs\":[\"any\",\"any\",\"any\"]}]}", false).partsOngoing);
        verifyReplicaDeleted(topicPartition, 3);
        verifyReplicaDeleted(topicPartition2, 3);
    }

    @ValueSource(strings = {"zk", "kraft"})
    @ParameterizedTest
    public void testCancellationWithAddingReplicaInIsr(String str) throws Exception {
        TopicPartition topicPartition = new TopicPartition(ConsumerGroupCommandTest.TOPIC, 0);
        this.cluster = new ReassignPartitionsTestCluster(Collections.emptyMap(), Collections.emptyMap());
        this.cluster.setup();
        this.cluster.produceMessages(topicPartition.topic(), topicPartition.partition(), 200);
        setReplicationThrottleForPartitions(this.cluster.adminClient, topicPartition);
        runExecuteAssignment(this.cluster.adminClient, false, "{\"version\":1,\"partitions\":[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,3,4],\"log_dirs\":[\"any\",\"any\",\"any\"]}]}", -1L, -1L);
        TestUtils.waitUntilTrue(() -> {
            return Boolean.valueOf(Objects.equals(TestUtils.currentIsr(this.cluster.adminClient, topicPartition), mutableSet(0, 1, 2, 3).toSet()));
        }, () -> {
            return "Timed out while waiting for replica 3 to join the ISR";
        }, 15000L, 100L);
        Assertions.assertEquals(new AbstractMap.SimpleImmutableEntry(Collections.singleton(topicPartition), Collections.emptySet()), runCancelAssignment(this.cluster.adminClient, "{\"version\":1,\"partitions\":[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,3,4],\"log_dirs\":[\"any\",\"any\",\"any\"]}]}", true));
        verifyReplicaDeleted(topicPartition, 3);
        verifyReplicaDeleted(topicPartition, 4);
    }

    private void verifyReplicaDeleted(TopicPartition topicPartition, Integer num) {
        TestUtils.waitUntilTrue(() -> {
            KafkaBroker kafkaBroker = (KafkaBroker) this.cluster.servers.get(num.intValue());
            return Boolean.valueOf(kafkaBroker.replicaManager().getPartition(topicPartition) == HostedPartition$None$.MODULE$ && kafkaBroker.logManager().getLog(topicPartition, false).isEmpty());
        }, () -> {
            return "Timed out waiting for replica " + num + " of " + topicPartition + " to be deleted";
        }, 15000L, 100L);
    }

    private void waitForLogDirThrottle(Set<Integer> set, Long l) {
        HashMap hashMap = new HashMap();
        hashMap.put("leader.replication.throttled.rate", -1L);
        hashMap.put("follower.replication.throttled.rate", -1L);
        hashMap.put("replica.alter.log.dirs.io.max.bytes.per.second", l);
        waitForBrokerThrottles(set, hashMap);
    }

    private void waitForInterBrokerThrottle(List<Integer> list, Long l) {
        HashMap hashMap = new HashMap();
        hashMap.put("leader.replication.throttled.rate", l);
        hashMap.put("follower.replication.throttled.rate", l);
        hashMap.put("replica.alter.log.dirs.io.max.bytes.per.second", -1L);
        waitForBrokerThrottles(list, hashMap);
    }

    private void waitForBrokerThrottles(Collection<Integer> collection, Map<String, Long> map) {
        HashMap hashMap = new HashMap();
        this.unthrottledBrokerConfigs.forEach((num, map2) -> {
            hashMap.put(num, collection.contains(num) ? map : map2);
        });
        waitForBrokerLevelThrottles(hashMap);
    }

    private void waitForBrokerLevelThrottles(Map<Integer, Map<String, Long>> map) {
        AtomicReference atomicReference = new AtomicReference(new HashMap());
        TestUtils.waitUntilTrue(() -> {
            try {
                atomicReference.set(describeBrokerLevelThrottles(map.keySet()));
                return Boolean.valueOf(map.equals(atomicReference.get()));
            } catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException(e);
            }
        }, () -> {
            return "timed out waiting for broker throttle to become " + map + ".  Latest throttles were " + atomicReference.get();
        }, 15000L, 25L);
    }

    private Map<Integer, Map<String, Long>> describeBrokerLevelThrottles(Collection<Integer> collection) throws ExecutionException, InterruptedException {
        HashMap hashMap = new HashMap();
        for (Integer num : collection) {
            ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, num.toString());
            Config config = (Config) ((KafkaFuture) this.cluster.adminClient.describeConfigs(Collections.singleton(configResource)).values().get(configResource)).get();
            HashMap hashMap2 = new HashMap();
            ReassignPartitionsCommand.BROKER_LEVEL_THROTTLES.forEach(str -> {
                hashMap2.put(str, Long.valueOf(Long.parseLong((String) Optional.ofNullable(config.get(str)).map((v0) -> {
                    return v0.value();
                }).orElse("-1"))));
            });
            hashMap.put(num, hashMap2);
        }
        return hashMap;
    }

    @ValueSource(strings = {"zk"})
    @ParameterizedTest
    public void testLogDirReassignment(String str) throws Exception {
        TopicPartition topicPartition = new TopicPartition(ConsumerGroupCommandTest.TOPIC, 0);
        this.cluster = new ReassignPartitionsTestCluster(Collections.emptyMap(), Collections.emptyMap());
        this.cluster.setup();
        this.cluster.produceMessages(topicPartition.topic(), topicPartition.partition(), 700);
        LogDirReassignment buildLogDirReassignment = buildLogDirReassignment(topicPartition, 0, Arrays.asList(0, 1, 2));
        runExecuteAssignment(this.cluster.adminClient, false, buildLogDirReassignment.json, -1L, 1L);
        waitForVerifyAssignment(this.cluster.adminClient, buildLogDirReassignment.json, true, new VerifyAssignmentResult(Collections.singletonMap(topicPartition, new PartitionReassignmentState(Arrays.asList(0, 1, 2), Arrays.asList(0, 1, 2), true)), false, Collections.singletonMap(new TopicPartitionReplica(topicPartition.topic(), topicPartition.partition(), 0), new ActiveMoveState(buildLogDirReassignment.currentDir, buildLogDirReassignment.targetDir, buildLogDirReassignment.targetDir)), true));
        waitForLogDirThrottle(Collections.singleton(0), 1L);
        this.cluster.adminClient.incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.BROKER, "0"), Collections.singletonList(new AlterConfigOp(new ConfigEntry("replica.alter.log.dirs.io.max.bytes.per.second", ""), AlterConfigOp.OpType.DELETE)))).all().get();
        waitForBrokerLevelThrottles(this.unthrottledBrokerConfigs);
        waitForVerifyAssignment(this.cluster.adminClient, buildLogDirReassignment.json, true, new VerifyAssignmentResult(Collections.singletonMap(topicPartition, new PartitionReassignmentState(Arrays.asList(0, 1, 2), Arrays.asList(0, 1, 2), true)), false, Collections.singletonMap(new TopicPartitionReplica(topicPartition.topic(), topicPartition.partition(), 0), new CompletedMoveState(buildLogDirReassignment.targetDir)), false));
        Assertions.assertEquals(buildLogDirReassignment.targetDir, new BrokerDirs(this.cluster.adminClient.describeLogDirs((Collection) IntStream.range(0, 4).boxed().collect(Collectors.toList())), 0).curLogDirs.getOrDefault(topicPartition, ""));
    }

    @ValueSource(strings = {"zk"})
    @ParameterizedTest
    public void testAlterLogDirReassignmentThrottle(String str) throws Exception {
        TopicPartition topicPartition = new TopicPartition(ConsumerGroupCommandTest.TOPIC, 0);
        this.cluster = new ReassignPartitionsTestCluster(Collections.emptyMap(), Collections.emptyMap());
        this.cluster.setup();
        this.cluster.produceMessages(topicPartition.topic(), topicPartition.partition(), 700);
        LogDirReassignment buildLogDirReassignment = buildLogDirReassignment(topicPartition, 0, Arrays.asList(0, 1, 2));
        runExecuteAssignment(this.cluster.adminClient, false, buildLogDirReassignment.json, -1L, 1L);
        waitForLogDirThrottle(new HashSet(Collections.singletonList(0)), 1L);
        runExecuteAssignment(this.cluster.adminClient, true, buildLogDirReassignment.json, -1L, 3000000L);
        waitForLogDirThrottle(Collections.singleton(0), 3000000L);
        waitForVerifyAssignment(this.cluster.adminClient, buildLogDirReassignment.json, true, new VerifyAssignmentResult(Collections.singletonMap(topicPartition, new PartitionReassignmentState(Arrays.asList(0, 1, 2), Arrays.asList(0, 1, 2), true)), false, Collections.singletonMap(new TopicPartitionReplica(topicPartition.topic(), topicPartition.partition(), 0), new CompletedMoveState(buildLogDirReassignment.targetDir)), false));
    }

    private LogDirReassignment buildLogDirReassignment(TopicPartition topicPartition, int i, List<Integer> list) throws ExecutionException, InterruptedException {
        BrokerDirs brokerDirs = new BrokerDirs(this.cluster.adminClient.describeLogDirs((Collection) IntStream.range(0, 4).boxed().collect(Collectors.toList())), i);
        Assertions.assertTrue(brokerDirs.futureLogDirs.isEmpty());
        String str = brokerDirs.curLogDirs.get(topicPartition);
        String str2 = brokerDirs.logDirs.stream().filter(str3 -> {
            return !str3.equals(str);
        }).findFirst().get();
        return new LogDirReassignment(" { \"version\": 1,  \"partitions\": [    {     \"topic\": \"" + topicPartition.topic() + "\",     \"partition\": " + topicPartition.partition() + ",     \"replicas\": [" + ((String) list.stream().map((v0) -> {
            return v0.toString();
        }).collect(Collectors.joining(","))) + "],     \"log_dirs\": [" + String.join(",", (List) list.stream().map(num -> {
            return num.intValue() == i ? "\"" + str2 + "\"" : "\"any\"";
        }).collect(Collectors.toList())) + "]    }   ]  }", str, str2);
    }

    private VerifyAssignmentResult runVerifyAssignment(Admin admin, String str, Boolean bool) {
        System.out.println("==> verifyAssignment(adminClient, jsonString=" + str);
        try {
            return ReassignPartitionsCommand.verifyAssignment(admin, str, bool);
        } catch (InterruptedException | ExecutionException | JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }

    private void waitForVerifyAssignment(Admin admin, String str, Boolean bool, VerifyAssignmentResult verifyAssignmentResult) {
        VerifyAssignmentResult[] verifyAssignmentResultArr = {null};
        TestUtils.waitUntilTrue(() -> {
            verifyAssignmentResultArr[0] = runVerifyAssignment(admin, str, bool);
            return Boolean.valueOf(verifyAssignmentResult.equals(verifyAssignmentResultArr[0]));
        }, () -> {
            return "Timed out waiting for verifyAssignment result " + verifyAssignmentResult + ".  The latest result was " + verifyAssignmentResultArr[0];
        }, 15000L, 10L);
    }

    private void runExecuteAssignment(Admin admin, Boolean bool, String str, Long l, Long l2) throws RuntimeException {
        System.out.println("==> executeAssignment(adminClient, additional=" + bool + ", reassignmentJson=" + str + ", interBrokerThrottle=" + l + ", replicaAlterLogDirsThrottle=" + l2 + "))");
        try {
            ReassignPartitionsCommand.executeAssignment(admin, bool, str, l, l2, 10000L, Time.SYSTEM);
        } catch (InterruptedException | ExecutionException | JsonProcessingException | TerseException e) {
            throw new RuntimeException(e);
        }
    }

    private Map.Entry<Set<TopicPartition>, Set<TopicPartitionReplica>> runCancelAssignment(Admin admin, String str, Boolean bool) {
        System.out.println("==> cancelAssignment(adminClient, jsonString=" + str);
        try {
            return ReassignPartitionsCommand.cancelAssignment(admin, str, bool, 10000L, Time.SYSTEM);
        } catch (InterruptedException | ExecutionException | JsonProcessingException | TerseException e) {
            throw new RuntimeException(e);
        }
    }

    private static <T> scala.collection.mutable.Set<T> mutableSet(T... tArr) {
        return JavaConverters.asScalaSet(new HashSet(Arrays.asList(tArr)));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <T> Seq<T> seq(Collection<T> collection) {
        return ((scala.collection.Iterator) JavaConverters.asScalaIteratorConverter(collection.iterator()).asScala()).toSeq();
    }

    private void removeReplicationThrottleForPartitions(Admin admin, TopicPartition topicPartition) {
        try {
            removePartitionReplicaThrottles(admin, new HashSet(Collections.singleton(topicPartition)));
            ToolsTestUtils.throttleAllBrokersReplication(admin, Arrays.asList(0, 1, 2, 3), Integer.MAX_VALUE);
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    private void removePartitionReplicaThrottles(Admin admin, Set<TopicPartition> set) {
        try {
            admin.incrementalAlterConfigs((Map) set.stream().map(topicPartition -> {
                return new AbstractMap.SimpleEntry(new ConfigResource(ConfigResource.Type.TOPIC, topicPartition.topic()), Arrays.asList(new AlterConfigOp(new ConfigEntry("leader.replication.throttled.replicas", ""), AlterConfigOp.OpType.DELETE), new AlterConfigOp(new ConfigEntry("follower.replication.throttled.replicas", ""), AlterConfigOp.OpType.DELETE)));
            }).collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, (v0) -> {
                return v0.getValue();
            }))).all().get();
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    private void setReplicationThrottleForPartitions(Admin admin, TopicPartition topicPartition) {
        try {
            ToolsTestUtils.throttleAllBrokersReplication(admin, Collections.singletonList(4), 1);
            ToolsTestUtils.assignThrottledPartitionReplicas(admin, Collections.singletonMap(topicPartition, Collections.singletonList(4)));
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }
}
