package com.linkedin.kafka.cruisecontrol.detector;

import com.linkedin.kafka.cruisecontrol.KafkaCruiseControl;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUnitTestUtils;
import com.linkedin.kafka.cruisecontrol.async.progress.OperationProgress;
import com.linkedin.kafka.cruisecontrol.common.KafkaCruiseControlThreadFactory;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.detector.AnomalyDetector;
import com.linkedin.kafka.cruisecontrol.detector.notifier.AnomalyNotificationResult;
import com.linkedin.kafka.cruisecontrol.detector.notifier.AnomalyNotifier;
import com.linkedin.kafka.cruisecontrol.detector.notifier.AnomalyType;
import com.linkedin.kafka.cruisecontrol.exception.KafkaCruiseControlException;
import com.linkedin.kafka.cruisecontrol.executor.ExecutorState;
import com.linkedin.kafka.cruisecontrol.executor.strategy.ReplicaMovementStrategy;
import com.linkedin.kafka.cruisecontrol.monitor.LoadMonitor;
import com.linkedin.kafka.cruisecontrol.monitor.ModelCompletenessRequirements;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/detector/AnomalyDetectorTest.class */
public class AnomalyDetectorTest {
    private static final long MOCK_ANOMALY_DETECTION_INTERVAL_MS = 3000;
    private static final long MOCK_DELAY_CHECK_MS = 1000;
    private static final Map<AnomalyType, Float> MOCK_SELF_HEALING_ENABLED_RATIO = new HashMap(AnomalyType.cachedValues().size());

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/detector/AnomalyDetectorTest$ExecutionNotPossibleMocker.class */
    public interface ExecutionNotPossibleMocker {
        void mockExecutionNotPossible(KafkaCruiseControl kafkaCruiseControl);
    }

    private static void startPeriodicDetectors(ScheduledExecutorService scheduledExecutorService, GoalViolationDetector goalViolationDetector, ScheduledExecutorService scheduledExecutorService2) {
        EasyMock.expect(scheduledExecutorService.scheduleAtFixedRate((Runnable) EasyMock.eq(goalViolationDetector), EasyMock.anyLong(), EasyMock.eq(MOCK_ANOMALY_DETECTION_INTERVAL_MS), (TimeUnit) EasyMock.eq(TimeUnit.MILLISECONDS))).andReturn((Object) null);
        EasyMock.expect(scheduledExecutorService.submit((Runnable) EasyMock.isA(AnomalyDetector.AnomalyHandlerTask.class))).andDelegateTo(scheduledExecutorService2);
    }

    private static void shutdownDetector(ScheduledExecutorService scheduledExecutorService, ScheduledExecutorService scheduledExecutorService2) throws InterruptedException {
        scheduledExecutorService.shutdown();
        EasyMock.expectLastCall().andDelegateTo(scheduledExecutorService2);
        EasyMock.expect(Boolean.valueOf(scheduledExecutorService.awaitTermination(MOCK_ANOMALY_DETECTION_INTERVAL_MS, TimeUnit.MILLISECONDS))).andDelegateTo(scheduledExecutorService2);
        EasyMock.expect(Boolean.valueOf(scheduledExecutorService.isTerminated())).andDelegateTo(scheduledExecutorService2);
    }

    private static void replayMocks(AnomalyNotifier anomalyNotifier, BrokerFailureDetector brokerFailureDetector, GoalViolationDetector goalViolationDetector, ScheduledExecutorService scheduledExecutorService, KafkaCruiseControl kafkaCruiseControl) {
        EasyMock.replay(new Object[]{anomalyNotifier});
        EasyMock.replay(new Object[]{brokerFailureDetector});
        EasyMock.replay(new Object[]{goalViolationDetector});
        EasyMock.replay(new Object[]{scheduledExecutorService});
        EasyMock.replay(new Object[]{kafkaCruiseControl});
    }

