package com.linkedin.kafka.cruisecontrol.executor;

import com.linkedin.kafka.cruisecontrol.common.SbkAdminUtils;
import com.linkedin.kafka.cruisecontrol.executor.ExecutionTask;
import com.linkedin.kafka.cruisecontrol.executor.Executor;
import com.linkedin.kafka.cruisecontrol.executor.ExecutorState;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ElectionNotNeededException;

@NotThreadSafe
/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/executor/ExecutorLeadershipReplicaMovement.class */
public class ExecutorLeadershipReplicaMovement extends AbstractExecutorReplicaMovement {
    public ExecutorLeadershipReplicaMovement(String str, ExecutionTaskManager executionTaskManager, Set<Integer> set, Set<Integer> set2, ReplicationThrottleHelper replicationThrottleHelper, ConfluentAdmin confluentAdmin, SbkAdminUtils sbkAdminUtils, AtomicBoolean atomicBoolean) {
        super(str, executionTaskManager, set, set2, replicationThrottleHelper, confluentAdmin, sbkAdminUtils, atomicBoolean);
    }

    @Override // com.linkedin.kafka.cruisecontrol.executor.AbstractExecutorReplicaMovement
    public void move(Executor.ExecutionTaskWaiter executionTaskWaiter) throws InterruptedException {
        int numPendingLeadershipMovements = this.executionTaskManager.numPendingLeadershipMovements();
        LOG.info("Starting {} leadership movements.", Integer.valueOf(numPendingLeadershipMovements));
        int i = 0;
        while (this.executionTaskManager.numPendingLeadershipMovements() != 0 && !this.stopRequested.get()) {
            i += moveLeadershipInBatch(executionTaskWaiter);
            LOG.info("{}/{} ({}%) leadership movements completed.", new Object[]{Integer.valueOf(i), Integer.valueOf(numPendingLeadershipMovements), Integer.valueOf((i * 100) / numPendingLeadershipMovements)});
        }
        if (this.executionTaskManager.inExecutionTasks().isEmpty()) {
            LOG.info("Leadership movements finished.");
        } else if (this.stopRequested.get()) {
            LOG.info("Leadership movements stopped. {}.", this.executionTaskManager.getExecutionTasksSummary(Collections.emptySet()).summarize(ExecutionTask.TaskType.LEADER_ACTION));
        }
    }

    private int moveLeadershipInBatch(Executor.ExecutionTaskWaiter executionTaskWaiter) {
        List<ExecutionTask> leadershipMovementTasks = this.executionTaskManager.getLeadershipMovementTasks();
        int size = leadershipMovementTasks.size();
        LOG.debug("Executing {} leadership movements in a batch: {}", Integer.valueOf(size), leadershipMovementTasks);
        if (!leadershipMovementTasks.isEmpty() && !this.stopRequested.get()) {
            this.executionTaskManager.markTasksInProgress(leadershipMovementTasks);
            try {
                executePreferredLeaderElection(leadershipMovementTasks);
            } catch (InterruptedException | ExecutionException e) {
                LOG.warn("Leadership tasks failed to execute, aborting: {}", leadershipMovementTasks, e);
                this.executionTaskManager.markTasksAborting(leadershipMovementTasks);
            }
            LOG.trace("Waiting for leadership movement batch to finish.");
            while (!this.executionTaskManager.inExecutionTasks().isEmpty() && !this.stopRequested.get()) {
                executionTaskWaiter.waitForAnyTaskToFinish(this);
            }
        }
        return size;
    }

    private void executePreferredLeaderElection(List<ExecutionTask> list) throws ExecutionException, InterruptedException {
        Optional findFirst = ((Map) this.adminClient.electLeaders(ElectionType.PREFERRED, (Set) list.stream().map(executionTask -> {
            return new TopicPartition(executionTask.proposal().topic(), executionTask.proposal().partitionId());
        }).collect(Collectors.toSet())).partitions().get()).entrySet().stream().filter(entry -> {
            return ((Optional) entry.getValue()).isPresent() && !(((Optional) entry.getValue()).get() instanceof ElectionNotNeededException);
        }).map(entry2 -> {
            return (Throwable) ((Optional) entry2.getValue()).get();
        }).findFirst();
        if (findFirst.isPresent()) {
            throw new ExecutionException((Throwable) findFirst.get());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // com.linkedin.kafka.cruisecontrol.executor.AbstractExecutorReplicaMovement
    public ExecutionTask.TaskType taskType() {
        return ExecutionTask.TaskType.LEADER_ACTION;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // com.linkedin.kafka.cruisecontrol.executor.AbstractExecutorReplicaMovement
    public ExecutorState.State state() {
        return ExecutorState.State.LEADER_MOVEMENT_TASK_IN_PROGRESS;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // com.linkedin.kafka.cruisecontrol.executor.AbstractExecutorReplicaMovement
    public ExecutorState executorState() {
        return ExecutorState.operationInProgress(ExecutorState.State.LEADER_MOVEMENT_TASK_IN_PROGRESS, this.executionTaskManager.getExecutionTasksSummary(Collections.singleton(ExecutionTask.TaskType.LEADER_ACTION)), this.executionTaskManager.interBrokerPartitionMovementConcurrency(), this.executionTaskManager.intraBrokerPartitionMovementConcurrency(), this.executionTaskManager.leadershipMovementConcurrency(), this.uuid, this.recentlyDemotedBrokers, this.recentlyRemovedBrokers);
    }
}
