package com.linkedin.kafka.cruisecontrol.executor;

import com.linkedin.kafka.cruisecontrol.common.SbkAdminUtils;
import com.linkedin.kafka.cruisecontrol.executor.ExecutionTask;
import com.linkedin.kafka.cruisecontrol.executor.Executor;
import com.linkedin.kafka.cruisecontrol.model.ReplicaPlacementInfo;
import com.linkedin.kafka.cruisecontrol.monitor.LoadMonitor;
import io.confluent.databalancer.integration.DataBalancerIntegrationTestUtils;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AlterPartitionReassignmentsResult;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.utils.MockTime;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.Mockito;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/executor/ExecutorInterBrokerReplicaMovementTest.class */
public class ExecutorInterBrokerReplicaMovementTest {
    private AlterPartitionReassignmentsResult resultsMock;
    private ExecutionTaskManager executionTaskManagerMock;
    private ReplicationThrottleHelper throttleHelperMock;
    private LoadMonitor loadMonitorMock;
    private SbkAdminUtils adminUtilsMock;
    private ConfluentAdmin adminClientMock;
    private MockTime time;
    private Cluster clusterMock;
    private Executor.ExecutionTaskWaiter taskWaiterMock;
    private ExecutorInterBrokerReplicaMovement interBroker;
    private final Set<Integer> removedBrokers = new HashSet();
    private final KafkaFuture<Void> throwingFuture = (KafkaFuture) Mockito.mock(KafkaFuture.class);
    private final KafkaFuture<Void> normalFuture = (KafkaFuture) Mockito.mock(KafkaFuture.class);
    private final TopicPartition tp1 = new TopicPartition(ExecutorTestUtils.TOPIC, 1);
    private final TopicPartition tp2 = new TopicPartition(ExecutorTestUtils.TOPIC, 2);
    private final TopicPartition invalidAssignmentTp = new TopicPartition(ExecutorTestUtils.TOPIC, 3);
    private final TopicPartition invalidAssignmentTp1 = new TopicPartition(ExecutorTestUtils.TOPIC, 4);
    private final List<TopicPartition> partitions = Arrays.asList(this.tp1, this.tp2, this.invalidAssignmentTp, this.invalidAssignmentTp1);
    private final ReplicaPlacementInfo b0Placement = new ReplicaPlacementInfo(0);
    private final ReplicaPlacementInfo b1Placement = new ReplicaPlacementInfo(1);
    private final ReplicaPlacementInfo b2Placement = new ReplicaPlacementInfo(2);
    private final ReplicaPlacementInfo b3Placement = new ReplicaPlacementInfo(3);
    private final ReplicaPlacementInfo b4Placement = new ReplicaPlacementInfo(4);
    private final ExecutionProposal tp1Proposal = proposal(this.tp1);
    private final ExecutionProposal tp2Proposal = proposal(this.tp2);
    private final ExecutionProposal invalidAssignmentTpProposal = proposal(this.invalidAssignmentTp);
    private final ExecutionProposal invalidAssignmentTp1Proposal = proposal(this.invalidAssignmentTp1);
    private final ExecutionTask executionTask1 = new ExecutionTask(1, this.tp1Proposal, ExecutionTask.TaskType.INTER_BROKER_REPLICA_ACTION);
    private final ExecutionTask executionTask2 = new ExecutionTask(2, this.tp2Proposal, ExecutionTask.TaskType.INTER_BROKER_REPLICA_ACTION);
    private final ExecutionTask invalidAssignmentExecutionTask3 = new ExecutionTask(3, this.invalidAssignmentTpProposal, ExecutionTask.TaskType.INTER_BROKER_REPLICA_ACTION);
    private final ExecutionTask invalidAssignmentExecutionTask4 = new ExecutionTask(4, this.invalidAssignmentTp1Proposal, ExecutionTask.TaskType.INTER_BROKER_REPLICA_ACTION);
    private final List<ExecutionTask> allTasks = Arrays.asList(this.executionTask1, this.executionTask2, this.invalidAssignmentExecutionTask3, this.invalidAssignmentExecutionTask4);
    private Map<TopicPartition, SbkAdminUtils.ReplicaDescription> mockReplicaDescriptionResult = (Map) this.partitions.stream().collect(Collectors.toMap(topicPartition -> {
        return topicPartition;
    }, topicPartition2 -> {
        return SbkAdminUtils.ReplicaDescription.ofSuccessfulDescription(Arrays.asList(1, 2, 3));
    }));