    @Test
    public void testDelayedCheck() throws InterruptedException {
        LinkedBlockingDeque linkedBlockingDeque = new LinkedBlockingDeque();
        ConfluentAdmin confluentAdmin = (ConfluentAdmin) EasyMock.createNiceMock(ConfluentAdmin.class);
        AnomalyNotifier anomalyNotifier = (AnomalyNotifier) EasyMock.mock(AnomalyNotifier.class);
        BrokerFailureDetector brokerFailureDetector = (BrokerFailureDetector) EasyMock.createNiceMock(BrokerFailureDetector.class);
        GoalViolationDetector goalViolationDetector = (GoalViolationDetector) EasyMock.createNiceMock(GoalViolationDetector.class);
        ScheduledExecutorService scheduledExecutorService = (ScheduledExecutorService) EasyMock.mock(ScheduledExecutorService.class);
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        EasyMock.expect(anomalyNotifier.onBrokerFailure((BrokerFailures) EasyMock.isA(BrokerFailures.class))).andReturn(AnomalyNotificationResult.check(MOCK_DELAY_CHECK_MS));
        EasyMock.expect(anomalyNotifier.selfHealingEnabledRatio()).andReturn(MOCK_SELF_HEALING_ENABLED_RATIO);
        KafkaCruiseControl mockKafkaCruiseControl = mockKafkaCruiseControl(KafkaCruiseControlUnitTestUtils.getKafkaCruiseControlProperties());
        startPeriodicDetectors(scheduledExecutorService, goalViolationDetector, newSingleThreadScheduledExecutor);
        EasyMock.expect(scheduledExecutorService.schedule((Runnable) EasyMock.isA(Runnable.class), EasyMock.eq(MOCK_DELAY_CHECK_MS), (TimeUnit) EasyMock.eq(TimeUnit.MILLISECONDS))).andReturn((Object) null);
        shutdownDetector(scheduledExecutorService, newSingleThreadScheduledExecutor);
        EasyMock.expect(mockKafkaCruiseControl.executionState()).andReturn(ExecutorState.State.NO_TASK_IN_PROGRESS);
        replayMocks(anomalyNotifier, brokerFailureDetector, goalViolationDetector, scheduledExecutorService, mockKafkaCruiseControl);
        AnomalyDetector anomalyDetector = new AnomalyDetector(linkedBlockingDeque, confluentAdmin, MOCK_ANOMALY_DETECTION_INTERVAL_MS, mockKafkaCruiseControl, anomalyNotifier, goalViolationDetector, brokerFailureDetector, scheduledExecutorService, (LoadMonitor) EasyMock.mock(LoadMonitor.class));
        try {
            anomalyDetector.startDetection();
            linkedBlockingDeque.add(new BrokerFailures(mockKafkaCruiseControl, Collections.singletonMap(0, 100L), false, true, true, Collections.emptyList()));
            do {
            } while (anomalyDetector.numCheckedWithDelay() < 1);
            anomalyDetector.shutdown();
            Assert.assertEquals(0L, anomalyDetector.numSelfHealingStarted());
            Assert.assertEquals(1L, anomalyDetector.numCheckedWithDelay());
            Assert.assertTrue(newSingleThreadScheduledExecutor.awaitTermination(5000L, TimeUnit.MILLISECONDS));
            Assert.assertEquals(((Long) anomalyDetector.anomalyDetectorState().metrics().get("numSelfHealingStarted")).longValue(), 0L);
            Assert.assertEquals(((Map) r0.recentAnomaliesByType().get(AnomalyType.BROKER_FAILURE)).size(), 1L);
            Assert.assertEquals(((Map) r0.recentAnomaliesByType().get(AnomalyType.GOAL_VIOLATION)).size(), 0L);
            newSingleThreadScheduledExecutor.shutdown();
            EasyMock.verify(new Object[]{anomalyNotifier, scheduledExecutorService, mockKafkaCruiseControl});
        } catch (Throwable th) {
            newSingleThreadScheduledExecutor.shutdown();
            throw th;
        }
    }

