package com.linkedin.kafka.cruisecontrol.detector;

import com.linkedin.cruisecontrol.detector.Anomaly;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControl;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUnitTestUtils;
import com.linkedin.kafka.cruisecontrol.RebalanceResult;
import com.linkedin.kafka.cruisecontrol.async.progress.OperationProgress;
import com.linkedin.kafka.cruisecontrol.common.KafkaCruiseControlThreadFactory;
import com.linkedin.kafka.cruisecontrol.detector.AnomalyDetector;
import com.linkedin.kafka.cruisecontrol.detector.AnomalyState;
import com.linkedin.kafka.cruisecontrol.detector.notifier.AnomalyNotificationResult;
import com.linkedin.kafka.cruisecontrol.detector.notifier.AnomalyNotifier;
import com.linkedin.kafka.cruisecontrol.detector.notifier.AnomalyType;
import com.linkedin.kafka.cruisecontrol.exception.KafkaCruiseControlException;
import com.linkedin.kafka.cruisecontrol.exception.OptimizationFailureException;
import com.linkedin.kafka.cruisecontrol.executor.ExecutorState;
import com.linkedin.kafka.cruisecontrol.monitor.LoadMonitor;
import com.linkedin.kafka.cruisecontrol.monitor.ModelCompletenessRequirements;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.easymock.IExpectationSetters;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/detector/AnomalyDetectorTest.class */
public class AnomalyDetectorTest {
    private static final long MOCK_ANOMALY_DETECTION_INTERVAL_MS = 3000;
    private static final long MOCK_DELAY_CHECK_MS = 1000;
    private static final Map<AnomalyType, Float> MOCK_SELF_HEALING_ENABLED_RATIO = new HashMap(AnomalyType.cachedValues().size());

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/detector/AnomalyDetectorTest$ExecutionNotPossibleMocker.class */
    public interface ExecutionNotPossibleMocker {
        void mockExecutionNotPossible(KafkaCruiseControl kafkaCruiseControl);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/detector/AnomalyDetectorTest$FixAnomalyFailureType.class */
    public enum FixAnomalyFailureType {
        SUCCESSFUL_FIX,
        PLAN_COMPUTATION_EXCEPTION,
        FAST_EXECUTOR_COMPLETION
    }

    private static void startPeriodicDetectors(ScheduledExecutorService scheduledExecutorService, GoalViolationDetector goalViolationDetector, ScheduledExecutorService scheduledExecutorService2) {
        EasyMock.expect(scheduledExecutorService.scheduleAtFixedRate((Runnable) EasyMock.eq(goalViolationDetector), EasyMock.anyLong(), EasyMock.eq(MOCK_ANOMALY_DETECTION_INTERVAL_MS), (TimeUnit) EasyMock.eq(TimeUnit.MILLISECONDS))).andReturn((Object) null);
        EasyMock.expect(scheduledExecutorService.submit((Runnable) EasyMock.isA(AnomalyDetector.AnomalyHandlerTask.class))).andDelegateTo(scheduledExecutorService2);
    }

    private static void shutdownDetector(ScheduledExecutorService scheduledExecutorService, ScheduledExecutorService scheduledExecutorService2) throws InterruptedException {
        scheduledExecutorService.shutdown();
        EasyMock.expectLastCall().andDelegateTo(scheduledExecutorService2);
        EasyMock.expect(Boolean.valueOf(scheduledExecutorService.awaitTermination(MOCK_ANOMALY_DETECTION_INTERVAL_MS, TimeUnit.MILLISECONDS))).andDelegateTo(scheduledExecutorService2);
        EasyMock.expect(Boolean.valueOf(scheduledExecutorService.isTerminated())).andDelegateTo(scheduledExecutorService2);
    }

    private static void replayMocks(AnomalyNotifier anomalyNotifier, BrokerFailureDetector brokerFailureDetector, GoalViolationDetector goalViolationDetector, ScheduledExecutorService scheduledExecutorService, KafkaCruiseControl kafkaCruiseControl) {
        EasyMock.replay(new Object[]{anomalyNotifier});
        EasyMock.replay(new Object[]{brokerFailureDetector});
        EasyMock.replay(new Object[]{goalViolationDetector});
        EasyMock.replay(new Object[]{scheduledExecutorService});
        EasyMock.replay(new Object[]{kafkaCruiseControl});
    }