    @BeforeEach
    public void setUp() throws ExecutionException, InterruptedException {
        this.adminUtilsMock = (SbkAdminUtils) Mockito.mock(SbkAdminUtils.class);
        this.adminClientMock = (ConfluentAdmin) Mockito.mock(ConfluentAdmin.class);
        this.resultsMock = (AlterPartitionReassignmentsResult) Mockito.mock(AlterPartitionReassignmentsResult.class);
        this.executionTaskManagerMock = (ExecutionTaskManager) Mockito.mock(ExecutionTaskManager.class);
        this.throttleHelperMock = (ReplicationThrottleHelper) Mockito.mock(ReplicationThrottleHelper.class);
        this.loadMonitorMock = (LoadMonitor) Mockito.mock(LoadMonitor.class);
        this.taskWaiterMock = (Executor.ExecutionTaskWaiter) Mockito.mock(Executor.ExecutionTaskWaiter.class);
        this.clusterMock = null;
        this.time = new MockTime();
        Mockito.when(this.throwingFuture.get()).thenThrow(new Throwable[]{new ExecutionException("", new InvalidReplicaAssignmentException(""))});
        Mockito.when(this.normalFuture.get()).then(invocationOnMock -> {
            return null;
        });
        Mockito.when(this.adminUtilsMock.listTargetReplicasBeingReassigned((Optional) ArgumentMatchers.any())).thenReturn(Collections.emptyMap());
        Mockito.when(this.adminUtilsMock.getReplicasForPartitions((Collection) ArgumentMatchers.any())).thenReturn(this.mockReplicaDescriptionResult);
        Mockito.when(this.adminClientMock.alterPartitionReassignments((Map) ArgumentMatchers.any())).thenReturn(this.resultsMock);
        Mockito.when(Integer.valueOf(this.executionTaskManagerMock.numPendingInterBrokerPartitionMovements())).thenReturn(0);
        Mockito.when(Long.valueOf(this.executionTaskManagerMock.remainingInterBrokerDataToMoveInMB())).thenReturn(0L);
        Mockito.when(Integer.valueOf(this.executionTaskManagerMock.numPendingInterBrokerPartitionMovements())).thenReturn(Integer.valueOf(this.allTasks.size()));
        this.interBroker = new ExecutorInterBrokerReplicaMovement((String) null, this.executionTaskManagerMock, (Set) null, (Set) null, this.throttleHelperMock, this.adminClientMock, this.adminUtilsMock, new AtomicBoolean(false), this.removedBrokers, this.loadMonitorMock, this.clusterMock, this.time, 100L);
    }

    @Test
    public void testExecuteReplicaReassignmentTasksCatchesInvalidAssignmentExceptionAndReturnsThoseTasks() throws InterruptedException {
        Mockito.when(this.resultsMock.all()).thenReturn(this.throwingFuture);
        Mockito.when(this.resultsMock.values()).thenReturn(reassignmentResults());
        List executeReplicaReassignmentTasks = this.interBroker.executeReplicaReassignmentTasks(Arrays.asList(this.executionTask1, this.executionTask2, this.invalidAssignmentExecutionTask3, this.invalidAssignmentExecutionTask4), true);
        Assertions.assertEquals(2, executeReplicaReassignmentTasks.size());
        Assertions.assertTrue(executeReplicaReassignmentTasks.contains(this.invalidAssignmentExecutionTask3), "Expected the returned retriable task to contain the execution task with an invalid assignment");
        Assertions.assertTrue(executeReplicaReassignmentTasks.contains(this.invalidAssignmentExecutionTask4), "Expected the returned retriable task to contain the execution task with an invalid assignment");
    }