    @Test
    public void testFixGoalViolation() throws InterruptedException, KafkaCruiseControlException {
        AnomalyType anomalyType = AnomalyType.GOAL_VIOLATION;
        LinkedBlockingDeque linkedBlockingDeque = new LinkedBlockingDeque();
        ConfluentAdmin confluentAdmin = (ConfluentAdmin) EasyMock.createNiceMock(ConfluentAdmin.class);
        AnomalyNotifier anomalyNotifier = (AnomalyNotifier) EasyMock.mock(AnomalyNotifier.class);
        BrokerFailureDetector brokerFailureDetector = (BrokerFailureDetector) EasyMock.createNiceMock(BrokerFailureDetector.class);
        GoalViolationDetector goalViolationDetector = (GoalViolationDetector) EasyMock.createNiceMock(GoalViolationDetector.class);
        ScheduledExecutorService scheduledExecutorService = (ScheduledExecutorService) EasyMock.mock(ScheduledExecutorService.class);
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        KafkaCruiseControl mockKafkaCruiseControl = mockKafkaCruiseControl(KafkaCruiseControlUnitTestUtils.getKafkaCruiseControlProperties());
        startPeriodicDetectors(scheduledExecutorService, goalViolationDetector, newSingleThreadScheduledExecutor);
        shutdownDetector(scheduledExecutorService, newSingleThreadScheduledExecutor);
        EasyMock.expect(mockKafkaCruiseControl.executionState()).andReturn(ExecutorState.State.NO_TASK_IN_PROGRESS);
        EasyMock.expect(anomalyNotifier.selfHealingEnabledRatio()).andReturn(MOCK_SELF_HEALING_ENABLED_RATIO);
        EasyMock.expect(anomalyNotifier.onGoalViolation((GoalViolations) EasyMock.isA(GoalViolations.class))).andReturn(AnomalyNotificationResult.fix());
        EasyMock.expect(mockKafkaCruiseControl.rebalance((List) EasyMock.eq(Collections.emptyList()), EasyMock.eq(false), (ModelCompletenessRequirements) EasyMock.eq((Object) null), (OperationProgress) EasyMock.anyObject(OperationProgress.class), EasyMock.eq(true), (Integer) EasyMock.eq((Object) null), (Integer) EasyMock.eq((Object) null), (Integer) EasyMock.eq((Object) null), (ReplicaMovementStrategy) EasyMock.eq((Object) null), EasyMock.anyString(), EasyMock.eq(true), EasyMock.eq(true), EasyMock.eq(false), (Set) EasyMock.eq(Collections.emptySet()), EasyMock.eq(false))).andReturn((Object) null);
        EasyMock.expect(Boolean.valueOf(mockKafkaCruiseControl.meetCompletenessRequirements(Collections.emptyList()))).andReturn(true);
        EasyMock.expect(scheduledExecutorService.schedule((Runnable) EasyMock.isA(Runnable.class), EasyMock.eq(0L), (TimeUnit) EasyMock.eq(TimeUnit.MILLISECONDS))).andReturn((Object) null);
        replayMocks(anomalyNotifier, brokerFailureDetector, goalViolationDetector, scheduledExecutorService, mockKafkaCruiseControl);
        AnomalyDetector anomalyDetector = new AnomalyDetector(linkedBlockingDeque, confluentAdmin, MOCK_ANOMALY_DETECTION_INTERVAL_MS, mockKafkaCruiseControl, anomalyNotifier, goalViolationDetector, brokerFailureDetector, scheduledExecutorService, (LoadMonitor) EasyMock.mock(LoadMonitor.class));
        try {
            anomalyDetector.startDetection();
            if (anomalyType == AnomalyType.GOAL_VIOLATION) {
                GoalViolations goalViolations = new GoalViolations(mockKafkaCruiseControl, true, true, true, Collections.emptyList());
                goalViolations.addViolation("RackAwareGoal", true);
                linkedBlockingDeque.add(goalViolations);
            }
            TestUtils.waitForCondition(() -> {
                return anomalyDetector.numSelfHealingStarted() >= 1;
            }, 120000L, "Anomaly was not fixed in due time");
            anomalyDetector.shutdown();
            Assert.assertEquals(1L, anomalyDetector.numSelfHealingStarted());
            Assert.assertEquals(0L, anomalyDetector.numCheckedWithDelay());
            Assert.assertTrue(newSingleThreadScheduledExecutor.awaitTermination(5000L, TimeUnit.MILLISECONDS));
            Assert.assertEquals(((Long) anomalyDetector.anomalyDetectorState().metrics().get("numSelfHealingStarted")).longValue(), 1L);
            Assert.assertEquals(((Map) r0.recentAnomaliesByType().get(AnomalyType.BROKER_FAILURE)).size(), 0L);
            Assert.assertEquals(((Map) r0.recentAnomaliesByType().get(AnomalyType.GOAL_VIOLATION)).size(), anomalyType == AnomalyType.GOAL_VIOLATION ? 1L : 0L);
            EasyMock.verify(new Object[]{anomalyNotifier, scheduledExecutorService, mockKafkaCruiseControl});
            newSingleThreadScheduledExecutor.shutdown();
            EasyMock.verify(new Object[]{anomalyNotifier, scheduledExecutorService, mockKafkaCruiseControl});
        } catch (Throwable th) {
            newSingleThreadScheduledExecutor.shutdown();
            throw th;
        }
    }

