package com.linkedin.kafka.cruisecontrol.statemachine;

import com.linkedin.kafka.cruisecontrol.KafkaCruiseControl;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlContext;
import com.linkedin.kafka.cruisecontrol.analyzer.OptimizationResult;
import com.linkedin.kafka.cruisecontrol.analyzer.OptimizerResult;
import com.linkedin.kafka.cruisecontrol.async.progress.OperationProgress;
import com.linkedin.kafka.cruisecontrol.common.AdminClientResult;
import com.linkedin.kafka.cruisecontrol.operation.BrokerRemovalCallback;
import com.linkedin.kafka.cruisecontrol.operation.BrokerRemovalContext;
import com.linkedin.kafka.cruisecontrol.operation.BrokerRemovalFuture;
import com.linkedin.kafka.cruisecontrol.operation.BrokerRemovalPhase;
import com.linkedin.kafka.cruisecontrol.operation.BrokerRemovalPhaseBuilder;
import com.linkedin.kafka.cruisecontrol.operation.BrokerRemovalRestartablePhase;
import io.confluent.databalancer.operation.BalanceOpExecutionCompletionCallback;
import io.confluent.databalancer.operation.BrokerRemovalStateMachine;
import io.confluent.databalancer.operation.EvenClusterLoadStateMachine;
import java.time.Duration;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.kafka.clients.admin.AlterBrokerReplicaExclusionsResult;
import org.apache.kafka.clients.admin.ExclusionOp;
import org.apache.kafka.common.errors.BalancerBrokerExcludedForReplicaPlacementException;
import org.apache.kafka.common.errors.BalancerOperationFailedException;
import org.apache.kafka.common.protocol.BalancerOperationOverriddenException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/statemachine/BrokerRemovalTask.class */
public class BrokerRemovalTask implements Task {
    private static final Logger LOG = LoggerFactory.getLogger(BrokerRemovalTask.class);
    private final String taskId;
    private final KafkaCruiseControlContext kafkaCruiseControlContext;
    private final BrokerRemovalFuture brokerRemovalFuture;

    public BrokerRemovalTask(String str, KafkaCruiseControlContext kafkaCruiseControlContext, boolean z, Map<Integer, Optional<Long>> map, @Nonnull BalanceOpExecutionCompletionCallback balanceOpExecutionCompletionCallback, @Nonnull BrokerRemovalCallback brokerRemovalCallback) {
        this.taskId = str;
        this.kafkaCruiseControlContext = kafkaCruiseControlContext;
        this.brokerRemovalFuture = createBrokerRemoval(map, z, balanceOpExecutionCompletionCallback, brokerRemovalCallback);
    }

