package com.linkedin.kafka.cruisecontrol.executor;

import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.common.KafkaCruiseControlThreadFactory;
import com.linkedin.kafka.cruisecontrol.common.MetadataClient;
import com.linkedin.kafka.cruisecontrol.common.SbkAdminUtils;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.detector.AnomalyDetector;
import com.linkedin.kafka.cruisecontrol.detector.notifier.AnomalyType;
import com.linkedin.kafka.cruisecontrol.executor.ExecutionTask;
import com.linkedin.kafka.cruisecontrol.executor.ExecutionTaskTracker;
import com.linkedin.kafka.cruisecontrol.executor.ExecutorState;
import com.linkedin.kafka.cruisecontrol.model.ReplicaPlacementInfo;
import com.linkedin.kafka.cruisecontrol.monitor.LoadMonitor;
import io.confluent.databalancer.metrics.DataBalancerMetricsRegistry;
import io.confluent.databalancer.operation.BalanceOpExecutionCompletionCallback;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.BalancerBrokerExcludedForReplicaPlacementException;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/executor/Executor.class */
public class Executor {
    private static final Logger LOG = LoggerFactory.getLogger(Executor.class);
    private static final long EXECUTION_HISTORY_SCANNER_PERIOD_SECONDS = 5;
    private static final long EXECUTION_HISTORY_SCANNER_INITIAL_DELAY_SECONDS = 0;
    private ExecutionTaskManager executionTaskManager;
    private MetadataClient metadataClient;
    private final long statusCheckingIntervalMs;
    private final long leaderActionTimeoutMs;
    private final long invalidReplicaAssignmentRetryMs;
    private ExecutorService proposalExecutor;
    private ConfluentAdmin adminClient;
    private SbkAdminUtils adminUtils;
    private final AtomicBoolean stopRequested;
    private final Time time;
    private volatile boolean hasOngoingExecution;
    final ExecutorReservation reservation;
    private volatile ExecutorState executorState;
    private volatile String uuid;
    private volatile MetadataClient.ClusterAndGeneration clusterMetadataAtProposalInitialization;
    private ExecutorNotifier executorNotifier;
    private AtomicInteger numExecutionsStarted;
    private AtomicInteger numExecutionStopped;
    private AtomicInteger numExecutionStoppedByUser;
    private AtomicBoolean executionStoppedByUser;
    private AtomicInteger numCancelledReassignments;
    private AtomicInteger numFailedReassignmentCancellations;
    private static final String GAUGE_EXECUTION_STARTED = "execution-started";
    private static final String GAUGE_EXECUTION_STOPPED = "execution-stopped";
    private static final String GAUGE_CANCELLED_REASSIGNMENTS = "cancelled-reassignments";
    private static final String GAUGE_FAILED_REASSIGNMENT_CANCELLATIONS = "failed-reassignment-cancellations";
    private final long demotionHistoryRetentionTimeMs;
    private final long removalHistoryRetentionTimeMs;
    private final ConcurrentMap<Integer, Long> latestDemoteStartTimeMsByBrokerId;
    private final ConcurrentMap<Integer, Long> latestRemoveStartTimeMsByBrokerId;
    private ScheduledExecutorService executionHistoryScannerExecutor;
    private final AnomalyDetector anomalyDetector;
    private ReplicationThrottleHelper throttleHelper;
    private final KafkaCruiseControlConfig config;
    private final DataBalancerMetricsRegistry metricRegistry;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.linkedin.kafka.cruisecontrol.executor.Executor$1, reason: invalid class name */
    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/executor/Executor$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$linkedin$kafka$cruisecontrol$executor$ExecutorState$State;
        static final /* synthetic */ int[] $SwitchMap$com$linkedin$kafka$cruisecontrol$executor$ExecutionTask$TaskType;
        static final /* synthetic */ int[] $SwitchMap$com$linkedin$kafka$cruisecontrol$executor$ExecutionTask$State = new int[ExecutionTask.State.values().length];