    @Test
    public void testExecutionInProgress() throws InterruptedException {
        assertExecutorNotAvailable(kafkaCruiseControl -> {
            EasyMock.expect(kafkaCruiseControl.executionState()).andReturn(ExecutorState.State.INTER_BROKER_REPLICA_MOVEMENT_TASK_IN_PROGRESS);
        });
    }

    @Test
    public void testExecutorIsReserved() throws InterruptedException {
        assertExecutorNotAvailable(kafkaCruiseControl -> {
            EasyMock.expect(kafkaCruiseControl.executionState()).andReturn(ExecutorState.State.NO_TASK_IN_PROGRESS);
            EasyMock.expect(Boolean.valueOf(kafkaCruiseControl.executorIsReserved())).andReturn(true);
        });
    }

    private KafkaCruiseControl mockKafkaCruiseControl(Properties properties) {
        KafkaCruiseControl kafkaCruiseControl = (KafkaCruiseControl) EasyMock.mock(KafkaCruiseControl.class);
        EasyMock.expect(kafkaCruiseControl.config()).andReturn(new KafkaCruiseControlConfig(properties)).anyTimes();
        EasyMock.expect(Boolean.valueOf(kafkaCruiseControl.executorIsReserved())).andReturn(false).anyTimes();
        return kafkaCruiseControl;
    }