    @Test
    public void testDelayedCheck() throws InterruptedException {
        LinkedBlockingDeque linkedBlockingDeque = new LinkedBlockingDeque();
        AnomalyNotifier anomalyNotifier = (AnomalyNotifier) EasyMock.mock(AnomalyNotifier.class);
        BrokerFailureDetector brokerFailureDetector = (BrokerFailureDetector) EasyMock.createNiceMock(BrokerFailureDetector.class);
        GoalViolationDetector goalViolationDetector = (GoalViolationDetector) EasyMock.createNiceMock(GoalViolationDetector.class);
        ScheduledExecutorService scheduledExecutorService = (ScheduledExecutorService) EasyMock.mock(ScheduledExecutorService.class);
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        EasyMock.expect(anomalyNotifier.onBrokerFailure((BrokerFailures) EasyMock.isA(BrokerFailures.class))).andReturn(AnomalyNotificationResult.check(MOCK_DELAY_CHECK_MS));
        EasyMock.expect(anomalyNotifier.selfHealingEnabledRatio()).andReturn(MOCK_SELF_HEALING_ENABLED_RATIO);
        KafkaCruiseControl mockKafkaCruiseControl = mockKafkaCruiseControl(KafkaCruiseControlUnitTestUtils.getKafkaCruiseControlProperties());
        startPeriodicDetectors(scheduledExecutorService, goalViolationDetector, newSingleThreadScheduledExecutor);
        EasyMock.expect(scheduledExecutorService.schedule((Runnable) EasyMock.isA(Runnable.class), EasyMock.eq(MOCK_DELAY_CHECK_MS), (TimeUnit) EasyMock.eq(TimeUnit.MILLISECONDS))).andReturn((Object) null);
        shutdownDetector(scheduledExecutorService, newSingleThreadScheduledExecutor);
        EasyMock.expect(mockKafkaCruiseControl.executionState()).andReturn(ExecutorState.State.NO_TASK_IN_PROGRESS);
        replayMocks(anomalyNotifier, brokerFailureDetector, goalViolationDetector, scheduledExecutorService, mockKafkaCruiseControl);
        AnomalyDetector anomalyDetector = new AnomalyDetector(linkedBlockingDeque, MOCK_ANOMALY_DETECTION_INTERVAL_MS, mockKafkaCruiseControl, anomalyNotifier, goalViolationDetector, brokerFailureDetector, scheduledExecutorService, (LoadMonitor) EasyMock.mock(LoadMonitor.class));
        try {
            anomalyDetector.startDetection();
            linkedBlockingDeque.add(new BrokerFailures(mockKafkaCruiseControl, Collections.singletonMap(0, 100L), false, true, true, Collections.emptyList()));
            do {
            } while (anomalyDetector.numCheckedWithDelay() < 1);
            anomalyDetector.shutdown();
            Assertions.assertEquals(0L, anomalyDetector.numSelfHealingStarted());
            Assertions.assertEquals(1L, anomalyDetector.numCheckedWithDelay());
            Assertions.assertTrue(newSingleThreadScheduledExecutor.awaitTermination(5000L, TimeUnit.MILLISECONDS));
            AnomalyDetectorState anomalyDetectorState = anomalyDetector.anomalyDetectorState();
            Assertions.assertEquals(((Long) anomalyDetectorState.metrics().get("numSelfHealingStarted")).longValue(), 0L);
            Assertions.assertEquals(((Map) anomalyDetectorState.recentAnomaliesByType().get(AnomalyType.BROKER_FAILURE)).size(), 1);
            Assertions.assertEquals(((Map) anomalyDetectorState.recentAnomaliesByType().get(AnomalyType.GOAL_VIOLATION)).size(), 0);
            newSingleThreadScheduledExecutor.shutdown();
            EasyMock.verify(new Object[]{anomalyNotifier, scheduledExecutorService, mockKafkaCruiseControl});
        } catch (Throwable th) {
            newSingleThreadScheduledExecutor.shutdown();
            throw th;
        }
    }

    @Test
    public void testSuccessfulFixGoalViolation() throws InterruptedException, KafkaCruiseControlException {
        testFixGoalViolation(FixAnomalyFailureType.SUCCESSFUL_FIX);
    }

    @Test
    public void testExecutionFailsFixGoalViolation() throws InterruptedException, KafkaCruiseControlException {
        testFixGoalViolation(FixAnomalyFailureType.FAST_EXECUTOR_COMPLETION);
    }