        static {
            try {
                $SwitchMap$com$linkedin$kafka$cruisecontrol$executor$ExecutionTask$State[ExecutionTask.State.IN_PROGRESS.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$linkedin$kafka$cruisecontrol$executor$ExecutionTask$State[ExecutionTask.State.ABORTING.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$linkedin$kafka$cruisecontrol$executor$ExecutionTask$State[ExecutionTask.State.DEAD.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            $SwitchMap$com$linkedin$kafka$cruisecontrol$executor$ExecutionTask$TaskType = new int[ExecutionTask.TaskType.values().length];
            try {
                $SwitchMap$com$linkedin$kafka$cruisecontrol$executor$ExecutionTask$TaskType[ExecutionTask.TaskType.INTER_BROKER_REPLICA_ACTION.ordinal()] = 1;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$com$linkedin$kafka$cruisecontrol$executor$ExecutionTask$TaskType[ExecutionTask.TaskType.INTRA_BROKER_REPLICA_ACTION.ordinal()] = 2;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$com$linkedin$kafka$cruisecontrol$executor$ExecutionTask$TaskType[ExecutionTask.TaskType.LEADER_ACTION.ordinal()] = 3;
            } catch (NoSuchFieldError e6) {
            }
            $SwitchMap$com$linkedin$kafka$cruisecontrol$executor$ExecutorState$State = new int[ExecutorState.State.values().length];
            try {
                $SwitchMap$com$linkedin$kafka$cruisecontrol$executor$ExecutorState$State[ExecutorState.State.LEADER_MOVEMENT_TASK_IN_PROGRESS.ordinal()] = 1;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$com$linkedin$kafka$cruisecontrol$executor$ExecutorState$State[ExecutorState.State.INTER_BROKER_REPLICA_MOVEMENT_TASK_IN_PROGRESS.ordinal()] = 2;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$com$linkedin$kafka$cruisecontrol$executor$ExecutorState$State[ExecutorState.State.INTRA_BROKER_REPLICA_MOVEMENT_TASK_IN_PROGRESS.ordinal()] = 3;
            } catch (NoSuchFieldError e9) {
            }
            try {
                $SwitchMap$com$linkedin$kafka$cruisecontrol$executor$ExecutorState$State[ExecutorState.State.STOPPING_EXECUTION.ordinal()] = 4;
            } catch (NoSuchFieldError e10) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/executor/Executor$ExecutionHistoryScanner.class */
    public class ExecutionHistoryScanner implements Runnable {
        private ExecutionHistoryScanner() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                Executor.this.removeExpiredDemotionHistory();
                Executor.this.removeExpiredRemovalHistory();
            } catch (Throwable th) {
                Executor.LOG.warn("Received exception when trying to expire execution history.", th);
            }
        }

        /* synthetic */ ExecutionHistoryScanner(Executor executor, AnonymousClass1 anonymousClass1) {
            this();
        }
    }

    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/executor/Executor$ExecutionTaskWaiter.class */
    public interface ExecutionTaskWaiter {
        List<ExecutionTask> waitForAnyTaskToFinish(AbstractExecutorReplicaMovement abstractExecutorReplicaMovement);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/executor/Executor$ExecutorReservation.class */
    public static final class ExecutorReservation {
        private ReentrantLock lock = new ReentrantLock();

        boolean attemptReservation() {
            if (this.lock.isHeldByCurrentThread()) {
                throw new IllegalStateException("Cannot reserve the Executor again while already holding a reservation.");
            }
            return this.lock.tryLock();
        }

        void cancelReservation() {
            this.lock.unlock();
        }

        boolean isReservedByMe() {
            return this.lock.isHeldByCurrentThread();
        }

        boolean isReserved() {
            return this.lock.isLocked();
        }
    }

    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/executor/Executor$PartitionReplicas.class */
    public static class PartitionReplicas {
        private final List<Integer> replicas;
        private final List<Integer> observers;

        public PartitionReplicas(List<Integer> list, List<Integer> list2) {
            this.replicas = list;
            this.observers = list2;
        }

        public List<Integer> replicas() {
            return Collections.unmodifiableList(this.replicas);
        }

        public List<Integer> observers() {
            return Collections.unmodifiableList(this.observers);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/executor/Executor$ProposalExecutionRunnable.class */
    public class ProposalExecutionRunnable implements Runnable {
        private final LoadMonitor loadMonitor;
        private final BalanceOpExecutionCompletionCallback completionCallback;
        private final Set<Integer> recentlyDemotedBrokers;
        private final Set<Integer> recentlyRemovedBrokers;
        private final Set<Integer> removedBrokers;
        private final long executionStartMs;
        private ExecutorState.State state = ExecutorState.State.NO_TASK_IN_PROGRESS;
        protected Throwable executionException = null;

        /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/executor/Executor$ProposalExecutionRunnable$ProposalExecutionAutoCloseable.class */
        private class ProposalExecutionAutoCloseable implements AutoCloseable {
            public ProposalExecutionAutoCloseable(String str, Set<Integer> set, Set<Integer> set2) {
                ProposalExecutionRunnable.this.state = ExecutorState.State.STARTING_EXECUTION;
                Executor.this.executorState = ExecutorState.executionStarted(str, set, set2);
                Executor.LOG.info("Task [{}] execution starts.", str);
            }

            @Override // java.lang.AutoCloseable
            public void close() {
                ProposalExecutionRunnable.this.state = ExecutorState.State.NO_TASK_IN_PROGRESS;
                Executor.this.executorState = ExecutorState.noTaskInProgress(ProposalExecutionRunnable.this.recentlyDemotedBrokers, ProposalExecutionRunnable.this.recentlyRemovedBrokers);
                Executor.this.clearExecutionState();
            }
        }

        ProposalExecutionRunnable(LoadMonitor loadMonitor, Collection<Integer> collection, Collection<Integer> collection2, BalanceOpExecutionCompletionCallback balanceOpExecutionCompletionCallback) {
            this.loadMonitor = loadMonitor;
            this.executionStartMs = Executor.this.time.milliseconds();
            this.completionCallback = balanceOpExecutionCompletionCallback;
            if (Executor.this.anomalyDetector == null) {
                throw new IllegalStateException("AnomalyDetector is not specified in Executor.");
            }
            if (collection != null) {
                collection.forEach(num -> {
                });
            }
            if (collection2 != null) {
                collection2.forEach(num2 -> {
                });
                this.removedBrokers = new HashSet(collection2);
            } else {
                this.removedBrokers = Collections.emptySet();
            }
            this.recentlyDemotedBrokers = Executor.this.recentlyDemotedBrokers();
            this.recentlyRemovedBrokers = Executor.this.recentlyRemovedBrokers();
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                ProposalExecutionAutoCloseable proposalExecutionAutoCloseable = new ProposalExecutionAutoCloseable(Executor.this.uuid, this.recentlyDemotedBrokers, this.recentlyRemovedBrokers);
                Throwable th = null;
                try {
                    Executor.LOG.info("Starting execution of balancing proposals.");
                    execute();
                    Executor.LOG.info("Execution finished.");
                    if (proposalExecutionAutoCloseable != null) {
                        if (0 != 0) {
                            try {
                                proposalExecutionAutoCloseable.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            proposalExecutionAutoCloseable.close();
                        }
                    }
                } finally {
                }
            } catch (Exception e) {
                Executor.LOG.error("Exception during execution of ProposalExecutionRunnable", e);
            }
        }

        protected void execute() {
            boolean anyMatch = AnomalyType.cachedValues().stream().anyMatch(anomalyType -> {
                return Executor.this.uuid.startsWith(anomalyType.toString());
            });
            while (true) {
                try {
                    try {
                        try {
                            this.loadMonitor.pauseMetricSampling(String.format("Paused-By-Cruise-Control-Before-Starting-Execution (Date: %s)", KafkaCruiseControlUtils.currentUtcDate()));
                            break;
                        } catch (Throwable th) {
                            Executor.LOG.error("Executor got exception during execution", th);
                            this.executionException = th;
                            String format = String.format("for operation %s", Executor.this.uuid);
                            Executor.LOG.info("Cleaning up execution {}", format);
                            tryRun(() -> {
                                this.loadMonitor.forceRefreshClusterAndGeneration();
                            }, "Unable to refresh cluster metadata after execution of " + format);
                            if (anyMatch) {
                                tryRun(() -> {
                                    Executor.this.anomalyDetector.markSelfHealingFinished(Executor.this.uuid);
                                }, "Caught exception while marking self healing finished " + format);
                            }
                            tryRun(() -> {
                                this.loadMonitor.resumeMetricSampling(String.format("Resumed-By-Cruise-Control-After-Completed-Execution (Date: %s)", KafkaCruiseControlUtils.currentUtcDate()));
                            }, "Caught exception while resuming metric sampling " + format);
                            boolean z = Executor.this.executorState.state() != ExecutorState.State.STOPPING_EXECUTION && this.executionException == null;
                            Executor.LOG.info("ProposalExecutionRunnable finishes with success state {} {} and exception: ", new Object[]{Boolean.valueOf(z), format, this.executionException});
                            ExecutorNotification executorNotification = new ExecutorNotification(this.executionStartMs, Executor.this.time.milliseconds(), Executor.this.uuid, Executor.this.stopRequested.get(), Executor.this.executionStoppedByUser.get(), this.executionException, z);
                            tryRun(() -> {
                                Executor.this.executorNotifier.sendNotification(executorNotification);
                            }, String.format("Caught exception while sending a notification (notification: %s) ", executorNotification) + format);
                            Executor.LOG.info("Task [{}] execution finishes.", Executor.this.uuid);
                            if (this.completionCallback != null) {
                                Executor.LOG.info("executionRunnable invoking completion callback");
                                tryRun(() -> {
                                    this.completionCallback.accept(z, this.executionException);
                                }, String.format("Caught exception while invoking completion callback (succeeded: %s, exception: %s) ", Boolean.valueOf(z), this.executionException) + format);
                                return;
                            }
                            return;
                        }
                    } catch (Throwable th2) {
                        String format2 = String.format("for operation %s", Executor.this.uuid);
                        Executor.LOG.info("Cleaning up execution {}", format2);
                        tryRun(() -> {
                            this.loadMonitor.forceRefreshClusterAndGeneration();
                        }, "Unable to refresh cluster metadata after execution of " + format2);
                        if (anyMatch) {
                            tryRun(() -> {
                                Executor.this.anomalyDetector.markSelfHealingFinished(Executor.this.uuid);
                            }, "Caught exception while marking self healing finished " + format2);
                        }
                        tryRun(() -> {
                            this.loadMonitor.resumeMetricSampling(String.format("Resumed-By-Cruise-Control-After-Completed-Execution (Date: %s)", KafkaCruiseControlUtils.currentUtcDate()));
                        }, "Caught exception while resuming metric sampling " + format2);
                        boolean z2 = Executor.this.executorState.state() != ExecutorState.State.STOPPING_EXECUTION && this.executionException == null;
                        Executor.LOG.info("ProposalExecutionRunnable finishes with success state {} {} and exception: ", new Object[]{Boolean.valueOf(z2), format2, this.executionException});
                        ExecutorNotification executorNotification2 = new ExecutorNotification(this.executionStartMs, Executor.this.time.milliseconds(), Executor.this.uuid, Executor.this.stopRequested.get(), Executor.this.executionStoppedByUser.get(), this.executionException, z2);
                        tryRun(() -> {
                            Executor.this.executorNotifier.sendNotification(executorNotification2);
                        }, String.format("Caught exception while sending a notification (notification: %s) ", executorNotification2) + format2);
                        Executor.LOG.info("Task [{}] execution finishes.", Executor.this.uuid);
                        if (this.completionCallback != null) {
                            Executor.LOG.info("executionRunnable invoking completion callback");
                            tryRun(() -> {
                                this.completionCallback.accept(z2, this.executionException);
                            }, String.format("Caught exception while invoking completion callback (succeeded: %s, exception: %s) ", Boolean.valueOf(z2), this.executionException) + format2);
                        }
                        throw th2;
                    }
                } catch (IllegalStateException e) {
                    Thread.sleep(Executor.this.statusCheckingIntervalMs);
                    Executor.LOG.debug("Waiting for the load monitor to be ready to initialize the execution.", e);
                }
            }
            if (interBrokerMove()) {
                Executor.LOG.info("Stopping execution {} after inter-broker movements - stopRequested: {}", Executor.this.uuid, Boolean.valueOf(Executor.this.stopRequested.get()));
                String format3 = String.format("for operation %s", Executor.this.uuid);
                Executor.LOG.info("Cleaning up execution {}", format3);
                tryRun(() -> {
                    this.loadMonitor.forceRefreshClusterAndGeneration();
                }, "Unable to refresh cluster metadata after execution of " + format3);
                if (anyMatch) {
                    tryRun(() -> {
                        Executor.this.anomalyDetector.markSelfHealingFinished(Executor.this.uuid);
                    }, "Caught exception while marking self healing finished " + format3);
                }
                tryRun(() -> {
                    this.loadMonitor.resumeMetricSampling(String.format("Resumed-By-Cruise-Control-After-Completed-Execution (Date: %s)", KafkaCruiseControlUtils.currentUtcDate()));
                }, "Caught exception while resuming metric sampling " + format3);
                boolean z3 = Executor.this.executorState.state() != ExecutorState.State.STOPPING_EXECUTION && this.executionException == null;
                Executor.LOG.info("ProposalExecutionRunnable finishes with success state {} {} and exception: ", new Object[]{Boolean.valueOf(z3), format3, this.executionException});
                ExecutorNotification executorNotification3 = new ExecutorNotification(this.executionStartMs, Executor.this.time.milliseconds(), Executor.this.uuid, Executor.this.stopRequested.get(), Executor.this.executionStoppedByUser.get(), this.executionException, z3);
                tryRun(() -> {
                    Executor.this.executorNotifier.sendNotification(executorNotification3);
                }, String.format("Caught exception while sending a notification (notification: %s) ", executorNotification3) + format3);
                Executor.LOG.info("Task [{}] execution finishes.", Executor.this.uuid);
                if (this.completionCallback != null) {
                    Executor.LOG.info("executionRunnable invoking completion callback");
                    tryRun(() -> {
                        this.completionCallback.accept(z3, this.executionException);
                    }, String.format("Caught exception while invoking completion callback (succeeded: %s, exception: %s) ", Boolean.valueOf(z3), this.executionException) + format3);
                    return;
                }
                return;
            }
            ExecutorIntraBrokerReplicaMovement executorIntraBrokerReplicaMovement = new ExecutorIntraBrokerReplicaMovement(Executor.this.uuid, Executor.this.executionTaskManager, this.recentlyDemotedBrokers, this.recentlyRemovedBrokers, Executor.this.throttleHelper, Executor.this.adminClient, Executor.this.adminUtils, Executor.this.stopRequested);
            this.state = executorIntraBrokerReplicaMovement.state();
            Executor.this.executorState = executorIntraBrokerReplicaMovement.executorState();
            executorIntraBrokerReplicaMovement.move(this::waitForAnyExecutionTaskToFinish);
            boolean checkStopRequested = checkStopRequested();
            updateOngoingExecutionState();
            if (checkStopRequested) {
                Executor.LOG.info("Stopping execution {} after intra-broker movements - stopRequested: {}", Executor.this.uuid, Boolean.valueOf(Executor.this.stopRequested.get()));
                String format4 = String.format("for operation %s", Executor.this.uuid);
                Executor.LOG.info("Cleaning up execution {}", format4);
                tryRun(() -> {
                    this.loadMonitor.forceRefreshClusterAndGeneration();
                }, "Unable to refresh cluster metadata after execution of " + format4);
                if (anyMatch) {
                    tryRun(() -> {
                        Executor.this.anomalyDetector.markSelfHealingFinished(Executor.this.uuid);
                    }, "Caught exception while marking self healing finished " + format4);
                }
                tryRun(() -> {
                    this.loadMonitor.resumeMetricSampling(String.format("Resumed-By-Cruise-Control-After-Completed-Execution (Date: %s)", KafkaCruiseControlUtils.currentUtcDate()));
                }, "Caught exception while resuming metric sampling " + format4);
                boolean z4 = Executor.this.executorState.state() != ExecutorState.State.STOPPING_EXECUTION && this.executionException == null;
                Executor.LOG.info("ProposalExecutionRunnable finishes with success state {} {} and exception: ", new Object[]{Boolean.valueOf(z4), format4, this.executionException});
                ExecutorNotification executorNotification4 = new ExecutorNotification(this.executionStartMs, Executor.this.time.milliseconds(), Executor.this.uuid, Executor.this.stopRequested.get(), Executor.this.executionStoppedByUser.get(), this.executionException, z4);
                tryRun(() -> {
                    Executor.this.executorNotifier.sendNotification(executorNotification4);
                }, String.format("Caught exception while sending a notification (notification: %s) ", executorNotification4) + format4);
                Executor.LOG.info("Task [{}] execution finishes.", Executor.this.uuid);
                if (this.completionCallback != null) {
                    Executor.LOG.info("executionRunnable invoking completion callback");
                    tryRun(() -> {
                        this.completionCallback.accept(z4, this.executionException);
                    }, String.format("Caught exception while invoking completion callback (succeeded: %s, exception: %s) ", Boolean.valueOf(z4), this.executionException) + format4);
                    return;
                }
                return;
            }
            ExecutorLeadershipReplicaMovement executorLeadershipReplicaMovement = new ExecutorLeadershipReplicaMovement(Executor.this.uuid, Executor.this.executionTaskManager, this.recentlyDemotedBrokers, this.recentlyRemovedBrokers, Executor.this.throttleHelper, Executor.this.adminClient, Executor.this.adminUtils, Executor.this.stopRequested);
            this.state = executorLeadershipReplicaMovement.state();
            Executor.this.executorState = executorLeadershipReplicaMovement.executorState();
            executorLeadershipReplicaMovement.move(this::waitForAnyExecutionTaskToFinish);
            checkStopRequested();
            updateOngoingExecutionState();
            String format5 = String.format("for operation %s", Executor.this.uuid);
            Executor.LOG.info("Cleaning up execution {}", format5);
            tryRun(() -> {
                this.loadMonitor.forceRefreshClusterAndGeneration();
            }, "Unable to refresh cluster metadata after execution of " + format5);
            if (anyMatch) {
                tryRun(() -> {
                    Executor.this.anomalyDetector.markSelfHealingFinished(Executor.this.uuid);
                }, "Caught exception while marking self healing finished " + format5);
            }
            tryRun(() -> {
                this.loadMonitor.resumeMetricSampling(String.format("Resumed-By-Cruise-Control-After-Completed-Execution (Date: %s)", KafkaCruiseControlUtils.currentUtcDate()));
            }, "Caught exception while resuming metric sampling " + format5);
            boolean z5 = Executor.this.executorState.state() != ExecutorState.State.STOPPING_EXECUTION && this.executionException == null;
            Executor.LOG.info("ProposalExecutionRunnable finishes with success state {} {} and exception: ", new Object[]{Boolean.valueOf(z5), format5, this.executionException});
            ExecutorNotification executorNotification5 = new ExecutorNotification(this.executionStartMs, Executor.this.time.milliseconds(), Executor.this.uuid, Executor.this.stopRequested.get(), Executor.this.executionStoppedByUser.get(), this.executionException, z5);
            tryRun(() -> {
                Executor.this.executorNotifier.sendNotification(executorNotification5);
            }, String.format("Caught exception while sending a notification (notification: %s) ", executorNotification5) + format5);
            Executor.LOG.info("Task [{}] execution finishes.", Executor.this.uuid);
            if (this.completionCallback != null) {
                Executor.LOG.info("executionRunnable invoking completion callback");
                tryRun(() -> {
                    this.completionCallback.accept(z5, this.executionException);
                }, String.format("Caught exception while invoking completion callback (succeeded: %s, exception: %s) ", Boolean.valueOf(z5), this.executionException) + format5);
            }
        }

        private boolean interBrokerMove() throws InterruptedException {
            ExecutorInterBrokerReplicaMovement executorInterBrokerReplicaMovement = new ExecutorInterBrokerReplicaMovement(Executor.this.uuid, Executor.this.executionTaskManager, this.recentlyDemotedBrokers, this.recentlyRemovedBrokers, Executor.this.throttleHelper, Executor.this.adminClient, Executor.this.adminUtils, Executor.this.stopRequested, this.removedBrokers, this.loadMonitor, Executor.this.clusterMetadataAtProposalInitialization.cluster(), Executor.this.time, Executor.this.invalidReplicaAssignmentRetryMs);
            this.state = executorInterBrokerReplicaMovement.state();
            Executor.this.executorState = executorInterBrokerReplicaMovement.executorState();
            int numPendingInterBrokerPartitionMovements = Executor.this.executionTaskManager.numPendingInterBrokerPartitionMovements();
            executorInterBrokerReplicaMovement.move(this::waitForAnyExecutionTaskToFinish);
            boolean checkStopRequested = checkStopRequested();
            updateOngoingExecutionState();
            int numPendingInterBrokerPartitionMovements2 = Executor.this.executionTaskManager.numPendingInterBrokerPartitionMovements();
            int size = Executor.this.executionTaskManager.inExecutionTasks().size();
            if (checkStopRequested) {
                ExecutionTaskTracker.ExecutionTasksSummary executionTasksSummary = Executor.this.executionTaskManager.getExecutionTasksSummary(Collections.emptySet());
                Executor.LOG.info("Stopping execution {} after {} inter-broker movements - stopRequested: {}. {}, {} remaining data to move; for intra-broker partition movement {} tasks cancelled; for leadership movements {} task cancelled.", new Object[]{Executor.this.uuid, Integer.valueOf(numPendingInterBrokerPartitionMovements - numPendingInterBrokerPartitionMovements2), Boolean.valueOf(Executor.this.stopRequested.get()), executionTasksSummary.summarize(ExecutionTask.TaskType.INTER_BROKER_REPLICA_ACTION), Long.valueOf(executionTasksSummary.remainingInterBrokerDataToMoveInMB()), executionTasksSummary.taskStat().get(ExecutionTask.TaskType.INTRA_BROKER_REPLICA_ACTION).get(ExecutionTask.State.PENDING), executionTasksSummary.taskStat().get(ExecutionTask.TaskType.LEADER_ACTION).get(ExecutionTask.State.PENDING)});
                return true;
            }
            if (numPendingInterBrokerPartitionMovements2 == 0 && size == 0) {
                Executor.LOG.info("Inter-broker partition movements finished successfully.");
                return false;
            }
            Executor.LOG.info("Inter-broker partition movements finished with leftover tasks to execute - {} remaining partitions to move, {} tasks remaining in execution", Integer.valueOf(numPendingInterBrokerPartitionMovements2), Integer.valueOf(size));
            return false;
        }

        private void tryRun(Runnable runnable, String str) {
            try {
                runnable.run();
            } catch (Exception e) {
                Executor.LOG.error(str, e);
            }
        }

        private void updateOngoingExecutionState() {
            switch (AnonymousClass1.$SwitchMap$com$linkedin$kafka$cruisecontrol$executor$ExecutorState$State[this.state.ordinal()]) {
                case 1:
                    Executor.this.executorState = ExecutorState.operationInProgress(ExecutorState.State.LEADER_MOVEMENT_TASK_IN_PROGRESS, Executor.this.executionTaskManager.getExecutionTasksSummary(Collections.singleton(ExecutionTask.TaskType.LEADER_ACTION)), Executor.this.executionTaskManager.interBrokerPartitionMovementConcurrency(), Executor.this.executionTaskManager.intraBrokerPartitionMovementConcurrency(), Executor.this.executionTaskManager.leadershipMovementConcurrency(), Executor.this.uuid, this.recentlyDemotedBrokers, this.recentlyRemovedBrokers);
                    return;
                case KafkaCruiseControlConfig.DEFAULT_NUM_SAMPLE_LOADING_THREADS /* 2 */:
                    Executor.this.executorState = ExecutorState.operationInProgress(ExecutorState.State.INTER_BROKER_REPLICA_MOVEMENT_TASK_IN_PROGRESS, Executor.this.executionTaskManager.getExecutionTasksSummary(Collections.singleton(ExecutionTask.TaskType.INTER_BROKER_REPLICA_ACTION)), Executor.this.executionTaskManager.interBrokerPartitionMovementConcurrency(), Executor.this.executionTaskManager.intraBrokerPartitionMovementConcurrency(), Executor.this.executionTaskManager.leadershipMovementConcurrency(), Executor.this.uuid, this.recentlyDemotedBrokers, this.recentlyRemovedBrokers);
                    return;
                case 3:
                    Executor.this.executorState = ExecutorState.operationInProgress(ExecutorState.State.INTRA_BROKER_REPLICA_MOVEMENT_TASK_IN_PROGRESS, Executor.this.executionTaskManager.getExecutionTasksSummary(Collections.singleton(ExecutionTask.TaskType.INTRA_BROKER_REPLICA_ACTION)), Executor.this.executionTaskManager.interBrokerPartitionMovementConcurrency(), Executor.this.executionTaskManager.intraBrokerPartitionMovementConcurrency(), Executor.this.executionTaskManager.leadershipMovementConcurrency(), Executor.this.uuid, this.recentlyDemotedBrokers, this.recentlyRemovedBrokers);
                    return;
                case 4:
                    Executor.this.executorState = ExecutorState.operationInProgress(ExecutorState.State.STOPPING_EXECUTION, Executor.this.executionTaskManager.getExecutionTasksSummary(new HashSet(ExecutionTask.TaskType.cachedValues())), Executor.this.executionTaskManager.interBrokerPartitionMovementConcurrency(), Executor.this.executionTaskManager.intraBrokerPartitionMovementConcurrency(), Executor.this.executionTaskManager.leadershipMovementConcurrency(), Executor.this.uuid, this.recentlyDemotedBrokers, this.recentlyRemovedBrokers);
                    return;
                default:
                    throw new IllegalStateException("Unexpected ongoing execution state " + this.state);
            }
        }

        private boolean checkStopRequested() {
            boolean z = Executor.this.stopRequested.get();
            if (z) {
                this.state = ExecutorState.State.STOPPING_EXECUTION;
            }
            return z;
        }

        private List<ExecutionTask> waitForAnyExecutionTaskToFinish(AbstractExecutorReplicaMovement abstractExecutorReplicaMovement) {
            boolean z;
            ArrayList arrayList = new ArrayList();
            do {
                abstractExecutorReplicaMovement.maybeReexecuteTasks();
                try {
                    Thread.sleep(Executor.this.statusCheckingIntervalMs);
                } catch (InterruptedException e) {
                }
                Cluster cluster = Executor.this.metadataClient.refreshMetadata().cluster();
                Map<ExecutionTask, DescribeReplicaLogDirsResult.ReplicaLogDirInfo> logdirInfoForExecutionTask = abstractExecutorReplicaMovement.taskType() == ExecutionTask.TaskType.INTRA_BROKER_REPLICA_ACTION ? Executor.this.adminUtils.getLogdirInfoForExecutionTask(Executor.this.executionTaskManager.inExecutionTasks(Collections.singleton(abstractExecutorReplicaMovement.taskType()))) : Collections.emptyMap();
                if (Executor.LOG.isDebugEnabled()) {
                    Executor.LOG.debug("Tasks in execution: {}", Executor.this.executionTaskManager.inExecutionTasks());
                }
                ArrayList arrayList2 = new ArrayList();
                ArrayList arrayList3 = new ArrayList();
                boolean z2 = Executor.this.stopRequested.get();
                if (z2) {
                    Executor.LOG.info("User initiated a cancellation of all ongoing tasks.");
                }
                for (ExecutionTask executionTask : Executor.this.executionTaskManager.inExecutionTasks()) {
                    TopicPartition topicPartition = executionTask.proposal().topicPartition();
                    if (cluster.partition(topicPartition) == null) {
                        Executor.LOG.debug("Task {} is marked as finished because the topic has been deleted", executionTask);
                        arrayList.add(executionTask);
                        Executor.this.executionTaskManager.markTaskAborting(executionTask);
                        Executor.this.executionTaskManager.markTaskDone(executionTask);
                    } else if (isTaskDone(cluster, logdirInfoForExecutionTask, topicPartition, executionTask)) {
                        arrayList.add(executionTask);
                        Executor.this.executionTaskManager.markTaskDone(executionTask);
                    } else if (maybeMarkTaskAsDead(cluster, logdirInfoForExecutionTask, executionTask, z2)) {
                        if (executionTask.type() != ExecutionTask.TaskType.LEADER_ACTION) {
                            arrayList2.add(executionTask);
                        }
                        if (z2 && executionTask.type() == ExecutionTask.TaskType.INTER_BROKER_REPLICA_ACTION) {
                            arrayList3.add(topicPartition);
                        }
                        if (executionTask.state() == ExecutionTask.State.DEAD || executionTask.state() == ExecutionTask.State.ABORTED) {
                            arrayList.add(executionTask);
                        }
                    }
                }
                int size = arrayList3.size();
                if (z2 && size > 0) {
                    Executor.LOG.info("Cancelling {} partition reassignments", Integer.valueOf(size));
                    int cancelInterBrokerReplicaMovements = Executor.this.adminUtils.cancelInterBrokerReplicaMovements(arrayList3);
                    int i = size - cancelInterBrokerReplicaMovements;
                    Executor.LOG.info("Successfully cancelled {}/{} partition reassignments", Integer.valueOf(cancelInterBrokerReplicaMovements), Integer.valueOf(size));
                    Executor.this.numCancelledReassignments.addAndGet(cancelInterBrokerReplicaMovements);
                    if (i > 0) {
                        Executor.LOG.warn("Failed to cancel {}/{} partition reassignments", Integer.valueOf(i), Integer.valueOf(size));
                        Executor.this.numFailedReassignmentCancellations.addAndGet(i);
                    }
                }
                if (!arrayList2.isEmpty() && !Executor.this.stopRequested.get()) {
                    Executor.this.stopExecution();
                }
                checkStopRequested();
                updateOngoingExecutionState();
                z = !arrayList.isEmpty();
                if (!(!Executor.this.executionTaskManager.inExecutionTasks().isEmpty())) {
                    break;
                }
            } while (!z);
            Executor.LOG.info("Completed tasks: {}", arrayList);
            return arrayList;
        }

        private boolean isTaskDone(Cluster cluster, Map<ExecutionTask, DescribeReplicaLogDirsResult.ReplicaLogDirInfo> map, TopicPartition topicPartition, ExecutionTask executionTask) {
            switch (AnonymousClass1.$SwitchMap$com$linkedin$kafka$cruisecontrol$executor$ExecutionTask$TaskType[executionTask.type().ordinal()]) {
                case 1:
                    return isInterBrokerReplicaActionDone(cluster, topicPartition, executionTask);
                case KafkaCruiseControlConfig.DEFAULT_NUM_SAMPLE_LOADING_THREADS /* 2 */:
                    return isIntraBrokerReplicaActionDone(map, executionTask);
                case 3:
                    return isLeadershipMovementDone(cluster, topicPartition, executionTask);
                default:
                    return true;
            }
        }

        private boolean isInterBrokerReplicaActionDone(Cluster cluster, TopicPartition topicPartition, ExecutionTask executionTask) {
            Node[] replicas = cluster.partition(topicPartition).replicas();
            Node[] observers = cluster.partition(topicPartition).observers();
            switch (AnonymousClass1.$SwitchMap$com$linkedin$kafka$cruisecontrol$executor$ExecutionTask$State[executionTask.state().ordinal()]) {
                case 1:
                    return executionTask.proposal().isInterBrokerMovementCompleted(replicas, observers);
                case KafkaCruiseControlConfig.DEFAULT_NUM_SAMPLE_LOADING_THREADS /* 2 */:
                    return executionTask.proposal().isInterBrokerMovementAborted(replicas, observers);
                case 3:
                    return true;
                default:
                    throw new IllegalStateException("Should never be here. State " + executionTask.state());
            }
        }

        private boolean isIntraBrokerReplicaActionDone(Map<ExecutionTask, DescribeReplicaLogDirsResult.ReplicaLogDirInfo> map, ExecutionTask executionTask) {
            if (map.containsKey(executionTask)) {
                return map.get(executionTask).getCurrentReplicaLogDir().equals(executionTask.proposal().replicasToMoveBetweenDisksByBroker().get(Integer.valueOf(executionTask.brokerId())).logdir());
            }
            return false;
        }

        private boolean isInIsr(Integer num, Cluster cluster, TopicPartition topicPartition) {
            return Arrays.stream(cluster.partition(topicPartition).inSyncReplicas()).anyMatch(node -> {
                return node.id() == num.intValue();
            });
        }

        private boolean isLeadershipMovementDone(Cluster cluster, TopicPartition topicPartition, ExecutionTask executionTask) {
            Node leaderFor = cluster.leaderFor(topicPartition);
            switch (AnonymousClass1.$SwitchMap$com$linkedin$kafka$cruisecontrol$executor$ExecutionTask$State[executionTask.state().ordinal()]) {
                case 1:
                    return (leaderFor != null && leaderFor.id() == executionTask.proposal().newLeader().brokerId().intValue()) || leaderFor == null || !isInIsr(executionTask.proposal().newLeader().brokerId(), cluster, topicPartition);
                case KafkaCruiseControlConfig.DEFAULT_NUM_SAMPLE_LOADING_THREADS /* 2 */:
                case 3:
                    return true;
                default:
                    throw new IllegalStateException("Should never be here.");
            }
        }

        private boolean maybeMarkTaskAsDead(Cluster cluster, Map<ExecutionTask, DescribeReplicaLogDirsResult.ReplicaLogDirInfo> map, ExecutionTask executionTask, boolean z) {
            if (z && executionTask.state() == ExecutionTask.State.IN_PROGRESS) {
                Executor.LOG.info("Cancelling task {}", executionTask);
                Executor.this.executionTaskManager.markTaskDead(executionTask);
                return true;
            }
            if (executionTask.state() != ExecutionTask.State.IN_PROGRESS && executionTask.state() != ExecutionTask.State.ABORTING) {
                return false;
            }
            switch (AnonymousClass1.$SwitchMap$com$linkedin$kafka$cruisecontrol$executor$ExecutionTask$TaskType[executionTask.type().ordinal()]) {
                case 1:
                    for (ReplicaPlacementInfo replicaPlacementInfo : executionTask.proposal().newReplicas()) {
                        if (cluster.nodeById(replicaPlacementInfo.brokerId().intValue()) == null) {
                            Executor.this.executionTaskManager.markTaskDead(executionTask);
                            Executor.LOG.warn("Killing execution for task {} because the new replica {} is down.", executionTask, replicaPlacementInfo);
                            return true;
                        }
                    }
                    return false;
                case KafkaCruiseControlConfig.DEFAULT_NUM_SAMPLE_LOADING_THREADS /* 2 */:
                    if (map.containsKey(executionTask)) {
                        return false;
                    }
                    Executor.this.executionTaskManager.markTaskDead(executionTask);
                    Executor.LOG.warn("Killing execution for task {} because the destination disk is down.", executionTask);
                    return true;
                case 3:
                    if (cluster.nodeById(executionTask.proposal().newLeader().brokerId().intValue()) == null) {
                        Executor.this.executionTaskManager.markTaskDead(executionTask);
                        Executor.LOG.warn("Killing execution for task {} because the target leader is down", executionTask);
                        return true;
                    }
                    if (Executor.this.time.milliseconds() <= executionTask.startTime() + Executor.this.leaderActionTimeoutMs) {
                        Executor.LOG.info("Evaluating timeout of start: {} now: {} timeout: {}", new Object[]{Long.valueOf(executionTask.startTime()), Long.valueOf(Executor.this.time.milliseconds()), Long.valueOf(Executor.this.leaderActionTimeoutMs)});
                        return false;
                    }
                    Executor.this.executionTaskManager.markTaskDead(executionTask);
                    Executor.LOG.warn("Failed task {} because it took longer than {} to finish.", executionTask, Long.valueOf(Executor.this.leaderActionTimeoutMs));
                    return true;
                default:
                    throw new IllegalStateException("Unknown task type " + executionTask.type());
            }
        }
    }

    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/executor/Executor$ReservationHandle.class */
    public class ReservationHandle implements AutoCloseable {
        public ReservationHandle() {
            if (!Executor.this.reservation.attemptReservation()) {
                throw new IllegalStateException("Cannot reserve the Executor because it is already reserved by another thread!");
            }
        }

        @Override // java.lang.AutoCloseable
        public void close() {
            Executor.this.reservation.cancelReservation();
        }

        public void stopExecution() {
            Executor.this.userTriggeredStopExecution();
        }
    }

    public Executor(KafkaCruiseControlConfig kafkaCruiseControlConfig, Time time, DataBalancerMetricsRegistry dataBalancerMetricsRegistry, MetadataClient metadataClient, long j, long j2, ExecutorNotifier executorNotifier, AnomalyDetector anomalyDetector, ConfluentAdmin confluentAdmin, ReplicationThrottleHelper replicationThrottleHelper) {
        this(kafkaCruiseControlConfig, time, dataBalancerMetricsRegistry, metadataClient, j, j2, executorNotifier, anomalyDetector, confluentAdmin, replicationThrottleHelper, null);
    }

    Executor(KafkaCruiseControlConfig kafkaCruiseControlConfig, Time time, DataBalancerMetricsRegistry dataBalancerMetricsRegistry, MetadataClient metadataClient, long j, long j2, ExecutorNotifier executorNotifier, AnomalyDetector anomalyDetector, ConfluentAdmin confluentAdmin, ReplicationThrottleHelper replicationThrottleHelper, ExecutorService executorService) {
        this.reservation = new ExecutorReservation();
        this.numCancelledReassignments = new AtomicInteger(0);
        this.numFailedReassignmentCancellations = new AtomicInteger(0);
        this.numExecutionStopped = new AtomicInteger(0);
        this.numExecutionsStarted = new AtomicInteger(0);
        this.numExecutionStoppedByUser = new AtomicInteger(0);
        this.executionStoppedByUser = new AtomicBoolean(false);
        this.config = kafkaCruiseControlConfig;
        this.time = time;
        this.metricRegistry = dataBalancerMetricsRegistry;
        this.metadataClient = metadataClient;
        this.executorNotifier = executorNotifier;
        this.adminClient = confluentAdmin;
        this.throttleHelper = replicationThrottleHelper;
        this.statusCheckingIntervalMs = kafkaCruiseControlConfig.getLong(KafkaCruiseControlConfig.EXECUTION_PROGRESS_CHECK_INTERVAL_MS_CONFIG).longValue();
        this.leaderActionTimeoutMs = kafkaCruiseControlConfig.getLong(KafkaCruiseControlConfig.LEADER_ACTION_TIMEOUT_MS_CONFIG).longValue();
        this.invalidReplicaAssignmentRetryMs = kafkaCruiseControlConfig.getLong(KafkaCruiseControlConfig.INVALID_REPLICA_ASSIGNMENT_RETRY_TIMEOUT_MS_CONFIG).longValue();
        this.proposalExecutor = executorService;
        this.latestDemoteStartTimeMsByBrokerId = new ConcurrentHashMap();
        this.latestRemoveStartTimeMsByBrokerId = new ConcurrentHashMap();
        this.stopRequested = new AtomicBoolean(false);
        this.hasOngoingExecution = false;
        this.uuid = null;
        this.anomalyDetector = anomalyDetector;
        this.demotionHistoryRetentionTimeMs = j;
        this.removalHistoryRetentionTimeMs = j2;
    }

    public void init() {
        if (this.proposalExecutor == null) {
            this.proposalExecutor = Executors.newSingleThreadExecutor(new KafkaCruiseControlThreadFactory("ProposalExecutor", false, LOG));
        }
        registerGaugeSensors(this.metricRegistry);
        this.adminUtils = new SbkAdminUtils(this.adminClient, this.config);
        this.executionTaskManager = new ExecutionTaskManager(this.config.getInt(KafkaCruiseControlConfig.NUM_CONCURRENT_PARTITION_MOVEMENTS_PER_BROKER_CONFIG).intValue(), this.config.getInt(KafkaCruiseControlConfig.NUM_CONCURRENT_INTRA_BROKER_PARTITION_MOVEMENTS_CONFIG).intValue(), this.config.getInt(KafkaCruiseControlConfig.NUM_CONCURRENT_LEADER_MOVEMENTS_CONFIG).intValue(), this.config.getList(KafkaCruiseControlConfig.DEFAULT_REPLICA_MOVEMENT_STRATEGIES_CONFIG), this.adminClient, this.metricRegistry, this.time, this.config);
        this.metadataClient = this.metadataClient != null ? this.metadataClient : new MetadataClient(this.config, -1L, this.time, this.adminClient);
        this.executorState = ExecutorState.noTaskInProgress(recentlyDemotedBrokers(), recentlyRemovedBrokers());
        this.executorNotifier = this.executorNotifier != null ? this.executorNotifier : (ExecutorNotifier) this.config.getConfiguredInstance(KafkaCruiseControlConfig.EXECUTOR_NOTIFIER_CLASS_CONFIG, ExecutorNotifier.class);
        this.executionHistoryScannerExecutor = Executors.newSingleThreadScheduledExecutor(new KafkaCruiseControlThreadFactory("ExecutionHistoryScanner", true, null));
        this.executionHistoryScannerExecutor.scheduleAtFixedRate(new ExecutionHistoryScanner(this, null), EXECUTION_HISTORY_SCANNER_INITIAL_DELAY_SECONDS, EXECUTION_HISTORY_SCANNER_PERIOD_SECONDS, TimeUnit.SECONDS);
        this.throttleHelper = this.throttleHelper != null ? this.throttleHelper : new ReplicationThrottleHelper(this.adminClient, this.config);
    }

    public void startUp() throws ExecutionException, InterruptedException {
        if (((Map) this.adminClient.listPartitionReassignments().reassignments().get()).isEmpty()) {
            removeThrottles();
            return;
        }
        this.hasOngoingExecution = true;
        LOG.info("Detected ongoing reassignment while starting up. Monitoring it to remove throttles once it completes.");
        new Thread(() -> {
            while (!((Map) this.adminClient.listPartitionReassignments().reassignments().get()).isEmpty()) {
                try {
                    LOG.debug("Sleeping {} ms while waiting for ongoing reassignment to complete", Long.valueOf(this.statusCheckingIntervalMs));
                    Thread.sleep(this.statusCheckingIntervalMs);
                } catch (InterruptedException | ExecutionException e) {
                }
            }
            removeThrottles();
            LOG.info("Ongoing reassignment that was detected while starting up has finished.");
            this.hasOngoingExecution = false;
        }).start();
    }

    public ReservationHandle reserveAndAbortOngoingExecutions(Duration duration) throws TimeoutException {
        ReservationHandle reservationHandle = null;
        boolean z = false;
        try {
            reservationHandle = new ReservationHandle();
            abortExecution(duration);
            z = true;
            if (1 == 0 && reservationHandle != null) {
                reservationHandle.close();
            }
            return reservationHandle;
        } catch (Throwable th) {
            if (!z && reservationHandle != null) {
                reservationHandle.close();
            }
            throw th;
        }
    }

    public synchronized void userTriggeredStopExecution() {
        if (stopExecution()) {
            LOG.info("User requested to stop the ongoing proposal execution and cancel the existing reassignments.");
            this.numExecutionStoppedByUser.incrementAndGet();
            this.executionStoppedByUser.set(true);
        }
    }

    public void dropRecentlyRemovedBrokers(Set<Integer> set) {
        this.latestRemoveStartTimeMsByBrokerId.keySet().removeAll(set);
    }

    public synchronized void shutdown() {
        LOG.info("Shutting down executor.");
        if (this.hasOngoingExecution) {
            LOG.warn("Shutdown executor may take long because execution is still in progress.");
        }
        KafkaCruiseControlUtils.executeSilently(this.proposalExecutor, (v0) -> {
            v0.shutdown();
        });
        try {
            this.proposalExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            LOG.warn("Interrupted while waiting for anomaly detector to shutdown.");
        }
        KafkaCruiseControlUtils.executeSilently(this.executionHistoryScannerExecutor, (v0) -> {
            v0.shutdownNow();
        });
        LOG.info("Executor shutdown completed.");
    }

    public boolean hasOngoingExecution() {
        return this.hasOngoingExecution;
    }

    public boolean isReservedByOther() {
        return this.reservation.isReserved() && !this.reservation.isReservedByMe();
    }

    public boolean updateThrottle(long j) {
        int updateOrRemoveBrokerThrottleRate;
        if (j < EXECUTION_HISTORY_SCANNER_INITIAL_DELAY_SECONDS) {
            throw new IllegalArgumentException("Cannot set a negative throttle");
        }
        synchronized (this.throttleHelper) {
            this.throttleHelper.setThrottleRate(Long.valueOf(j));
            updateOrRemoveBrokerThrottleRate = this.throttleHelper.updateOrRemoveBrokerThrottleRate(Long.valueOf(j));
        }
        if (updateOrRemoveBrokerThrottleRate == 0) {
            LOG.info("No brokers had throttling rate updated");
        } else {
            LOG.info("Updated throttle rate config on {} brokers to {}", Integer.valueOf(updateOrRemoveBrokerThrottleRate), Long.valueOf(j));
        }
        return updateOrRemoveBrokerThrottleRate != 0;
    }

    public boolean hasOngoingPartitionReassignments() {
        return !partitionsBeingReassigned(this.adminUtils).isEmpty();
    }

    public Set<Integer> recentlyDemotedBrokers() {
        return Collections.unmodifiableSet(this.latestDemoteStartTimeMsByBrokerId.keySet());
    }

    public Set<Integer> recentlyRemovedBrokers() {
        return Collections.unmodifiableSet(this.latestRemoveStartTimeMsByBrokerId.keySet());
    }

    public ExecutorState state() {
        return this.executorState;
    }

    public synchronized Future<?> executeProposals(Collection<ExecutionProposal> collection, Set<Integer> set, Set<Integer> set2, LoadMonitor loadMonitor, String str, BalanceOpExecutionCompletionCallback balanceOpExecutionCompletionCallback) {
        try {
            LOG.trace("executeProposals with completionCallback {}", balanceOpExecutionCompletionCallback);
            return doExecuteProposals(collection, set, loadMonitor, str, new ProposalExecutionRunnable(loadMonitor, null, set2, balanceOpExecutionCompletionCallback));
        } catch (Exception e) {
            clearExecutionState();
            throw e;
        }
    }

    private synchronized Future<?> doExecuteProposals(Collection<ExecutionProposal> collection, Set<Integer> set, LoadMonitor loadMonitor, String str, ProposalExecutionRunnable proposalExecutionRunnable) {
        if (this.hasOngoingExecution) {
            throw new IllegalStateException("Cannot execute new proposals while there is an ongoing execution.");
        }
        if (isReservedByOther()) {
            throw new IllegalStateException("Cannot execute new proposals because the Executor is reserved by another thread.");
        }
        if (loadMonitor == null) {
            throw new IllegalArgumentException("Load monitor cannot be null.");
        }
        if (str == null) {
            throw new IllegalStateException("UUID of the execution cannot be null.");
        }
        initProposalExecution(collection, set, str);
        return startExecution(proposalExecutionRunnable);
    }

    void initProposalExecution(Collection<ExecutionProposal> collection, Collection<Integer> collection2, String str) {
        this.clusterMetadataAtProposalInitialization = this.metadataClient.refreshMetadata();
        this.executionTaskManager.addExecutionProposals(collection, collection2, this.clusterMetadataAtProposalInitialization.cluster());
        this.uuid = str;
    }

    Future<?> startExecution(ProposalExecutionRunnable proposalExecutionRunnable) {
        sanityCheckBrokerReplicaExclusions(proposalExecutionRunnable.removedBrokers);
        this.numExecutionsStarted.incrementAndGet();
        sanityCheckOngoingReplicaMovement();
        this.hasOngoingExecution = true;
        this.anomalyDetector.maybeClearOngoingAnomalyDetectionTimeMs();
        this.stopRequested.set(false);
        this.executionStoppedByUser.set(false);
        return this.proposalExecutor.submit(proposalExecutionRunnable);
    }

    private void abortExecution(Duration duration) throws TimeoutException {
        userTriggeredStopExecution();
        long milliseconds = this.time.milliseconds() + duration.toMillis();
        if (hasOngoingExecution()) {
            LOG.info("Aborted executions, waiting for them to stop for {}", duration);
        }
        while (hasOngoingExecution()) {
            if (milliseconds <= this.time.milliseconds()) {
                throw new TimeoutException(String.format("Timed out awaiting for execution to finish after %s", duration));
            }
            this.time.sleep(100L);
        }
    }

    synchronized boolean stopExecution() {
        if (!this.stopRequested.compareAndSet(false, true)) {
            return false;
        }
        this.numExecutionStopped.incrementAndGet();
        this.executionTaskManager.setStopRequested();
        return true;
    }

    int numCancelledReassignments() {
        return this.numCancelledReassignments.get();
    }

    private void sanityCheckOngoingReplicaMovement() {
        if (hasOngoingPartitionReassignments()) {
            this.executionTaskManager.clear();
            this.uuid = null;
            throw new IllegalStateException("There are ongoing inter-broker partition movements.");
        }
        if (this.adminUtils.isOngoingIntraBrokerReplicaMovement((Collection) this.metadataClient.cluster().nodes().stream().mapToInt((v0) -> {
            return v0.id();
        }).boxed().collect(Collectors.toSet()))) {
            this.executionTaskManager.clear();
            this.uuid = null;
            throw new IllegalStateException("There are ongoing intra-broker partition movements.");
        }
    }

    private void sanityCheckBrokerReplicaExclusions(Set<Integer> set) {
        HashSet hashSet = new HashSet();
        hashSet.addAll(set);
        hashSet.addAll(recentlyRemovedBrokers());
        HashSet hashSet2 = new HashSet(this.clusterMetadataAtProposalInitialization.replicaExclusions().keySet());
        hashSet2.removeAll(hashSet);
        if (hashSet2.isEmpty()) {
            return;
        }
        LOG.warn("Found {} brokers that are excluded for replica placement but not removed as part of this execution (broker ids {}). Aborting execution.", Integer.valueOf(hashSet2.size()), hashSet2);
        throw new BalancerBrokerExcludedForReplicaPlacementException(String.format("Cannot execute reassignment plan because brokers %s were excluded for replica placement. Please remove the brokers that are excluded, or remove the exclusions themselves.", hashSet2));
    }

    private void registerGaugeSensors(DataBalancerMetricsRegistry dataBalancerMetricsRegistry) {
        dataBalancerMetricsRegistry.newGauge(Executor.class, GAUGE_EXECUTION_STARTED, this::numExecutionsStarted);
        dataBalancerMetricsRegistry.newGauge(Executor.class, GAUGE_EXECUTION_STOPPED, this::numExecutionStopped);
        dataBalancerMetricsRegistry.newGauge(Executor.class, GAUGE_CANCELLED_REASSIGNMENTS, this::numCancelledReassignments);
        dataBalancerMetricsRegistry.newGauge(Executor.class, GAUGE_FAILED_REASSIGNMENT_CANCELLATIONS, this::numFailedReassignmentCancellations);
    }

    private void removeThrottles() {
        synchronized (this.throttleHelper) {
            this.throttleHelper.removeAllThrottles();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void removeExpiredDemotionHistory() {
        LOG.debug("Remove expired demotion history");
        this.latestDemoteStartTimeMsByBrokerId.entrySet().removeIf(entry -> {
            return ((Long) entry.getValue()).longValue() + this.demotionHistoryRetentionTimeMs < this.time.milliseconds();
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void removeExpiredRemovalHistory() {
        LOG.debug("Remove expired broker removal history");
        this.latestRemoveStartTimeMsByBrokerId.entrySet().removeIf(entry -> {
            return ((Long) entry.getValue()).longValue() + this.removalHistoryRetentionTimeMs < this.time.milliseconds();
        });
    }

    int numExecutionsStarted() {
        return this.numExecutionsStarted.get();
    }

    int numExecutionStopped() {
        return this.numExecutionStopped.get();
    }

    private int numFailedReassignmentCancellations() {
        return this.numFailedReassignmentCancellations.get();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void clearExecutionState() {
        this.executionTaskManager.clear();
        this.uuid = null;
        this.hasOngoingExecution = false;
        this.stopRequested.set(false);
        this.executionStoppedByUser.set(false);
    }

    public static Set<TopicPartition> partitionsBeingReassigned(SbkAdminUtils sbkAdminUtils) {
        return sbkAdminUtils.listTargetReplicasBeingReassigned(Optional.empty()).keySet();
    }
}
