package com.linkedin.kafka.cruisecontrol.detector;

import com.linkedin.cruisecontrol.detector.Anomaly;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControl;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.common.KafkaCruiseControlThreadFactory;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.detector.AnomalyState;
import com.linkedin.kafka.cruisecontrol.detector.notifier.AnomalyNotificationResult;
import com.linkedin.kafka.cruisecontrol.detector.notifier.AnomalyNotifier;
import com.linkedin.kafka.cruisecontrol.detector.notifier.AnomalyType;
import com.linkedin.kafka.cruisecontrol.exception.KafkaCruiseControlException;
import com.linkedin.kafka.cruisecontrol.exception.OptimizationFailureException;
import com.linkedin.kafka.cruisecontrol.executor.ExecutorState;
import com.linkedin.kafka.cruisecontrol.monitor.LoadMonitor;
import com.linkedin.kafka.cruisecontrol.monitor.task.LoadMonitorTaskRunner;
import io.confluent.databalancer.metrics.DataBalancerMetricsRegistry;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.zookeeper.client.ZKClientConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/detector/AnomalyDetector.class */
public class AnomalyDetector {
    static final String METRIC_REGISTRY_NAME = "AnomalyDetector";
    private static final int INIT_JITTER_BOUND = 10000;
    private static final int NUM_ANOMALY_DETECTION_THREADS = 2;
    private static final Logger LOG = LoggerFactory.getLogger(AnomalyDetector.class);
    private static final Logger OPERATION_LOG = LoggerFactory.getLogger(KafkaCruiseControlUtils.OPERATION_LOGGER);
    private final KafkaCruiseControl _kafkaCruiseControl;
    private final AnomalyNotifier _anomalyNotifier;
    private final ConfluentAdmin _adminClient;
    private final GoalViolationDetector _goalViolationDetector;
    private final BrokerFailureDetector _brokerFailureDetector;
    private final ScheduledExecutorService _detectorScheduler;
    private final long _anomalyDetectionIntervalMs;
    private final LinkedBlockingDeque<Anomaly> _anomalies;
    private volatile boolean _shutdown;
    private final LoadMonitor _loadMonitor;
    private final AnomalyDetectorState _anomalyDetectorState;
    private final List<String> _selfHealingGoals;
    private final ExecutorService _anomalyLoggerExecutor;
    private volatile Anomaly _anomalyInProgress;
    private AtomicLong _numCheckedWithDelay;
    private final Object _shutdownLock;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.linkedin.kafka.cruisecontrol.detector.AnomalyDetector$1, reason: invalid class name */
    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/detector/AnomalyDetector$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$linkedin$kafka$cruisecontrol$detector$notifier$AnomalyNotificationResult$Action;
        static final /* synthetic */ int[] $SwitchMap$com$linkedin$kafka$cruisecontrol$detector$notifier$AnomalyType = new int[AnomalyType.values().length];

        static {
            try {
                $SwitchMap$com$linkedin$kafka$cruisecontrol$detector$notifier$AnomalyType[AnomalyType.GOAL_VIOLATION.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$linkedin$kafka$cruisecontrol$detector$notifier$AnomalyType[AnomalyType.BROKER_FAILURE.ordinal()] = AnomalyDetector.NUM_ANOMALY_DETECTION_THREADS;
            } catch (NoSuchFieldError e2) {
            }
            $SwitchMap$com$linkedin$kafka$cruisecontrol$detector$notifier$AnomalyNotificationResult$Action = new int[AnomalyNotificationResult.Action.values().length];
            try {
                $SwitchMap$com$linkedin$kafka$cruisecontrol$detector$notifier$AnomalyNotificationResult$Action[AnomalyNotificationResult.Action.FIX.ordinal()] = 1;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$com$linkedin$kafka$cruisecontrol$detector$notifier$AnomalyNotificationResult$Action[AnomalyNotificationResult.Action.CHECK.ordinal()] = AnomalyDetector.NUM_ANOMALY_DETECTION_THREADS;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$com$linkedin$kafka$cruisecontrol$detector$notifier$AnomalyNotificationResult$Action[AnomalyNotificationResult.Action.IGNORE.ordinal()] = 3;
            } catch (NoSuchFieldError e5) {
            }
        }
    }

    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/detector/AnomalyDetector$AnomalyHandlerTask.class */
    class AnomalyHandlerTask implements Runnable {
        AnomalyHandlerTask() {
        }