    @Test
    public void testMultipleDescribeExceptionTypesMarkedRetriable() throws InterruptedException {
        HashMap hashMap = new HashMap();
        SbkAdminUtils.ReplicaDescription ofSuccessfulDescription = SbkAdminUtils.ReplicaDescription.ofSuccessfulDescription(Arrays.asList(1, 2, 3));
        hashMap.put(this.tp1, ofSuccessfulDescription);
        hashMap.put(this.invalidAssignmentTp, ofSuccessfulDescription);
        hashMap.put(this.invalidAssignmentTp1, ofSuccessfulDescription);
        hashMap.put(this.tp2, SbkAdminUtils.ReplicaDescription.ofFailedDescription(new UnknownTopicOrPartitionException("unknown!")));
        Mockito.when(this.adminUtilsMock.getReplicasForPartitions((Collection) ArgumentMatchers.any())).thenReturn(hashMap);
        Mockito.when(this.resultsMock.all()).thenReturn(this.throwingFuture);
        Mockito.when(this.resultsMock.values()).thenReturn(reassignmentResults());
        List executeReplicaReassignmentTasks = this.interBroker.executeReplicaReassignmentTasks(Arrays.asList(this.executionTask1, this.executionTask2, this.invalidAssignmentExecutionTask3, this.invalidAssignmentExecutionTask4), true);
        Assertions.assertEquals(3, executeReplicaReassignmentTasks.size());
        Assertions.assertTrue(executeReplicaReassignmentTasks.contains(this.invalidAssignmentExecutionTask3), "Expected the returned retriable task to contain the execution tasks with an invalid assignment and those with an invalid description");
        Assertions.assertTrue(executeReplicaReassignmentTasks.contains(this.invalidAssignmentExecutionTask4), "Expected the returned retriable task to contain the execution task with an invalid assignment and those with an invalid description");
        Assertions.assertTrue(executeReplicaReassignmentTasks.contains(this.executionTask2), "Expected the returned retriable task to contain the execution task with an invalid assignment and those with an invalid description");
    }

    @Test
    public void testMoveRetriesTasksWhichThrowInvalidAssignmentExceptionAndSucceed() throws InterruptedException {
        Mockito.when(this.resultsMock.all()).thenReturn(this.throwingFuture, new KafkaFuture[]{this.normalFuture});
        Mockito.when(this.resultsMock.values()).thenReturn(reassignmentResults(), new Map[]{Collections.emptyMap()});
        Mockito.when(this.taskWaiterMock.waitForAnyTaskToFinish((AbstractExecutorReplicaMovement) ArgumentMatchers.any())).thenReturn(Arrays.asList(this.executionTask1, this.executionTask2), new List[]{Arrays.asList(this.invalidAssignmentExecutionTask3, this.invalidAssignmentExecutionTask4)});
        testBaseRetryCase(true);
    }

    @Test
    public void testMoveRetriesTasksWhichFailDescriptionAndSucceedsAfterwards() throws InterruptedException {
        Mockito.reset(new SbkAdminUtils[]{this.adminUtilsMock});
        HashMap hashMap = new HashMap();
        SbkAdminUtils.ReplicaDescription ofSuccessfulDescription = SbkAdminUtils.ReplicaDescription.ofSuccessfulDescription(Arrays.asList(1, 2, 3));
        hashMap.put(this.tp1, ofSuccessfulDescription);
        hashMap.put(this.tp2, ofSuccessfulDescription);
        SbkAdminUtils.ReplicaDescription ofFailedDescription = SbkAdminUtils.ReplicaDescription.ofFailedDescription(new UnknownTopicOrPartitionException("unknown!"));
        hashMap.put(this.invalidAssignmentTp, ofFailedDescription);
        hashMap.put(this.invalidAssignmentTp1, ofFailedDescription);
        Mockito.when(this.adminUtilsMock.getReplicasForPartitions((Collection) ArgumentMatchers.any())).thenReturn(hashMap, new Map[]{this.mockReplicaDescriptionResult});
        Mockito.when(this.resultsMock.all()).thenReturn(this.normalFuture);
        Mockito.when(this.taskWaiterMock.waitForAnyTaskToFinish((AbstractExecutorReplicaMovement) ArgumentMatchers.any())).thenReturn(Arrays.asList(this.executionTask1, this.executionTask2), new List[]{Arrays.asList(this.invalidAssignmentExecutionTask3, this.invalidAssignmentExecutionTask4)});
        testBaseRetryCase(true);
    }

