package com.linkedin.kafka.cruisecontrol.detector;

import com.linkedin.cruisecontrol.detector.Anomaly;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControl;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.common.KafkaCruiseControlThreadFactory;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.detector.AnomalyState;
import com.linkedin.kafka.cruisecontrol.detector.notifier.AnomalyNotificationResult;
import com.linkedin.kafka.cruisecontrol.detector.notifier.AnomalyNotifier;
import com.linkedin.kafka.cruisecontrol.detector.notifier.AnomalyType;
import com.linkedin.kafka.cruisecontrol.exception.KafkaCruiseControlException;
import com.linkedin.kafka.cruisecontrol.exception.OptimizationFailureException;
import com.linkedin.kafka.cruisecontrol.executor.ExecutorState;
import com.linkedin.kafka.cruisecontrol.monitor.LoadMonitor;
import com.linkedin.kafka.cruisecontrol.monitor.task.LoadMonitorTaskRunner;
import com.linkedin.kafka.cruisecontrol.operation.EvenClusterLoadStateManager;
import io.confluent.databalancer.metrics.DataBalancerMetricsRegistry;
import io.confluent.databalancer.operation.SelfHealingEvenClusterLoadStateManager;
import io.confluent.databalancer.persistence.ApiStatePersistenceStore;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/detector/AnomalyDetector.class */
public class AnomalyDetector {
    static final String METRIC_REGISTRY_NAME = "AnomalyDetector";
    private static final int INIT_JITTER_BOUND = 10000;
    private static final int NUM_ANOMALY_DETECTION_THREADS = 2;
    private static final Logger LOG = LoggerFactory.getLogger(AnomalyDetector.class);
    private final KafkaCruiseControl kafkaCruiseControl;
    private AnomalyNotifier anomalyNotifier;
    private GoalViolationDetector goalViolationDetector;
    private BrokerFailureDetector brokerFailureDetector;
    private ScheduledExecutorService detectorScheduler;
    private final long anomalyDetectionIntervalMs;
    private final LinkedBlockingDeque<Anomaly> anomalies;
    private volatile boolean shutdown;
    private final LoadMonitor loadMonitor;
    private AnomalyDetectorState anomalydetectorstate;
    private List<String> selfHealingGoals;
    private ExecutorService anomalyLoggerExecutor;
    private volatile Anomaly anomalyInProgress;
    private final AtomicLong numCheckedWithDelay;
    private final Object shutdownLock;
    private final KafkaCruiseControlConfig config;
    private final Time time;
    private final ConfluentAdmin adminClient;
    private final DataBalancerMetricsRegistry metricRegistry;
    private final ApiStatePersistenceStore persistenceStore;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/detector/AnomalyDetector$AnomalyHandlerTask.class */
    public class AnomalyHandlerTask implements Runnable {
        AnomalyHandlerTask() {
        }

        @Override // java.lang.Runnable
        public void run() {
            AnomalyDetector.LOG.info("Starting anomaly handler");
            while (true) {
                boolean z = false;
                AnomalyDetector.this.anomalyInProgress = null;
                try {
                    AnomalyDetector.this.anomalyInProgress = (Anomaly) AnomalyDetector.this.anomalies.takeFirst();
                    AnomalyDetector.LOG.trace("Processing anomaly {}.", AnomalyDetector.this.anomalyInProgress);
                } catch (KafkaCruiseControlException e) {
                    AnomalyDetector.LOG.warn("Anomaly handler received exception when trying to fix the anomaly {}.", AnomalyDetector.this.anomalyInProgress, e);
                    z = true;
                } catch (InterruptedException e2) {
                    AnomalyDetector.LOG.debug("Received interrupted exception.", e2);
                    z = true;
                } catch (Throwable th) {
                    AnomalyDetector.LOG.error("Uncaught exception in anomaly handler.", th);
                    z = true;
                }
                if (AnomalyDetector.this.anomalyInProgress == AnomalyDetectorUtils.SHUTDOWN_ANOMALY) {
                    AnomalyDetector.this.anomalyInProgress = null;
                    AnomalyDetector.LOG.info("Anomaly handler exited.");
                    return;
                } else {
                    handleAnomalyInProgress();
                    if (z && AnomalyDetector.this.anomalyInProgress != null) {
                        checkWithDelay(AnomalyDetector.this.anomalyDetectionIntervalMs);
                    }
                }
            }
        }