    @Test
    public void testExceptionalFixGoalViolation() throws InterruptedException, KafkaCruiseControlException {
        testFixGoalViolation(FixAnomalyFailureType.PLAN_COMPUTATION_EXCEPTION);
    }

    private void testFixGoalViolation(FixAnomalyFailureType fixAnomalyFailureType) throws InterruptedException, KafkaCruiseControlException {
        Supplier supplier;
        AnomalyState.Status status;
        Supplier supplier2;
        boolean z = fixAnomalyFailureType != FixAnomalyFailureType.PLAN_COMPUTATION_EXCEPTION;
        LinkedBlockingDeque linkedBlockingDeque = new LinkedBlockingDeque();
        AnomalyNotifier anomalyNotifier = (AnomalyNotifier) EasyMock.mock(AnomalyNotifier.class);
        BrokerFailureDetector brokerFailureDetector = (BrokerFailureDetector) EasyMock.createNiceMock(BrokerFailureDetector.class);
        GoalViolationDetector goalViolationDetector = (GoalViolationDetector) EasyMock.createNiceMock(GoalViolationDetector.class);
        ScheduledExecutorService scheduledExecutorService = (ScheduledExecutorService) EasyMock.mock(ScheduledExecutorService.class);
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        KafkaCruiseControl mockKafkaCruiseControl = mockKafkaCruiseControl(KafkaCruiseControlUnitTestUtils.getKafkaCruiseControlProperties());
        RebalanceResult rebalanceResult = (RebalanceResult) EasyMock.mock(RebalanceResult.class);
        AtomicReference atomicReference = new AtomicReference(null);
        AtomicReference atomicReference2 = new AtomicReference(null);
        AtomicReference atomicReference3 = new AtomicReference(null);
        startPeriodicDetectors(scheduledExecutorService, goalViolationDetector, newSingleThreadScheduledExecutor);
        shutdownDetector(scheduledExecutorService, newSingleThreadScheduledExecutor);
        EasyMock.expect(mockKafkaCruiseControl.executionState()).andReturn(ExecutorState.State.NO_TASK_IN_PROGRESS);
        EasyMock.expect(anomalyNotifier.selfHealingEnabledRatio()).andReturn(MOCK_SELF_HEALING_ENABLED_RATIO);
        EasyMock.expect(anomalyNotifier.onGoalViolation((GoalViolations) EasyMock.isA(GoalViolations.class))).andReturn(AnomalyNotificationResult.fix());
        EasyMock.expect(Boolean.valueOf(mockKafkaCruiseControl.meetCompletenessRequirements(Collections.emptyList()))).andReturn(true);
        IExpectationSetters expect = EasyMock.expect(mockKafkaCruiseControl.rebalance((List) EasyMock.eq(Collections.emptyList()), EasyMock.eq(false), (ModelCompletenessRequirements) EasyMock.eq((Object) null), (OperationProgress) EasyMock.anyObject(OperationProgress.class), EasyMock.eq(true), EasyMock.anyString(), EasyMock.eq(true), EasyMock.eq(true), EasyMock.eq(false), (Collection) EasyMock.eq(Collections.emptyList()), (Set) EasyMock.eq(Collections.emptySet()), EasyMock.eq(false)));
        Assertions.assertNotNull(expect);
        if (fixAnomalyFailureType == FixAnomalyFailureType.SUCCESSFUL_FIX) {
            expect.andReturn(rebalanceResult);
            EasyMock.expect(rebalanceResult.optimizerResult()).andReturn((Object) null);
            EasyMock.expect(Boolean.valueOf(rebalanceResult.wasExecuted())).andReturn(true);
            supplier = () -> {
                return Long.valueOf(((AnomalyDetector) atomicReference.get()).numSelfHealingStarted());
            };
            EasyMock.expect(scheduledExecutorService.schedule((Runnable) EasyMock.isA(Runnable.class), EasyMock.eq(0L), (TimeUnit) EasyMock.eq(TimeUnit.MILLISECONDS))).andReturn((Object) null);
            status = AnomalyState.Status.FIX_STARTED;
            supplier2 = () -> {
                return (Anomaly) atomicReference2.get();
            };
        } else if (fixAnomalyFailureType == FixAnomalyFailureType.PLAN_COMPUTATION_EXCEPTION) {
            expect.andThrow(new OptimizationFailureException("test failure"));
            supplier = () -> {
                return Long.valueOf(((AnomalyDetector) atomicReference.get()).numSelfHealingErrors());
            };
            status = AnomalyState.Status.IGNORED;
            supplier2 = () -> {
                return null;
            };
        } else if (fixAnomalyFailureType == FixAnomalyFailureType.FAST_EXECUTOR_COMPLETION) {
            expect.andAnswer(() -> {
                new Thread(() -> {
                    try {
                        ((AnomalyDetector) atomicReference.get()).markSelfHealingFinished(((Anomaly) atomicReference2.get()).anomalyId());
                    } catch (Exception e) {
                        atomicReference3.set(e);
                    }
                }).start();
                return rebalanceResult;
            });
            EasyMock.expect(rebalanceResult.optimizerResult()).andReturn((Object) null);
            EasyMock.expect(Boolean.valueOf(rebalanceResult.wasExecuted())).andReturn(true);
            supplier = () -> {
                return Long.valueOf(((AnomalyDetector) atomicReference.get()).numSelfHealingStarted());
            };
            EasyMock.expect(scheduledExecutorService.schedule((Runnable) EasyMock.isA(Runnable.class), EasyMock.eq(0L), (TimeUnit) EasyMock.eq(TimeUnit.MILLISECONDS))).andReturn((Object) null);
            status = AnomalyState.Status.FIX_STARTED;
            supplier2 = () -> {
                return null;
            };
        } else {
            supplier = null;
            status = null;
            supplier2 = null;
            Assertions.assertFalse(true, "Test was configured incorrectly, with a wrong failure type!");
        }
        if (fixAnomalyFailureType != FixAnomalyFailureType.PLAN_COMPUTATION_EXCEPTION) {
        }
        replayMocks(anomalyNotifier, brokerFailureDetector, goalViolationDetector, scheduledExecutorService, mockKafkaCruiseControl);
        EasyMock.replay(new Object[]{rebalanceResult});
        AnomalyDetector anomalyDetector = new AnomalyDetector(linkedBlockingDeque, MOCK_ANOMALY_DETECTION_INTERVAL_MS, mockKafkaCruiseControl, anomalyNotifier, goalViolationDetector, brokerFailureDetector, scheduledExecutorService, (LoadMonitor) EasyMock.mock(LoadMonitor.class));
        atomicReference.set(anomalyDetector);
        try {
            anomalyDetector.startDetection();
            GoalViolations goalViolations = new GoalViolations(mockKafkaCruiseControl, true, true, true, Collections.emptyList(), Collections.emptyList());
            goalViolations.addViolation("RackAwareGoal", true);
            atomicReference2.set(goalViolations);
            linkedBlockingDeque.add(goalViolations);
            Supplier supplier3 = supplier;
            TestUtils.waitForCondition(() -> {
                return ((Long) supplier3.get()).longValue() >= 1;
            }, 120000L, "Anomaly was not fixed in due time");
            anomalyDetector.shutdown();
            Assertions.assertEquals(1L, ((Long) supplier.get()).longValue());
            Assertions.assertEquals(0L, anomalyDetector.numCheckedWithDelay());
            Assertions.assertTrue(newSingleThreadScheduledExecutor.awaitTermination(5000L, TimeUnit.MILLISECONDS));
            AnomalyDetectorState anomalyDetectorState = anomalyDetector.anomalyDetectorState();
            Assertions.assertEquals(((Long) anomalyDetectorState.metrics().get("numSelfHealingStarted")).longValue(), z ? 1L : 0L);
            Assertions.assertEquals(((Map) anomalyDetectorState.recentAnomaliesByType().get(AnomalyType.BROKER_FAILURE)).size(), 0);
            Assertions.assertEquals(((Map) anomalyDetectorState.recentAnomaliesByType().get(AnomalyType.GOAL_VIOLATION)).size(), 1);
            EasyMock.verify(new Object[]{anomalyNotifier, scheduledExecutorService, mockKafkaCruiseControl});
            Assertions.assertNull(atomicReference3.get(), "Expected no exception, but got one:");
            Assertions.assertEquals(status, ((AnomalyState) ((Map) anomalyDetectorState.recentAnomaliesByType().get(AnomalyType.GOAL_VIOLATION)).get(((Anomaly) atomicReference2.get()).anomalyId())).status(), "Anomaly status doesn't match what was expected");
            Assertions.assertEquals(supplier2.get(), anomalyDetector.ongoingSelfHealingAnomaly(), "Current self-healing anomaly isn't what was expected");
            newSingleThreadScheduledExecutor.shutdown();
            EasyMock.verify(new Object[]{anomalyNotifier, scheduledExecutorService, mockKafkaCruiseControl});
        } catch (Throwable th) {
            newSingleThreadScheduledExecutor.shutdown();
            throw th;
        }
    }