        @Override // java.lang.Runnable
        public void run() {
            AnomalyDetector.LOG.info("Starting anomaly handler");
            while (true) {
                boolean z = false;
                AnomalyDetector.this._anomalyInProgress = null;
                try {
                    AnomalyDetector.this._anomalyInProgress = (Anomaly) AnomalyDetector.this._anomalies.takeFirst();
                    AnomalyDetector.LOG.trace("Processing anomaly {}.", AnomalyDetector.this._anomalyInProgress);
                } catch (KafkaCruiseControlException e) {
                    AnomalyDetector.LOG.warn("Anomaly handler received exception when trying to fix the anomaly {}.", AnomalyDetector.this._anomalyInProgress, e);
                    z = true;
                } catch (InterruptedException e2) {
                    AnomalyDetector.LOG.debug("Received interrupted exception.", e2);
                    z = true;
                } catch (Throwable th) {
                    AnomalyDetector.LOG.error("Uncaught exception in anomaly handler.", th);
                    z = true;
                }
                if (AnomalyDetector.this._anomalyInProgress == AnomalyDetectorUtils.SHUTDOWN_ANOMALY) {
                    AnomalyDetector.this._anomalyInProgress = null;
                    AnomalyDetector.LOG.info("Anomaly handler exited.");
                    return;
                } else {
                    handleAnomalyInProgress();
                    if (z && AnomalyDetector.this._anomalyInProgress != null) {
                        checkWithDelay(AnomalyDetector.this._anomalyDetectionIntervalMs);
                    }
                }
            }
        }

        private void handleAnomalyInProgress() throws Exception {
            AnomalyType anomalyType = AnomalyDetectorUtils.getAnomalyType(AnomalyDetector.this._anomalyInProgress);
            AnomalyDetector.this._anomalyDetectorState.addAnomalyDetection(anomalyType, AnomalyDetector.this._anomalyInProgress);
            ExecutorState.State executionState = AnomalyDetector.this._kafkaCruiseControl.executionState();
            if (executionState != ExecutorState.State.NO_TASK_IN_PROGRESS) {
                AnomalyDetector.LOG.debug("Schedule delayed check for anomaly {} because executor is in {} state", AnomalyDetector.this._anomalyInProgress, executionState);
                checkWithDelay(AnomalyDetector.this._anomalyDetectionIntervalMs);
            } else if (!AnomalyDetector.this._kafkaCruiseControl.executorIsReserved()) {
                processAnomalyInProgress(anomalyType);
            } else {
                AnomalyDetector.LOG.debug("Ignoring anomaly {} because the executor is reserved", AnomalyDetector.this._anomalyInProgress);
                AnomalyDetector.this._anomalyDetectorState.onAnomalyHandle(AnomalyDetector.this._anomalyInProgress, AnomalyState.Status.IGNORED);
            }
        }