        private void handleAnomalyInProgress() throws Exception {
            AnomalyType anomalyType = AnomalyDetectorUtils.getAnomalyType(AnomalyDetector.this.anomalyInProgress);
            AnomalyDetector.this.anomalydetectorstate.addAnomalyDetection(anomalyType, AnomalyDetector.this.anomalyInProgress);
            ExecutorState.State executionState = AnomalyDetector.this.kafkaCruiseControl.executionState();
            if (executionState != ExecutorState.State.NO_TASK_IN_PROGRESS) {
                AnomalyDetector.LOG.debug("Schedule delayed check for anomaly {} because executor is in {} state", AnomalyDetector.this.anomalyInProgress, executionState);
                checkWithDelay(AnomalyDetector.this.anomalyDetectionIntervalMs);
            } else if (!AnomalyDetector.this.kafkaCruiseControl.executorIsReserved()) {
                processAnomalyInProgress(anomalyType);
            } else {
                AnomalyDetector.LOG.debug("Ignoring anomaly {} because the executor is reserved", AnomalyDetector.this.anomalyInProgress);
                AnomalyDetector.this.anomalydetectorstate.onAnomalyHandle(AnomalyDetector.this.anomalyInProgress, AnomalyState.Status.IGNORED);
            }
        }

        private void processAnomalyInProgress(AnomalyType anomalyType) throws Exception {
            AnomalyDetector.this.anomalydetectorstate.markAnomalyRate(anomalyType);
            AnomalyNotificationResult notifyAnomalyInProgress = notifyAnomalyInProgress(anomalyType);
            if (notifyAnomalyInProgress != null) {
                AnomalyDetector.this.anomalydetectorstate.maybeSetOngoingAnomalyDetectionTimeMs();
                switch (notifyAnomalyInProgress.action()) {
                    case FIX:
                        fixAnomalyInProgress(anomalyType);
                        return;
                    case CHECK:
                        checkWithDelay(notifyAnomalyInProgress.delay());
                        return;
                    case IGNORE:
                        AnomalyDetector.this.anomalydetectorstate.onAnomalyHandle(AnomalyDetector.this.anomalyInProgress, AnomalyState.Status.IGNORED);
                        return;
                    default:
                        throw new IllegalStateException("Unrecognized anomaly notification result.");
                }
            }
        }

        private AnomalyNotificationResult notifyAnomalyInProgress(AnomalyType anomalyType) {
            AnomalyNotificationResult onBrokerFailure;
            switch (anomalyType) {
                case GOAL_VIOLATION:
                    onBrokerFailure = AnomalyDetector.this.anomalyNotifier.onGoalViolation((GoalViolations) AnomalyDetector.this.anomalyInProgress);
                    break;
                case BROKER_FAILURE:
                    onBrokerFailure = AnomalyDetector.this.anomalyNotifier.onBrokerFailure((BrokerFailures) AnomalyDetector.this.anomalyInProgress);
                    break;
                default:
                    throw new IllegalStateException("Unrecognized anomaly type.");
            }
            AnomalyDetector.LOG.debug("Received notification result {}", onBrokerFailure);
            return onBrokerFailure;
        }