    @Test
    public void testExecutionInProgress() throws InterruptedException {
        assertExecutorNotAvailable(kafkaCruiseControl -> {
            EasyMock.expect(kafkaCruiseControl.executionState()).andReturn(ExecutorState.State.INTER_BROKER_REPLICA_MOVEMENT_TASK_IN_PROGRESS);
        });
    }

    @Test
    public void testExecutorIsReserved() throws InterruptedException {
        assertExecutorNotAvailable(kafkaCruiseControl -> {
            EasyMock.expect(kafkaCruiseControl.executionState()).andReturn(ExecutorState.State.NO_TASK_IN_PROGRESS);
            EasyMock.expect(Boolean.valueOf(kafkaCruiseControl.executorIsReserved())).andReturn(true);
        });
    }

    private KafkaCruiseControl mockKafkaCruiseControl(Properties properties) {
        KafkaCruiseControl kafkaCruiseControl = (KafkaCruiseControl) EasyMock.mock(KafkaCruiseControl.class);
        EasyMock.expect(Boolean.valueOf(kafkaCruiseControl.executorIsReserved())).andReturn(false).anyTimes();
        return kafkaCruiseControl;
    }

    public void assertExecutorNotAvailable(ExecutionNotPossibleMocker executionNotPossibleMocker) throws InterruptedException {
        LinkedBlockingDeque linkedBlockingDeque = new LinkedBlockingDeque();
        AnomalyNotifier anomalyNotifier = (AnomalyNotifier) EasyMock.mock(AnomalyNotifier.class);
        BrokerFailureDetector brokerFailureDetector = (BrokerFailureDetector) EasyMock.createNiceMock(BrokerFailureDetector.class);
        GoalViolationDetector goalViolationDetector = (GoalViolationDetector) EasyMock.createNiceMock(GoalViolationDetector.class);
        ScheduledExecutorService scheduledExecutorService = (ScheduledExecutorService) EasyMock.mock(ScheduledExecutorService.class);
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        KafkaCruiseControl kafkaCruiseControl = (KafkaCruiseControl) EasyMock.mock(KafkaCruiseControl.class);
        startPeriodicDetectors(scheduledExecutorService, goalViolationDetector, newSingleThreadScheduledExecutor);
        shutdownDetector(scheduledExecutorService, newSingleThreadScheduledExecutor);
        EasyMock.expect(anomalyNotifier.selfHealingEnabledRatio()).andReturn(MOCK_SELF_HEALING_ENABLED_RATIO);
        executionNotPossibleMocker.mockExecutionNotPossible(kafkaCruiseControl);
        replayMocks(anomalyNotifier, brokerFailureDetector, goalViolationDetector, scheduledExecutorService, kafkaCruiseControl);
        AnomalyDetector anomalyDetector = new AnomalyDetector(linkedBlockingDeque, MOCK_ANOMALY_DETECTION_INTERVAL_MS, kafkaCruiseControl, anomalyNotifier, goalViolationDetector, brokerFailureDetector, scheduledExecutorService, (LoadMonitor) EasyMock.mock(LoadMonitor.class));
        try {
            anomalyDetector.startDetection();
            linkedBlockingDeque.add(new GoalViolations(kafkaCruiseControl, true, true, true, Collections.emptyList(), Collections.emptyList()));
            linkedBlockingDeque.getClass();
            TestUtils.waitForCondition(linkedBlockingDeque::isEmpty, "Expected the anomalies to be emptied out;");
            anomalyDetector.shutdown();
            Assertions.assertEquals(0L, anomalyDetector.numSelfHealingStarted());
            Assertions.assertEquals(0L, anomalyDetector.numCheckedWithDelay());
            Assertions.assertTrue(newSingleThreadScheduledExecutor.awaitTermination(5000L, TimeUnit.MILLISECONDS));
            AnomalyDetectorState anomalyDetectorState = anomalyDetector.anomalyDetectorState();
            Assertions.assertEquals(((Long) anomalyDetectorState.metrics().get("numSelfHealingStarted")).longValue(), 0L);
            Assertions.assertEquals(((Map) anomalyDetectorState.recentAnomaliesByType().get(AnomalyType.BROKER_FAILURE)).size(), 0);
            Assertions.assertEquals(((Map) anomalyDetectorState.recentAnomaliesByType().get(AnomalyType.GOAL_VIOLATION)).size(), 1);
            newSingleThreadScheduledExecutor.shutdown();
            EasyMock.verify(new Object[]{anomalyNotifier, scheduledExecutorService, kafkaCruiseControl});
        } catch (Throwable th) {
            newSingleThreadScheduledExecutor.shutdown();
            throw th;
        }
    }