    @Test
    public void testMoveThrowsOnNonRetriableExceptions() throws InterruptedException, ExecutionException {
        KafkaFuture kafkaFuture = (KafkaFuture) Mockito.mock(KafkaFuture.class);
        Mockito.when(kafkaFuture.get()).thenAnswer(invocationOnMock -> {
            throw new ExecutionException("Exception!", new UnknownServerException("unknown"));
        });
        Mockito.when(this.resultsMock.all()).thenReturn(kafkaFuture);
        DataBalancerIntegrationTestUtils.assertAdminApiThrows(UnknownServerException.class, () -> {
            testBaseRetryCase(true);
        });
    }

    @Test
    public void testMoveRetriesTasksWhichThrowInvalidAssignmentExceptionAndContinueToThrow() throws InterruptedException {
        Mockito.when(this.resultsMock.all()).thenReturn(this.throwingFuture, new KafkaFuture[]{this.throwingFuture});
        HashMap hashMap = new HashMap();
        hashMap.put(this.invalidAssignmentTp, this.throwingFuture);
        hashMap.put(this.invalidAssignmentTp1, this.throwingFuture);
        Mockito.when(this.resultsMock.values()).thenReturn(reassignmentResults(), new Map[]{hashMap});
        Mockito.when(this.taskWaiterMock.waitForAnyTaskToFinish((AbstractExecutorReplicaMovement) ArgumentMatchers.any())).thenReturn(Arrays.asList(this.executionTask1, this.executionTask2), new List[]{Collections.emptyList()});
        testBaseRetryCase(false);
    }

    private void testBaseRetryCase(boolean z) throws InterruptedException {
        Mockito.when(this.executionTaskManagerMock.drainInterBrokerReplicaMovementTasks()).thenReturn(this.allTasks);
        Mockito.when(Integer.valueOf(this.executionTaskManagerMock.numPendingInterBrokerPartitionMovements())).thenReturn(0);
        Mockito.when(this.executionTaskManagerMock.inExecutionTasks()).thenReturn(Collections.emptySet());
        Mockito.when(Integer.valueOf(this.executionTaskManagerMock.numInterBrokerPartitionMovementsToBeRetried())).thenReturn(2);
        MockTime.Listener listener = (MockTime.Listener) Mockito.mock(MockTime.Listener.class);
        this.time.addListener(listener);
        if (z) {
            this.interBroker.move(this.taskWaiterMock);
        } else {
            DataBalancerIntegrationTestUtils.assertAdminApiThrows(InvalidReplicaAssignmentException.class, () -> {
                this.interBroker.move(this.taskWaiterMock);
            });
        }
        InOrder inOrder = Mockito.inOrder(new Object[]{this.taskWaiterMock, this.throttleHelperMock, this.resultsMock, this.adminUtilsMock, this.adminClientMock, this.executionTaskManagerMock, listener});
        ((ReplicationThrottleHelper) inOrder.verify(this.throttleHelperMock)).setThrottles((List) this.allTasks.stream().map((v0) -> {
            return v0.proposal();
        }).collect(Collectors.toList()), this.loadMonitorMock, this.removedBrokers);
        ((ExecutionTaskManager) inOrder.verify(this.executionTaskManagerMock)).markTasksInProgress(this.allTasks);
        ((SbkAdminUtils) inOrder.verify(this.adminUtilsMock)).listTargetReplicasBeingReassigned((Optional) ArgumentMatchers.any());
        ((ConfluentAdmin) inOrder.verify(this.adminClientMock)).alterPartitionReassignments((Map) ArgumentMatchers.any());
        ((AlterPartitionReassignmentsResult) inOrder.verify(this.resultsMock)).all();
        ((ExecutionTaskManager) inOrder.verify(this.executionTaskManagerMock)).markTasksToBeRetried(executionTaskListArgThatContains(this.invalidAssignmentExecutionTask3, this.invalidAssignmentExecutionTask4));
        ((Executor.ExecutionTaskWaiter) inOrder.verify(this.taskWaiterMock, Mockito.times(1))).waitForAnyTaskToFinish((AbstractExecutorReplicaMovement) ArgumentMatchers.any());
        ((ReplicationThrottleHelper) inOrder.verify(this.throttleHelperMock)).clearThrottles(executionTaskListArgThatContains((ExecutionTask[]) this.allTasks.toArray(new ExecutionTask[0])), (List) ArgumentMatchers.eq(Collections.emptyList()), (Set) ArgumentMatchers.eq(this.removedBrokers));
        ((ExecutionTaskManager) inOrder.verify(this.executionTaskManagerMock)).numInterBrokerPartitionMovementsToBeRetried();
        ((MockTime.Listener) inOrder.verify(listener, Mockito.atLeastOnce())).onTimeUpdated();
        ((ExecutionTaskManager) inOrder.verify(this.executionTaskManagerMock)).reloadInterBrokerTasksToBeRetried(this.clusterMock);
        ((ReplicationThrottleHelper) inOrder.verify(this.throttleHelperMock)).setThrottles((List) this.allTasks.stream().map((v0) -> {
            return v0.proposal();
        }).collect(Collectors.toList()), this.loadMonitorMock, this.removedBrokers);
        ((ConfluentAdmin) inOrder.verify(this.adminClientMock)).alterPartitionReassignments((Map) ArgumentMatchers.any());
        ((AlterPartitionReassignmentsResult) inOrder.verify(this.resultsMock)).all();
        if (z) {
            ((Executor.ExecutionTaskWaiter) inOrder.verify(this.taskWaiterMock, Mockito.times(1))).waitForAnyTaskToFinish((AbstractExecutorReplicaMovement) ArgumentMatchers.any());
            ((ReplicationThrottleHelper) inOrder.verify(this.throttleHelperMock)).clearThrottles(executionTaskListArgThatContains(this.invalidAssignmentExecutionTask3, this.invalidAssignmentExecutionTask4), (List) ArgumentMatchers.eq(Collections.emptyList()), (Set) ArgumentMatchers.eq(this.removedBrokers));
            ((ExecutionTaskManager) inOrder.verify(this.executionTaskManagerMock)).inExecutionTasks();
            ((ExecutionTaskManager) inOrder.verify(this.executionTaskManagerMock, Mockito.never())).markTasksToBeRetried((List) ArgumentMatchers.any());
            inOrder.verifyNoMoreInteractions();
        }
    }