        private void checkWithDelay(long j) {
            if (AnomalyDetectorUtils.getAnomalyType(AnomalyDetector.this.anomalyInProgress) != AnomalyType.BROKER_FAILURE) {
                AnomalyDetector.this.anomalydetectorstate.onAnomalyHandle(AnomalyDetector.this.anomalyInProgress, AnomalyState.Status.IGNORED);
                return;
            }
            synchronized (AnomalyDetector.this.shutdownLock) {
                if (AnomalyDetector.this.shutdown) {
                    AnomalyDetector.LOG.debug("Skip delayed checking anomaly {}, because anomaly detector is shutting down.", AnomalyDetector.this.anomalyInProgress);
                } else {
                    AnomalyDetector.LOG.debug("Scheduling broker failure detection with delay of {} ms", Long.valueOf(j));
                    AnomalyDetector.this.numCheckedWithDelay.incrementAndGet();
                    ScheduledExecutorService scheduledExecutorService = AnomalyDetector.this.detectorScheduler;
                    BrokerFailureDetector brokerFailureDetector = AnomalyDetector.this.brokerFailureDetector;
                    brokerFailureDetector.getClass();
                    scheduledExecutorService.schedule(brokerFailureDetector::scheduleDetection, j, TimeUnit.MILLISECONDS);
                    AnomalyDetector.this.anomalydetectorstate.onAnomalyHandle(AnomalyDetector.this.anomalyInProgress, AnomalyState.Status.CHECK_WITH_DELAY);
                }
            }
        }

        private boolean isAnomalyInProgressReadyToFix(AnomalyType anomalyType) {
            LoadMonitorTaskRunner.LoadMonitorTaskRunnerState taskRunnerState = AnomalyDetector.this.loadMonitor.taskRunnerState();
            if (!ViolationUtils.isLoadMonitorReady(taskRunnerState)) {
                AnomalyDetector.LOG.info("Skipping {} fix because load monitor is in {} state.", anomalyType, taskRunnerState);
                AnomalyDetector.this.anomalydetectorstate.onAnomalyHandle(AnomalyDetector.this.anomalyInProgress, AnomalyState.Status.LOAD_MONITOR_NOT_READY);
                return false;
            }
            if (AnomalyDetector.this.kafkaCruiseControl.meetCompletenessRequirements(AnomalyDetector.this.selfHealingGoals)) {
                return true;
            }
            AnomalyDetector.LOG.warn("Skipping {} fix because load completeness requirement is not met for goals.", anomalyType);
            AnomalyDetector.this.anomalydetectorstate.onAnomalyHandle(AnomalyDetector.this.anomalyInProgress, AnomalyState.Status.COMPLETENESS_NOT_READY);
            return false;
        }

        private String optimizationResult(AnomalyType anomalyType) {
            switch (anomalyType) {
                case GOAL_VIOLATION:
                case BROKER_FAILURE:
                    return ((KafkaAnomaly) AnomalyDetector.this.anomalyInProgress).optimizationResult();
                default:
                    throw new IllegalStateException("Unrecognized anomaly type.");
            }
        }

        private void logSelfHealingOperation(String str, OptimizationFailureException optimizationFailureException, String str2) {
            if (str2 != null) {
                AnomalyDetector.LOG.info("[{}] Self-healing started successfully:\n{}", str, str2);
            } else {
                AnomalyDetector.LOG.warn("[{}] Self-healing failed to start:\n{}", str, optimizationFailureException);
            }
        }