    @Test
    public void testShutdown() throws InterruptedException {
        AnomalyNotifier anomalyNotifier = (AnomalyNotifier) EasyMock.createNiceMock(AnomalyNotifier.class);
        BrokerFailureDetector brokerFailureDetector = (BrokerFailureDetector) EasyMock.createNiceMock(BrokerFailureDetector.class);
        GoalViolationDetector goalViolationDetector = (GoalViolationDetector) EasyMock.createNiceMock(GoalViolationDetector.class);
        KafkaCruiseControl kafkaCruiseControl = (KafkaCruiseControl) EasyMock.createNiceMock(KafkaCruiseControl.class);
        ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(2, new KafkaCruiseControlThreadFactory("AnomalyDetector", false, (Logger) null));
        AnomalyDetector anomalyDetector = new AnomalyDetector(new LinkedBlockingDeque(), MOCK_ANOMALY_DETECTION_INTERVAL_MS, kafkaCruiseControl, anomalyNotifier, goalViolationDetector, brokerFailureDetector, newScheduledThreadPool, (LoadMonitor) EasyMock.mock(LoadMonitor.class));
        anomalyDetector.shutdown();
        anomalyDetector.getClass();
        Thread thread = new Thread(anomalyDetector::shutdown);
        thread.start();
        thread.join(30000L);
        Assertions.assertEquals(0L, anomalyDetector.numSelfHealingStarted());
        Assertions.assertEquals(0L, anomalyDetector.numCheckedWithDelay());
        Assertions.assertTrue(newScheduledThreadPool.isTerminated());
    }