    @Test
    public void testExecuteReplicaReassignmentTasksThrowsInvalidAssignmentExceptionWhenNotMarkedToRetry() {
        Mockito.when(this.resultsMock.all()).thenReturn(this.throwingFuture);
        Mockito.when(this.resultsMock.values()).thenReturn(reassignmentResults());
        DataBalancerIntegrationTestUtils.assertAdminApiThrows(InvalidReplicaAssignmentException.class, () -> {
            this.interBroker.executeReplicaReassignmentTasks(Arrays.asList(this.executionTask1, this.executionTask2, this.invalidAssignmentExecutionTask3, this.invalidAssignmentExecutionTask4), false);
        });
    }

    public static List<ExecutionTask> executionTaskListArgThatContains(ExecutionTask... executionTaskArr) {
        return (List) Mockito.argThat(list -> {
            for (ExecutionTask executionTask : executionTaskArr) {
                if (!list.contains(executionTask)) {
                    return false;
                }
            }
            return true;
        });
    }

    @Test
    public void testExecuteReplicaReassignmentTasksReturnsEmptyListWhenNoInvalidAssignments() throws InterruptedException {
        Mockito.when(this.resultsMock.all()).thenReturn(this.normalFuture);
        Assertions.assertEquals(0, this.interBroker.executeReplicaReassignmentTasks(Arrays.asList(this.executionTask1, this.executionTask2, this.invalidAssignmentExecutionTask3, this.invalidAssignmentExecutionTask4), true).size());
    }

    private ExecutionProposal proposal(TopicPartition topicPartition) {
        return new ExecutionProposal(topicPartition, 100L, new ReplicaPlacementInfo(1), Arrays.asList(this.b0Placement, this.b1Placement, this.b2Placement), Arrays.asList(this.b3Placement, this.b4Placement), Collections.emptyList(), Collections.emptyList());
    }

    private Map<TopicPartition, KafkaFuture<Void>> reassignmentResults() {
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp1, this.normalFuture);
        hashMap.put(this.tp2, this.normalFuture);
        hashMap.put(this.invalidAssignmentTp, this.throwingFuture);
        hashMap.put(this.invalidAssignmentTp1, this.throwingFuture);
        return hashMap;
    }
}