        private void fixAnomalyInProgress(AnomalyType anomalyType) throws Exception {
            synchronized (AnomalyDetector.this.shutdownLock) {
                if (AnomalyDetector.this.shutdown) {
                    AnomalyDetector.LOG.info("Skip fixing anomaly {}, because anomaly detector is shutting down.", AnomalyDetector.this.anomalyInProgress);
                } else {
                    boolean isAnomalyInProgressReadyToFix = isAnomalyInProgressReadyToFix(anomalyType);
                    if (isAnomalyInProgressReadyToFix) {
                        AnomalyDetector.LOG.info("Fixing anomaly {}", AnomalyDetector.this.anomalyInProgress);
                        boolean z = false;
                        String anomalyId = AnomalyDetector.this.anomalyInProgress.anomalyId();
                        AnomalyDetector.this.anomalydetectorstate.onAnomalyHandle(AnomalyDetector.this.anomalyInProgress, AnomalyState.Status.ATTEMPTING_FIX);
                        try {
                            try {
                                z = AnomalyDetector.this.anomalyInProgress.fix();
                                String optimizationResult = optimizationResult(anomalyType);
                                AnomalyDetector.this.anomalyLoggerExecutor.submit(() -> {
                                    logSelfHealingOperation(anomalyId, null, optimizationResult);
                                });
                                AnomalyDetector.this.anomalydetectorstate.onAnomalyHandle(AnomalyDetector.this.anomalyInProgress, z ? AnomalyState.Status.FIX_STARTED : AnomalyState.Status.FIX_FAILED_TO_START);
                                if (z) {
                                    AnomalyDetector.this.anomalydetectorstate.incrementNumSelfHealingStarted();
                                    AnomalyDetector.LOG.info("[{}] Self-healing started successfully.", anomalyId);
                                } else {
                                    AnomalyDetector.this.markSelfHealingFinished(AnomalyDetector.this.anomalyInProgress.anomalyId());
                                    AnomalyDetector.this.anomalydetectorstate.incrementNumSelfHealingErrors();
                                    AnomalyDetector.LOG.warn("[{}] Self-healing failed to start.", anomalyId);
                                }
                            } catch (OptimizationFailureException e) {
                                AnomalyDetector.this.anomalyLoggerExecutor.submit(() -> {
                                    logSelfHealingOperation(anomalyId, e, null);
                                });
                                throw e;
                            }
                        } catch (Throwable th) {
                            AnomalyDetector.this.anomalydetectorstate.onAnomalyHandle(AnomalyDetector.this.anomalyInProgress, z ? AnomalyState.Status.FIX_STARTED : AnomalyState.Status.FIX_FAILED_TO_START);
                            if (z) {
                                AnomalyDetector.this.anomalydetectorstate.incrementNumSelfHealingStarted();
                                AnomalyDetector.LOG.info("[{}] Self-healing started successfully.", anomalyId);
                            } else {
                                AnomalyDetector.this.markSelfHealingFinished(AnomalyDetector.this.anomalyInProgress.anomalyId());
                                AnomalyDetector.this.anomalydetectorstate.incrementNumSelfHealingErrors();
                                AnomalyDetector.LOG.warn("[{}] Self-healing failed to start.", anomalyId);
                            }
                            throw th;
                        }
                    }
                    handlePostFixAnomaly(isAnomalyInProgressReadyToFix);
                }
            }
        }

        private void handlePostFixAnomaly(boolean z) {
            AnomalyDetector.this.anomalies.clear();
            ScheduledExecutorService scheduledExecutorService = AnomalyDetector.this.detectorScheduler;
            BrokerFailureDetector brokerFailureDetector = AnomalyDetector.this.brokerFailureDetector;
            brokerFailureDetector.getClass();
            scheduledExecutorService.schedule(brokerFailureDetector::scheduleDetection, z ? 0L : AnomalyDetector.this.anomalyDetectionIntervalMs, TimeUnit.MILLISECONDS);
        }
    }

    public AnomalyDetector(KafkaCruiseControlConfig kafkaCruiseControlConfig, ConfluentAdmin confluentAdmin, LoadMonitor loadMonitor, KafkaCruiseControl kafkaCruiseControl, Time time, DataBalancerMetricsRegistry dataBalancerMetricsRegistry, ApiStatePersistenceStore apiStatePersistenceStore) {
        this.anomalies = new LinkedBlockingDeque<>();
        this.config = kafkaCruiseControlConfig;
        this.adminClient = confluentAdmin;
        this.anomalyDetectionIntervalMs = kafkaCruiseControlConfig.getLong(KafkaCruiseControlConfig.ANOMALY_DETECTION_INTERVAL_MS_CONFIG).longValue();
        this.loadMonitor = loadMonitor;
        this.kafkaCruiseControl = kafkaCruiseControl;
        this.time = time;
        this.metricRegistry = dataBalancerMetricsRegistry;
        this.persistenceStore = apiStatePersistenceStore;
        this.shutdown = false;
        this.anomalyInProgress = null;
        this.numCheckedWithDelay = new AtomicLong();
        this.shutdownLock = new Object();
    }

