package com.linkedin.kafka.cruisecontrol.executor;

import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.common.KafkaCruiseControlThreadFactory;
import com.linkedin.kafka.cruisecontrol.common.MetadataClient;
import com.linkedin.kafka.cruisecontrol.common.SbkAdminUtils;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.detector.AnomalyDetector;
import com.linkedin.kafka.cruisecontrol.detector.notifier.AnomalyType;
import com.linkedin.kafka.cruisecontrol.executor.ExecutionTask;
import com.linkedin.kafka.cruisecontrol.executor.ExecutionTaskTracker;
import com.linkedin.kafka.cruisecontrol.executor.ExecutorState;
import com.linkedin.kafka.cruisecontrol.executor.strategy.ReplicaMovementStrategy;
import com.linkedin.kafka.cruisecontrol.model.ReplicaPlacementInfo;
import com.linkedin.kafka.cruisecontrol.monitor.LoadMonitor;
import io.confluent.databalancer.metrics.DataBalancerMetricsRegistry;
import io.confluent.databalancer.operation.BalanceOpExecutionCompletionCallback;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import kafka.zk.KafkaZkClient;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult;
import org.apache.kafka.clients.admin.NewPartitionReassignment;
import org.apache.kafka.clients.admin.PartitionReassignment;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ElectionNotNeededException;
import org.apache.kafka.common.utils.Time;
import org.apache.zookeeper.client.ZKClientConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/executor/Executor.class */
public class Executor {
    private static final Logger LOG = LoggerFactory.getLogger(Executor.class);
    private static final Logger OPERATION_LOG = LoggerFactory.getLogger(KafkaCruiseControlUtils.OPERATION_LOGGER);
    private static final long EXECUTION_HISTORY_SCANNER_PERIOD_SECONDS = 5;
    private static final long EXECUTION_HISTORY_SCANNER_INITIAL_DELAY_SECONDS = 0;
    private static final long LEADER_ACTION_TIMEOUT_MS = 180000;
    private static final String ZK_EXECUTOR_METRIC_GROUP = "CruiseControlExecutor";
    private static final String ZK_EXECUTOR_METRIC_TYPE = "Executor";
    private final ExecutionTaskManager _executionTaskManager;
    private final MetadataClient _metadataClient;
    private final long _statusCheckingIntervalMs;
    private final ExecutorService _proposalExecutor;
    private final KafkaZkClient _kafkaZkClient;
    private final ConfluentAdmin _adminClient;
    private final SbkAdminUtils adminUtils;
    private final AtomicBoolean _stopRequested;
    private final Time _time;
    private volatile boolean _hasOngoingExecution;
    final ExecutorReservation _reservation;
    private volatile ExecutorState _executorState;
    private volatile String _uuid;
    private final ExecutorNotifier _executorNotifier;
    private AtomicInteger _numExecutionStopped;
    private AtomicInteger _numExecutionStoppedByUser;
    private AtomicBoolean _executionStoppedByUser;
    private AtomicInteger _numCancelledReassignments;
    private AtomicInteger _numFailedReassignmentCancellations;
    private static final String EXECUTION_STOPPED = "execution-stopped";
    private static final String GAUGE_EXECUTION_STOPPED = "execution-stopped";
    private static final String GAUGE_CANCELLED_REASSIGNMENTS = "cancelled-reassignments";
    private static final String GAUGE_FAILED_REASSIGNMENT_CANCELLATIONS = "failed-reassignment-cancellations";
    private final long _demotionHistoryRetentionTimeMs;
    private final long _removalHistoryRetentionTimeMs;
    private final ConcurrentMap<Integer, Long> _latestDemoteStartTimeMsByBrokerId;
    private final ConcurrentMap<Integer, Long> _latestRemoveStartTimeMsByBrokerId;
    private final ScheduledExecutorService _executionHistoryScannerExecutor;
    private final AnomalyDetector _anomalyDetector;
    private final ReplicationThrottleHelper _throttleHelper;

    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/executor/Executor$ExecutionHistoryScanner.class */
    private class ExecutionHistoryScanner implements Runnable {
        private ExecutionHistoryScanner() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                Executor.this.removeExpiredDemotionHistory();
                Executor.this.removeExpiredRemovalHistory();
            } catch (Throwable th) {
                Executor.LOG.warn("Received exception when trying to expire execution history.", th);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/executor/Executor$ExecutorReservation.class */
    public static final class ExecutorReservation {
        private ReentrantLock _lock = new ReentrantLock();

        boolean attemptReservation() {
            if (this._lock.isHeldByCurrentThread()) {
                throw new IllegalStateException("Cannot reserve the Executor again while already holding a reservation.");
            }
            return this._lock.tryLock();
        }

        void cancelReservation() {
            this._lock.unlock();
        }

        boolean isReservedByMe() {
            return this._lock.isHeldByCurrentThread();
        }

        boolean isReserved() {
            return this._lock.isLocked();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/executor/Executor$PartitionReplicas.class */
    public static class PartitionReplicas {
        private final List<Integer> replicas;
        private final List<Integer> observers;

        public PartitionReplicas(List<Integer> list, List<Integer> list2) {
            this.replicas = list;
            this.observers = list2;
        }

        public List<Integer> replicas() {
            return Collections.unmodifiableList(this.replicas);
        }

        public List<Integer> observers() {
            return Collections.unmodifiableList(this.observers);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/executor/Executor$ProposalExecutionRunnable.class */
    public class ProposalExecutionRunnable implements Runnable {
        private final LoadMonitor _loadMonitor;
        private final BalanceOpExecutionCompletionCallback _completionCallback;
        private Set<Integer> _recentlyDemotedBrokers;
        private Set<Integer> _recentlyRemovedBrokers;
        private Set<Integer> _removedBrokers;
        private final long _executionStartMs;
        private ExecutorState.State _state = ExecutorState.State.NO_TASK_IN_PROGRESS;
        protected Throwable _executionException = null;

        /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/executor/Executor$ProposalExecutionRunnable$ProposalExecutionAutoCloseable.class */
        private class ProposalExecutionAutoCloseable implements AutoCloseable {
            public ProposalExecutionAutoCloseable(String str, Set<Integer> set, Set<Integer> set2) {
                ProposalExecutionRunnable.this._state = ExecutorState.State.STARTING_EXECUTION;
                Executor.this._executorState = ExecutorState.executionStarted(str, set, set2);
                Executor.OPERATION_LOG.info("Task [{}] execution starts.", str);
            }

            @Override // java.lang.AutoCloseable
            public void close() {
                Executor.this._executionTaskManager.clear();
                Executor.this._uuid = null;
                ProposalExecutionRunnable.this._state = ExecutorState.State.NO_TASK_IN_PROGRESS;
                Executor.this._executorState = ExecutorState.noTaskInProgress(ProposalExecutionRunnable.this._recentlyDemotedBrokers, ProposalExecutionRunnable.this._recentlyRemovedBrokers);
                Executor.this._hasOngoingExecution = false;
                Executor.this._stopRequested.set(false);
                Executor.this._executionStoppedByUser.set(false);
            }
        }

        ProposalExecutionRunnable(LoadMonitor loadMonitor, Collection<Integer> collection, Collection<Integer> collection2, BalanceOpExecutionCompletionCallback balanceOpExecutionCompletionCallback) {
            this._loadMonitor = loadMonitor;
            this._executionStartMs = Executor.this._time.milliseconds();
            this._completionCallback = balanceOpExecutionCompletionCallback;
            if (Executor.this._anomalyDetector == null) {
                throw new IllegalStateException("AnomalyDetector is not specified in Executor.");
            }
            if (collection != null) {
                collection.forEach(num -> {
                });
            }
            if (collection2 != null) {
                collection2.forEach(num2 -> {
                });
                this._removedBrokers = new HashSet(collection2);
            } else {
                this._removedBrokers = Collections.emptySet();
            }
            this._recentlyDemotedBrokers = Executor.this.recentlyDemotedBrokers();
            this._recentlyRemovedBrokers = Executor.this.recentlyRemovedBrokers();
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                ProposalExecutionAutoCloseable proposalExecutionAutoCloseable = new ProposalExecutionAutoCloseable(Executor.this._uuid, this._recentlyDemotedBrokers, this._recentlyRemovedBrokers);
                Throwable th = null;
                try {
                    Executor.LOG.info("Starting execution of balancing proposals.");
                    execute();
                    Executor.LOG.info("Execution finished.");
                    if (proposalExecutionAutoCloseable != null) {
                        if (0 != 0) {
                            try {
                                proposalExecutionAutoCloseable.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            proposalExecutionAutoCloseable.close();
                        }
                    }
                } finally {
                }
            } catch (Exception e) {
                Executor.LOG.error("Exception during execution of ProposalExecutionRunnable", e);
            }
        }

        protected void execute() {
            boolean anyMatch = AnomalyType.cachedValues().stream().anyMatch(anomalyType -> {
                return Executor.this._uuid.startsWith(anomalyType.toString());
            });
            while (true) {
                try {
                    try {
                        try {
                            this._loadMonitor.pauseMetricSampling(String.format("Paused-By-Cruise-Control-Before-Starting-Execution (Date: %s)", KafkaCruiseControlUtils.currentUtcDate()));
                            break;
                        } catch (Throwable th) {
                            Executor.LOG.error("Executor got exception during execution", th);
                            this._executionException = th;
                            String format = String.format("for operation %s", Executor.this._uuid);
                            Executor.LOG.info("Cleaning up execution {}", format);
                            tryRun(() -> {
                                this._loadMonitor.forceRefreshClusterAndGeneration();
                            }, "Unable to refresh cluster metadata after execution of " + format);
                            if (anyMatch) {
                                tryRun(() -> {
                                    Executor.this._anomalyDetector.markSelfHealingFinished(Executor.this._uuid);
                                }, "Caught exception while marking self healing finished " + format);
                            }
                            tryRun(() -> {
                                this._loadMonitor.resumeMetricSampling(String.format("Resumed-By-Cruise-Control-After-Completed-Execution (Date: %s)", KafkaCruiseControlUtils.currentUtcDate()));
                            }, "Caught exception while resuming metric sampling " + format);
                            boolean z = Executor.this._executorState.state() != ExecutorState.State.STOPPING_EXECUTION && this._executionException == null;
                            Executor.LOG.info("ProposalExecutionRunnable finishes with success state {} {} and exception: ", new Object[]{Boolean.valueOf(z), format, this._executionException});
                            ExecutorNotification executorNotification = new ExecutorNotification(this._executionStartMs, Executor.this._time.milliseconds(), Executor.this._uuid, Executor.this._stopRequested.get(), Executor.this._executionStoppedByUser.get(), this._executionException, z);
                            tryRun(() -> {
                                Executor.this._executorNotifier.sendNotification(executorNotification);
                            }, String.format("Caught exception while sending a notification (notification: %s) ", executorNotification) + format);
                            Executor.OPERATION_LOG.info("Task [{}] execution finishes.", Executor.this._uuid);
                            if (this._completionCallback != null) {
                                Executor.LOG.info("executionRunnable invoking completion callback");
                                tryRun(() -> {
                                    this._completionCallback.accept(z, this._executionException);
                                }, String.format("Caught exception while invoking completion callback (succeeded: %s, exception: %s) ", Boolean.valueOf(z), this._executionException) + format);
                                return;
                            }
                            return;
                        }
                    } catch (Throwable th2) {
                        String format2 = String.format("for operation %s", Executor.this._uuid);
                        Executor.LOG.info("Cleaning up execution {}", format2);
                        tryRun(() -> {
                            this._loadMonitor.forceRefreshClusterAndGeneration();
                        }, "Unable to refresh cluster metadata after execution of " + format2);
                        if (anyMatch) {
                            tryRun(() -> {
                                Executor.this._anomalyDetector.markSelfHealingFinished(Executor.this._uuid);
                            }, "Caught exception while marking self healing finished " + format2);
                        }
                        tryRun(() -> {
                            this._loadMonitor.resumeMetricSampling(String.format("Resumed-By-Cruise-Control-After-Completed-Execution (Date: %s)", KafkaCruiseControlUtils.currentUtcDate()));
                        }, "Caught exception while resuming metric sampling " + format2);
                        boolean z2 = Executor.this._executorState.state() != ExecutorState.State.STOPPING_EXECUTION && this._executionException == null;
                        Executor.LOG.info("ProposalExecutionRunnable finishes with success state {} {} and exception: ", new Object[]{Boolean.valueOf(z2), format2, this._executionException});
                        ExecutorNotification executorNotification2 = new ExecutorNotification(this._executionStartMs, Executor.this._time.milliseconds(), Executor.this._uuid, Executor.this._stopRequested.get(), Executor.this._executionStoppedByUser.get(), this._executionException, z2);
                        tryRun(() -> {
                            Executor.this._executorNotifier.sendNotification(executorNotification2);
                        }, String.format("Caught exception while sending a notification (notification: %s) ", executorNotification2) + format2);
                        Executor.OPERATION_LOG.info("Task [{}] execution finishes.", Executor.this._uuid);
                        if (this._completionCallback != null) {
                            Executor.LOG.info("executionRunnable invoking completion callback");
                            tryRun(() -> {
                                this._completionCallback.accept(z2, this._executionException);
                            }, String.format("Caught exception while invoking completion callback (succeeded: %s, exception: %s) ", Boolean.valueOf(z2), this._executionException) + format2);
                        }
                        throw th2;
                    }
                } catch (IllegalStateException e) {
                    Thread.sleep(Executor.this._statusCheckingIntervalMs);
                    Executor.LOG.debug("Waiting for the load monitor to be ready to initialize the execution.", e);
                }
            }
            if (this._state == ExecutorState.State.STARTING_EXECUTION) {
                this._state = ExecutorState.State.INTER_BROKER_REPLICA_MOVEMENT_TASK_IN_PROGRESS;
                Executor.this._executorState = ExecutorState.operationInProgress(ExecutorState.State.INTER_BROKER_REPLICA_MOVEMENT_TASK_IN_PROGRESS, Executor.this._executionTaskManager.getExecutionTasksSummary(Collections.singleton(ExecutionTask.TaskType.INTER_BROKER_REPLICA_ACTION)), Executor.this._executionTaskManager.interBrokerPartitionMovementConcurrency(), Executor.this._executionTaskManager.intraBrokerPartitionMovementConcurrency(), Executor.this._executionTaskManager.leadershipMovementConcurrency(), Executor.this._uuid, this._recentlyDemotedBrokers, this._recentlyRemovedBrokers);
                interBrokerMoveReplicas();
                updateOngoingExecutionState();
            }
            if (this._state == ExecutorState.State.INTER_BROKER_REPLICA_MOVEMENT_TASK_IN_PROGRESS) {
                this._state = ExecutorState.State.INTRA_BROKER_REPLICA_MOVEMENT_TASK_IN_PROGRESS;
                Executor.this._executorState = ExecutorState.operationInProgress(ExecutorState.State.INTRA_BROKER_REPLICA_MOVEMENT_TASK_IN_PROGRESS, Executor.this._executionTaskManager.getExecutionTasksSummary(Collections.singleton(ExecutionTask.TaskType.INTRA_BROKER_REPLICA_ACTION)), Executor.this._executionTaskManager.interBrokerPartitionMovementConcurrency(), Executor.this._executionTaskManager.intraBrokerPartitionMovementConcurrency(), Executor.this._executionTaskManager.leadershipMovementConcurrency(), Executor.this._uuid, this._recentlyDemotedBrokers, this._recentlyRemovedBrokers);
                intraBrokerMoveReplicas();
                updateOngoingExecutionState();
            }
            if (this._state == ExecutorState.State.INTRA_BROKER_REPLICA_MOVEMENT_TASK_IN_PROGRESS) {
                this._state = ExecutorState.State.LEADER_MOVEMENT_TASK_IN_PROGRESS;
                Executor.this._executorState = ExecutorState.operationInProgress(ExecutorState.State.LEADER_MOVEMENT_TASK_IN_PROGRESS, Executor.this._executionTaskManager.getExecutionTasksSummary(Collections.singleton(ExecutionTask.TaskType.LEADER_ACTION)), Executor.this._executionTaskManager.interBrokerPartitionMovementConcurrency(), Executor.this._executionTaskManager.intraBrokerPartitionMovementConcurrency(), Executor.this._executionTaskManager.leadershipMovementConcurrency(), Executor.this._uuid, this._recentlyDemotedBrokers, this._recentlyRemovedBrokers);
                moveLeaderships();
                updateOngoingExecutionState();
            }
            String format3 = String.format("for operation %s", Executor.this._uuid);
            Executor.LOG.info("Cleaning up execution {}", format3);
            tryRun(() -> {
                this._loadMonitor.forceRefreshClusterAndGeneration();
            }, "Unable to refresh cluster metadata after execution of " + format3);
            if (anyMatch) {
                tryRun(() -> {
                    Executor.this._anomalyDetector.markSelfHealingFinished(Executor.this._uuid);
                }, "Caught exception while marking self healing finished " + format3);
            }
            tryRun(() -> {
                this._loadMonitor.resumeMetricSampling(String.format("Resumed-By-Cruise-Control-After-Completed-Execution (Date: %s)", KafkaCruiseControlUtils.currentUtcDate()));
            }, "Caught exception while resuming metric sampling " + format3);
            boolean z3 = Executor.this._executorState.state() != ExecutorState.State.STOPPING_EXECUTION && this._executionException == null;
            Executor.LOG.info("ProposalExecutionRunnable finishes with success state {} {} and exception: ", new Object[]{Boolean.valueOf(z3), format3, this._executionException});
            ExecutorNotification executorNotification3 = new ExecutorNotification(this._executionStartMs, Executor.this._time.milliseconds(), Executor.this._uuid, Executor.this._stopRequested.get(), Executor.this._executionStoppedByUser.get(), this._executionException, z3);
            tryRun(() -> {
                Executor.this._executorNotifier.sendNotification(executorNotification3);
            }, String.format("Caught exception while sending a notification (notification: %s) ", executorNotification3) + format3);
            Executor.OPERATION_LOG.info("Task [{}] execution finishes.", Executor.this._uuid);
            if (this._completionCallback != null) {
                Executor.LOG.info("executionRunnable invoking completion callback");
                tryRun(() -> {
                    this._completionCallback.accept(z3, this._executionException);
                }, String.format("Caught exception while invoking completion callback (succeeded: %s, exception: %s) ", Boolean.valueOf(z3), this._executionException) + format3);
            }
        }

        private void tryRun(Runnable runnable, String str) {
            try {
                runnable.run();
            } catch (Exception e) {
                Executor.LOG.error(str, e);
            }
        }

        private void updateOngoingExecutionState() {
            if (Executor.this._stopRequested.get()) {
                this._state = ExecutorState.State.STOPPING_EXECUTION;
                Executor.this._executorState = ExecutorState.operationInProgress(ExecutorState.State.STOPPING_EXECUTION, Executor.this._executionTaskManager.getExecutionTasksSummary(new HashSet(ExecutionTask.TaskType.cachedValues())), Executor.this._executionTaskManager.interBrokerPartitionMovementConcurrency(), Executor.this._executionTaskManager.intraBrokerPartitionMovementConcurrency(), Executor.this._executionTaskManager.leadershipMovementConcurrency(), Executor.this._uuid, this._recentlyDemotedBrokers, this._recentlyRemovedBrokers);
                return;
            }
            switch (this._state) {
                case LEADER_MOVEMENT_TASK_IN_PROGRESS:
                    Executor.this._executorState = ExecutorState.operationInProgress(ExecutorState.State.LEADER_MOVEMENT_TASK_IN_PROGRESS, Executor.this._executionTaskManager.getExecutionTasksSummary(Collections.singleton(ExecutionTask.TaskType.LEADER_ACTION)), Executor.this._executionTaskManager.interBrokerPartitionMovementConcurrency(), Executor.this._executionTaskManager.intraBrokerPartitionMovementConcurrency(), Executor.this._executionTaskManager.leadershipMovementConcurrency(), Executor.this._uuid, this._recentlyDemotedBrokers, this._recentlyRemovedBrokers);
                    return;
                case INTER_BROKER_REPLICA_MOVEMENT_TASK_IN_PROGRESS:
                    Executor.this._executorState = ExecutorState.operationInProgress(ExecutorState.State.INTER_BROKER_REPLICA_MOVEMENT_TASK_IN_PROGRESS, Executor.this._executionTaskManager.getExecutionTasksSummary(Collections.singleton(ExecutionTask.TaskType.INTER_BROKER_REPLICA_ACTION)), Executor.this._executionTaskManager.interBrokerPartitionMovementConcurrency(), Executor.this._executionTaskManager.intraBrokerPartitionMovementConcurrency(), Executor.this._executionTaskManager.leadershipMovementConcurrency(), Executor.this._uuid, this._recentlyDemotedBrokers, this._recentlyRemovedBrokers);
                    return;
                case INTRA_BROKER_REPLICA_MOVEMENT_TASK_IN_PROGRESS:
                    Executor.this._executorState = ExecutorState.operationInProgress(ExecutorState.State.INTRA_BROKER_REPLICA_MOVEMENT_TASK_IN_PROGRESS, Executor.this._executionTaskManager.getExecutionTasksSummary(Collections.singleton(ExecutionTask.TaskType.INTRA_BROKER_REPLICA_ACTION)), Executor.this._executionTaskManager.interBrokerPartitionMovementConcurrency(), Executor.this._executionTaskManager.intraBrokerPartitionMovementConcurrency(), Executor.this._executionTaskManager.leadershipMovementConcurrency(), Executor.this._uuid, this._recentlyDemotedBrokers, this._recentlyRemovedBrokers);
                    return;
                default:
                    throw new IllegalStateException("Unexpected ongoing execution state " + this._state);
            }
        }

        private void interBrokerMoveReplicas() throws InterruptedException {
            int numRemainingInterBrokerPartitionMovements = Executor.this._executionTaskManager.numRemainingInterBrokerPartitionMovements();
            long remainingInterBrokerDataToMoveInMB = Executor.this._executionTaskManager.remainingInterBrokerDataToMoveInMB();
            Executor.LOG.info("Starting {} inter-broker partition movements.", Integer.valueOf(numRemainingInterBrokerPartitionMovements));
            int i = numRemainingInterBrokerPartitionMovements;
            while (true) {
                if ((i > 0 || !Executor.this._executionTaskManager.inExecutionTasks().isEmpty()) && !Executor.this._stopRequested.get()) {
                    List<ExecutionTask> interBrokerReplicaMovementTasks = Executor.this._executionTaskManager.getInterBrokerReplicaMovementTasks();
                    Executor.LOG.info("Executor will execute {} task(s)", Integer.valueOf(interBrokerReplicaMovementTasks.size()));
                    if (!interBrokerReplicaMovementTasks.isEmpty()) {
                        synchronized (Executor.this._throttleHelper) {
                            Executor.this._throttleHelper.setThrottles((List) interBrokerReplicaMovementTasks.stream().map((v0) -> {
                                return v0.proposal();
                            }).collect(Collectors.toList()), this._loadMonitor, this._removedBrokers);
                        }
                        Executor.this._executionTaskManager.markTasksInProgress(interBrokerReplicaMovementTasks);
                        Executor.this.executeReplicaReassignmentTasks(interBrokerReplicaMovementTasks);
                    }
                    List<ExecutionTask> waitForExecutionTaskToFinish = waitForExecutionTaskToFinish();
                    i = Executor.this._executionTaskManager.numRemainingInterBrokerPartitionMovements();
                    int numFinishedInterBrokerPartitionMovements = Executor.this._executionTaskManager.numFinishedInterBrokerPartitionMovements();
                    long finishedInterBrokerDataMovementInMB = Executor.this._executionTaskManager.finishedInterBrokerDataMovementInMB();
                    Logger logger = Executor.LOG;
                    Object[] objArr = new Object[6];
                    objArr[0] = Integer.valueOf(numFinishedInterBrokerPartitionMovements);
                    objArr[1] = Integer.valueOf(numRemainingInterBrokerPartitionMovements);
                    objArr[2] = String.format(Locale.US, "%.2f", Double.valueOf((numFinishedInterBrokerPartitionMovements * 100.0d) / numRemainingInterBrokerPartitionMovements));
                    objArr[3] = Long.valueOf(finishedInterBrokerDataMovementInMB);
                    objArr[4] = Long.valueOf(remainingInterBrokerDataToMoveInMB);
                    objArr[5] = remainingInterBrokerDataToMoveInMB == Executor.EXECUTION_HISTORY_SCANNER_INITIAL_DELAY_SECONDS ? 100 : String.format(Locale.US, "%.2f", Double.valueOf((finishedInterBrokerDataMovementInMB * 100.0d) / remainingInterBrokerDataToMoveInMB));
                    logger.info("{}/{} ({}%) inter-broker partition movements completed. {}/{} ({}%) MB have been moved.", objArr);
                    synchronized (Executor.this._throttleHelper) {
                        Executor.this._throttleHelper.clearThrottles(waitForExecutionTaskToFinish, new ArrayList(Executor.this._executionTaskManager.inExecutionTasks()));
                    }
                }
            }
            Set<ExecutionTask> inExecutionTasks = Executor.this._executionTaskManager.inExecutionTasks();
            while (!inExecutionTasks.isEmpty()) {
                Executor.LOG.info("Waiting for {} tasks moving {} MB to finish.", Integer.valueOf(inExecutionTasks.size()), Long.valueOf(Executor.this._executionTaskManager.inExecutionInterBrokerDataToMoveInMB()));
                List<ExecutionTask> waitForExecutionTaskToFinish2 = waitForExecutionTaskToFinish();
                inExecutionTasks = Executor.this._executionTaskManager.inExecutionTasks();
                synchronized (Executor.this._throttleHelper) {
                    Executor.this._throttleHelper.clearThrottles(waitForExecutionTaskToFinish2, new ArrayList(inExecutionTasks));
                }
            }
            if (Executor.this._executionTaskManager.inExecutionTasks().isEmpty()) {
                Executor.LOG.info("Inter-broker partition movements finished.");
            } else if (Executor.this._stopRequested.get()) {
                ExecutionTaskTracker.ExecutionTasksSummary executionTasksSummary = Executor.this._executionTaskManager.getExecutionTasksSummary(Collections.emptySet());
                Map<ExecutionTask.State, Integer> map = executionTasksSummary.taskStat().get(ExecutionTask.TaskType.INTER_BROKER_REPLICA_ACTION);
                Executor.LOG.info("Inter-broker partition movements stopped. For inter-broker partition movements {} tasks cancelled, {} tasks in-progress, {} tasks aborting, {} tasks aborted, {} tasks dead, {} tasks completed, {} remaining data to move; for intra-broker partition movement {} tasks cancelled; for leadership movements {} task cancelled.", new Object[]{map.get(ExecutionTask.State.PENDING), map.get(ExecutionTask.State.IN_PROGRESS), map.get(ExecutionTask.State.ABORTING), map.get(ExecutionTask.State.ABORTED), map.get(ExecutionTask.State.DEAD), map.get(ExecutionTask.State.COMPLETED), Long.valueOf(executionTasksSummary.remainingInterBrokerDataToMoveInMB()), executionTasksSummary.taskStat().get(ExecutionTask.TaskType.INTRA_BROKER_REPLICA_ACTION).get(ExecutionTask.State.PENDING), executionTasksSummary.taskStat().get(ExecutionTask.TaskType.LEADER_ACTION).get(ExecutionTask.State.PENDING)});
            }
        }

        private void intraBrokerMoveReplicas() {
            int numRemainingIntraBrokerPartitionMovements = Executor.this._executionTaskManager.numRemainingIntraBrokerPartitionMovements();
            long remainingIntraBrokerDataToMoveInMB = Executor.this._executionTaskManager.remainingIntraBrokerDataToMoveInMB();
            Executor.LOG.info("Starting {} intra-broker partition movements.", Integer.valueOf(numRemainingIntraBrokerPartitionMovements));
            int i = numRemainingIntraBrokerPartitionMovements;
            while (true) {
                if ((i > 0 || !Executor.this._executionTaskManager.inExecutionTasks().isEmpty()) && !Executor.this._stopRequested.get()) {
                    List<ExecutionTask> intraBrokerReplicaMovementTasks = Executor.this._executionTaskManager.getIntraBrokerReplicaMovementTasks();
                    Executor.LOG.info("Executor will execute {} task(s)", Integer.valueOf(intraBrokerReplicaMovementTasks.size()));
                    if (!intraBrokerReplicaMovementTasks.isEmpty()) {
                        Executor.this._executionTaskManager.markTasksInProgress(intraBrokerReplicaMovementTasks);
                        Executor.this.adminUtils.executeIntraBrokerReplicaMovements(intraBrokerReplicaMovementTasks, Executor.this._executionTaskManager);
                    }
                    waitForExecutionTaskToFinish();
                    i = Executor.this._executionTaskManager.numRemainingIntraBrokerPartitionMovements();
                    int numFinishedIntraBrokerPartitionMovements = Executor.this._executionTaskManager.numFinishedIntraBrokerPartitionMovements();
                    long finishedIntraBrokerDataToMoveInMB = Executor.this._executionTaskManager.finishedIntraBrokerDataToMoveInMB();
                    Logger logger = Executor.LOG;
                    Object[] objArr = new Object[6];
                    objArr[0] = Integer.valueOf(numFinishedIntraBrokerPartitionMovements);
                    objArr[1] = Integer.valueOf(numRemainingIntraBrokerPartitionMovements);
                    objArr[2] = String.format(Locale.US, "%.2f", Double.valueOf((numFinishedIntraBrokerPartitionMovements * 100.0d) / numRemainingIntraBrokerPartitionMovements));
                    objArr[3] = Long.valueOf(finishedIntraBrokerDataToMoveInMB);
                    objArr[4] = Long.valueOf(remainingIntraBrokerDataToMoveInMB);
                    objArr[5] = remainingIntraBrokerDataToMoveInMB == Executor.EXECUTION_HISTORY_SCANNER_INITIAL_DELAY_SECONDS ? 100 : String.format(Locale.US, "%.2f", Double.valueOf((finishedIntraBrokerDataToMoveInMB * 100.0d) / remainingIntraBrokerDataToMoveInMB));
                    logger.info("{}/{} ({}%) intra-broker partition movements completed. {}/{} ({}%) MB have been moved.", objArr);
                }
            }
            Set<ExecutionTask> inExecutionTasks = Executor.this._executionTaskManager.inExecutionTasks();
            while (true) {
                Set<ExecutionTask> set = inExecutionTasks;
                if (set.isEmpty()) {
                    break;
                }
                Executor.LOG.info("Waiting for {} tasks moving {} MB to finish", new Object[]{Integer.valueOf(set.size()), Long.valueOf(Executor.this._executionTaskManager.inExecutionIntraBrokerDataMovementInMB()), set});
                waitForExecutionTaskToFinish();
                inExecutionTasks = Executor.this._executionTaskManager.inExecutionTasks();
            }
            if (Executor.this._executionTaskManager.inExecutionTasks().isEmpty()) {
                Executor.LOG.info("Intra-broker partition movements finished.");
            } else if (Executor.this._stopRequested.get()) {
                ExecutionTaskTracker.ExecutionTasksSummary executionTasksSummary = Executor.this._executionTaskManager.getExecutionTasksSummary(Collections.emptySet());
                Map<ExecutionTask.State, Integer> map = executionTasksSummary.taskStat().get(ExecutionTask.TaskType.INTRA_BROKER_REPLICA_ACTION);
                Executor.LOG.info("Intra-broker partition movements stopped. For intra-broker partition movements {} tasks cancelled, {} tasks in-progress, {} tasks aborting, {} tasks aborted, {} tasks dead, {} tasks completed, {} remaining data to move; for leadership movements {} task cancelled.", new Object[]{map.get(ExecutionTask.State.PENDING), map.get(ExecutionTask.State.IN_PROGRESS), map.get(ExecutionTask.State.ABORTING), map.get(ExecutionTask.State.ABORTED), map.get(ExecutionTask.State.DEAD), map.get(ExecutionTask.State.COMPLETED), Long.valueOf(executionTasksSummary.remainingIntraBrokerDataToMoveInMB()), executionTasksSummary.taskStat().get(ExecutionTask.TaskType.LEADER_ACTION).get(ExecutionTask.State.PENDING)});
            }
        }

        private void moveLeaderships() {
            int numRemainingLeadershipMovements = Executor.this._executionTaskManager.numRemainingLeadershipMovements();
            Executor.LOG.info("Starting {} leadership movements.", Integer.valueOf(numRemainingLeadershipMovements));
            int i = 0;
            while (Executor.this._executionTaskManager.numRemainingLeadershipMovements() != 0 && !Executor.this._stopRequested.get()) {
                updateOngoingExecutionState();
                i += moveLeadershipInBatch();
                Executor.LOG.info("{}/{} ({}%) leadership movements completed.", new Object[]{Integer.valueOf(i), Integer.valueOf(numRemainingLeadershipMovements), Integer.valueOf((i * 100) / numRemainingLeadershipMovements)});
            }
            if (Executor.this._executionTaskManager.inExecutionTasks().isEmpty()) {
                Executor.LOG.info("Leadership movements finished.");
            } else if (Executor.this._stopRequested.get()) {
                Map<ExecutionTask.State, Integer> map = Executor.this._executionTaskManager.getExecutionTasksSummary(Collections.emptySet()).taskStat().get(ExecutionTask.TaskType.LEADER_ACTION);
                Executor.LOG.info("Leadership movements stopped. {} tasks cancelled, {} tasks in-progress, {} tasks aborting, {} tasks aborted, {} tasks dead, {} tasks completed.", new Object[]{map.get(ExecutionTask.State.PENDING), map.get(ExecutionTask.State.IN_PROGRESS), map.get(ExecutionTask.State.ABORTING), map.get(ExecutionTask.State.ABORTED), map.get(ExecutionTask.State.DEAD), map.get(ExecutionTask.State.COMPLETED)});
            }
        }

        private int moveLeadershipInBatch() {
            List<ExecutionTask> leadershipMovementTasks = Executor.this._executionTaskManager.getLeadershipMovementTasks();
            int size = leadershipMovementTasks.size();
            Executor.LOG.debug("Executing {} leadership movements in a batch.", Integer.valueOf(size));
            if (!leadershipMovementTasks.isEmpty() && !Executor.this._stopRequested.get()) {
                Executor.this._executionTaskManager.markTasksInProgress(leadershipMovementTasks);
                try {
                    Executor.this.executePreferredLeaderElection(leadershipMovementTasks);
                } catch (InterruptedException | ExecutionException e) {
                    Executor.LOG.warn("Leadership tasks failed to execute, aborting: {}", leadershipMovementTasks, e);
                    Executor.this._executionTaskManager.markTasksAborting(leadershipMovementTasks);
                }
                Executor.LOG.trace("Waiting for leadership movement batch to finish.");
                while (!Executor.this._executionTaskManager.inExecutionTasks().isEmpty() && !Executor.this._stopRequested.get()) {
                    waitForExecutionTaskToFinish();
                }
            }
            return size;
        }

        private List<ExecutionTask> waitForExecutionTaskToFinish() {
            ArrayList arrayList = new ArrayList();
            do {
                maybeReexecuteTasks();
                try {
                    Thread.sleep(Executor.this._statusCheckingIntervalMs);
                } catch (InterruptedException e) {
                }
                Cluster cluster = Executor.this._metadataClient.refreshMetadata().cluster();
                Map<ExecutionTask, DescribeReplicaLogDirsResult.ReplicaLogDirInfo> logdirInfoForExecutionTask = Executor.this.adminUtils.getLogdirInfoForExecutionTask(Executor.this._executionTaskManager.inExecutionTasks(Collections.singleton(ExecutionTask.TaskType.INTRA_BROKER_REPLICA_ACTION)));
                if (Executor.LOG.isDebugEnabled()) {
                    Executor.LOG.debug("Tasks in execution: {}", Executor.this._executionTaskManager.inExecutionTasks());
                }
                ArrayList arrayList2 = new ArrayList();
                ArrayList arrayList3 = new ArrayList();
                boolean z = Executor.this._stopRequested.get();
                if (z) {
                    Executor.LOG.info("User initiated a cancellation of all ongoing tasks.");
                }
                for (ExecutionTask executionTask : Executor.this._executionTaskManager.inExecutionTasks()) {
                    TopicPartition topicPartition = executionTask.proposal().topicPartition();
                    if (cluster.partition(topicPartition) == null) {
                        Executor.LOG.debug("Task {} is marked as finished because the topic has been deleted", executionTask);
                        arrayList.add(executionTask);
                        Executor.this._executionTaskManager.markTaskAborting(executionTask);
                        Executor.this._executionTaskManager.markTaskDone(executionTask);
                    } else if (isTaskDone(cluster, logdirInfoForExecutionTask, topicPartition, executionTask)) {
                        arrayList.add(executionTask);
                        Executor.this._executionTaskManager.markTaskDone(executionTask);
                    } else if (maybeMarkTaskAsDeadOrAborting(cluster, logdirInfoForExecutionTask, executionTask, z)) {
                        if (executionTask.type() != ExecutionTask.TaskType.LEADER_ACTION) {
                            arrayList2.add(executionTask);
                        }
                        if (z && executionTask.type() == ExecutionTask.TaskType.INTER_BROKER_REPLICA_ACTION) {
                            arrayList3.add(topicPartition);
                        }
                        if (executionTask.state() == ExecutionTask.State.DEAD || executionTask.state() == ExecutionTask.State.ABORTED) {
                            arrayList.add(executionTask);
                        }
                    }
                }
                int size = arrayList3.size();
                if (z && size > 0) {
                    Executor.LOG.info("Cancelling {} partition reassignments", Integer.valueOf(size));
                    int cancelInterBrokerReplicaMovements = Executor.this.adminUtils.cancelInterBrokerReplicaMovements(arrayList3);
                    int i = size - cancelInterBrokerReplicaMovements;
                    Executor.LOG.info("Successfully cancelled {}/{} partition reassignments", Integer.valueOf(cancelInterBrokerReplicaMovements), Integer.valueOf(size));
                    Executor.this._numCancelledReassignments.addAndGet(cancelInterBrokerReplicaMovements);
                    if (i > 0) {
                        Executor.LOG.warn("Failed to cancel {}/{} partition reassignments", Integer.valueOf(i), Integer.valueOf(size));
                        Executor.this._numFailedReassignmentCancellations.addAndGet(i);
                    }
                }
                if (!arrayList2.isEmpty() && !Executor.this._stopRequested.get()) {
                    Executor.this.stopExecution();
                }
                updateOngoingExecutionState();
                if (Executor.this._executionTaskManager.inExecutionTasks().isEmpty()) {
                    break;
                }
            } while (arrayList.isEmpty());
            Executor.LOG.info("Completed tasks: {}", arrayList);
            return arrayList;
        }

        private boolean isTaskDone(Cluster cluster, Map<ExecutionTask, DescribeReplicaLogDirsResult.ReplicaLogDirInfo> map, TopicPartition topicPartition, ExecutionTask executionTask) {
            switch (executionTask.type()) {
                case INTER_BROKER_REPLICA_ACTION:
                    return isInterBrokerReplicaActionDone(cluster, topicPartition, executionTask);
                case INTRA_BROKER_REPLICA_ACTION:
                    return isIntraBrokerReplicaActionDone(map, executionTask);
                case LEADER_ACTION:
                    return isLeadershipMovementDone(cluster, topicPartition, executionTask);
                default:
                    return true;
            }
        }

        private boolean isInterBrokerReplicaActionDone(Cluster cluster, TopicPartition topicPartition, ExecutionTask executionTask) {
            Node[] replicas = cluster.partition(topicPartition).replicas();
            Node[] observers = cluster.partition(topicPartition).observers();
            switch (executionTask.state()) {
                case IN_PROGRESS:
                    return executionTask.proposal().isInterBrokerMovementCompleted(replicas, observers);
                case ABORTING:
                    return executionTask.proposal().isInterBrokerMovementAborted(replicas, observers);
                case DEAD:
                    return true;
                default:
                    throw new IllegalStateException("Should never be here. State " + executionTask.state());
            }
        }

        private boolean isIntraBrokerReplicaActionDone(Map<ExecutionTask, DescribeReplicaLogDirsResult.ReplicaLogDirInfo> map, ExecutionTask executionTask) {
            if (map.containsKey(executionTask)) {
                return map.get(executionTask).getCurrentReplicaLogDir().equals(executionTask.proposal().replicasToMoveBetweenDisksByBroker().get(Integer.valueOf(executionTask.brokerId())).logdir());
            }
            return false;
        }

        private boolean isInIsr(Integer num, Cluster cluster, TopicPartition topicPartition) {
            return Arrays.stream(cluster.partition(topicPartition).inSyncReplicas()).anyMatch(node -> {
                return node.id() == num.intValue();
            });
        }

        private boolean isLeadershipMovementDone(Cluster cluster, TopicPartition topicPartition, ExecutionTask executionTask) {
            Node leaderFor = cluster.leaderFor(topicPartition);
            switch (executionTask.state()) {
                case IN_PROGRESS:
                    return (leaderFor != null && leaderFor.id() == executionTask.proposal().newLeader().brokerId().intValue()) || leaderFor == null || !isInIsr(executionTask.proposal().newLeader().brokerId(), cluster, topicPartition);
                case ABORTING:
                case DEAD:
                    return true;
                default:
                    throw new IllegalStateException("Should never be here.");
            }
        }

        private boolean maybeMarkTaskAsDeadOrAborting(Cluster cluster, Map<ExecutionTask, DescribeReplicaLogDirsResult.ReplicaLogDirInfo> map, ExecutionTask executionTask, boolean z) {
            if (z && executionTask.state() == ExecutionTask.State.IN_PROGRESS) {
                Executor.LOG.info("Cancelling task {}", executionTask);
                Executor.this._executionTaskManager.markTaskDead(executionTask);
                return true;
            }
            if (executionTask.state() != ExecutionTask.State.IN_PROGRESS && executionTask.state() != ExecutionTask.State.ABORTING) {
                return false;
            }
            switch (executionTask.type()) {
                case INTER_BROKER_REPLICA_ACTION:
                    for (ReplicaPlacementInfo replicaPlacementInfo : executionTask.proposal().newReplicas()) {
                        if (cluster.nodeById(replicaPlacementInfo.brokerId().intValue()) == null) {
                            Executor.this._executionTaskManager.markTaskDead(executionTask);
                            Executor.LOG.warn("Killing execution for task {} because the new replica {} is down.", executionTask, replicaPlacementInfo);
                            return true;
                        }
                    }
                    return false;
                case INTRA_BROKER_REPLICA_ACTION:
                    if (map.containsKey(executionTask)) {
                        return false;
                    }
                    Executor.this._executionTaskManager.markTaskDead(executionTask);
                    Executor.LOG.warn("Killing execution for task {} because the destination disk is down.", executionTask);
                    return true;
                case LEADER_ACTION:
                    if (cluster.nodeById(executionTask.proposal().newLeader().brokerId().intValue()) == null) {
                        Executor.this._executionTaskManager.markTaskDead(executionTask);
                        Executor.LOG.warn("Killing execution for task {} because the target leader is down", executionTask);
                        return true;
                    }
                    if (Executor.this._time.milliseconds() <= executionTask.startTime() + Executor.LEADER_ACTION_TIMEOUT_MS) {
                        return false;
                    }
                    Executor.this._executionTaskManager.markTaskDead(executionTask);
                    Executor.LOG.warn("Failed task {} because it took longer than {} to finish.", executionTask, Long.valueOf(Executor.LEADER_ACTION_TIMEOUT_MS));
                    return true;
                default:
                    throw new IllegalStateException("Unknown task type " + executionTask.type());
            }
        }

        private boolean isSubset(Set<TopicPartition> set, Collection<ExecutionTask> collection) {
            boolean z = true;
            Iterator<ExecutionTask> it = collection.iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                if (!set.contains(it.next().proposal().topicPartition())) {
                    z = false;
                    break;
                }
            }
            return z;
        }

        private void maybeReexecuteTasks() {
            ArrayList arrayList = new ArrayList(Executor.this._executionTaskManager.inExecutionTasks(Collections.singleton(ExecutionTask.TaskType.INTER_BROKER_REPLICA_ACTION)));
            if (!isSubset(Executor.this.partitionsBeingReassigned(), arrayList)) {
                Executor.LOG.info("Reexecuting tasks {}", arrayList);
                Executor.this.executeReplicaReassignmentTasks(arrayList);
            }
            ArrayList arrayList2 = new ArrayList(Executor.this._executionTaskManager.inExecutionTasks(Collections.singleton(ExecutionTask.TaskType.INTRA_BROKER_REPLICA_ACTION)));
            Executor.this.adminUtils.getLogdirInfoForExecutionTask(arrayList2).forEach((executionTask, replicaLogDirInfo) -> {
                String logdir = executionTask.proposal().replicasToMoveBetweenDisksByBroker().get(Integer.valueOf(executionTask.brokerId())).logdir();
                if (logdir.equals(replicaLogDirInfo.getCurrentReplicaLogDir()) || logdir.equals(replicaLogDirInfo.getFutureReplicaLogDir())) {
                    arrayList2.remove(executionTask);
                }
            });
            if (arrayList2.isEmpty()) {
                return;
            }
            Executor.LOG.info("Reexecuting tasks {}", arrayList2);
            Executor.this.adminUtils.executeIntraBrokerReplicaMovements(arrayList2, Executor.this._executionTaskManager);
        }
    }

    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/executor/Executor$ReservationHandle.class */
    public class ReservationHandle implements AutoCloseable {
        public ReservationHandle() {
            if (!Executor.this._reservation.attemptReservation()) {
                throw new IllegalStateException("Cannot reserve the Executor because it is already reserved by another thread!");
            }
        }

        @Override // java.lang.AutoCloseable
        public void close() {
            Executor.this._reservation.cancelReservation();
        }

        public void stopExecution() {
            Executor.this.userTriggeredStopExecution();
        }
    }

    public Executor(KafkaCruiseControlConfig kafkaCruiseControlConfig, Option<ZKClientConfig> option, Time time, DataBalancerMetricsRegistry dataBalancerMetricsRegistry, long j, long j2, AnomalyDetector anomalyDetector) {
        this(kafkaCruiseControlConfig, option, time, dataBalancerMetricsRegistry, null, j, j2, null, anomalyDetector);
    }

    Executor(KafkaCruiseControlConfig kafkaCruiseControlConfig, Option<ZKClientConfig> option, Time time, DataBalancerMetricsRegistry dataBalancerMetricsRegistry, MetadataClient metadataClient, long j, long j2, ExecutorNotifier executorNotifier, AnomalyDetector anomalyDetector, ConfluentAdmin confluentAdmin, ReplicationThrottleHelper replicationThrottleHelper) {
        this(kafkaCruiseControlConfig, option, time, dataBalancerMetricsRegistry, metadataClient, j, j2, executorNotifier, anomalyDetector, confluentAdmin, replicationThrottleHelper, Executors.newSingleThreadExecutor(new KafkaCruiseControlThreadFactory("ProposalExecutor", false, LOG)));
    }

    Executor(KafkaCruiseControlConfig kafkaCruiseControlConfig, Option<ZKClientConfig> option, Time time, DataBalancerMetricsRegistry dataBalancerMetricsRegistry, MetadataClient metadataClient, long j, long j2, ExecutorNotifier executorNotifier, AnomalyDetector anomalyDetector, ConfluentAdmin confluentAdmin, ReplicationThrottleHelper replicationThrottleHelper, ExecutorService executorService) {
        this._reservation = new ExecutorReservation();
        this._numCancelledReassignments = new AtomicInteger(0);
        this._numFailedReassignmentCancellations = new AtomicInteger(0);
        this._numExecutionStopped = new AtomicInteger(0);
        this._numExecutionStoppedByUser = new AtomicInteger(0);
        this._executionStoppedByUser = new AtomicBoolean(false);
        registerGaugeSensors(dataBalancerMetricsRegistry);
        this._time = time;
        this._kafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient(kafkaCruiseControlConfig, ZK_EXECUTOR_METRIC_GROUP, ZK_EXECUTOR_METRIC_TYPE, option);
        this._adminClient = confluentAdmin;
        this.adminUtils = new SbkAdminUtils(this._adminClient, kafkaCruiseControlConfig);
        this._executionTaskManager = new ExecutionTaskManager(kafkaCruiseControlConfig.getInt(KafkaCruiseControlConfig.NUM_CONCURRENT_PARTITION_MOVEMENTS_PER_BROKER_CONFIG).intValue(), kafkaCruiseControlConfig.getInt(KafkaCruiseControlConfig.NUM_CONCURRENT_INTRA_BROKER_PARTITION_MOVEMENTS_CONFIG).intValue(), kafkaCruiseControlConfig.getInt(KafkaCruiseControlConfig.NUM_CONCURRENT_LEADER_MOVEMENTS_CONFIG).intValue(), kafkaCruiseControlConfig.getList(KafkaCruiseControlConfig.DEFAULT_REPLICA_MOVEMENT_STRATEGIES_CONFIG), this._adminClient, dataBalancerMetricsRegistry, time, kafkaCruiseControlConfig);
        this._metadataClient = metadataClient != null ? metadataClient : new MetadataClient(kafkaCruiseControlConfig, -1L, time);
        this._statusCheckingIntervalMs = kafkaCruiseControlConfig.getLong(KafkaCruiseControlConfig.EXECUTION_PROGRESS_CHECK_INTERVAL_MS_CONFIG).longValue();
        this._proposalExecutor = executorService;
        this._latestDemoteStartTimeMsByBrokerId = new ConcurrentHashMap();
        this._latestRemoveStartTimeMsByBrokerId = new ConcurrentHashMap();
        this._executorState = ExecutorState.noTaskInProgress(recentlyDemotedBrokers(), recentlyRemovedBrokers());
        this._stopRequested = new AtomicBoolean(false);
        this._hasOngoingExecution = false;
        this._uuid = null;
        this._executorNotifier = executorNotifier != null ? executorNotifier : (ExecutorNotifier) kafkaCruiseControlConfig.getConfiguredInstance(KafkaCruiseControlConfig.EXECUTOR_NOTIFIER_CLASS_CONFIG, ExecutorNotifier.class);
        this._anomalyDetector = anomalyDetector;
        this._demotionHistoryRetentionTimeMs = j;
        this._removalHistoryRetentionTimeMs = j2;
        this._executionHistoryScannerExecutor = Executors.newSingleThreadScheduledExecutor(new KafkaCruiseControlThreadFactory("ExecutionHistoryScanner", true, null));
        this._executionHistoryScannerExecutor.scheduleAtFixedRate(new ExecutionHistoryScanner(), EXECUTION_HISTORY_SCANNER_INITIAL_DELAY_SECONDS, EXECUTION_HISTORY_SCANNER_PERIOD_SECONDS, TimeUnit.SECONDS);
        this._throttleHelper = replicationThrottleHelper != null ? replicationThrottleHelper : new ReplicationThrottleHelper(this._kafkaZkClient, this._adminClient, kafkaCruiseControlConfig.getLong(KafkaCruiseControlConfig.REPLICATION_THROTTLE_CONFIG), kafkaCruiseControlConfig.getBoolean(KafkaCruiseControlConfig.OVERRIDE_STATIC_THROTTLES_CONFIG).booleanValue());
    }

    Executor(KafkaCruiseControlConfig kafkaCruiseControlConfig, Option<ZKClientConfig> option, Time time, DataBalancerMetricsRegistry dataBalancerMetricsRegistry, MetadataClient metadataClient, long j, long j2, ExecutorNotifier executorNotifier, AnomalyDetector anomalyDetector) {
        this(kafkaCruiseControlConfig, option, time, dataBalancerMetricsRegistry, metadataClient, j, j2, executorNotifier, anomalyDetector, KafkaCruiseControlUtils.createAdmin(kafkaCruiseControlConfig.originals()), null);
    }

    public void startUp() throws ExecutionException, InterruptedException {
        if (((Map) this._adminClient.listPartitionReassignments().reassignments().get()).isEmpty()) {
            removeThrottles();
            return;
        }
        this._hasOngoingExecution = true;
        LOG.info("Detected ongoing reassignment while starting up. Monitoring it to remove throttles once it completes.");
        new Thread(() -> {
            while (!((Map) this._adminClient.listPartitionReassignments().reassignments().get()).isEmpty()) {
                try {
                    LOG.debug("Sleeping {} ms while waiting for ongoing reassignment to complete", Long.valueOf(this._statusCheckingIntervalMs));
                    Thread.sleep(this._statusCheckingIntervalMs);
                } catch (InterruptedException | ExecutionException e) {
                }
            }
            removeThrottles();
            LOG.info("Ongoing reassignment that was detected while starting up has finished.");
            this._hasOngoingExecution = false;
        }).start();
    }

    public ReservationHandle reserveAndAbortOngoingExecutions(Duration duration) throws TimeoutException {
        ReservationHandle reservationHandle = null;
        boolean z = false;
        try {
            reservationHandle = new ReservationHandle();
            abortExecution(duration);
            z = true;
            if (1 == 0 && reservationHandle != null) {
                reservationHandle.close();
            }
            return reservationHandle;
        } catch (Throwable th) {
            if (!z && reservationHandle != null) {
                reservationHandle.close();
            }
            throw th;
        }
    }

    public synchronized void userTriggeredStopExecution() {
        if (stopExecution()) {
            LOG.info("User requested to stop the ongoing proposal execution and cancel the existing reassignments.");
            this._numExecutionStoppedByUser.incrementAndGet();
            this._executionStoppedByUser.set(true);
        }
    }

    public void dropRecentlyRemovedBrokers(Set<Integer> set) {
        this._latestRemoveStartTimeMsByBrokerId.keySet().removeAll(set);
    }

    public synchronized void shutdown() {
        LOG.info("Shutting down executor.");
        if (this._hasOngoingExecution) {
            LOG.warn("Shutdown executor may take long because execution is still in progress.");
        }
        this._proposalExecutor.shutdown();
        try {
            this._proposalExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            LOG.warn("Interrupted while waiting for anomaly detector to shutdown.");
        }
        this._metadataClient.close();
        KafkaCruiseControlUtils.closeKafkaZkClientWithTimeout(this._kafkaZkClient);
        KafkaCruiseControlUtils.closeAdminClientWithTimeout(this._adminClient, EXECUTION_HISTORY_SCANNER_INITIAL_DELAY_SECONDS);
        this._executionHistoryScannerExecutor.shutdownNow();
        LOG.info("Executor shutdown completed.");
    }

    public boolean hasOngoingExecution() {
        return this._hasOngoingExecution;
    }

    public boolean isReservedByOther() {
        return this._reservation.isReserved() && !this._reservation.isReservedByMe();
    }

    public boolean updateThrottle(long j) {
        int updateOrRemoveThrottleRate;
        if (j < EXECUTION_HISTORY_SCANNER_INITIAL_DELAY_SECONDS) {
            throw new IllegalArgumentException("Cannot set a negative throttle");
        }
        synchronized (this._throttleHelper) {
            this._throttleHelper.setThrottleRate(Long.valueOf(j));
            updateOrRemoveThrottleRate = this._throttleHelper.updateOrRemoveThrottleRate(Long.valueOf(j));
        }
        if (updateOrRemoveThrottleRate == 0) {
            LOG.info("No brokers had throttling rate updated");
        } else {
            LOG.info("Updated throttle rate config on {} brokers to {}", Integer.valueOf(updateOrRemoveThrottleRate), Long.valueOf(j));
        }
        return updateOrRemoveThrottleRate != 0;
    }

    public boolean hasOngoingPartitionReassignments() {
        return !partitionsBeingReassigned().isEmpty();
    }

    public Set<Integer> recentlyDemotedBrokers() {
        return Collections.unmodifiableSet(this._latestDemoteStartTimeMsByBrokerId.keySet());
    }

    public Set<Integer> recentlyRemovedBrokers() {
        return Collections.unmodifiableSet(this._latestRemoveStartTimeMsByBrokerId.keySet());
    }

    public ExecutorState state() {
        return this._executorState;
    }

    public synchronized Future<?> executeProposals(Collection<ExecutionProposal> collection, Set<Integer> set, Set<Integer> set2, LoadMonitor loadMonitor, Integer num, Integer num2, Integer num3, ReplicaMovementStrategy replicaMovementStrategy, String str, BalanceOpExecutionCompletionCallback balanceOpExecutionCompletionCallback) {
        LOG.trace("executeProposals with completionCallback {}", balanceOpExecutionCompletionCallback);
        return doExecuteProposals(collection, set, loadMonitor, num, num2, num3, replicaMovementStrategy, str, new ProposalExecutionRunnable(loadMonitor, null, set2, balanceOpExecutionCompletionCallback));
    }

    private synchronized Future<?> doExecuteProposals(Collection<ExecutionProposal> collection, Set<Integer> set, LoadMonitor loadMonitor, Integer num, Integer num2, Integer num3, ReplicaMovementStrategy replicaMovementStrategy, String str, ProposalExecutionRunnable proposalExecutionRunnable) {
        if (this._hasOngoingExecution) {
            throw new IllegalStateException("Cannot execute new proposals while there is an ongoing execution.");
        }
        if (isReservedByOther()) {
            throw new IllegalStateException("Cannot execute new proposals because the Executor is reserved by another thread.");
        }
        if (loadMonitor == null) {
            throw new IllegalArgumentException("Load monitor cannot be null.");
        }
        if (str == null) {
            throw new IllegalStateException("UUID of the execution cannot be null.");
        }
        initProposalExecution(collection, set, num, num2, num3, replicaMovementStrategy, str);
        return startExecution(proposalExecutionRunnable);
    }

    void initProposalExecution(Collection<ExecutionProposal> collection, Collection<Integer> collection2, Integer num, Integer num2, Integer num3, ReplicaMovementStrategy replicaMovementStrategy, String str) {
        this._executionTaskManager.addExecutionProposals(collection, collection2, this._metadataClient.refreshMetadata().cluster(), replicaMovementStrategy);
        this._executionTaskManager.setRequestedInterBrokerPartitionMovementConcurrency(num);
        this._executionTaskManager.setRequestedIntraBrokerPartitionMovementConcurrency(num2);
        this._executionTaskManager.setRequestedLeadershipMovementConcurrency(num3);
        this._uuid = str;
    }

    Future<?> startExecution(ProposalExecutionRunnable proposalExecutionRunnable) {
        this._executionStoppedByUser.set(false);
        sanityCheckOngoingReplicaMovement();
        this._hasOngoingExecution = true;
        this._anomalyDetector.maybeClearOngoingAnomalyDetectionTimeMs();
        this._stopRequested.set(false);
        this._executionStoppedByUser.set(false);
        return this._proposalExecutor.submit(proposalExecutionRunnable);
    }

    private void abortExecution(Duration duration) throws TimeoutException {
        userTriggeredStopExecution();
        long milliseconds = this._time.milliseconds() + duration.toMillis();
        if (hasOngoingExecution()) {
            LOG.info("Aborted executions, waiting for them to stop for {}", duration);
        }
        while (hasOngoingExecution()) {
            if (milliseconds <= this._time.milliseconds()) {
                throw new TimeoutException(String.format("Timed out awaiting for execution to finish after %s", duration));
            }
            this._time.sleep(100L);
        }
    }

    synchronized boolean stopExecution() {
        if (!this._stopRequested.compareAndSet(false, true)) {
            return false;
        }
        this._numExecutionStopped.incrementAndGet();
        this._executionTaskManager.setStopRequested();
        return true;
    }

    int numCancelledReassignments() {
        return this._numCancelledReassignments.get();
    }

    private void sanityCheckOngoingReplicaMovement() {
        if (hasOngoingPartitionReassignments()) {
            this._executionTaskManager.clear();
            this._uuid = null;
            throw new IllegalStateException("There are ongoing inter-broker partition movements.");
        }
        if (this.adminUtils.isOngoingIntraBrokerReplicaMovement((Collection) this._metadataClient.cluster().nodes().stream().mapToInt((v0) -> {
            return v0.id();
        }).boxed().collect(Collectors.toSet()))) {
            this._executionTaskManager.clear();
            this._uuid = null;
            throw new IllegalStateException("There are ongoing intra-broker partition movements.");
        }
    }

    private void registerGaugeSensors(DataBalancerMetricsRegistry dataBalancerMetricsRegistry) {
        dataBalancerMetricsRegistry.newGauge(Executor.class, "execution-stopped", this::numExecutionStopped);
        dataBalancerMetricsRegistry.newGauge(Executor.class, GAUGE_CANCELLED_REASSIGNMENTS, this::numCancelledReassignments);
        dataBalancerMetricsRegistry.newGauge(Executor.class, GAUGE_FAILED_REASSIGNMENT_CANCELLATIONS, this::numFailedReassignmentCancellations);
    }

    private void removeThrottles() {
        synchronized (this._throttleHelper) {
            this._throttleHelper.removeAllThrottles();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void removeExpiredDemotionHistory() {
        LOG.debug("Remove expired demotion history");
        this._latestDemoteStartTimeMsByBrokerId.entrySet().removeIf(entry -> {
            return ((Long) entry.getValue()).longValue() + this._demotionHistoryRetentionTimeMs < this._time.milliseconds();
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void removeExpiredRemovalHistory() {
        LOG.debug("Remove expired broker removal history");
        this._latestRemoveStartTimeMsByBrokerId.entrySet().removeIf(entry -> {
            return ((Long) entry.getValue()).longValue() + this._removalHistoryRetentionTimeMs < this._time.milliseconds();
        });
    }

    private int numExecutionStopped() {
        return this._numExecutionStopped.get();
    }

    private int numFailedReassignmentCancellations() {
        return this._numFailedReassignmentCancellations.get();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void executeReplicaReassignmentTasks(List<ExecutionTask> list) {
        if (list == null || list.isEmpty()) {
            return;
        }
        Map<TopicPartition, PartitionReplicas> fetchTargetReplicasBeingReassigned = fetchTargetReplicasBeingReassigned(Optional.of((Set) list.stream().map(executionTask -> {
            return executionTask.proposal().topicPartition();
        }).collect(Collectors.toSet())));
        HashMap hashMap = new HashMap();
        list.forEach(executionTask2 -> {
            TopicPartition topicPartition = executionTask2.proposal().topicPartition();
            PartitionReplicas replicasToWrite = replicasToWrite(executionTask2, Optional.ofNullable(fetchTargetReplicasBeingReassigned.get(topicPartition)));
            if (replicasToWrite.replicas().isEmpty()) {
                return;
            }
            hashMap.put(topicPartition, Optional.of(NewPartitionReassignment.ofReplicasAndObservers(replicasToWrite.replicas(), replicasToWrite.observers())));
        });
        if (hashMap.isEmpty()) {
            return;
        }
        try {
            this._adminClient.alterPartitionReassignments(hashMap).all().get();
        } catch (Throwable th) {
            sneakyThrow(th);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Set<TopicPartition> partitionsBeingReassigned() {
        return fetchTargetReplicasBeingReassigned(Optional.empty()).keySet();
    }

    private Map<TopicPartition, PartitionReplicas> fetchTargetReplicasBeingReassigned(Optional<Set<TopicPartition>> optional) {
        try {
            return (Map) ((Map) (optional.isPresent() ? this._adminClient.listPartitionReassignments(optional.get()) : this._adminClient.listPartitionReassignments()).reassignments().get()).entrySet().stream().collect(Collectors.toMap(entry -> {
                return (TopicPartition) entry.getKey();
            }, entry2 -> {
                PartitionReassignment partitionReassignment = (PartitionReassignment) entry2.getValue();
                ArrayList arrayList = new ArrayList(partitionReassignment.replicas());
                arrayList.removeAll(partitionReassignment.removingReplicas());
                ArrayList arrayList2 = new ArrayList(partitionReassignment.observers());
                arrayList2.removeAll(partitionReassignment.removingReplicas());
                return new PartitionReplicas(arrayList, arrayList2);
            }));
        } catch (Throwable th) {
            LOG.error("Fetching reassigning replicas through the listPartitionReassignments API failed with an exception", th);
            sneakyThrow(th);
            return null;
        }
    }

    private PartitionReplicas replicasToWrite(ExecutionTask executionTask, Optional<PartitionReplicas> optional) {
        TopicPartition topicPartition = executionTask.proposal().topicPartition();
        List list = (List) executionTask.proposal().oldReplicas().stream().map((v0) -> {
            return v0.brokerId();
        }).collect(Collectors.toList());
        List list2 = (List) executionTask.proposal().newReplicas().stream().map((v0) -> {
            return v0.brokerId();
        }).collect(Collectors.toList());
        List list3 = (List) executionTask.proposal().oldObservers().stream().map((v0) -> {
            return v0.brokerId();
        }).collect(Collectors.toList());
        List list4 = (List) executionTask.proposal().newObservers().stream().map((v0) -> {
            return v0.brokerId();
        }).collect(Collectors.toList());
        if (!optional.isPresent()) {
            if (executionTask.state() == ExecutionTask.State.ABORTED || executionTask.state() == ExecutionTask.State.DEAD || executionTask.state() == ExecutionTask.State.ABORTING || executionTask.state() == ExecutionTask.State.COMPLETED) {
                LOG.warn("No need to abort tasks {} because the partition is not in reassignment", executionTask);
                return new PartitionReplicas(Collections.emptyList(), Collections.emptyList());
            }
            if (!this.adminUtils.getReplicasForPartition(topicPartition).isEmpty()) {
                return new PartitionReplicas(list2, list4);
            }
            LOG.warn("Could not fetch the replicas for partition {}. It is possible the topic or partition doesn't exist.", topicPartition);
            return new PartitionReplicas(Collections.emptyList(), Collections.emptyList());
        }
        List<Integer> replicas = optional.get().replicas();
        List<Integer> observers = optional.get().observers();
        if (executionTask.state() == ExecutionTask.State.ABORTING) {
            return new PartitionReplicas(list, list3);
        }
        if (executionTask.state() == ExecutionTask.State.DEAD || executionTask.state() == ExecutionTask.State.ABORTED || executionTask.state() == ExecutionTask.State.COMPLETED) {
            return new PartitionReplicas(Collections.emptyList(), Collections.emptyList());
        }
        if (executionTask.state() != ExecutionTask.State.IN_PROGRESS) {
            throw new IllegalStateException("Should never be here, the state " + executionTask.state());
        }
        if (list2.equals(replicas) || list4.equals(observers)) {
            return new PartitionReplicas(Collections.emptyList(), Collections.emptyList());
        }
        throw new RuntimeException("The provided new replica list " + list2 + " is different from the in progress replica list " + replicas + " for " + topicPartition);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void executePreferredLeaderElection(List<ExecutionTask> list) throws ExecutionException, InterruptedException {
        Optional findFirst = ((Map) this._adminClient.electLeaders(ElectionType.PREFERRED, (Set) list.stream().map(executionTask -> {
            return new TopicPartition(executionTask.proposal().topic(), executionTask.proposal().partitionId());
        }).collect(Collectors.toSet())).partitions().get()).entrySet().stream().filter(entry -> {
            return ((Optional) entry.getValue()).isPresent() && !(((Optional) entry.getValue()).get() instanceof ElectionNotNeededException);
        }).map(entry2 -> {
            return (Throwable) ((Optional) entry2.getValue()).get();
        }).findFirst();
        if (findFirst.isPresent()) {
            throw new ExecutionException((Throwable) findFirst.get());
        }
    }

    private static <E extends Throwable> void sneakyThrow(Throwable th) throws Throwable {
        throw th;
    }
}