        private void processAnomalyInProgress(AnomalyType anomalyType) throws Exception {
            AnomalyDetector.this._anomalyDetectorState.markAnomalyRate(anomalyType);
            AnomalyNotificationResult notifyAnomalyInProgress = notifyAnomalyInProgress(anomalyType);
            if (notifyAnomalyInProgress != null) {
                AnomalyDetector.this._anomalyDetectorState.maybeSetOngoingAnomalyDetectionTimeMs();
                switch (AnonymousClass1.$SwitchMap$com$linkedin$kafka$cruisecontrol$detector$notifier$AnomalyNotificationResult$Action[notifyAnomalyInProgress.action().ordinal()]) {
                    case 1:
                        fixAnomalyInProgress(anomalyType);
                        return;
                    case AnomalyDetector.NUM_ANOMALY_DETECTION_THREADS /* 2 */:
                        checkWithDelay(notifyAnomalyInProgress.delay());
                        return;
                    case 3:
                        AnomalyDetector.this._anomalyDetectorState.onAnomalyHandle(AnomalyDetector.this._anomalyInProgress, AnomalyState.Status.IGNORED);
                        return;
                    default:
                        throw new IllegalStateException("Unrecognized anomaly notification result.");
                }
            }
        }

        private AnomalyNotificationResult notifyAnomalyInProgress(AnomalyType anomalyType) {
            AnomalyNotificationResult onBrokerFailure;
            switch (AnonymousClass1.$SwitchMap$com$linkedin$kafka$cruisecontrol$detector$notifier$AnomalyType[anomalyType.ordinal()]) {
                case 1:
                    onBrokerFailure = AnomalyDetector.this._anomalyNotifier.onGoalViolation((GoalViolations) AnomalyDetector.this._anomalyInProgress);
                    break;
                case AnomalyDetector.NUM_ANOMALY_DETECTION_THREADS /* 2 */:
                    onBrokerFailure = AnomalyDetector.this._anomalyNotifier.onBrokerFailure((BrokerFailures) AnomalyDetector.this._anomalyInProgress);
                    break;
                default:
                    throw new IllegalStateException("Unrecognized anomaly type.");
            }
            AnomalyDetector.LOG.debug("Received notification result {}", onBrokerFailure);
            return onBrokerFailure;
        }

        private void checkWithDelay(long j) {
            if (AnomalyDetectorUtils.getAnomalyType(AnomalyDetector.this._anomalyInProgress) != AnomalyType.BROKER_FAILURE) {
                AnomalyDetector.this._anomalyDetectorState.onAnomalyHandle(AnomalyDetector.this._anomalyInProgress, AnomalyState.Status.IGNORED);
                return;
            }
            synchronized (AnomalyDetector.this._shutdownLock) {
                if (AnomalyDetector.this._shutdown) {
                    AnomalyDetector.LOG.debug("Skip delayed checking anomaly {}, because anomaly detector is shutting down.", AnomalyDetector.this._anomalyInProgress);
                } else {
                    AnomalyDetector.LOG.debug("Scheduling broker failure detection with delay of {} ms", Long.valueOf(j));
                    AnomalyDetector.this._numCheckedWithDelay.incrementAndGet();
                    ScheduledExecutorService scheduledExecutorService = AnomalyDetector.this._detectorScheduler;
                    BrokerFailureDetector brokerFailureDetector = AnomalyDetector.this._brokerFailureDetector;
                    brokerFailureDetector.getClass();
                    scheduledExecutorService.schedule(brokerFailureDetector::scheduleDetection, j, TimeUnit.MILLISECONDS);
                    AnomalyDetector.this._anomalyDetectorState.onAnomalyHandle(AnomalyDetector.this._anomalyInProgress, AnomalyState.Status.CHECK_WITH_DELAY);
                }
            }
        }

        private boolean isAnomalyInProgressReadyToFix(AnomalyType anomalyType) {
            LoadMonitorTaskRunner.LoadMonitorTaskRunnerState taskRunnerState = AnomalyDetector.this._loadMonitor.taskRunnerState();
            if (!ViolationUtils.isLoadMonitorReady(taskRunnerState)) {
                AnomalyDetector.LOG.info("Skipping {} fix because load monitor is in {} state.", anomalyType, taskRunnerState);
                AnomalyDetector.this._anomalyDetectorState.onAnomalyHandle(AnomalyDetector.this._anomalyInProgress, AnomalyState.Status.LOAD_MONITOR_NOT_READY);
                return false;
            }
            if (AnomalyDetector.this._kafkaCruiseControl.meetCompletenessRequirements(AnomalyDetector.this._selfHealingGoals)) {
                return true;
            }
            AnomalyDetector.LOG.warn("Skipping {} fix because load completeness requirement is not met for goals.", anomalyType);
            AnomalyDetector.this._anomalyDetectorState.onAnomalyHandle(AnomalyDetector.this._anomalyInProgress, AnomalyState.Status.COMPLETENESS_NOT_READY);
            return false;
        }