    AnomalyDetector(LinkedBlockingDeque<Anomaly> linkedBlockingDeque, long j, KafkaCruiseControl kafkaCruiseControl, AnomalyNotifier anomalyNotifier, GoalViolationDetector goalViolationDetector, BrokerFailureDetector brokerFailureDetector, ScheduledExecutorService scheduledExecutorService, LoadMonitor loadMonitor) {
        this.config = null;
        this.adminClient = null;
        this.metricRegistry = null;
        this.persistenceStore = null;
        this.anomalies = linkedBlockingDeque;
        this.anomalyDetectionIntervalMs = j;
        this.anomalyNotifier = anomalyNotifier;
        this.goalViolationDetector = goalViolationDetector;
        this.brokerFailureDetector = brokerFailureDetector;
        this.kafkaCruiseControl = kafkaCruiseControl;
        this.time = Time.SYSTEM;
        this.detectorScheduler = scheduledExecutorService;
        this.shutdown = false;
        this.loadMonitor = loadMonitor;
        this.selfHealingGoals = Collections.emptyList();
        this.anomalyLoggerExecutor = Executors.newSingleThreadScheduledExecutor(new KafkaCruiseControlThreadFactory("AnomalyLogger", true, null));
        this.anomalyInProgress = null;
        this.numCheckedWithDelay = new AtomicLong();
        this.shutdownLock = new Object();
        this.anomalydetectorstate = new AnomalyDetectorState(new SystemTime(), new HashMap(AnomalyType.cachedValues().size()), 10, null);
    }

    public EvenClusterLoadStateManager init(Integer num, KafkaCruiseControl.CcStartupMode ccStartupMode) {
        this.anomalyNotifier = (AnomalyNotifier) this.config.getConfiguredInstance(KafkaCruiseControlConfig.ANOMALY_NOTIFIER_CLASS_CONFIG, AnomalyNotifier.class);
        this.selfHealingGoals = AnomalyDetectorUtils.getSelfHealingGoalNames(this.config);
        this.goalViolationDetector = new GoalViolationDetector(this.config, this.loadMonitor, this.anomalies, this.time, this.kafkaCruiseControl, this.selfHealingGoals, ccStartupMode);
        this.brokerFailureDetector = new BrokerFailureDetector(this.config, this.adminClient, this.loadMonitor, this.anomalies, this.time, this.kafkaCruiseControl, this.selfHealingGoals, this.persistenceStore);
        this.detectorScheduler = Executors.newScheduledThreadPool(2, new KafkaCruiseControlThreadFactory(METRIC_REGISTRY_NAME, false, LOG));
        int intValue = this.config.getInt(KafkaCruiseControlConfig.NUM_CACHED_RECENT_ANOMALY_STATES_CONFIG).intValue();
        this.anomalyLoggerExecutor = Executors.newSingleThreadScheduledExecutor(new KafkaCruiseControlThreadFactory("AnomalyLogger", true, null));
        DataBalancerMetricsRegistry dataBalancerMetricsRegistry = this.metricRegistry;
        GoalViolationDetector goalViolationDetector = this.goalViolationDetector;
        goalViolationDetector.getClass();
        dataBalancerMetricsRegistry.newGauge(AnomalyDetector.class, "balancedness-score", goalViolationDetector::balancednessScore);
        this.anomalydetectorstate = new AnomalyDetectorState(this.time, this.anomalyNotifier.selfHealingEnabled(), intValue, this.metricRegistry);
        return new SelfHealingEvenClusterLoadStateManager(num.intValue(), this.time, this.persistenceStore);
    }