    private BrokerRemovalFuture createBrokerRemoval(final Map<Integer, Optional<Long>> map, boolean z, @Nonnull final BalanceOpExecutionCompletionCallback balanceOpExecutionCompletionCallback, @Nonnull BrokerRemovalCallback brokerRemovalCallback) {
        OperationProgress operationProgress = new OperationProgress();
        final HashSet hashSet = new HashSet(map.keySet());
        final String logPrefix = BrokerRemovalCallback.logPrefix(this.taskId);
        final BrokerRemovalContext brokerRemovalContext = new BrokerRemovalContext(map, z, brokerRemovalCallback, this.taskId, this.kafkaCruiseControlContext.defaultPlanComputationOptions(), operationProgress);
        return new BrokerRemovalPhaseBuilder().composeRemoval(brokerRemovalContext, brokerRemovalCallback, new BrokerRemovalRestartablePhase.BrokerRemovalRestartablePhaseBuilder().setAlwaysExecute(true).setPhase(new BrokerRemovalPhase<Void>() { // from class: com.linkedin.kafka.cruisecontrol.statemachine.BrokerRemovalTask.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // com.linkedin.kafka.cruisecontrol.operation.BrokerRemovalPhase
            public Void execute(BrokerRemovalContext brokerRemovalContext2) throws Exception {
                BrokerRemovalTask.LOG.info("{} Reserving the Executor and aborting ongoing executions", logPrefix);
                BrokerRemovalTask.this.kafkaCruiseControlContext.activeEvenClusterLoadStateManager().registerEvent(EvenClusterLoadStateMachine.EvenClusterLoadEvent.REMOVE_BROKER_TRIGGERED, new BalancerOperationOverriddenException("Even cluster load balancing operation was aborted by a higher priority 'Remove Broker' operation."));
                long hiResClockMs = BrokerRemovalTask.this.kafkaCruiseControlContext.time().hiResClockMs();
                brokerRemovalContext.reservationHandle().set(BrokerRemovalTask.this.kafkaCruiseControlContext.executor().reserveAndAbortOngoingExecutions(Duration.ofMinutes(1L)));
                BrokerRemovalTask.LOG.info("{} Successfully reserved the Executor in {} ms", logPrefix, Long.valueOf(BrokerRemovalTask.this.kafkaCruiseControlContext.time().hiResClockMs() - hiResClockMs));
                return null;
            }

            @Override // com.linkedin.kafka.cruisecontrol.operation.BrokerRemovalPhase
            public BrokerRemovalStateMachine.BrokerRemovalState startState() {
                return null;
            }
        }).build(), new BrokerRemovalRestartablePhase.BrokerRemovalRestartablePhaseBuilder().setBrokerRemovalStateTracker(brokerRemovalCallback).setPhase(new BrokerRemovalPhase<Void>() { // from class: com.linkedin.kafka.cruisecontrol.statemachine.BrokerRemovalTask.2
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // com.linkedin.kafka.cruisecontrol.operation.BrokerRemovalPhase
            public Void execute(BrokerRemovalContext brokerRemovalContext2) throws Exception {
                KafkaCruiseControl.computeDrainBrokersPlan(brokerRemovalContext2.brokersToRemove(), Collections.emptyList(), brokerRemovalContext2.operationProgress, brokerRemovalContext2.planComputationOptions, BrokerRemovalTask.this.kafkaCruiseControlContext);
                BrokerRemovalTask.LOG.info("{} Successfully computed the remove broker plan", logPrefix);
                return null;
            }

            @Override // com.linkedin.kafka.cruisecontrol.operation.BrokerRemovalPhase
            public BrokerRemovalStateMachine.BrokerRemovalState startState() {
                return BrokerRemovalStateMachine.BrokerRemovalState.INITIAL_PLAN_COMPUTATION_INITIATED;
            }
        }).build(), new BrokerRemovalRestartablePhase.BrokerRemovalRestartablePhaseBuilder().setBrokerRemovalStateTracker(brokerRemovalCallback).setPhase(new BrokerRemovalPhase<Void>() { // from class: com.linkedin.kafka.cruisecontrol.statemachine.BrokerRemovalTask.3
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // com.linkedin.kafka.cruisecontrol.operation.BrokerRemovalPhase
            public Void execute(BrokerRemovalContext brokerRemovalContext2) throws Exception {
                BrokerRemovalTask.LOG.info("{} Attempting to exclude brokers {} as part of broker removal operation", logPrefix, brokerRemovalContext2.brokersToRemove());
                AdminClientResult<Set<Integer>> describeCurrentlyExcludedBrokers = BrokerRemovalTask.this.kafkaCruiseControlContext.sbkAdminUtils().describeCurrentlyExcludedBrokers();
                if (describeCurrentlyExcludedBrokers.hasException()) {
                    throw ((Exception) describeCurrentlyExcludedBrokers.exception());
                }
                if (!brokerRemovalContext2.brokersToRemove().containsAll(describeCurrentlyExcludedBrokers.result())) {
                    HashSet hashSet2 = new HashSet(describeCurrentlyExcludedBrokers.result());
                    hashSet2.removeAll(brokerRemovalContext2.brokersToRemove());
                    throw new BalancerBrokerExcludedForReplicaPlacementException(String.format("Removal of brokers %s failed because exclusions are present on brokers not being removed (%s)", brokerRemovalContext2.brokersToRemove(), hashSet2));
                }
                long hiResClockMs = BrokerRemovalTask.this.kafkaCruiseControlContext.time().hiResClockMs();
                AdminClientResult<AlterBrokerReplicaExclusionsResult.ExclusionsResult> alterBrokerReplicaExclusions = BrokerRemovalTask.this.kafkaCruiseControlContext.sbkAdminUtils().alterBrokerReplicaExclusions(new ExclusionOp(ExclusionOp.OpType.SET, BrokerRemovalTask.this.taskId), brokerRemovalContext2.brokersToRemove());
                BrokerRemovalTask.LOG.info("{} Brokers {} had replica exclusion emplaced in {} milliseconds", new Object[]{logPrefix, brokerRemovalContext2.brokersToRemove(), Long.valueOf(BrokerRemovalTask.this.kafkaCruiseControlContext.time().hiResClockMs() - hiResClockMs)});
                if (alterBrokerReplicaExclusions.hasException()) {
                    throw ((Exception) alterBrokerReplicaExclusions.exception());
                }
                if (alterBrokerReplicaExclusions.result().isSuccessful()) {
                    return null;
                }
                throw new BalancerOperationFailedException(String.format("Unable to set broker exclusions on brokers %s", brokerRemovalContext2.brokersToRemove()));
            }

            @Override // com.linkedin.kafka.cruisecontrol.operation.BrokerRemovalPhase
            public BrokerRemovalStateMachine.BrokerRemovalState startState() {
                return BrokerRemovalStateMachine.BrokerRemovalState.EXCLUSION_INITIATED;
            }
        }).build(), new BrokerRemovalRestartablePhase.BrokerRemovalRestartablePhaseBuilder().setBrokerRemovalStateTracker(brokerRemovalCallback).setPhase(new BrokerRemovalPhase<Void>() { // from class: com.linkedin.kafka.cruisecontrol.statemachine.BrokerRemovalTask.4
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // com.linkedin.kafka.cruisecontrol.operation.BrokerRemovalPhase
            public Void execute(BrokerRemovalContext brokerRemovalContext2) throws Exception {
                BrokerRemovalTask.LOG.info("{} Beginning computation of broker removal plan", logPrefix);
                OptimizerResult generatePlanWithRetries = BrokerRemovalTask.this.kafkaCruiseControlContext.computationUtils().generatePlanWithRetries(() -> {
                    return KafkaCruiseControl.computeDrainBrokersPlan(brokerRemovalContext2.brokersToRemove(), Collections.emptyList(), brokerRemovalContext2.operationProgress, brokerRemovalContext2.planComputationOptions, BrokerRemovalTask.this.kafkaCruiseControlContext);
                }, String.format("broker removal plan for broker %s", hashSet));
                BrokerRemovalTask.LOG.info("{} Computed broker removal plan {}", logPrefix, new OptimizationResult(generatePlanWithRetries).proposalSummary("broker removal"));
                brokerRemovalContext2.proposals(generatePlanWithRetries.goalProposals());
                return null;
            }

            @Override // com.linkedin.kafka.cruisecontrol.operation.BrokerRemovalPhase
            public BrokerRemovalStateMachine.BrokerRemovalState startState() {
                return BrokerRemovalStateMachine.BrokerRemovalState.PLAN_COMPUTATION_INITIATED;
            }
        }).build(), new BrokerRemovalRestartablePhase.BrokerRemovalRestartablePhaseBuilder().setBrokerRemovalStateTracker(brokerRemovalCallback).setPhase(new BrokerRemovalPhase<Void>() { // from class: com.linkedin.kafka.cruisecontrol.statemachine.BrokerRemovalTask.5
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // com.linkedin.kafka.cruisecontrol.operation.BrokerRemovalPhase
            public Void execute(BrokerRemovalContext brokerRemovalContext2) throws Exception {
                Future<?> executeRemoval = KafkaCruiseControl.executeRemoval(brokerRemovalContext2.proposals, brokerRemovalContext2.brokersToRemove(), BrokerRemovalTask.this.taskId, (z2, th) -> {
                    brokerRemovalContext2.planExecutionSuccess(z2);
                    brokerRemovalContext2.planExecutionException(BrokerRemovalTask.this.maybeWrapException(th));
                }, BrokerRemovalTask.this.kafkaCruiseControlContext);
                brokerRemovalContext2.executorFuture(executeRemoval);
                BrokerRemovalTask.LOG.info("{} Successfully submitted the broker removal plan", logPrefix);
                executeRemoval.get();
                if (brokerRemovalContext2.planExecutionException() == null) {
                    BrokerRemovalTask.LOG.info("{} execution completed with value: {}", logPrefix, brokerRemovalContext2.planExecutionSuccess());
                } else {
                    BrokerRemovalTask.LOG.info("{} execution completed with value: {}, exception", new Object[]{logPrefix, brokerRemovalContext2.planExecutionSuccess(), brokerRemovalContext2.planExecutionException()});
                }
                if (brokerRemovalContext2.planExecutionException() != null) {
                    throw brokerRemovalContext2.planExecutionException();
                }
                return null;
            }

            @Override // com.linkedin.kafka.cruisecontrol.operation.BrokerRemovalPhase
            public BrokerRemovalStateMachine.BrokerRemovalState startState() {
                return BrokerRemovalStateMachine.BrokerRemovalState.PLAN_EXECUTION_INITIATED;
            }
        }).build(), new BrokerRemovalRestartablePhase.BrokerRemovalRestartablePhaseBuilder().setBrokerRemovalStateTracker(brokerRemovalCallback).setPhase(new BrokerRemovalPhase<Void>() { // from class: com.linkedin.kafka.cruisecontrol.statemachine.BrokerRemovalTask.6
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // com.linkedin.kafka.cruisecontrol.operation.BrokerRemovalPhase
            public Void execute(BrokerRemovalContext brokerRemovalContext2) throws Exception {
                BrokerRemovalTask.LOG.info("{} Attempting to shut down brokers {} as part of broker removal operation", logPrefix, hashSet);
                long hiResClockMs = BrokerRemovalTask.this.kafkaCruiseControlContext.time().hiResClockMs();
                Map<Integer, Boolean> maybeShutdownBrokers = BrokerRemovalTask.this.kafkaCruiseControlContext.brokerShutdownManager().maybeShutdownBrokers(map);
                long hiResClockMs2 = BrokerRemovalTask.this.kafkaCruiseControlContext.time().hiResClockMs();
                Set set = (Set) maybeShutdownBrokers.entrySet().stream().filter((v0) -> {
                    return v0.getValue();
                }).map(entry -> {
                    return (Integer) entry.getKey();
                }).collect(Collectors.toSet());
                Set set2 = (Set) maybeShutdownBrokers.entrySet().stream().filter(entry2 -> {
                    return !((Boolean) entry2.getValue()).booleanValue();
                }).map(entry3 -> {
                    return (Integer) entry3.getKey();
                }).collect(Collectors.toSet());
                if (!set.isEmpty()) {
                    BrokerRemovalTask.LOG.info("{} Brokers {} were shut down successfully in {} milliseconds (brokers being removed: {}) ", new Object[]{logPrefix, set, Long.valueOf(hiResClockMs2 - hiResClockMs), hashSet});
                }
                if (!set2.isEmpty()) {
                    BrokerRemovalTask.LOG.info("{} Brokers {} were not shut down (already offline?) (brokers being removed: {})", new Object[]{logPrefix, set2, hashSet});
                }
                if (set.isEmpty() && set2.isEmpty()) {
                    throw new IllegalStateException(String.format("{} shutdown of brokers {} had empty values for brokers shutdown/not-shutdown", logPrefix, map.keySet()));
                }
                return null;
            }

            @Override // com.linkedin.kafka.cruisecontrol.operation.BrokerRemovalPhase
            public BrokerRemovalStateMachine.BrokerRemovalState startState() {
                return BrokerRemovalStateMachine.BrokerRemovalState.BROKER_SHUTDOWN_INITIATED;
            }
        }).build(), new BrokerRemovalRestartablePhase.BrokerRemovalRestartablePhaseBuilder().setBrokerRemovalStateTracker(brokerRemovalCallback).setPhase(new BrokerRemovalPhase<Void>() { // from class: com.linkedin.kafka.cruisecontrol.statemachine.BrokerRemovalTask.7
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // com.linkedin.kafka.cruisecontrol.operation.BrokerRemovalPhase
            public Void execute(BrokerRemovalContext brokerRemovalContext2) throws Exception {
                long hiResClockMs = BrokerRemovalTask.this.kafkaCruiseControlContext.time().hiResClockMs();
                AdminClientResult<AlterBrokerReplicaExclusionsResult.ExclusionsResult> alterBrokerReplicaExclusions = BrokerRemovalTask.this.kafkaCruiseControlContext.sbkAdminUtils().alterBrokerReplicaExclusions(new ExclusionOp(ExclusionOp.OpType.DELETE, BrokerRemovalTask.this.taskId), brokerRemovalContext2.brokersToRemove());
                long hiResClockMs2 = BrokerRemovalTask.this.kafkaCruiseControlContext.time().hiResClockMs();
                if (alterBrokerReplicaExclusions.hasException()) {
                    BrokerRemovalTask.LOG.error("{} Unexpected exception while removing broker replica exclusions:", logPrefix, alterBrokerReplicaExclusions.exception());
                    throw new BalancerOperationFailedException("Exception while removing broker replica exclusions", alterBrokerReplicaExclusions.exception());
                }
                if (alterBrokerReplicaExclusions.result().isSuccessful()) {
                    BrokerRemovalTask.LOG.info("{} Broker replica exclusions removed in {} milliseconds (for brokers {})", new Object[]{logPrefix, Long.valueOf(hiResClockMs2 - hiResClockMs), brokerRemovalContext2.brokersToRemove()});
                    return null;
                }
                BrokerRemovalTask.LOG.error("{} Exclusion removal had errors after {} ms: {}", new Object[]{logPrefix, Long.valueOf(hiResClockMs2 - hiResClockMs), alterBrokerReplicaExclusions.result().exclusionResultByBroker()});
                throw new BalancerOperationFailedException(String.format("Unable to remove all broker exclusions on brokers %s with errors %s", brokerRemovalContext2.brokersToRemove(), alterBrokerReplicaExclusions.result().exclusionResultByBroker()));
            }

            @Override // com.linkedin.kafka.cruisecontrol.operation.BrokerRemovalPhase
            public BrokerRemovalStateMachine.BrokerRemovalState startState() {
                return BrokerRemovalStateMachine.BrokerRemovalState.EXCLUSION_REMOVAL_INITIATED;
            }
        }).build(), new BrokerRemovalRestartablePhase.BrokerRemovalRestartablePhaseBuilder().setBrokerRemovalStateTracker(brokerRemovalCallback).setAlwaysExecute(true).setPhase(new BrokerRemovalPhase<Void>() { // from class: com.linkedin.kafka.cruisecontrol.statemachine.BrokerRemovalTask.8
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // com.linkedin.kafka.cruisecontrol.operation.BrokerRemovalPhase
            public Void execute(BrokerRemovalContext brokerRemovalContext2) {
                balanceOpExecutionCompletionCallback.accept(brokerRemovalContext2.planExecutionSuccess().orElse(false).booleanValue(), brokerRemovalContext2.planExecutionException());
                return null;
            }

            @Override // com.linkedin.kafka.cruisecontrol.operation.BrokerRemovalPhase
            public BrokerRemovalStateMachine.BrokerRemovalState startState() {
                return null;
            }
        }).build());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Exception maybeWrapException(Throwable th) {
        return th == null ? null : th instanceof Exception ? (Exception) th : new Exception(th);
    }

    public BrokerRemovalFuture brokerRemovalFuture() {
        return this.brokerRemovalFuture;
    }

    @Override // com.linkedin.kafka.cruisecontrol.statemachine.Task
    public String taskId() {
        return this.taskId;
    }

    @Override // com.linkedin.kafka.cruisecontrol.statemachine.Task, java.lang.Runnable
    public void run() {
        try {
            this.brokerRemovalFuture.execute(Duration.ofMinutes(60L));
        } catch (Throwable th) {
            LOG.error("Error when executing broker removal task.", th);
        }
    }
}