        private String optimizationResult(AnomalyType anomalyType) {
            switch (AnonymousClass1.$SwitchMap$com$linkedin$kafka$cruisecontrol$detector$notifier$AnomalyType[anomalyType.ordinal()]) {
                case 1:
                case AnomalyDetector.NUM_ANOMALY_DETECTION_THREADS /* 2 */:
                    return ((KafkaAnomaly) AnomalyDetector.this._anomalyInProgress).optimizationResult(false);
                default:
                    throw new IllegalStateException("Unrecognized anomaly type.");
            }
        }

        private void logSelfHealingOperation(String str, OptimizationFailureException optimizationFailureException, String str2) {
            if (str2 != null) {
                AnomalyDetector.OPERATION_LOG.info("[{}] Self-healing started successfully:\n{}", str, str2);
            } else {
                AnomalyDetector.OPERATION_LOG.warn("[{}] Self-healing failed to start:\n{}", str, optimizationFailureException);
            }
        }

        private void fixAnomalyInProgress(AnomalyType anomalyType) throws Exception {
            synchronized (AnomalyDetector.this._shutdownLock) {
                if (AnomalyDetector.this._shutdown) {
                    AnomalyDetector.LOG.info("Skip fixing anomaly {}, because anomaly detector is shutting down.", AnomalyDetector.this._anomalyInProgress);
                } else {
                    boolean isAnomalyInProgressReadyToFix = isAnomalyInProgressReadyToFix(anomalyType);
                    if (isAnomalyInProgressReadyToFix) {
                        AnomalyDetector.LOG.info("Fixing anomaly {}", AnomalyDetector.this._anomalyInProgress);
                        boolean z = false;
                        String anomalyId = AnomalyDetector.this._anomalyInProgress.anomalyId();
                        try {
                            try {
                                z = AnomalyDetector.this._anomalyInProgress.fix();
                                String optimizationResult = optimizationResult(anomalyType);
                                AnomalyDetector.this._anomalyLoggerExecutor.submit(() -> {
                                    logSelfHealingOperation(anomalyId, null, optimizationResult);
                                });
                                AnomalyDetector.this._anomalyDetectorState.onAnomalyHandle(AnomalyDetector.this._anomalyInProgress, z ? AnomalyState.Status.FIX_STARTED : AnomalyState.Status.FIX_FAILED_TO_START);
                                if (z) {
                                    AnomalyDetector.this._anomalyDetectorState.incrementNumSelfHealingStarted();
                                    AnomalyDetector.LOG.info("[{}] Self-healing started successfully.", anomalyId);
                                } else {
                                    AnomalyDetector.LOG.warn("[{}] Self-healing failed to start.", anomalyId);
                                }
                            } catch (OptimizationFailureException e) {
                                AnomalyDetector.this._anomalyLoggerExecutor.submit(() -> {
                                    logSelfHealingOperation(anomalyId, e, null);
                                });
                                throw e;
                            }
                        } catch (Throwable th) {
                            AnomalyDetector.this._anomalyDetectorState.onAnomalyHandle(AnomalyDetector.this._anomalyInProgress, z ? AnomalyState.Status.FIX_STARTED : AnomalyState.Status.FIX_FAILED_TO_START);
                            if (z) {
                                AnomalyDetector.this._anomalyDetectorState.incrementNumSelfHealingStarted();
                                AnomalyDetector.LOG.info("[{}] Self-healing started successfully.", anomalyId);
                            } else {
                                AnomalyDetector.LOG.warn("[{}] Self-healing failed to start.", anomalyId);
                            }
                            throw th;
                        }
                    }
                    handlePostFixAnomaly(isAnomalyInProgressReadyToFix);
                }
            }
        }