    public void startDetection() {
        LOG.info("Starting anomaly detector.");
        this.brokerFailureDetector.startDetection();
        int nextInt = new Random().nextInt(INIT_JITTER_BOUND);
        LOG.debug("Starting goal violation detector with delay of {} ms", Integer.valueOf(nextInt));
        this.detectorScheduler.scheduleAtFixedRate(this.goalViolationDetector, (this.anomalyDetectionIntervalMs / 2) + nextInt, this.anomalyDetectionIntervalMs, TimeUnit.MILLISECONDS);
        this.detectorScheduler.submit(new AnomalyHandlerTask());
    }

    public void shutdown() {
        LOG.info("Shutting down anomaly detector.");
        synchronized (this.shutdownLock) {
            this.shutdown = true;
        }
        KafkaCruiseControlUtils.executeSilently(this.brokerFailureDetector, (v0) -> {
            v0.shutdownNow();
        });
        KafkaCruiseControlUtils.executeSilently(this.anomalyLoggerExecutor, (v0) -> {
            v0.shutdownNow();
        });
        this.anomalies.addFirst(AnomalyDetectorUtils.SHUTDOWN_ANOMALY);
        KafkaCruiseControlUtils.executeSilently(this.detectorScheduler, this::shutdownDetectorScheduler);
        LOG.info("Anomaly detector shutdown completed.");
    }

    BrokerFailureDetector getBrokerFailureDetector() {
        return this.brokerFailureDetector;
    }

    private void shutdownDetectorScheduler(ScheduledExecutorService scheduledExecutorService) {
        try {
            scheduledExecutorService.shutdown();
            scheduledExecutorService.awaitTermination(this.anomalyDetectionIntervalMs, TimeUnit.MILLISECONDS);
            if (!scheduledExecutorService.isTerminated()) {
                LOG.warn("The sampling scheduler failed to shutdown in " + this.anomalyDetectionIntervalMs + " ms.");
            }
        } catch (InterruptedException e) {
            LOG.warn("Interrupted while waiting for anomaly detector to shutdown.");
        }
    }

    public synchronized AnomalyDetectorState anomalyDetectorState() {
        this.anomalydetectorstate.refreshMetrics(this.anomalyNotifier.selfHealingEnabledRatio(), this.goalViolationDetector.balancednessScore());
        return this.anomalydetectorstate;
    }

    long numSelfHealingStarted() {
        return this.anomalydetectorstate.numSelfHealingStarted();
    }

    long numSelfHealingErrors() {
        return this.anomalydetectorstate.numSelfHealingErrors();
    }

    Anomaly ongoingSelfHealingAnomaly() {
        return this.anomalydetectorstate.ongoingSelfHealingAnomaly();
    }

    public void maybeClearOngoingAnomalyDetectionTimeMs() {
        this.anomalydetectorstate.maybeClearOngoingAnomalyDetectionTimeMs();
    }

    public boolean setSelfHealingFor(AnomalyType anomalyType, boolean z) {
        boolean selfHealingFor = this.anomalyNotifier.setSelfHealingFor(anomalyType, z);
        this.anomalydetectorstate.setSelfHealingFor(anomalyType, z);
        return selfHealingFor;
    }

    public long numCheckedWithDelay() {
        return this.numCheckedWithDelay.get();
    }

    public void markSelfHealingFinished(String str) {
        this.anomalydetectorstate.markSelfHealingFinished(str);
    }

    public void notifyNewBrokers(Set<Integer> set) {
        this.brokerFailureDetector.notifyNewBrokers(set);
        this.goalViolationDetector.notifyNewBrokers(set);
    }

    public void notifyDeadBrokers(Set<Integer> set) {
        this.brokerFailureDetector.notifyDeadBrokers(set);
        this.goalViolationDetector.notifyDeadBrokers(set);
    }
}