    public void assertExecutorNotAvailable(ExecutionNotPossibleMocker executionNotPossibleMocker) throws InterruptedException {
        LinkedBlockingDeque linkedBlockingDeque = new LinkedBlockingDeque();
        ConfluentAdmin confluentAdmin = (ConfluentAdmin) EasyMock.createNiceMock(ConfluentAdmin.class);
        AnomalyNotifier anomalyNotifier = (AnomalyNotifier) EasyMock.mock(AnomalyNotifier.class);
        BrokerFailureDetector brokerFailureDetector = (BrokerFailureDetector) EasyMock.createNiceMock(BrokerFailureDetector.class);
        GoalViolationDetector goalViolationDetector = (GoalViolationDetector) EasyMock.createNiceMock(GoalViolationDetector.class);
        ScheduledExecutorService scheduledExecutorService = (ScheduledExecutorService) EasyMock.mock(ScheduledExecutorService.class);
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        KafkaCruiseControl kafkaCruiseControl = (KafkaCruiseControl) EasyMock.mock(KafkaCruiseControl.class);
        EasyMock.expect(kafkaCruiseControl.config()).andReturn(new KafkaCruiseControlConfig(KafkaCruiseControlUnitTestUtils.getKafkaCruiseControlProperties())).anyTimes();
        startPeriodicDetectors(scheduledExecutorService, goalViolationDetector, newSingleThreadScheduledExecutor);
        shutdownDetector(scheduledExecutorService, newSingleThreadScheduledExecutor);
        EasyMock.expect(anomalyNotifier.selfHealingEnabledRatio()).andReturn(MOCK_SELF_HEALING_ENABLED_RATIO);
        executionNotPossibleMocker.mockExecutionNotPossible(kafkaCruiseControl);
        replayMocks(anomalyNotifier, brokerFailureDetector, goalViolationDetector, scheduledExecutorService, kafkaCruiseControl);
        AnomalyDetector anomalyDetector = new AnomalyDetector(linkedBlockingDeque, confluentAdmin, MOCK_ANOMALY_DETECTION_INTERVAL_MS, kafkaCruiseControl, anomalyNotifier, goalViolationDetector, brokerFailureDetector, scheduledExecutorService, (LoadMonitor) EasyMock.mock(LoadMonitor.class));
        try {
            anomalyDetector.startDetection();
            linkedBlockingDeque.add(new GoalViolations(kafkaCruiseControl, true, true, true, Collections.emptyList()));
            linkedBlockingDeque.getClass();
            TestUtils.waitForCondition(linkedBlockingDeque::isEmpty, "Expected the anomalies to be emptied out;");
            anomalyDetector.shutdown();
            Assert.assertEquals(0L, anomalyDetector.numSelfHealingStarted());
            Assert.assertEquals(0L, anomalyDetector.numCheckedWithDelay());
            Assert.assertTrue(newSingleThreadScheduledExecutor.awaitTermination(5000L, TimeUnit.MILLISECONDS));
            Assert.assertEquals(((Long) anomalyDetector.anomalyDetectorState().metrics().get("numSelfHealingStarted")).longValue(), 0L);
            Assert.assertEquals(((Map) r0.recentAnomaliesByType().get(AnomalyType.BROKER_FAILURE)).size(), 0L);
            Assert.assertEquals(((Map) r0.recentAnomaliesByType().get(AnomalyType.GOAL_VIOLATION)).size(), 1L);
            newSingleThreadScheduledExecutor.shutdown();
            EasyMock.verify(new Object[]{anomalyNotifier, scheduledExecutorService, kafkaCruiseControl});
        } catch (Throwable th) {
            newSingleThreadScheduledExecutor.shutdown();
            throw th;
        }
    }

    @Test
    public void testShutdown() throws InterruptedException {
        AnomalyNotifier anomalyNotifier = (AnomalyNotifier) EasyMock.createNiceMock(AnomalyNotifier.class);
        ConfluentAdmin confluentAdmin = (ConfluentAdmin) EasyMock.createNiceMock(ConfluentAdmin.class);
        BrokerFailureDetector brokerFailureDetector = (BrokerFailureDetector) EasyMock.createNiceMock(BrokerFailureDetector.class);
        GoalViolationDetector goalViolationDetector = (GoalViolationDetector) EasyMock.createNiceMock(GoalViolationDetector.class);
        KafkaCruiseControl kafkaCruiseControl = (KafkaCruiseControl) EasyMock.createNiceMock(KafkaCruiseControl.class);
        ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(2, new KafkaCruiseControlThreadFactory("AnomalyDetector", false, (Logger) null));
        AnomalyDetector anomalyDetector = new AnomalyDetector(new LinkedBlockingDeque(), confluentAdmin, MOCK_ANOMALY_DETECTION_INTERVAL_MS, kafkaCruiseControl, anomalyNotifier, goalViolationDetector, brokerFailureDetector, newScheduledThreadPool, (LoadMonitor) EasyMock.mock(LoadMonitor.class));
        anomalyDetector.shutdown();
        anomalyDetector.getClass();
        Thread thread = new Thread(anomalyDetector::shutdown);
        thread.start();
        thread.join(30000L);
        Assert.assertEquals(0L, anomalyDetector.numSelfHealingStarted());
        Assert.assertEquals(0L, anomalyDetector.numCheckedWithDelay());
        Assert.assertTrue(newScheduledThreadPool.isTerminated());
    }

    static {
        Iterator it = AnomalyType.cachedValues().iterator();
        while (it.hasNext()) {
            MOCK_SELF_HEALING_ENABLED_RATIO.put((AnomalyType) it.next(), Float.valueOf(0.99f));
        }
    }
}