        private void handlePostFixAnomaly(boolean z) {
            AnomalyDetector.this._anomalies.clear();
            ScheduledExecutorService scheduledExecutorService = AnomalyDetector.this._detectorScheduler;
            BrokerFailureDetector brokerFailureDetector = AnomalyDetector.this._brokerFailureDetector;
            brokerFailureDetector.getClass();
            scheduledExecutorService.schedule(brokerFailureDetector::scheduleDetection, z ? 0L : AnomalyDetector.this._anomalyDetectionIntervalMs, TimeUnit.MILLISECONDS);
        }
    }

    public AnomalyDetector(KafkaCruiseControlConfig kafkaCruiseControlConfig, Option<ZKClientConfig> option, LoadMonitor loadMonitor, KafkaCruiseControl kafkaCruiseControl, Time time, DataBalancerMetricsRegistry dataBalancerMetricsRegistry) {
        this._anomalies = new LinkedBlockingDeque<>();
        this._adminClient = KafkaCruiseControlUtils.createAdmin(kafkaCruiseControlConfig.originals());
        this._anomalyDetectionIntervalMs = kafkaCruiseControlConfig.getLong(KafkaCruiseControlConfig.ANOMALY_DETECTION_INTERVAL_MS_CONFIG).longValue();
        this._anomalyNotifier = (AnomalyNotifier) kafkaCruiseControlConfig.getConfiguredInstance(KafkaCruiseControlConfig.ANOMALY_NOTIFIER_CLASS_CONFIG, AnomalyNotifier.class);
        this._loadMonitor = loadMonitor;
        this._kafkaCruiseControl = kafkaCruiseControl;
        this._selfHealingGoals = AnomalyDetectorUtils.getSelfHealingGoalNames(kafkaCruiseControlConfig);
        this._goalViolationDetector = new GoalViolationDetector(kafkaCruiseControlConfig, this._loadMonitor, this._anomalies, time, this._kafkaCruiseControl, this._selfHealingGoals);
        this._brokerFailureDetector = new BrokerFailureDetector(kafkaCruiseControlConfig, option, this._loadMonitor, this._anomalies, time, this._kafkaCruiseControl, this._selfHealingGoals);
        this._detectorScheduler = Executors.newScheduledThreadPool(NUM_ANOMALY_DETECTION_THREADS, new KafkaCruiseControlThreadFactory(METRIC_REGISTRY_NAME, false, LOG));
        this._shutdown = false;
        int intValue = kafkaCruiseControlConfig.getInt(KafkaCruiseControlConfig.NUM_CACHED_RECENT_ANOMALY_STATES_CONFIG).intValue();
        this._anomalyLoggerExecutor = Executors.newSingleThreadScheduledExecutor(new KafkaCruiseControlThreadFactory("AnomalyLogger", true, null));
        this._anomalyInProgress = null;
        this._numCheckedWithDelay = new AtomicLong();
        this._shutdownLock = new Object();
        GoalViolationDetector goalViolationDetector = this._goalViolationDetector;
        goalViolationDetector.getClass();
        dataBalancerMetricsRegistry.newGauge(AnomalyDetector.class, "balancedness-score", goalViolationDetector::balancednessScore);
        this._anomalyDetectorState = new AnomalyDetectorState(time, this._anomalyNotifier.selfHealingEnabled(), intValue, dataBalancerMetricsRegistry);
    }