    @Test
    public void testBrokerNotification() throws InterruptedException {
        AnomalyNotifier anomalyNotifier = (AnomalyNotifier) EasyMock.createNiceMock(AnomalyNotifier.class);
        BrokerFailureDetector brokerFailureDetector = (BrokerFailureDetector) EasyMock.createNiceMock(BrokerFailureDetector.class);
        GoalViolationDetector goalViolationDetector = (GoalViolationDetector) EasyMock.createNiceMock(GoalViolationDetector.class);
        AnomalyDetector anomalyDetector = new AnomalyDetector(new LinkedBlockingDeque(), MOCK_ANOMALY_DETECTION_INTERVAL_MS, (KafkaCruiseControl) EasyMock.createNiceMock(KafkaCruiseControl.class), anomalyNotifier, goalViolationDetector, brokerFailureDetector, Executors.newScheduledThreadPool(2, new KafkaCruiseControlThreadFactory("AnomalyDetector", false, (Logger) null)), (LoadMonitor) EasyMock.mock(LoadMonitor.class));
        HashSet hashSet = new HashSet(Arrays.asList(11, 12));
        HashSet hashSet2 = new HashSet(Arrays.asList(8, 9));
        goalViolationDetector.notifyNewBrokers((Set) EasyMock.eq(hashSet));
        goalViolationDetector.notifyDeadBrokers((Set) EasyMock.eq(hashSet2));
        brokerFailureDetector.notifyNewBrokers((Set) EasyMock.eq(hashSet));
        brokerFailureDetector.notifyDeadBrokers((Set) EasyMock.eq(hashSet2));
        EasyMock.replay(new Object[]{brokerFailureDetector, goalViolationDetector});
        anomalyDetector.notifyNewBrokers(hashSet);
        anomalyDetector.notifyDeadBrokers(hashSet2);
        anomalyDetector.shutdown();
        EasyMock.verify(new Object[]{brokerFailureDetector, goalViolationDetector});
    }

    static {
        Iterator it = AnomalyType.cachedValues().iterator();
        while (it.hasNext()) {
            MOCK_SELF_HEALING_ENABLED_RATIO.put((AnomalyType) it.next(), Float.valueOf(0.99f));
        }
    }
}
