package com.linkedin.kafka.cruisecontrol.executor;

import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUnitTestUtils;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.common.MetadataClient;
import com.linkedin.kafka.cruisecontrol.common.TestConstants;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.detector.AnomalyDetector;
import com.linkedin.kafka.cruisecontrol.detector.notifier.AnomalyType;
import com.linkedin.kafka.cruisecontrol.executor.Executor;
import com.linkedin.kafka.cruisecontrol.executor.ExecutorNotification;
import com.linkedin.kafka.cruisecontrol.executor.ExecutorState;
import com.linkedin.kafka.cruisecontrol.executor.strategy.ReplicaMovementStrategy;
import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaClientsIntegrationTestHarness;
import com.linkedin.kafka.cruisecontrol.model.ReplicaPlacementInfo;
import com.linkedin.kafka.cruisecontrol.monitor.LoadMonitor;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.NoopSampler;
import com.yammer.metrics.core.MetricsRegistry;
import io.confluent.databalancer.operation.BalanceOpExecutionCompletionCallback;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import kafka.controller.ReplicaAssignment;
import kafka.server.ConfigType;
import kafka.server.ConfigType$;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.zk.KafkaZkClient;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.ElectLeadersResult;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.easymock.Capture;
import org.easymock.CaptureType;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.collection.JavaConverters;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/executor/ExecutorTest.class */
public class ExecutorTest extends CCKafkaClientsIntegrationTestHarness {
    private static final int REPLICA_FETCH_MAX_BYTES = 10000;
    private static final String RANDOM_UUID = "random_uuid";
    private static final String DESCRIBE_TOPICS_RESPONSE_TIMEOUT_MS = "10000";
    private MetricsRegistry metricsRegistry;
    private static final Logger LOG = LoggerFactory.getLogger(ExecutorTest.class);
    private static final int PARTITION = 0;
    private static final TopicPartition TP0 = new TopicPartition(TestConstants.TOPIC0, PARTITION);
    private static final TopicPartition TP1 = new TopicPartition(TestConstants.TOPIC1, PARTITION);
    private static final TopicPartition TP2 = new TopicPartition(TestConstants.TOPIC2, PARTITION);
    private static final TopicPartition TP3 = new TopicPartition(TestConstants.TOPIC_WITH_DOT, PARTITION);
    private final ExecutorService sameThreadExecutorService = (ExecutorService) Mockito.mock(ExecutorService.class);
    private Map<String, TopicDescription> topicsCreated = new HashMap();

    @Override // com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaIntegrationTestHarness
    public int clusterSize() {
        return 2;
    }

    @Override // com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaIntegrationTestHarness
    protected Map<Object, Object> overridingProps() {
        HashMap hashMap = new HashMap();
        hashMap.put(KafkaConfig.DeleteTopicEnableProp(), "true");
        hashMap.put(KafkaConfig.ReplicaFetchMaxBytesProp(), String.valueOf(REPLICA_FETCH_MAX_BYTES));
        return hashMap;
    }

    @Override // com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaIntegrationTestHarness
    protected String rackForNode(int i) {
        return String.valueOf(i);
    }

    @Override // com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaClientsIntegrationTestHarness, com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaIntegrationTestHarness, com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCAbstractZookeeperTestHarness
    @Before
    public void setUp() {
        super.setUp();
        this.metricsRegistry = new MetricsRegistry();
        ((ExecutorService) Mockito.doAnswer(invocationOnMock -> {
            ((Runnable) invocationOnMock.getArgument(PARTITION)).run();
            return new Future<Object>() { // from class: com.linkedin.kafka.cruisecontrol.executor.ExecutorTest.1
                @Override // java.util.concurrent.Future
                public boolean cancel(boolean z) {
                    return false;
                }

                @Override // java.util.concurrent.Future
                public boolean isCancelled() {
                    return false;
                }

                @Override // java.util.concurrent.Future
                public boolean isDone() {
                    return true;
                }

                @Override // java.util.concurrent.Future
                public Object get() {
                    return null;
                }

                @Override // java.util.concurrent.Future
                public Object get(long j, TimeUnit timeUnit) {
                    return null;
                }
            };
        }).when(this.sameThreadExecutorService)).submit((Runnable) Mockito.any(Runnable.class));
    }

    @Override // com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaIntegrationTestHarness, com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCAbstractZookeeperTestHarness
    @After
    public void tearDown() {
        super.tearDown();
        this.metricsRegistry.shutdown();
    }

    @Test
    public void testBasicBalanceMovement() throws InterruptedException {
        KafkaZkClient createKafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient(zookeeper().connectionString(), "ExecutorTestMetricGroup", "BasicBalanceMovement", false);
        try {
            Collection<ExecutionProposal> basicProposals = getBasicProposals();
            executeAndVerifyProposals(createKafkaZkClient, basicProposals, basicProposals).shutdown();
        } finally {
            KafkaCruiseControlUtils.closeKafkaZkClientWithTimeout(createKafkaZkClient);
        }
    }

    @Test
    public void testRebalanceObserverMovement() throws InterruptedException {
        KafkaZkClient createKafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient(zookeeper().connectionString(), "ExecutorTestMetricGroup", "BasicBalanceMovement", false);
        try {
            Collection<ExecutionProposal> topicPlacementProposals = getTopicPlacementProposals(createKafkaZkClient);
            executeAndVerifyProposals(createKafkaZkClient, topicPlacementProposals, topicPlacementProposals).shutdown();
        } finally {
            KafkaCruiseControlUtils.closeKafkaZkClientWithTimeout(createKafkaZkClient);
        }
    }

    @Test
    public void testRebalanceCancellation() throws InterruptedException, ExecutionException {
        ConfluentAdmin createAdmin = KafkaCruiseControlUtils.createAdmin(new KafkaCruiseControlConfig(getExecutorProperties()).originals());
        try {
            int id = ((TopicPartitionInfo) createTopics().get(TP0.topic()).partitions().get(TP0.partition())).leader().id();
            Executor fillUpAndMoveBasicTopicPartition0 = fillUpAndMoveBasicTopicPartition0(1L);
            assertReassignmentsStarted(createAdmin, 1);
            Assert.assertEquals(0L, fillUpAndMoveBasicTopicPartition0.numCancelledReassignments());
            fillUpAndMoveBasicTopicPartition0.stopExecution();
            assertReassignmentsCanceled(createAdmin, id, fillUpAndMoveBasicTopicPartition0);
            fillUpAndMoveBasicTopicPartition0.shutdown();
            KafkaCruiseControlUtils.closeAdminClientWithTimeout(createAdmin);
        } catch (Throwable th) {
            KafkaCruiseControlUtils.closeAdminClientWithTimeout(createAdmin);
            throw th;
        }
    }

    private void assertReassignmentsStarted(ConfluentAdmin confluentAdmin, int i) throws InterruptedException {
        waitForAssert(() -> {
            Assert.assertEquals(i, ((Map) confluentAdmin.listPartitionReassignments().reassignments().get()).size());
            return true;
        }, 10000L, "Should have started reassigning");
    }

    private void assertReassignmentsCanceled(ConfluentAdmin confluentAdmin, int i, Executor executor) throws InterruptedException {
        int i2 = 1;
        HashSet hashSet = new HashSet();
        hashSet.add(TP0.topic());
        waitForAssert(() -> {
            Assert.assertEquals(0L, ((Map) confluentAdmin.listPartitionReassignments().reassignments().get()).size());
            Assert.assertEquals(i, ((TopicPartitionInfo) ((TopicDescription) ((Map) confluentAdmin.describeTopics(hashSet).all().get()).get(TP0.topic())).partitions().get(TP0.partition())).leader().id());
            return true;
        }, 5000L, "Should have reverted the reassignment");
        waitForAssert(() -> {
            Assert.assertEquals(i2, executor.numCancelledReassignments());
            return true;
        }, 5000L, "Reassignment cancellation should be reflected in the metrics");
    }

    @Test
    public void testExecutionProposalCompletionCbHandlesCompletion() throws InterruptedException, ExecutionException {
        ConfluentAdmin createAdmin = KafkaCruiseControlUtils.createAdmin(new KafkaCruiseControlConfig(getExecutorProperties()).originals());
        try {
            BalanceOpExecutionCompletionCallback balanceOpExecutionCompletionCallback = (BalanceOpExecutionCompletionCallback) Mockito.mock(BalanceOpExecutionCompletionCallback.class);
            ExecutionProposal basicTopicPartition0Proposal = getBasicTopicPartition0Proposal();
            Executor executor = executor(1L);
            executeProposals(executor, Collections.singletonList(basicTopicPartition0Proposal), balanceOpExecutionCompletionCallback);
            waitUntilExecutionFinishes(executor);
            ((BalanceOpExecutionCompletionCallback) Mockito.verify(balanceOpExecutionCompletionCallback)).accept(Mockito.eq(true), (Throwable) Mockito.isNull());
            executor.shutdown();
            KafkaCruiseControlUtils.closeAdminClientWithTimeout(createAdmin);
        } catch (Throwable th) {
            KafkaCruiseControlUtils.closeAdminClientWithTimeout(createAdmin);
            throw th;
        }
    }

    @Test
    public void testExecutionProposalCompletionCbHandlesStop() throws InterruptedException, ExecutionException {
        ConfluentAdmin createAdmin = KafkaCruiseControlUtils.createAdmin(new KafkaCruiseControlConfig(getExecutorProperties()).originals());
        try {
            ((TopicPartitionInfo) createTopics().get(TP0.topic()).partitions().get(TP0.partition())).leader().id();
            BalanceOpExecutionCompletionCallback balanceOpExecutionCompletionCallback = (BalanceOpExecutionCompletionCallback) Mockito.mock(BalanceOpExecutionCompletionCallback.class);
            ExecutionProposal basicTopicPartition0Proposal = getBasicTopicPartition0Proposal();
            produceData(TP0.topic(), 30000);
            Executor executor = executor(1L);
            executeProposals(executor, Collections.singletonList(basicTopicPartition0Proposal), balanceOpExecutionCompletionCallback);
            assertReassignmentsStarted(createAdmin, 1);
            executor.stopExecution();
            waitUntilExecutionFinishes(executor);
            ((BalanceOpExecutionCompletionCallback) Mockito.verify(balanceOpExecutionCompletionCallback)).accept(Mockito.eq(false), (Throwable) Mockito.isNull());
            executor.shutdown();
            KafkaCruiseControlUtils.closeAdminClientWithTimeout(createAdmin);
        } catch (Throwable th) {
            KafkaCruiseControlUtils.closeAdminClientWithTimeout(createAdmin);
            throw th;
        }
    }