    AnomalyDetector(LinkedBlockingDeque<Anomaly> linkedBlockingDeque, ConfluentAdmin confluentAdmin, long j, KafkaCruiseControl kafkaCruiseControl, AnomalyNotifier anomalyNotifier, GoalViolationDetector goalViolationDetector, BrokerFailureDetector brokerFailureDetector, ScheduledExecutorService scheduledExecutorService, LoadMonitor loadMonitor) {
        this._anomalies = linkedBlockingDeque;
        this._adminClient = confluentAdmin;
        this._anomalyDetectionIntervalMs = j;
        this._anomalyNotifier = anomalyNotifier;
        this._goalViolationDetector = goalViolationDetector;
        this._brokerFailureDetector = brokerFailureDetector;
        this._kafkaCruiseControl = kafkaCruiseControl;
        this._detectorScheduler = scheduledExecutorService;
        this._shutdown = false;
        this._loadMonitor = loadMonitor;
        this._selfHealingGoals = Collections.emptyList();
        this._anomalyLoggerExecutor = Executors.newSingleThreadScheduledExecutor(new KafkaCruiseControlThreadFactory("AnomalyLogger", true, null));
        this._anomalyInProgress = null;
        this._numCheckedWithDelay = new AtomicLong();
        this._shutdownLock = new Object();
        this._anomalyDetectorState = new AnomalyDetectorState(new SystemTime(), new HashMap(AnomalyType.cachedValues().size()), 10, null);
    }

    public void startDetection() {
        LOG.info("Starting anomaly detector.");
        this._brokerFailureDetector.startDetection();
        int nextInt = new Random().nextInt(INIT_JITTER_BOUND);
        LOG.debug("Starting goal violation detector with delay of {} ms", Integer.valueOf(nextInt));
        this._detectorScheduler.scheduleAtFixedRate(this._goalViolationDetector, (this._anomalyDetectionIntervalMs / 2) + nextInt, this._anomalyDetectionIntervalMs, TimeUnit.MILLISECONDS);
        this._detectorScheduler.submit(new AnomalyHandlerTask());
    }

    public void shutdown() {
        LOG.info("Shutting down anomaly detector.");
        synchronized (this._shutdownLock) {
            this._shutdown = true;
        }
        this._anomalies.addFirst(AnomalyDetectorUtils.SHUTDOWN_ANOMALY);
        this._detectorScheduler.shutdown();
        KafkaCruiseControlUtils.closeAdminClientWithTimeout(this._adminClient, 0L);
        try {
            this._detectorScheduler.awaitTermination(this._anomalyDetectionIntervalMs, TimeUnit.MILLISECONDS);
            if (!this._detectorScheduler.isTerminated()) {
                LOG.warn("The sampling scheduler failed to shutdown in " + this._anomalyDetectionIntervalMs + " ms.");
            }
        } catch (InterruptedException e) {
            LOG.warn("Interrupted while waiting for anomaly detector to shutdown.");
        }
        this._brokerFailureDetector.shutdownNow();
        this._anomalyLoggerExecutor.shutdownNow();
        LOG.info("Anomaly detector shutdown completed.");
    }

    public synchronized AnomalyDetectorState anomalyDetectorState() {
        this._anomalyDetectorState.refreshMetrics(this._anomalyNotifier.selfHealingEnabledRatio(), this._goalViolationDetector.balancednessScore());
        return this._anomalyDetectorState;
    }

    long numSelfHealingStarted() {
        return this._anomalyDetectorState.numSelfHealingStarted();
    }

    public void maybeClearOngoingAnomalyDetectionTimeMs() {
        this._anomalyDetectorState.maybeClearOngoingAnomalyDetectionTimeMs();
    }

    public boolean setSelfHealingFor(AnomalyType anomalyType, boolean z) {
        boolean selfHealingFor = this._anomalyNotifier.setSelfHealingFor(anomalyType, z);
        this._anomalyDetectorState.setSelfHealingFor(anomalyType, z);
        return selfHealingFor;
    }

    public long numCheckedWithDelay() {
        return this._numCheckedWithDelay.get();
    }

    public void markSelfHealingFinished(String str) {
        this._anomalyDetectorState.markSelfHealingFinished(str);
    }
}