    @Test
    public void testAbortAndAcquireThrowsIllegalStateExceptionWhenLockAcquired() throws Exception {
        Executor executor = executor();
        Executor.ReservationHandle reserveAndAbortOngoingExecutions = executor.reserveAndAbortOngoingExecutions(Duration.ofSeconds(5L));
        Throwable th = PARTITION;
        try {
            try {
                Assert.assertFalse("Expected Executor to not be reserved for the current thread", executor.isReservedByOther());
                Assert.assertTrue("Expected the executor's reservation to be taken", executor._reservation.isReserved());
                assertIllegalStateExceptionThrownWhileReservationIsHeld(executor, () -> {
                    try {
                        executor.reserveAndAbortOngoingExecutions(Duration.ofMillis(100L));
                    } catch (TimeoutException e) {
                        throw new RuntimeException(e);
                    }
                });
                assertAbortAndAcquireThrowsIllegalStateExceptionWhenReservationAcquired(executor);
                Assert.assertTrue("Expected the executor's reservation to be taken", executor._reservation.isReserved());
                Assert.assertFalse("Expected Executor to not be reserved for the current thread", executor.isReservedByOther());
                if (reserveAndAbortOngoingExecutions != null) {
                    if (th == null) {
                        reserveAndAbortOngoingExecutions.close();
                        return;
                    }
                    try {
                        reserveAndAbortOngoingExecutions.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (reserveAndAbortOngoingExecutions != null) {
                if (th != null) {
                    try {
                        reserveAndAbortOngoingExecutions.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    reserveAndAbortOngoingExecutions.close();
                }
            }
            throw th4;
        }
    }

    private void assertAbortAndAcquireThrowsIllegalStateExceptionWhenReservationAcquired(Executor executor) throws InterruptedException {
        AtomicReference atomicReference = new AtomicReference();
        Thread thread = new Thread(() -> {
            try {
                Assert.assertTrue("Expected the executor to be reserved", executor.isReservedByOther());
                executor.reserveAndAbortOngoingExecutions(Duration.ofMillis(100L));
                atomicReference.set(new Exception("Expected Executor reservation to be held by another thread and this thread to not be able to reserve it."));
            } catch (Throwable th) {
                atomicReference.set(th);
            }
        });
        thread.start();
        thread.join();
        Assert.assertNotNull("Expected exception to be populated", atomicReference.get());
        Assert.assertTrue("Expected populated exception to be IllegalStateException", atomicReference.get() instanceof IllegalStateException);
        Assert.assertTrue("Expected the executor's reservation to be taken", executor._reservation.isReserved());
    }

    private void assertIllegalStateExceptionThrownWhileReservationIsHeld(Executor executor, Runnable runnable) throws InterruptedException {
        AtomicReference atomicReference = new AtomicReference();
        Thread thread = new Thread(() -> {
            try {
                Assert.assertTrue("Expected the executor to be reserved", executor.isReservedByOther());
                runnable.run();
                atomicReference.set(new Exception("Expected Executor reservation to be held by another thread and this thread to not be able to reserve it."));
            } catch (Throwable th) {
                atomicReference.set(th);
            }
        });
        thread.start();
        thread.join();
        Assert.assertNotNull("Expected exception to be populated", atomicReference.get());
        Assert.assertTrue("Expected populated exception to be IllegalStateException", atomicReference.get() instanceof IllegalStateException);
        Assert.assertTrue("Expected the executor's reservation to be taken", executor._reservation.isReserved());
    }

    @Test
    public void testAbortAndAcquireCancelsCurrentRebalanceAndGetsReservation() throws InterruptedException, ExecutionException, TimeoutException {
        KafkaCruiseControlConfig kafkaCruiseControlConfig = new KafkaCruiseControlConfig(getExecutorProperties());
        Duration ofMillis = Duration.ofMillis(10000L);
        ConfluentAdmin createAdmin = KafkaCruiseControlUtils.createAdmin(kafkaCruiseControlConfig.originals());
        try {
            int id = ((TopicPartitionInfo) createTopics().get(TP0.topic()).partitions().get(TP0.partition())).leader().id();
            Executor fillUpAndMoveBasicTopicPartition0 = fillUpAndMoveBasicTopicPartition0(1L);
            assertReassignmentsStarted(createAdmin, 1);
            Assert.assertEquals(0L, fillUpAndMoveBasicTopicPartition0.numCancelledReassignments());
            Executor.ReservationHandle reserveAndAbortOngoingExecutions = fillUpAndMoveBasicTopicPartition0.reserveAndAbortOngoingExecutions(ofMillis);
            Throwable th = PARTITION;
            try {
                try {
                    Assert.assertFalse("Expected ongoing execution to be canceled", fillUpAndMoveBasicTopicPartition0.hasOngoingExecution());
                    assertReassignmentsCanceled(createAdmin, id, fillUpAndMoveBasicTopicPartition0);
                    Assert.assertTrue("Expected the executor's reservation to be taken", fillUpAndMoveBasicTopicPartition0._reservation.isReserved());
                    Assert.assertTrue("Expected reservation to be held by the test thread", fillUpAndMoveBasicTopicPartition0._reservation.isReservedByMe());
                    Assert.assertFalse("Expected the Executor to not be reserved for the test thread that holds the reservation", fillUpAndMoveBasicTopicPartition0.isReservedByOther());
                    if (reserveAndAbortOngoingExecutions != null) {
                        if (th != null) {
                            try {
                                reserveAndAbortOngoingExecutions.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            reserveAndAbortOngoingExecutions.close();
                        }
                    }
                    fillUpAndMoveBasicTopicPartition0.shutdown();
                    KafkaCruiseControlUtils.closeAdminClientWithTimeout(createAdmin);
                } finally {
                }
            } finally {
            }
        } catch (Throwable th3) {
            KafkaCruiseControlUtils.closeAdminClientWithTimeout(createAdmin);
            throw th3;
        }
    }

    @Test(expected = IllegalStateException.class)
    public void testCannotAcquireReservationTwice() throws TimeoutException {
        Executor executor = executor();
        executor.reserveAndAbortOngoingExecutions(Duration.ofSeconds(1L));
        executor.reserveAndAbortOngoingExecutions(Duration.ofSeconds(1L));
    }

    @Test
    public void testAbortAndAcquireShortAbortTimeoutThrowsExceptionAndGivesUpReservation() throws ExecutionException, InterruptedException {
        KafkaCruiseControlConfig kafkaCruiseControlConfig = new KafkaCruiseControlConfig(getExecutorProperties());
        Duration ofMillis = Duration.ofMillis(50L);
        ConfluentAdmin createAdmin = KafkaCruiseControlUtils.createAdmin(kafkaCruiseControlConfig.originals());
        try {
            int id = ((TopicPartitionInfo) createTopics().get(TP0.topic()).partitions().get(TP0.partition())).leader().id();
            Executor fillUpAndMoveBasicTopicPartition0 = fillUpAndMoveBasicTopicPartition0(1L);
            assertReassignmentsStarted(createAdmin, 1);
            Assert.assertEquals(0L, fillUpAndMoveBasicTopicPartition0.numCancelledReassignments());
            try {
                fillUpAndMoveBasicTopicPartition0.reserveAndAbortOngoingExecutions(ofMillis);
            } catch (TimeoutException e) {
                Assert.assertFalse("Expected the reservation to not be taken", fillUpAndMoveBasicTopicPartition0._reservation.isReserved());
                Assert.assertFalse("Expected the Executor's reservation to not be held", fillUpAndMoveBasicTopicPartition0.isReservedByOther());
            }
            assertReassignmentsCanceled(createAdmin, id, fillUpAndMoveBasicTopicPartition0);
            fillUpAndMoveBasicTopicPartition0.shutdown();
            KafkaCruiseControlUtils.closeAdminClientWithTimeout(createAdmin);
        } catch (Throwable th) {
            KafkaCruiseControlUtils.closeAdminClientWithTimeout(createAdmin);
            throw th;
        }
    }

    @Test
    public void testProposalsCannotBeExecutedWhileReservationHeld() throws TimeoutException, InterruptedException {
        KafkaCruiseControlConfig kafkaCruiseControlConfig = new KafkaCruiseControlConfig(getExecutorProperties());
        Duration ofMillis = Duration.ofMillis(50L);
        ConfluentAdmin createAdmin = KafkaCruiseControlUtils.createAdmin(kafkaCruiseControlConfig.originals());
        try {
            Executor executor = executor();
            Executor.ReservationHandle reserveAndAbortOngoingExecutions = executor.reserveAndAbortOngoingExecutions(ofMillis);
            Throwable th = PARTITION;
            try {
                try {
                    Assert.assertTrue("Expected the executor's reservation to be taken", executor._reservation.isReserved());
                    Assert.assertTrue("Expected reservation to be held by the test thread", executor._reservation.isReservedByMe());
                    Assert.assertFalse("Expected the Executor to not be reserved for the test thread that holds the reservation", executor.isReservedByOther());
                    assertIllegalStateExceptionThrownWhileReservationIsHeld(executor, () -> {
                        try {
                            ExecutionProposal basicTopicPartition0Proposal = getBasicTopicPartition0Proposal();
                            executor.updateThrottle(1L);
                            executeProposals(executor, Collections.singletonList(basicTopicPartition0Proposal));
                        } catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        }
                    });
                    if (reserveAndAbortOngoingExecutions != null) {
                        if (th != null) {
                            try {
                                reserveAndAbortOngoingExecutions.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            reserveAndAbortOngoingExecutions.close();
                        }
                    }
                    assertReassignmentsStarted(createAdmin, PARTITION);
                    Assert.assertEquals(0L, executor.numCancelledReassignments());
                    executor.shutdown();
                    KafkaCruiseControlUtils.closeAdminClientWithTimeout(createAdmin);
                } finally {
                }
            } finally {
            }
        } catch (Throwable th3) {
            KafkaCruiseControlUtils.closeAdminClientWithTimeout(createAdmin);
            throw th3;
        }
    }

    @Test
    public void testMoveNonExistingPartition() throws InterruptedException {
        KafkaZkClient createKafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient(zookeeper().connectionString(), "ExecutorTestMetricGroup", "MoveNonExistingPartition", false);
        try {
            Map<String, TopicDescription> createTopics = createTopics();
            int id = ((TopicPartitionInfo) createTopics.get(TestConstants.TOPIC0).partitions().get(PARTITION)).leader().id();
            int id2 = ((TopicPartitionInfo) createTopics.get(TestConstants.TOPIC1).partitions().get(PARTITION)).leader().id();
            ExecutionProposal executionProposal = new ExecutionProposal(TP0, 0L, new ReplicaPlacementInfo(Integer.valueOf(id)), Collections.singletonList(new ReplicaPlacementInfo(Integer.valueOf(id))), Collections.singletonList(id == 0 ? new ReplicaPlacementInfo(1) : new ReplicaPlacementInfo(Integer.valueOf(PARTITION))), Collections.emptyList(), Collections.emptyList());
            TopicPartition topicPartition = TP1;
            ReplicaPlacementInfo replicaPlacementInfo = new ReplicaPlacementInfo(Integer.valueOf(id2));
            ReplicaPlacementInfo[] replicaPlacementInfoArr = new ReplicaPlacementInfo[2];
            replicaPlacementInfoArr[PARTITION] = new ReplicaPlacementInfo(Integer.valueOf(id2));
            replicaPlacementInfoArr[1] = id2 == 0 ? new ReplicaPlacementInfo(1) : new ReplicaPlacementInfo(Integer.valueOf(PARTITION));
            List asList = Arrays.asList(replicaPlacementInfoArr);
            ReplicaPlacementInfo[] replicaPlacementInfoArr2 = new ReplicaPlacementInfo[2];
            replicaPlacementInfoArr2[PARTITION] = id2 == 0 ? new ReplicaPlacementInfo(1) : new ReplicaPlacementInfo(Integer.valueOf(PARTITION));
            replicaPlacementInfoArr2[1] = new ReplicaPlacementInfo(Integer.valueOf(id2));
            ExecutionProposal executionProposal2 = new ExecutionProposal(topicPartition, 0L, replicaPlacementInfo, asList, Arrays.asList(replicaPlacementInfoArr2), Collections.emptyList(), Collections.emptyList());
            ExecutionProposal executionProposal3 = new ExecutionProposal(TP2, 0L, new ReplicaPlacementInfo(Integer.valueOf(id)), Collections.singletonList(new ReplicaPlacementInfo(Integer.valueOf(id))), Collections.singletonList(id == 0 ? new ReplicaPlacementInfo(1) : new ReplicaPlacementInfo(Integer.valueOf(PARTITION))), Collections.emptyList(), Collections.emptyList());
            TopicPartition topicPartition2 = TP3;
            ReplicaPlacementInfo replicaPlacementInfo2 = new ReplicaPlacementInfo(Integer.valueOf(id2));
            ReplicaPlacementInfo[] replicaPlacementInfoArr3 = new ReplicaPlacementInfo[2];
            replicaPlacementInfoArr3[PARTITION] = new ReplicaPlacementInfo(Integer.valueOf(id2));
            replicaPlacementInfoArr3[1] = id2 == 0 ? new ReplicaPlacementInfo(1) : new ReplicaPlacementInfo(Integer.valueOf(PARTITION));
            List asList2 = Arrays.asList(replicaPlacementInfoArr3);
            ReplicaPlacementInfo[] replicaPlacementInfoArr4 = new ReplicaPlacementInfo[2];
            replicaPlacementInfoArr4[PARTITION] = id2 == 0 ? new ReplicaPlacementInfo(1) : new ReplicaPlacementInfo(Integer.valueOf(PARTITION));
            replicaPlacementInfoArr4[1] = new ReplicaPlacementInfo(Integer.valueOf(id2));
            executeAndVerifyProposals(createKafkaZkClient, Arrays.asList(executionProposal, executionProposal2, executionProposal3, new ExecutionProposal(topicPartition2, 0L, replicaPlacementInfo2, asList2, Arrays.asList(replicaPlacementInfoArr4), Collections.emptyList(), Collections.emptyList())), Arrays.asList(executionProposal, executionProposal2)).shutdown();
            KafkaCruiseControlUtils.closeKafkaZkClientWithTimeout(createKafkaZkClient);
        } catch (Throwable th) {
            KafkaCruiseControlUtils.closeKafkaZkClientWithTimeout(createKafkaZkClient);
            throw th;
        }
    }

    @Test
    public void testMoveDeletedTopic() throws InterruptedException, ExecutionException {
        KafkaZkClient createKafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient(zookeeper().connectionString(), "ExecutorTestMetricGroup", "MoveNonExistingTopic", false);
        try {
            Map<String, TopicDescription> createTopics = createTopics();
            int id = ((TopicPartitionInfo) createTopics.get(TestConstants.TOPIC0).partitions().get(PARTITION)).leader().id();
            int id2 = ((TopicPartitionInfo) createTopics.get(TestConstants.TOPIC1).partitions().get(PARTITION)).leader().id();
            ExecutionProposal executionProposal = new ExecutionProposal(TP0, 0L, new ReplicaPlacementInfo(Integer.valueOf(id)), Collections.singletonList(new ReplicaPlacementInfo(Integer.valueOf(id))), Collections.singletonList(id == 0 ? new ReplicaPlacementInfo(1) : new ReplicaPlacementInfo(Integer.valueOf(PARTITION))), Collections.emptyList(), Collections.emptyList());
            TopicPartition topicPartition = TP1;
            ReplicaPlacementInfo replicaPlacementInfo = new ReplicaPlacementInfo(Integer.valueOf(id2));
            ReplicaPlacementInfo[] replicaPlacementInfoArr = new ReplicaPlacementInfo[2];
            replicaPlacementInfoArr[PARTITION] = new ReplicaPlacementInfo(Integer.valueOf(id2));
            replicaPlacementInfoArr[1] = id2 == 0 ? new ReplicaPlacementInfo(1) : new ReplicaPlacementInfo(Integer.valueOf(PARTITION));
            List asList = Arrays.asList(replicaPlacementInfoArr);
            ReplicaPlacementInfo[] replicaPlacementInfoArr2 = new ReplicaPlacementInfo[2];
            replicaPlacementInfoArr2[PARTITION] = id2 == 0 ? new ReplicaPlacementInfo(1) : new ReplicaPlacementInfo(Integer.valueOf(PARTITION));
            replicaPlacementInfoArr2[1] = new ReplicaPlacementInfo(Integer.valueOf(id2));
            ExecutionProposal executionProposal2 = new ExecutionProposal(topicPartition, 0L, replicaPlacementInfo, asList, Arrays.asList(replicaPlacementInfoArr2), Collections.emptyList(), Collections.emptyList());
            List asList2 = Arrays.asList(executionProposal, executionProposal2);
            List asList3 = Arrays.asList(executionProposal2);
            Executor createExecutor = createExecutor();
            createExecutor.updateThrottle(1024L);
            LoadMonitor loadMonitor = (LoadMonitor) EasyMock.mock(LoadMonitor.class);
            createExecutor.initProposalExecution(asList2, Collections.emptySet(), (Integer) null, (Integer) null, (Integer) null, (ReplicaMovementStrategy) null, RANDOM_UUID);
            deleteTopics(Collections.singletonList(TestConstants.TOPIC0));
            createExecutor.getClass();
            createExecutor.startExecution(new Executor.ProposalExecutionRunnable(createExecutor, loadMonitor, (Collection) null, (Collection) null, (BalanceOpExecutionCompletionCallback) null));
            waitAndVerifyProposals(createKafkaZkClient, createExecutor, asList3);
            createExecutor.shutdown();
            KafkaCruiseControlUtils.closeKafkaZkClientWithTimeout(createKafkaZkClient);
        } catch (Throwable th) {
            KafkaCruiseControlUtils.closeKafkaZkClientWithTimeout(createKafkaZkClient);
            throw th;
        }
    }

    @Test
    public void testTimeoutLeaderActions() throws InterruptedException {
        createTopics();
        ExecutionProposal executionProposal = new ExecutionProposal(TP1, 0L, new ReplicaPlacementInfo(1), Arrays.asList(new ReplicaPlacementInfo(Integer.valueOf(PARTITION)), new ReplicaPlacementInfo(1)), Arrays.asList(new ReplicaPlacementInfo(Integer.valueOf(PARTITION)), new ReplicaPlacementInfo(1)), Collections.emptyList(), Collections.emptyList());
        KafkaCruiseControlConfig kafkaCruiseControlConfig = new KafkaCruiseControlConfig(getExecutorProperties());
        MockTime mockTime = new MockTime();
        MetadataClient metadataClient = (MetadataClient) EasyMock.createMock(MetadataClient.class);
        Node node = new Node(PARTITION, "host0", 100);
        Node node2 = new Node(1, "host1", 100);
        Node[] nodeArr = {node, node2};
        MetadataClient.ClusterAndGeneration clusterAndGeneration = new MetadataClient.ClusterAndGeneration(new MetadataClient.ClusterAndPlacements(new Cluster("id", Arrays.asList(node, node2), Collections.singleton(new PartitionInfo(TP1.topic(), TP1.partition(), node2, nodeArr, nodeArr)), Collections.emptySet(), Collections.emptySet()), Collections.emptyMap()), PARTITION);
        EasyMock.expect(metadataClient.refreshMetadata()).andReturn(clusterAndGeneration).anyTimes();
        EasyMock.expect(metadataClient.cluster()).andReturn(clusterAndGeneration.cluster()).anyTimes();
        metadataClient.close();
        EasyMock.expectLastCall().andAnswer(() -> {
            return null;
        });
        EasyMock.replay(new Object[]{metadataClient});
        List singletonList = Collections.singletonList(executionProposal);
        Executor executor = new Executor(kafkaCruiseControlConfig, Option.empty(), mockTime, KafkaCruiseControlUnitTestUtils.getMetricsRegistry(this.metricsRegistry), metadataClient, 86400000L, 43200000L, (ExecutorNotifier) null, getMockAnomalyDetector(RANDOM_UUID));
        executor.executeProposals(singletonList, Collections.emptySet(), (Set) null, (LoadMonitor) EasyMock.mock(LoadMonitor.class), (Integer) null, (Integer) null, (Integer) null, (ReplicaMovementStrategy) null, RANDOM_UUID, (BalanceOpExecutionCompletionCallback) null);
        while (executor.state().state() != ExecutorState.State.LEADER_MOVEMENT_TASK_IN_PROGRESS) {
            Thread.sleep(10L);
        }
        mockTime.sleep(200000L);
        waitUntilExecutionFinishes(executor);
        executor.shutdown();
    }

    @Test
    public void testLeaderActionException() throws ExecutionException, InterruptedException {
        createTopics();
        ExecutionProposal executionProposal = new ExecutionProposal(TP1, 0L, new ReplicaPlacementInfo(1), Arrays.asList(new ReplicaPlacementInfo(Integer.valueOf(PARTITION)), new ReplicaPlacementInfo(1)), Arrays.asList(new ReplicaPlacementInfo(Integer.valueOf(PARTITION)), new ReplicaPlacementInfo(1)), Collections.emptyList(), Collections.emptyList());
        KafkaCruiseControlConfig kafkaCruiseControlConfig = new KafkaCruiseControlConfig(getExecutorProperties());
        MockTime mockTime = new MockTime();
        MetadataClient metadataClient = (MetadataClient) Mockito.mock(MetadataClient.class);
        Node node = new Node(PARTITION, "host0", 100);
        Node node2 = new Node(1, "host1", 100);
        Node[] nodeArr = {node, node2};
        MetadataClient.ClusterAndGeneration clusterAndGeneration = new MetadataClient.ClusterAndGeneration(new MetadataClient.ClusterAndPlacements(new Cluster("id", Arrays.asList(node, node2), Collections.singleton(new PartitionInfo(TP1.topic(), TP1.partition(), node2, nodeArr, nodeArr)), Collections.emptySet(), Collections.emptySet()), Collections.emptyMap()), PARTITION);
        ((MetadataClient) Mockito.doReturn(clusterAndGeneration).when(metadataClient)).refreshMetadata();
        ((MetadataClient) Mockito.doReturn(clusterAndGeneration.cluster()).when(metadataClient)).cluster();
        ConfluentAdmin confluentAdmin = (ConfluentAdmin) Mockito.spy(KafkaCruiseControlUtils.createAdmin(kafkaCruiseControlConfig.originals()));
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        kafkaFutureImpl.completeExceptionally(new Exception("this is a test exception"));
        try {
            Constructor declaredConstructor = ElectLeadersResult.class.getDeclaredConstructor(KafkaFutureImpl.class);
            declaredConstructor.setAccessible(true);
            ElectLeadersResult electLeadersResult = (ElectLeadersResult) declaredConstructor.newInstance(kafkaFutureImpl);
            Assert.assertNotNull(electLeadersResult);
            ((ConfluentAdmin) Mockito.doReturn(electLeadersResult).when(confluentAdmin)).electLeaders((ElectionType) Mockito.eq(ElectionType.PREFERRED), Mockito.anySet());
            final AtomicReference atomicReference = new AtomicReference();
            ExecutorNotifier executorNotifier = new ExecutorNotifier() { // from class: com.linkedin.kafka.cruisecontrol.executor.ExecutorTest.2
                public void sendNotification(ExecutorNotification executorNotification) {
                    atomicReference.set(executorNotification);
                }

                public void configure(Map<String, ?> map) {
                }
            };
            AtomicReference atomicReference2 = new AtomicReference();
            List singletonList = Collections.singletonList(executionProposal);
            Executor executor = new Executor(kafkaCruiseControlConfig, Option.empty(), mockTime, KafkaCruiseControlUnitTestUtils.getMetricsRegistry(this.metricsRegistry), metadataClient, 86400000L, 43200000L, executorNotifier, getMockAnomalyDetector(RANDOM_UUID), confluentAdmin, (ReplicationThrottleHelper) null);
            executor.executeProposals(singletonList, Collections.emptySet(), (Set) null, (LoadMonitor) Mockito.mock(LoadMonitor.class), (Integer) null, (Integer) null, (Integer) null, (ReplicaMovementStrategy) null, RANDOM_UUID, (z, th) -> {
                atomicReference2.set(th);
            });
            while (executor.state().state() != ExecutorState.State.LEADER_MOVEMENT_TASK_IN_PROGRESS) {
                Thread.sleep(10L);
            }
            waitUntilExecutionFinishes(executor);
            Assert.assertTrue("expected execution to succeed", ((ExecutorNotification) atomicReference.get()).executionSucceeded());
            Assert.assertNull("expected no exception in test notification", ((ExecutorNotification) atomicReference.get()).exception());
            Assert.assertNull("expected no Exception to be returned from execution", atomicReference2.get());
            executor.shutdown();
        } catch (IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) {
            throw new ExecutionException(e);
        }
    }

    @Test
    public void testMultipleLeaderActionBatches() throws ExecutionException, InterruptedException {
        HashMap hashMap = new HashMap();
        hashMap.put(Integer.valueOf(PARTITION), Arrays.asList(1, Integer.valueOf(PARTITION)));
        HashMap hashMap2 = new HashMap();
        hashMap2.put(Integer.valueOf(PARTITION), Arrays.asList(Integer.valueOf(PARTITION), 1));
        List<NewTopic> asList = Arrays.asList(new NewTopic(TestConstants.TOPIC0, hashMap), new NewTopic(TestConstants.TOPIC1, hashMap2), new NewTopic(TestConstants.TOPIC2, hashMap), new NewTopic(TestConstants.TOPIC_WITH_DOT, hashMap2));
        LOG.info("Creating test topics {}", (List) asList.stream().map(newTopic -> {
            return newTopic.name();
        }).collect(Collectors.toList()));
        Map<String, TopicDescription> createTopics = createTopics(asList);
        int id = ((TopicPartitionInfo) createTopics.get(TestConstants.TOPIC0).partitions().get(PARTITION)).leader().id();
        int id2 = ((TopicPartitionInfo) createTopics.get(TestConstants.TOPIC1).partitions().get(PARTITION)).leader().id();
        int id3 = ((TopicPartitionInfo) createTopics.get(TestConstants.TOPIC2).partitions().get(PARTITION)).leader().id();
        int id4 = ((TopicPartitionInfo) createTopics.get(TestConstants.TOPIC_WITH_DOT).partitions().get(PARTITION)).leader().id();
        List asList2 = Arrays.asList(new ReplicaPlacementInfo(Integer.valueOf(PARTITION)), new ReplicaPlacementInfo(1));
        List asList3 = Arrays.asList(new ReplicaPlacementInfo(1), new ReplicaPlacementInfo(Integer.valueOf(PARTITION)));
        ArrayList arrayList = new ArrayList();
        arrayList.add(new ExecutionProposal(TP0, 0L, new ReplicaPlacementInfo(Integer.valueOf(id)), Arrays.asList(new ReplicaPlacementInfo(1), new ReplicaPlacementInfo(Integer.valueOf(PARTITION))), id == 0 ? asList3 : asList2, Collections.emptyList(), Collections.emptyList()));
        arrayList.add(new ExecutionProposal(TP1, 0L, new ReplicaPlacementInfo(Integer.valueOf(id2)), Arrays.asList(new ReplicaPlacementInfo(Integer.valueOf(PARTITION)), new ReplicaPlacementInfo(1)), id2 == 0 ? asList3 : asList2, Collections.emptyList(), Collections.emptyList()));
        arrayList.add(new ExecutionProposal(TP2, 0L, new ReplicaPlacementInfo(Integer.valueOf(id3)), Arrays.asList(new ReplicaPlacementInfo(1), new ReplicaPlacementInfo(Integer.valueOf(PARTITION))), id3 == 0 ? asList3 : asList2, Collections.emptyList(), Collections.emptyList()));
        arrayList.add(new ExecutionProposal(TP3, 0L, new ReplicaPlacementInfo(Integer.valueOf(id4)), Arrays.asList(new ReplicaPlacementInfo(Integer.valueOf(PARTITION)), new ReplicaPlacementInfo(1)), id4 == 0 ? asList3 : asList2, Collections.emptyList(), Collections.emptyList()));
        LOG.info("Submitting proposals for {}", arrayList);
        getExecutorProperties().put("execution.progress.check.interval.ms", 100L);
        KafkaCruiseControlConfig kafkaCruiseControlConfig = new KafkaCruiseControlConfig(getExecutorProperties());
        MockTime mockTime = new MockTime();
        ConfluentAdmin confluentAdmin = (ConfluentAdmin) Mockito.spy(KafkaCruiseControlUtils.createAdmin(kafkaCruiseControlConfig.originals()));
        final AtomicReference atomicReference = new AtomicReference();
        ExecutorNotifier executorNotifier = new ExecutorNotifier() { // from class: com.linkedin.kafka.cruisecontrol.executor.ExecutorTest.3
            public void sendNotification(ExecutorNotification executorNotification) {
                atomicReference.set(executorNotification);
            }

            public void configure(Map<String, ?> map) {
            }
        };
        AtomicReference atomicReference2 = new AtomicReference();
        Executor executor = new Executor(kafkaCruiseControlConfig, Option.empty(), mockTime, KafkaCruiseControlUnitTestUtils.getMetricsRegistry(this.metricsRegistry), (MetadataClient) null, 86400000L, 43200000L, executorNotifier, getMockAnomalyDetector(RANDOM_UUID), confluentAdmin, (ReplicationThrottleHelper) null);
        executor.executeProposals(arrayList, Collections.emptySet(), (Set) null, (LoadMonitor) Mockito.mock(LoadMonitor.class), (Integer) null, (Integer) null, 2, (ReplicaMovementStrategy) null, RANDOM_UUID, (z, th) -> {
            atomicReference2.set(th);
        });
        while (executor.state().state() != ExecutorState.State.LEADER_MOVEMENT_TASK_IN_PROGRESS) {
            Thread.sleep(10L);
        }
        waitUntilExecutionFinishes(executor);
        Assert.assertTrue("expected execution to succeed", ((ExecutorNotification) atomicReference.get()).executionSucceeded());
        Assert.assertNull("expected no exception in test notification", ((ExecutorNotification) atomicReference.get()).exception());
        Assert.assertNull("expected no Exception to be returned from execution", atomicReference2.get());
        ((ConfluentAdmin) Mockito.verify(confluentAdmin, Mockito.atLeast(2))).electLeaders((ElectionType) Mockito.eq(ElectionType.PREFERRED), Mockito.anySet());
        executor.shutdown();
    }

    @Test
    public void testExecutorSendNotificationForUserTask() throws InterruptedException {
        executeAndVerifyNotification(getBasicProposals(), "user-task-uuid", ExecutorNotification.ActionAgent.UNKNOWN, ExecutorNotification.ActionAgent.EXECUTION_COMPLETION, true);
    }

    @Test
    public void testReplicationThrottling() throws InterruptedException, ExecutionException {
        executeProposalWithReplicationThrottling(null);
    }

    @Test
    public void testReplicationThrottlingOverride() throws InterruptedException, ExecutionException {
        executeProposalWithReplicationThrottling(2000000L);
    }

    private void executeProposalWithReplicationThrottling(Long l) throws InterruptedException, ExecutionException {
        KafkaZkClient createKafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient(zookeeper().connectionString(), "LoadMonitorTaskRunnerGroup", "LoadMonitorTaskRunnerSetup", false);
        try {
            KafkaCruiseControlConfig kafkaCruiseControlConfig = new KafkaCruiseControlConfig(getExecutorProperties());
            int id = ((TopicPartitionInfo) createTopics().get(TP0.topic()).partitions().get(PARTITION)).leader().id();
            ExecutionProposal executionProposal = new ExecutionProposal(TP0, 0L, new ReplicaPlacementInfo(Integer.valueOf(id)), Collections.singletonList(new ReplicaPlacementInfo(Integer.valueOf(id))), Collections.singletonList(new ReplicaPlacementInfo(Integer.valueOf(id == 0 ? 1 : PARTITION))), Collections.emptyList(), Collections.emptyList());
            MockTime mockTime = new MockTime();
            MetadataClient metadataClient = new MetadataClient(kafkaCruiseControlConfig, -1L, mockTime);
            Properties entityConfigs = createKafkaZkClient.getEntityConfigs(ConfigType.Topic(), TestConstants.TOPIC1);
            entityConfigs.put(KafkaConfig.LeaderReplicationThrottledReplicasProp(), "0:0,0:1");
            entityConfigs.put(KafkaConfig.FollowerReplicationThrottledReplicasProp(), "0:0,0:1");
            createKafkaZkClient.setOrCreateEntityConfigs(ConfigType.Topic(), TestConstants.TOPIC1, entityConfigs);
            KafkaAdminClient kafkaAdminClient = (KafkaAdminClient) EasyMock.mock(KafkaAdminClient.class);
            mockDescribeConfigs(kafkaAdminClient, Arrays.asList(Integer.valueOf(PARTITION), 1));
            EasyMock.replay(new Object[]{kafkaAdminClient});
            Executor executor = new Executor(kafkaCruiseControlConfig, Option.empty(), mockTime, KafkaCruiseControlUnitTestUtils.getMetricsRegistry(this.metricsRegistry), metadataClient, 86400000L, 43200000L, (ExecutorNotifier) null, getMockAnomalyDetector(RANDOM_UUID), KafkaCruiseControlUtils.createAdmin(kafkaCruiseControlConfig.originals()), new ReplicationThrottleHelper(KafkaCruiseControlUtils.createKafkaZkClient(zookeeper().connectionString(), "CruiseControlExecutor", "Executor", false), kafkaAdminClient, kafkaCruiseControlConfig.getLong("throttle.bytes.per.second"), false));
            if (l != null) {
                executor.updateThrottle(l.longValue());
            }
            executor.executeProposals(Collections.singletonList(executionProposal), Collections.emptySet(), (Set) null, (LoadMonitor) EasyMock.mock(LoadMonitor.class), (Integer) null, (Integer) null, (Integer) null, (ReplicaMovementStrategy) null, RANDOM_UUID, (BalanceOpExecutionCompletionCallback) null);
            String l2 = l == null ? null : l.toString();
            String str = l == null ? null : "0:0,0:1";
            String str2 = l == null ? null : "0:0,0:1";
            waitForAssert(() -> {
                Iterator<Integer> it = this._brokers.keySet().iterator();
                while (it.hasNext()) {
                    verifyThrottleInZk(createKafkaZkClient, ConfigType.Broker(), String.valueOf(it.next()), l2);
                }
                verifyThrottleInZk(createKafkaZkClient, ConfigType.Topic(), TestConstants.TOPIC0, str, str2);
                verifyThrottleInZk(createKafkaZkClient, ConfigType.Topic(), TestConstants.TOPIC1, "0:0,0:1");
                return true;
            }, 5000L, "Should have properly throttled during the reassignment");
            waitUntilExecutionFinishes(executor);
            executor.shutdown();
            Iterator<Integer> it = this._brokers.keySet().iterator();
            while (it.hasNext()) {
                verifyThrottleInZk(createKafkaZkClient, ConfigType.Broker(), String.valueOf(it.next()), null);
            }
            verifyThrottleInZk(createKafkaZkClient, ConfigType.Topic(), TestConstants.TOPIC0, null);
            verifyThrottleInZk(createKafkaZkClient, ConfigType.Topic(), TestConstants.TOPIC1, "0:0,0:1");
            KafkaCruiseControlUtils.closeKafkaZkClientWithTimeout(createKafkaZkClient);
        } catch (Throwable th) {
            KafkaCruiseControlUtils.closeKafkaZkClientWithTimeout(createKafkaZkClient);
            throw th;
        }
    }

    @Test
    public void testUpdateThrottleWithInitialThrottle() throws InterruptedException {
        updateThrottleForOngoingProposal(100L);
    }

    @Test
    public void testUpdateThrottleWithoutInitialThrottle() throws InterruptedException {
        updateThrottleForOngoingProposal(null);
    }

    private void updateThrottleForOngoingProposal(Long l) throws InterruptedException {
        KafkaZkClient createKafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient(zookeeper().connectionString(), "LoadMonitorTaskRunnerGroup", "LoadMonitorTaskRunnerSetup", false);
        try {
            KafkaCruiseControlConfig kafkaCruiseControlConfig = new KafkaCruiseControlConfig(getExecutorProperties());
            MockTime mockTime = new MockTime();
            MetadataClient metadataClient = new MetadataClient(kafkaCruiseControlConfig, -1L, mockTime);
            Map<String, TopicDescription> createTopics = createTopics();
            int id = ((TopicPartitionInfo) createTopics.get(TestConstants.TOPIC0).partitions().get(PARTITION)).leader().id();
            int id2 = ((TopicPartitionInfo) createTopics.get(TestConstants.TOPIC1).partitions().get(PARTITION)).leader().id();
            ExecutionProposal executionProposal = new ExecutionProposal(TP0, 0L, new ReplicaPlacementInfo(Integer.valueOf(id)), Collections.singletonList(new ReplicaPlacementInfo(Integer.valueOf(id))), Collections.singletonList(new ReplicaPlacementInfo(Integer.valueOf(id == 0 ? 1 : PARTITION))), Collections.emptyList(), Collections.emptyList());
            ExecutionProposal executionProposal2 = new ExecutionProposal(TP1, 0L, new ReplicaPlacementInfo(Integer.valueOf(id2)), Collections.singletonList(new ReplicaPlacementInfo(Integer.valueOf(id2))), Collections.singletonList(new ReplicaPlacementInfo(Integer.valueOf(id2 == 0 ? 1 : PARTITION))), Collections.emptyList(), Collections.emptyList());
            Executor executor = new Executor(kafkaCruiseControlConfig, Option.empty(), mockTime, KafkaCruiseControlUnitTestUtils.getMetricsRegistry(this.metricsRegistry), metadataClient, 86400000L, 43200000L, (ExecutorNotifier) null, getMockAnomalyDetector(RANDOM_UUID));
            if (l != null) {
                executor.updateThrottle(l.longValue());
                waitForAssert(() -> {
                    Iterator<Integer> it = this._brokers.keySet().iterator();
                    while (it.hasNext()) {
                        verifyThrottleInZk(createKafkaZkClient, ConfigType.Broker(), String.valueOf(it.next()), Long.toString(l.longValue()));
                    }
                    return true;
                }, 3000L, "Should have set the initial throttle");
            }
            executor.executeProposals(Arrays.asList(executionProposal, executionProposal2), Collections.emptySet(), (Set) null, (LoadMonitor) EasyMock.mock(LoadMonitor.class), 1, 1, 1, (ReplicaMovementStrategy) null, RANDOM_UUID, (BalanceOpExecutionCompletionCallback) null);
            String l2 = l == null ? null : Long.toString(l.longValue());
            waitForAssert(() -> {
                Iterator<Integer> it = this._brokers.keySet().iterator();
                while (it.hasNext()) {
                    verifyThrottleInZk(createKafkaZkClient, ConfigType.Broker(), String.valueOf(it.next()), l2);
                }
                try {
                    waitForAssert(() -> {
                        Assert.assertTrue("Throttle rate update failed", executor.updateThrottle(Long.parseLong("1000000")));
                        Iterator<Integer> it2 = this._brokers.keySet().iterator();
                        while (it2.hasNext()) {
                            verifyThrottleInZk(createKafkaZkClient, ConfigType.Broker(), String.valueOf(it2.next()), "1000000");
                        }
                        return true;
                    }, 5000L, "The manual throttle update should have applied during the reassignment");
                    return true;
                } catch (InterruptedException e) {
                    return false;
                }
            }, 10000L, "Should have properly throttled during the reassignment");
            waitUntilExecutionFinishes(executor);
            executor.shutdown();
            Iterator<Integer> it = this._brokers.keySet().iterator();
            while (it.hasNext()) {
                verifyThrottleInZk(createKafkaZkClient, ConfigType.Broker(), String.valueOf(it.next()), null);
            }
            verifyThrottleInZk(createKafkaZkClient, ConfigType.Topic(), TestConstants.TOPIC0, null);
            verifyThrottleInZk(createKafkaZkClient, ConfigType.Topic(), TestConstants.TOPIC1, null);
            KafkaCruiseControlUtils.closeKafkaZkClientWithTimeout(createKafkaZkClient);
        } catch (Throwable th) {
            KafkaCruiseControlUtils.closeKafkaZkClientWithTimeout(createKafkaZkClient);
            throw th;
        }
    }

    private void verifyThrottleInZk(KafkaZkClient kafkaZkClient, String str, String str2, String str3, String str4) throws AssertionError {
        if (str.equals(ConfigType.Broker())) {
            Properties entityConfigs = kafkaZkClient.getEntityConfigs(ConfigType.Broker(), str2);
            String property = entityConfigs.getProperty(KafkaConfig.LeaderReplicationThrottledRateProp());
            String property2 = entityConfigs.getProperty(KafkaConfig.FollowerReplicationThrottledRateProp());
            Assert.assertEquals(str3, property);
            Assert.assertEquals(str4, property2);
            return;
        }
        Properties entityConfigs2 = kafkaZkClient.getEntityConfigs(ConfigType.Topic(), str2);
        String property3 = entityConfigs2.getProperty(KafkaConfig.LeaderReplicationThrottledReplicasProp());
        String property4 = entityConfigs2.getProperty(KafkaConfig.FollowerReplicationThrottledReplicasProp());
        Assert.assertEquals(str3, property3);
        Assert.assertEquals(str4, property4);
    }

    private void verifyThrottleInZk(KafkaZkClient kafkaZkClient, String str, String str2, String str3) {
        verifyThrottleInZk(kafkaZkClient, str, str2, str3, str3);
    }

    @Test
    public void testRemoveBroker() throws InterruptedException {
        KafkaZkClient createKafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient(zookeeper().connectionString(), "ExecutorTestMetricGroup", "RemoveBroker", false);
        try {
            Map<String, TopicDescription> createTopics = createTopics();
            ArrayList arrayList = new ArrayList();
            for (Map.Entry<String, TopicDescription> entry : createTopics.entrySet()) {
                for (TopicPartitionInfo topicPartitionInfo : entry.getValue().partitions()) {
                    arrayList.add(new ExecutionProposal(new TopicPartition(entry.getKey(), topicPartitionInfo.partition()), 0L, new ReplicaPlacementInfo(Integer.valueOf(topicPartitionInfo.leader().id())), (List) topicPartitionInfo.replicas().stream().map(node -> {
                        return new ReplicaPlacementInfo(Integer.valueOf(node.id()));
                    }).collect(Collectors.toList()), Collections.singletonList(new ReplicaPlacementInfo(Integer.valueOf(PARTITION))), Collections.emptyList(), Collections.emptyList()));
                }
            }
            BalanceOpExecutionCompletionCallback balanceOpExecutionCompletionCallback = (BalanceOpExecutionCompletionCallback) Mockito.mock(BalanceOpExecutionCompletionCallback.class);
            Executor executor = executor();
            executor.executeProposals(arrayList, Collections.emptySet(), (Set) null, (LoadMonitor) EasyMock.mock(LoadMonitor.class), (Integer) null, (Integer) null, (Integer) null, (ReplicaMovementStrategy) null, RANDOM_UUID, balanceOpExecutionCompletionCallback);
            waitAndVerifyProposals(createKafkaZkClient, executor, arrayList);
            ((BalanceOpExecutionCompletionCallback) Mockito.verify(balanceOpExecutionCompletionCallback)).accept(true, (Throwable) null);
            KafkaCruiseControlUtils.closeKafkaZkClientWithTimeout(createKafkaZkClient);
        } catch (Throwable th) {
            KafkaCruiseControlUtils.closeKafkaZkClientWithTimeout(createKafkaZkClient);
            throw th;
        }
    }

    @Test
    public void testProposalExecutionRunnableCallsCallbackWithException() {
        AnomalyDetector anomalyDetector = (AnomalyDetector) Mockito.mock(AnomalyDetector.class);
        LoadMonitor loadMonitor = (LoadMonitor) Mockito.mock(LoadMonitor.class);
        ((LoadMonitor) Mockito.doAnswer(invocationOnMock -> {
            throw new NullPointerException("haha got you");
        }).when(loadMonitor)).pauseMetricSampling(Mockito.anyString());
        Executor createExecutor = createExecutor(anomalyDetector, Optional.empty());
        createExecutor.initProposalExecution(Collections.emptySet(), Collections.emptySet(), (Integer) null, (Integer) null, (Integer) null, (ReplicaMovementStrategy) null, RANDOM_UUID);
        BalanceOpExecutionCompletionCallback balanceOpExecutionCompletionCallback = (BalanceOpExecutionCompletionCallback) Mockito.mock(BalanceOpExecutionCompletionCallback.class);
        createExecutor.getClass();
        new Executor.ProposalExecutionRunnable(createExecutor, loadMonitor, (Collection) null, (Collection) null, balanceOpExecutionCompletionCallback).run();
        ((BalanceOpExecutionCompletionCallback) Mockito.verify(balanceOpExecutionCompletionCallback)).accept(Mockito.eq(false), (Throwable) Mockito.any(NullPointerException.class));
    }

    @Test
    public void testUnhandledExceptionsDuringProposalExecutionRunnableCleansUpState() {
        AnomalyDetector anomalyDetector = (AnomalyDetector) Mockito.mock(AnomalyDetector.class);
        LoadMonitor loadMonitor = (LoadMonitor) Mockito.mock(LoadMonitor.class);
        ((LoadMonitor) Mockito.doAnswer(invocationOnMock -> {
            throw new NullPointerException("haha got you");
        }).when(loadMonitor)).pauseMetricSampling(Mockito.anyString());
        ((AnomalyDetector) Mockito.doAnswer(invocationOnMock2 -> {
            throw new NullPointerException("haha got you again");
        }).when(anomalyDetector)).markSelfHealingFinished((String) Mockito.any());
        Executor createExecutor = createExecutor(anomalyDetector, Optional.of(this.sameThreadExecutorService));
        createExecutor.initProposalExecution(Collections.emptySet(), Collections.emptySet(), (Integer) null, (Integer) null, (Integer) null, (ReplicaMovementStrategy) null, RANDOM_UUID);
        BalanceOpExecutionCompletionCallback balanceOpExecutionCompletionCallback = (BalanceOpExecutionCompletionCallback) Mockito.mock(BalanceOpExecutionCompletionCallback.class);
        createExecutor.getClass();
        createExecutor.startExecution(new Executor.ProposalExecutionRunnable(createExecutor, loadMonitor, (Collection) null, (Collection) null, balanceOpExecutionCompletionCallback));
        Assert.assertFalse("The Executor should have cleaned up its ongoing execution and should have none active", createExecutor.hasOngoingExecution());
        Assert.assertEquals(ExecutorState.State.NO_TASK_IN_PROGRESS, createExecutor.state().state());
        ((BalanceOpExecutionCompletionCallback) Mockito.verify(balanceOpExecutionCompletionCallback)).accept(Mockito.eq(false), (Throwable) Mockito.any(NullPointerException.class));
        ((LoadMonitor) Mockito.verify(loadMonitor)).resumeMetricSampling((String) Mockito.any());
        ((LoadMonitor) Mockito.verify(loadMonitor)).forceRefreshClusterAndGeneration();
    }

    @Test
    public void testExecutorSendNotificationForSelfHealing() throws InterruptedException {
        executeAndVerifyNotification(getBasicProposals(), AnomalyType.GOAL_VIOLATION.toString() + "-uuid", ExecutorNotification.ActionAgent.CRUISE_CONTROL, ExecutorNotification.ActionAgent.EXECUTION_COMPLETION, false);
    }

    private void executeAndVerifyNotification(Collection<ExecutionProposal> collection, String str, ExecutorNotification.ActionAgent actionAgent, ExecutorNotification.ActionAgent actionAgent2, boolean z) {
        KafkaCruiseControlConfig kafkaCruiseControlConfig = new KafkaCruiseControlConfig(getExecutorProperties());
        ExecutorNotifier executorNotifier = (ExecutorNotifier) EasyMock.mock(ExecutorNotifier.class);
        Capture newInstance = Capture.newInstance(CaptureType.FIRST);
        executorNotifier.sendNotification((ExecutorNotification) EasyMock.capture(newInstance));
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{executorNotifier});
        Executor executor = new Executor(kafkaCruiseControlConfig, Option.empty(), new SystemTime(), KafkaCruiseControlUnitTestUtils.getMetricsRegistry(this.metricsRegistry), (MetadataClient) null, 86400000L, 43200000L, executorNotifier, getMockAnomalyDetector(str));
        executor.executeProposals(collection, Collections.emptySet(), (Set) null, (LoadMonitor) EasyMock.mock(LoadMonitor.class), (Integer) null, (Integer) null, (Integer) null, (ReplicaMovementStrategy) null, str, (BalanceOpExecutionCompletionCallback) null);
        waitUntilExecutionFinishes(executor);
        executor.shutdown();
        ExecutorNotification executorNotification = (ExecutorNotification) newInstance.getValue();
        Assert.assertEquals(executorNotification.startedBy(), actionAgent);
        Assert.assertEquals(executorNotification.endedBy(), actionAgent2);
        Assert.assertEquals(executorNotification.actionUuid(), str);
    }

    private Collection<ExecutionProposal> getBasicProposals() throws InterruptedException {
        int id = ((TopicPartitionInfo) createTopics().get(TestConstants.TOPIC1).partitions().get(PARTITION)).leader().id();
        ExecutionProposal basicTopicPartition0Proposal = getBasicTopicPartition0Proposal();
        TopicPartition topicPartition = TP1;
        ReplicaPlacementInfo replicaPlacementInfo = new ReplicaPlacementInfo(Integer.valueOf(id));
        ReplicaPlacementInfo[] replicaPlacementInfoArr = new ReplicaPlacementInfo[2];
        replicaPlacementInfoArr[PARTITION] = new ReplicaPlacementInfo(Integer.valueOf(id));
        replicaPlacementInfoArr[1] = id == 0 ? new ReplicaPlacementInfo(1) : new ReplicaPlacementInfo(Integer.valueOf(PARTITION));
        List asList = Arrays.asList(replicaPlacementInfoArr);
        ReplicaPlacementInfo[] replicaPlacementInfoArr2 = new ReplicaPlacementInfo[2];
        replicaPlacementInfoArr2[PARTITION] = id == 0 ? new ReplicaPlacementInfo(1) : new ReplicaPlacementInfo(Integer.valueOf(PARTITION));
        replicaPlacementInfoArr2[1] = new ReplicaPlacementInfo(Integer.valueOf(id));
        return Arrays.asList(basicTopicPartition0Proposal, new ExecutionProposal(topicPartition, 0L, replicaPlacementInfo, asList, Arrays.asList(replicaPlacementInfoArr2), Collections.emptyList(), Collections.emptyList()));
    }

    private Collection<ExecutionProposal> getTopicPlacementProposals(KafkaZkClient kafkaZkClient) throws InterruptedException {
        TopicDescription topicDescription = createTopicWithPlacement(TestConstants.TOPIC1, "{\"version\":1,\"replicas\":[{\"count\": 1, \"constraints\":{\"rack\":\"0\"}}], \"observers\": [{\"count\": 1, \"constraints\":{\"rack\":\"1\"}}]}").get(TestConstants.TOPIC1);
        Properties entityConfigs = kafkaZkClient.getEntityConfigs(ConfigType.Topic(), TestConstants.TOPIC1);
        entityConfigs.put("confluent.placement.constraints", "{\"version\":1,\"replicas\":[{\"count\": 1, \"constraints\":{\"rack\":\"1\"}}], \"observers\": [{\"count\": 1, \"constraints\":{\"rack\":\"0\"}}]}");
        kafkaZkClient.setOrCreateEntityConfigs(ConfigType.Topic(), TestConstants.TOPIC1, entityConfigs);
        return Arrays.asList(new ExecutionProposal(TP1, 0L, new ReplicaPlacementInfo(Integer.valueOf(((TopicPartitionInfo) topicDescription.partitions().get(TP1.partition())).leader().id())), Arrays.asList(new ReplicaPlacementInfo(Integer.valueOf(PARTITION)), new ReplicaPlacementInfo(1)), Arrays.asList(new ReplicaPlacementInfo(1), new ReplicaPlacementInfo(Integer.valueOf(PARTITION))), Collections.singletonList(new ReplicaPlacementInfo(1)), Collections.singletonList(new ReplicaPlacementInfo(Integer.valueOf(PARTITION)))));
    }

    private Executor fillUpAndMoveBasicTopicPartition0(Long l) throws InterruptedException, ExecutionException {
        Executor executor = executor(l);
        ExecutionProposal basicTopicPartition0Proposal = getBasicTopicPartition0Proposal();
        produceData(TP0.topic(), 30000);
        return executeProposals(executor, Collections.singletonList(basicTopicPartition0Proposal), null);
    }

    private ExecutionProposal getBasicTopicPartition0Proposal() throws InterruptedException {
        int id = ((TopicPartitionInfo) createTopics().get(TP0.topic()).partitions().get(TP0.partition())).leader().id();
        return new ExecutionProposal(TP0, 0L, new ReplicaPlacementInfo(Integer.valueOf(id)), Collections.singletonList(new ReplicaPlacementInfo(Integer.valueOf(id))), Collections.singletonList(id == 0 ? new ReplicaPlacementInfo(1) : new ReplicaPlacementInfo(Integer.valueOf(PARTITION))), Collections.emptyList(), Collections.emptyList());
    }

    @Test
    public void testUserTriggeredExecutionDuringSelfHealingPause() throws Exception {
        Map<String, TopicDescription> createTopics = createTopics();
        int id = ((TopicPartitionInfo) createTopics.get(TestConstants.TOPIC0).partitions().get(PARTITION)).leader().id();
        int id2 = ((TopicPartitionInfo) createTopics.get(TestConstants.TOPIC1).partitions().get(PARTITION)).leader().id();
        ExecutionProposal executionProposal = new ExecutionProposal(TP0, 0L, new ReplicaPlacementInfo(Integer.valueOf(id)), Collections.singletonList(new ReplicaPlacementInfo(Integer.valueOf(id))), Collections.singletonList(new ReplicaPlacementInfo(Integer.valueOf(id == 0 ? 1 : PARTITION))), Collections.emptyList(), Collections.emptyList());
        TopicPartition topicPartition = TP1;
        ReplicaPlacementInfo replicaPlacementInfo = new ReplicaPlacementInfo(Integer.valueOf(id2));
        ReplicaPlacementInfo[] replicaPlacementInfoArr = new ReplicaPlacementInfo[2];
        replicaPlacementInfoArr[PARTITION] = new ReplicaPlacementInfo(Integer.valueOf(id2));
        replicaPlacementInfoArr[1] = new ReplicaPlacementInfo(Integer.valueOf(id2 == 0 ? 1 : PARTITION));
        List asList = Arrays.asList(replicaPlacementInfoArr);
        ReplicaPlacementInfo[] replicaPlacementInfoArr2 = new ReplicaPlacementInfo[2];
        replicaPlacementInfoArr2[PARTITION] = new ReplicaPlacementInfo(Integer.valueOf(id2 == 0 ? 1 : PARTITION));
        replicaPlacementInfoArr2[1] = new ReplicaPlacementInfo(Integer.valueOf(id2));
        List asList2 = Arrays.asList(executionProposal, new ExecutionProposal(topicPartition, 0L, replicaPlacementInfo, asList, Arrays.asList(replicaPlacementInfoArr2), Collections.emptyList(), Collections.emptyList()));
        Executor executor = new Executor(new KafkaCruiseControlConfig(getExecutorProperties()), Option.empty(), new SystemTime(), KafkaCruiseControlUnitTestUtils.getMetricsRegistry(this.metricsRegistry), (MetadataClient) null, 86400000L, 43200000L, (ExecutorNotifier) null, getMockAnomalyDetector(RANDOM_UUID));
        executor.executeProposals(asList2, Collections.emptySet(), (Set) null, (LoadMonitor) EasyMock.mock(LoadMonitor.class), (Integer) null, (Integer) null, (Integer) null, (ReplicaMovementStrategy) null, RANDOM_UUID, (BalanceOpExecutionCompletionCallback) null);
        waitUntilExecutionFinishes(executor);
        executor.executeProposals(asList2, Collections.emptySet(), (Set) null, (LoadMonitor) EasyMock.mock(LoadMonitor.class), (Integer) null, (Integer) null, (Integer) null, (ReplicaMovementStrategy) null, RANDOM_UUID, (BalanceOpExecutionCompletionCallback) null);
        executor.shutdown();
    }

    @Test
    public void testThrottleIsRemovedUponStartUpWhenNoReassignmentAreRunning() throws Exception {
        KafkaZkClient createKafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient(zookeeper().connectionString(), "ExecutorTestMetricGroup", "BrokerDiesWhenMovePartitions", false);
        KafkaCruiseControlConfig kafkaCruiseControlConfig = new KafkaCruiseControlConfig(getExecutorProperties());
        int id = ((TopicPartitionInfo) createTopics().get(TestConstants.TOPIC0).partitions().get(PARTITION)).leader().id();
        int i = id == 0 ? 1 : PARTITION;
        MockTime mockTime = new MockTime();
        MetadataClient metadataClient = new MetadataClient(kafkaCruiseControlConfig, -1L, mockTime);
        Properties properties = new Properties();
        properties.put(KafkaConfig.LeaderReplicationThrottledRateProp(), "10000000");
        properties.put(KafkaConfig.FollowerReplicationThrottledRateProp(), "10000000");
        Iterator<Integer> it = this._brokers.keySet().iterator();
        while (it.hasNext()) {
            createKafkaZkClient.setOrCreateEntityConfigs(ConfigType$.MODULE$.Broker(), String.valueOf(it.next()), properties);
        }
        Properties properties2 = new Properties();
        properties2.put(KafkaConfig$.MODULE$.LeaderReplicationThrottledReplicasProp(), String.format("0:%d", Integer.valueOf(id)));
        properties2.put(KafkaConfig$.MODULE$.FollowerReplicationThrottledReplicasProp(), String.format("0:%d", Integer.valueOf(i)));
        createKafkaZkClient.setOrCreateEntityConfigs(ConfigType$.MODULE$.Topic(), TestConstants.TOPIC0, properties2);
        Thread.sleep(500L);
        Executor executor = new Executor(kafkaCruiseControlConfig, Option.empty(), mockTime, KafkaCruiseControlUnitTestUtils.getMetricsRegistry(this.metricsRegistry), metadataClient, 86400000L, 43200000L, (ExecutorNotifier) null, getMockAnomalyDetector(RANDOM_UUID));
        executor.startUp();
        Thread.sleep(100L);
        Iterator<Integer> it2 = this._brokers.keySet().iterator();
        while (it2.hasNext()) {
            Properties entityConfigs = createKafkaZkClient.getEntityConfigs(ConfigType.Broker(), String.valueOf(it2.next()));
            Assert.assertNull(entityConfigs.get(KafkaConfig.LeaderReplicationThrottledRateProp()));
            Assert.assertNull(entityConfigs.get(KafkaConfig.FollowerReplicationThrottledRateProp()));
        }
        Properties entityConfigs2 = createKafkaZkClient.getEntityConfigs(ConfigType.Topic(), TestConstants.TOPIC0);
        Assert.assertNull(entityConfigs2.get(KafkaConfig.LeaderReplicationThrottledReplicasProp()));
        Assert.assertNull(entityConfigs2.get(KafkaConfig.FollowerReplicationThrottledReplicasProp()));
        executor.shutdown();
        KafkaCruiseControlUtils.closeKafkaZkClientWithTimeout(createKafkaZkClient);
    }

    @Test
    public void testThrottleComputationThrowsException() throws InterruptedException {
        LoadMonitor loadMonitor = (LoadMonitor) EasyMock.mock(LoadMonitor.class);
        loadMonitor.pauseMetricSampling(EasyMock.anyString());
        EasyMock.expectLastCall();
        loadMonitor.resumeMetricSampling(EasyMock.anyString());
        EasyMock.expectLastCall();
        EasyMock.expect(loadMonitor.forceRefreshClusterAndGeneration()).andReturn((Object) null);
        EasyMock.expect(Long.valueOf(loadMonitor.computeThrottle())).andThrow(new IllegalStateException("throttle computation failed"));
        EasyMock.replay(new Object[]{loadMonitor});
        KafkaCruiseControlConfig kafkaCruiseControlConfig = new KafkaCruiseControlConfig(getExecutorProperties());
        int id = ((TopicPartitionInfo) createTopics().get(TP1.topic()).partitions().get(PARTITION)).leader().id();
        ExecutionProposal executionProposal = new ExecutionProposal(TP1, 0L, new ReplicaPlacementInfo(Integer.valueOf(id)), Collections.singletonList(new ReplicaPlacementInfo(Integer.valueOf(id))), Collections.singletonList(new ReplicaPlacementInfo(Integer.valueOf(id == 0 ? 1 : PARTITION))), Collections.emptyList(), Collections.emptyList());
        MockTime mockTime = new MockTime();
        MetadataClient metadataClient = new MetadataClient(kafkaCruiseControlConfig, -1L, mockTime);
        final AtomicReference atomicReference = new AtomicReference();
        Executor executor = new Executor(kafkaCruiseControlConfig, Option.empty(), mockTime, KafkaCruiseControlUnitTestUtils.getMetricsRegistry(this.metricsRegistry), metadataClient, 86400000L, 43200000L, new ExecutorNotifier() { // from class: com.linkedin.kafka.cruisecontrol.executor.ExecutorTest.4
            public void sendNotification(ExecutorNotification executorNotification) {
                atomicReference.set(executorNotification);
            }

            public void configure(Map<String, ?> map) {
            }
        }, getMockAnomalyDetector(RANDOM_UUID), KafkaCruiseControlUtils.createAdmin(kafkaCruiseControlConfig.originals()), new ReplicationThrottleHelper(KafkaCruiseControlUtils.createKafkaZkClient(zookeeper().connectionString(), "CruiseControlExecutor", "Executor", false), (KafkaAdminClient) EasyMock.mock(KafkaAdminClient.class), Long.valueOf(KafkaCruiseControlConfig.AUTO_THROTTLE), true));
        executor.executeProposals(Collections.singletonList(executionProposal), Collections.emptySet(), (Set) null, loadMonitor, (Integer) null, (Integer) null, (Integer) null, (ReplicaMovementStrategy) null, RANDOM_UUID, (BalanceOpExecutionCompletionCallback) null);
        waitUntilExecutionFinishes(executor);
        Assert.assertFalse(((ExecutorNotification) atomicReference.get()).executionSucceeded());
        Assert.assertTrue(((ExecutorNotification) atomicReference.get()).exception() instanceof RuntimeException);
        Assert.assertTrue(((ExecutorNotification) atomicReference.get()).exception().getCause() instanceof IllegalStateException);
        Assert.assertTrue(String.format("Did not get expected exception message. Expected to contain [%s], but instead it was [%s]", "throttle computation failed", ((ExecutorNotification) atomicReference.get()).exception().getMessage()), ((ExecutorNotification) atomicReference.get()).exception().getMessage().contains("throttle computation failed"));
        EasyMock.verify(new Object[]{loadMonitor});
    }

    private void mockDescribeConfigs(KafkaAdminClient kafkaAdminClient, List<Integer> list) throws ExecutionException, InterruptedException {
        Collection<ConfigResource> configResourcesForBrokers = KafkaCruiseControlUnitTestUtils.configResourcesForBrokers(list);
        HashMap hashMap = new HashMap();
        Iterator<Integer> it = list.iterator();
        while (it.hasNext()) {
            hashMap.put(Integer.toString(it.next().intValue()), Collections.emptyList());
        }
        KafkaCruiseControlUnitTestUtils.mockDescribeConfigs(kafkaAdminClient, configResourcesForBrokers, hashMap);
    }

    @Test
    public void testThrottleIsComputedOncePerExecution() throws InterruptedException, ExecutionException {
        LoadMonitor loadMonitor = (LoadMonitor) EasyMock.createStrictMock(LoadMonitor.class);
        loadMonitor.pauseMetricSampling(EasyMock.anyString());
        EasyMock.expectLastCall();
        EasyMock.expect(Long.valueOf(loadMonitor.computeThrottle())).andReturn(5000000L);
        EasyMock.expect(loadMonitor.forceRefreshClusterAndGeneration()).andReturn(EasyMock.mock(MetadataClient.ClusterAndGeneration.class));
        loadMonitor.resumeMetricSampling(EasyMock.anyString());
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{loadMonitor});
        KafkaCruiseControlConfig kafkaCruiseControlConfig = new KafkaCruiseControlConfig(getExecutorProperties());
        int id = ((TopicPartitionInfo) createTopics().get(TP1.topic()).partitions().get(PARTITION)).leader().id();
        ExecutionProposal executionProposal = new ExecutionProposal(TP1, 0L, new ReplicaPlacementInfo(Integer.valueOf(id)), Collections.singletonList(new ReplicaPlacementInfo(Integer.valueOf(id))), Collections.singletonList(new ReplicaPlacementInfo(Integer.valueOf(id == 0 ? 1 : PARTITION))), Collections.emptyList(), Collections.emptyList());
        MockTime mockTime = new MockTime();
        MetadataClient metadataClient = new MetadataClient(kafkaCruiseControlConfig, -1L, mockTime);
        final AtomicReference atomicReference = new AtomicReference();
        ExecutorNotifier executorNotifier = new ExecutorNotifier() { // from class: com.linkedin.kafka.cruisecontrol.executor.ExecutorTest.5
            public void sendNotification(ExecutorNotification executorNotification) {
                atomicReference.set(executorNotification);
            }

            public void configure(Map<String, ?> map) {
            }
        };
        KafkaAdminClient kafkaAdminClient = (KafkaAdminClient) EasyMock.mock(KafkaAdminClient.class);
        mockDescribeConfigs(kafkaAdminClient, Arrays.asList(Integer.valueOf(PARTITION), 1));
        EasyMock.replay(new Object[]{kafkaAdminClient});
        ReplicationThrottleHelper replicationThrottleHelper = new ReplicationThrottleHelper(KafkaCruiseControlUtils.createKafkaZkClient(zookeeper().connectionString(), "CruiseControlExecutor", "Executor", false), kafkaAdminClient, Long.valueOf(KafkaCruiseControlConfig.AUTO_THROTTLE), true);
        Executor executor = new Executor(kafkaCruiseControlConfig, Option.empty(), mockTime, KafkaCruiseControlUnitTestUtils.getMetricsRegistry(this.metricsRegistry), metadataClient, 86400000L, 43200000L, executorNotifier, getMockAnomalyDetector(RANDOM_UUID), KafkaCruiseControlUtils.createAdmin(kafkaCruiseControlConfig.originals()), replicationThrottleHelper);
        executor.executeProposals(Collections.singletonList(executionProposal), Collections.emptySet(), (Set) null, loadMonitor, (Integer) null, (Integer) null, (Integer) null, (ReplicaMovementStrategy) null, RANDOM_UUID, (BalanceOpExecutionCompletionCallback) null);
        waitUntilExecutionFinishes(executor);
        Assert.assertTrue(((ExecutorNotification) atomicReference.get()).executionSucceeded());
        Assert.assertEquals(KafkaCruiseControlConfig.AUTO_THROTTLE, replicationThrottleHelper.getThrottleRate().longValue());
        EasyMock.verify(new Object[]{loadMonitor});
    }

    @Test
    public void testRemovedBrokersNotDescribed() throws ExecutionException, InterruptedException {
        LoadMonitor loadMonitor = (LoadMonitor) EasyMock.createStrictMock(LoadMonitor.class);
        loadMonitor.pauseMetricSampling(EasyMock.anyString());
        EasyMock.expectLastCall();
        EasyMock.expect(Long.valueOf(loadMonitor.computeThrottle())).andReturn(5000000L);
        EasyMock.expect(loadMonitor.forceRefreshClusterAndGeneration()).andReturn((Object) null);
        loadMonitor.resumeMetricSampling(EasyMock.anyString());
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{loadMonitor});
        KafkaCruiseControlConfig kafkaCruiseControlConfig = new KafkaCruiseControlConfig(getExecutorProperties());
        int id = ((TopicPartitionInfo) createTopics().get(TP1.topic()).partitions().get(PARTITION)).leader().id();
        int i = id == 0 ? 1 : PARTITION;
        ExecutionProposal executionProposal = new ExecutionProposal(TP1, 0L, new ReplicaPlacementInfo(Integer.valueOf(id)), Collections.singletonList(new ReplicaPlacementInfo(Integer.valueOf(id))), Collections.singletonList(new ReplicaPlacementInfo(Integer.valueOf(i))), Collections.emptyList(), Collections.emptyList());
        MockTime mockTime = new MockTime();
        MetadataClient metadataClient = new MetadataClient(kafkaCruiseControlConfig, -1L, mockTime);
        final AtomicReference atomicReference = new AtomicReference();
        ExecutorNotifier executorNotifier = new ExecutorNotifier() { // from class: com.linkedin.kafka.cruisecontrol.executor.ExecutorTest.6
            public void sendNotification(ExecutorNotification executorNotification) {
                atomicReference.set(executorNotification);
            }

            public void configure(Map<String, ?> map) {
            }
        };
        KafkaAdminClient kafkaAdminClient = (KafkaAdminClient) EasyMock.mock(KafkaAdminClient.class);
        mockDescribeConfigs(kafkaAdminClient, Arrays.asList(Integer.valueOf(i)));
        EasyMock.replay(new Object[]{kafkaAdminClient});
        ReplicationThrottleHelper replicationThrottleHelper = new ReplicationThrottleHelper(KafkaCruiseControlUtils.createKafkaZkClient(zookeeper().connectionString(), "CruiseControlExecutor", "Executor", false), kafkaAdminClient, Long.valueOf(KafkaCruiseControlConfig.AUTO_THROTTLE), true);
        Executor executor = new Executor(kafkaCruiseControlConfig, Option.empty(), mockTime, KafkaCruiseControlUnitTestUtils.getMetricsRegistry(this.metricsRegistry), metadataClient, 86400000L, 43200000L, executorNotifier, getMockAnomalyDetector(RANDOM_UUID), KafkaCruiseControlUtils.createAdmin(kafkaCruiseControlConfig.originals()), replicationThrottleHelper);
        executor.executeProposals(Collections.singletonList(executionProposal), Collections.emptySet(), Collections.singleton(Integer.valueOf(id)), loadMonitor, (Integer) null, (Integer) null, (Integer) null, (ReplicaMovementStrategy) null, RANDOM_UUID, (BalanceOpExecutionCompletionCallback) null);
        waitUntilExecutionFinishes(executor);
        Assert.assertTrue(((ExecutorNotification) atomicReference.get()).executionSucceeded());
        Assert.assertEquals(KafkaCruiseControlConfig.AUTO_THROTTLE, replicationThrottleHelper.getThrottleRate().longValue());
        EasyMock.verify(new Object[]{loadMonitor});
    }

    private Map<String, TopicDescription> createTopicWithPlacement(String str, String str2) throws InterruptedException {
        ConfluentAdmin createAdmin = KafkaCruiseControlUtils.createAdmin(Collections.singletonMap("bootstrap.servers", broker(PARTITION).plaintextAddr()));
        try {
            try {
                NewTopic newTopic = new NewTopic(str, Optional.of(1), Optional.empty());
                newTopic.configs(Collections.singletonMap("confluent.placement.constraints", str2));
                Assert.assertEquals(createAdmin.createTopics(Arrays.asList(newTopic)).numPartitions(str).get(), 1);
                KafkaCruiseControlUtils.closeAdminClientWithTimeout(createAdmin);
                return waitForTopicMetadataPropagation(Arrays.asList(str));
            } catch (ExecutionException e) {
                throw new RuntimeException(e.getMessage());
            }
        } catch (Throwable th) {
            KafkaCruiseControlUtils.closeAdminClientWithTimeout(createAdmin);
            throw th;
        }
    }

    private Map<String, TopicDescription> waitForTopicMetadataPropagation(List<String> list) throws InterruptedException {
        Map<String, TopicDescription> map = PARTITION;
        Map map2 = PARTITION;
        while (true) {
            ConfluentAdmin createAdmin = KafkaCruiseControlUtils.createAdmin(Collections.singletonMap("bootstrap.servers", broker(PARTITION).plaintextAddr()));
            ConfluentAdmin createAdmin2 = KafkaCruiseControlUtils.createAdmin(Collections.singletonMap("bootstrap.servers", broker(1).plaintextAddr()));
            try {
                map = (Map) createAdmin.describeTopics(list).all().get();
                map2 = (Map) createAdmin2.describeTopics(list).all().get();
                try {
                    Thread.sleep(100L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                KafkaCruiseControlUtils.closeAdminClientWithTimeout(createAdmin);
                KafkaCruiseControlUtils.closeAdminClientWithTimeout(createAdmin2);
            } catch (ExecutionException e2) {
                KafkaCruiseControlUtils.closeAdminClientWithTimeout(createAdmin);
                KafkaCruiseControlUtils.closeAdminClientWithTimeout(createAdmin2);
            } catch (Throwable th) {
                KafkaCruiseControlUtils.closeAdminClientWithTimeout(createAdmin);
                KafkaCruiseControlUtils.closeAdminClientWithTimeout(createAdmin2);
                throw th;
            }
            if (map != null && map.size() >= list.size() && map2 != null && map2.size() >= list.size()) {
                this.topicsCreated = map;
                return this.topicsCreated;
            }
        }
    }

    private Map<String, TopicDescription> createTopics() throws InterruptedException {
        return createTopics(Arrays.asList(new NewTopic(TestConstants.TOPIC0, 1, (short) 1), new NewTopic(TestConstants.TOPIC1, 1, (short) 2)));
    }

    private Map<String, TopicDescription> createTopics(List<NewTopic> list) throws InterruptedException {
        if (this.topicsCreated.size() != 0) {
            return this.topicsCreated;
        }
        List<String> list2 = (List) list.stream().map(newTopic -> {
            return newTopic.name();
        }).collect(Collectors.toList());
        LOG.info("Attempting to create topics {}", list2);
        ConfluentAdmin createAdmin = KafkaCruiseControlUtils.createAdmin(Collections.singletonMap("bootstrap.servers", broker(PARTITION).plaintextAddr()));
        try {
            createAdmin.createTopics(list);
            KafkaCruiseControlUtils.closeAdminClientWithTimeout(createAdmin);
            return waitForTopicMetadataPropagation(list2);
        } catch (Throwable th) {
            KafkaCruiseControlUtils.closeAdminClientWithTimeout(createAdmin);
            throw th;
        }
    }

    private void deleteTopics(Collection<String> collection) throws ExecutionException, InterruptedException {
        ConfluentAdmin createAdmin = KafkaCruiseControlUtils.createAdmin(Collections.singletonMap("bootstrap.servers", broker(PARTITION).plaintextAddr()));
        try {
            createAdmin.deleteTopics(collection).all().get();
            KafkaCruiseControlUtils.closeAdminClientWithTimeout(createAdmin);
        } catch (Throwable th) {
            KafkaCruiseControlUtils.closeAdminClientWithTimeout(createAdmin);
            throw th;
        }
    }

    private AnomalyDetector getMockAnomalyDetector(String str) {
        AnomalyDetector anomalyDetector = (AnomalyDetector) EasyMock.mock(AnomalyDetector.class);
        anomalyDetector.maybeClearOngoingAnomalyDetectionTimeMs();
        EasyMock.expectLastCall().times(1, 2);
        anomalyDetector.markSelfHealingFinished(str);
        EasyMock.replay(new Object[]{anomalyDetector});
        return anomalyDetector;
    }

    private Executor executeAndVerifyProposals(KafkaZkClient kafkaZkClient, Collection<ExecutionProposal> collection, Collection<ExecutionProposal> collection2) {
        return executeAndVerifyProposals(kafkaZkClient, collection, collection2, executor());
    }

    private Executor executeAndVerifyProposals(KafkaZkClient kafkaZkClient, Collection<ExecutionProposal> collection, Collection<ExecutionProposal> collection2, Executor executor) {
        executeProposals(executor, collection);
        waitAndVerifyProposals(kafkaZkClient, executor, collection2);
        return executor;
    }

    private void waitForAssert(TestCondition testCondition, long j, String str) throws InterruptedException {
        TestUtils.waitForCondition(() -> {
            try {
                return testCondition.conditionMet();
            } catch (AssertionError e) {
                return false;
            }
        }, j, str);
    }

    private Executor executeProposals(Executor executor, Collection<ExecutionProposal> collection) {
        return executeProposals(executor, collection, null);
    }

    private Executor executeProposals(Executor executor, Collection<ExecutionProposal> collection, BalanceOpExecutionCompletionCallback balanceOpExecutionCompletionCallback) {
        executor.executeProposals(collection, Collections.emptySet(), (Set) null, (LoadMonitor) EasyMock.mock(LoadMonitor.class), (Integer) null, (Integer) null, (Integer) null, (ReplicaMovementStrategy) null, RANDOM_UUID, balanceOpExecutionCompletionCallback);
        return executor;
    }

    private Executor executor(Long l) {
        KafkaCruiseControlConfig kafkaCruiseControlConfig = new KafkaCruiseControlConfig(getExecutorProperties());
        return executor(new ReplicationThrottleHelper(KafkaCruiseControlUtils.createKafkaZkClient(zookeeper().connectionString(), "CruiseControlExecutor", "Executor", false), KafkaCruiseControlUtils.createAdmin(kafkaCruiseControlConfig.originals()), l, false), kafkaCruiseControlConfig);
    }

    private Executor executor() {
        KafkaCruiseControlConfig kafkaCruiseControlConfig = new KafkaCruiseControlConfig(getExecutorProperties());
        return executor(new ReplicationThrottleHelper(KafkaCruiseControlUtils.createKafkaZkClient(zookeeper().connectionString(), "CruiseControlExecutor", "Executor", false), KafkaCruiseControlUtils.createAdmin(kafkaCruiseControlConfig.originals()), kafkaCruiseControlConfig.getLong("throttle.bytes.per.second"), false), kafkaCruiseControlConfig);
    }

    private Executor executor(ReplicationThrottleHelper replicationThrottleHelper, KafkaCruiseControlConfig kafkaCruiseControlConfig) {
        return new Executor(kafkaCruiseControlConfig, Option.empty(), new SystemTime(), KafkaCruiseControlUnitTestUtils.getMetricsRegistry(this.metricsRegistry), (MetadataClient) null, 86400000L, 43200000L, (ExecutorNotifier) null, getMockAnomalyDetector(RANDOM_UUID), KafkaCruiseControlUtils.createAdmin(kafkaCruiseControlConfig.originals()), replicationThrottleHelper);
    }

    private void waitAndVerifyProposals(KafkaZkClient kafkaZkClient, Executor executor, Collection<ExecutionProposal> collection) {
        waitUntilExecutionFinishes(executor);
        for (ExecutionProposal executionProposal : collection) {
            Map mapAsJavaMap = JavaConverters.mapAsJavaMap(kafkaZkClient.getFullReplicaAssignmentForTopics(JavaConverters.asScalaSet(Collections.singleton(executionProposal.topic())).toSet()));
            TopicPartition topicPartition = new TopicPartition(executionProposal.topic(), executionProposal.partitionId());
            ReplicaAssignment replicaAssignment = (ReplicaAssignment) mapAsJavaMap.get(topicPartition);
            int size = executionProposal.newReplicas().size();
            Assert.assertEquals("Replication factor for partition " + topicPartition + " should be " + size, size, replicaAssignment.replicas().size());
            if (executionProposal.hasReplicaAction()) {
                Iterator it = executionProposal.newReplicas().iterator();
                while (it.hasNext()) {
                    Assert.assertTrue("The partition should have moved for " + topicPartition, replicaAssignment.replicas().contains(((ReplicaPlacementInfo) it.next()).brokerId()));
                }
            }
            Iterator it2 = executionProposal.newObservers().iterator();
            while (it2.hasNext()) {
                Assert.assertTrue("The observer should have been moved for " + topicPartition, replicaAssignment.observers().contains(((ReplicaPlacementInfo) it2.next()).brokerId()));
            }
            Assert.assertEquals("The leader should have moved for " + topicPartition, executionProposal.newLeader().brokerId(), kafkaZkClient.getLeaderForPartition(topicPartition).get());
        }
    }

    private Executor createExecutor() {
        return createExecutor(getMockAnomalyDetector(RANDOM_UUID), Optional.empty());
    }

    private Executor createExecutor(AnomalyDetector anomalyDetector, Optional<ExecutorService> optional) {
        KafkaCruiseControlConfig kafkaCruiseControlConfig = new KafkaCruiseControlConfig(getExecutorProperties());
        return optional.isPresent() ? new Executor(kafkaCruiseControlConfig, Option.empty(), new SystemTime(), KafkaCruiseControlUnitTestUtils.getMetricsRegistry(this.metricsRegistry), (MetadataClient) null, 86400000L, 43200000L, (ExecutorNotifier) null, anomalyDetector, KafkaCruiseControlUtils.createAdmin(kafkaCruiseControlConfig.originals()), (ReplicationThrottleHelper) null, optional.get()) : new Executor(kafkaCruiseControlConfig, Option.empty(), new SystemTime(), KafkaCruiseControlUnitTestUtils.getMetricsRegistry(this.metricsRegistry), (MetadataClient) null, 86400000L, 43200000L, (ExecutorNotifier) null, anomalyDetector);
    }

    private void waitUntilExecutionFinishes(Executor executor) {
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            if ((executor.hasOngoingExecution() || executor.state().state() != ExecutorState.State.NO_TASK_IN_PROGRESS) && System.currentTimeMillis() < currentTimeMillis + 30000) {
                try {
                    Thread.sleep(100L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        if (executor.state().state() != ExecutorState.State.NO_TASK_IN_PROGRESS) {
            Assert.fail("The execution did not finish in 30 seconds.");
        }
    }

    private Properties getExecutorProperties() {
        Properties properties = new Properties();
        properties.setProperty("capacity.config.file", getClass().getClassLoader().getResource(TestConstants.DEFAULT_BROKER_CAPACITY_CONFIG_FILE).getFile());
        properties.setProperty("bootstrap.servers", bootstrapServers());
        properties.setProperty("metric.sampler.class", NoopSampler.class.getName());
        properties.setProperty("zookeeper.connect", zookeeper().connectionString());
        properties.setProperty("num.concurrent.partition.movements.per.broker", "10");
        properties.setProperty("execution.progress.check.interval.ms", "1000");
        properties.setProperty("describe.topics.response.timeout.ms", DESCRIBE_TOPICS_RESPONSE_TIMEOUT_MS);
        return properties;
    }
}
