package com.linkedin.kafka.cruisecontrol.detector;

import com.linkedin.cruisecontrol.detector.Anomaly;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControl;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUnitTestUtils;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCEmbeddedBroker;
import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaIntegrationTestHarness;
import com.linkedin.kafka.cruisecontrol.monitor.LoadMonitor;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Properties;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import scala.Option;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/detector/BrokerFailureDetectorTest.class */
public class BrokerFailureDetectorTest extends CCKafkaIntegrationTestHarness {
    @Override // com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaIntegrationTestHarness
    public int clusterSize() {
        return 2;
    }

    @Override // com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaIntegrationTestHarness, com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCAbstractZookeeperTestHarness
    @Before
    public void setUp() {
        super.setUp();
    }

    @Override // com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaIntegrationTestHarness, com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCAbstractZookeeperTestHarness
    @After
    public void tearDown() {
        super.tearDown();
    }

    @Test
    public void testFailureDetection() throws Exception {
        MockTime mockTime = getMockTime();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        BrokerFailureDetector createBrokerFailureDetector = createBrokerFailureDetector(concurrentLinkedQueue, mockTime);
        try {
            createBrokerFailureDetector.startDetection();
            Assert.assertTrue(concurrentLinkedQueue.isEmpty());
            killBroker(0);
            long currentTimeMillis = System.currentTimeMillis();
            while (concurrentLinkedQueue.isEmpty() && System.currentTimeMillis() < currentTimeMillis + 30000) {
            }
            Assert.assertEquals("One broker failure should have been detected before timeout.", 1L, concurrentLinkedQueue.size());
            Anomaly peek = concurrentLinkedQueue.peek();
            Assert.assertTrue("The anomaly should be BrokerFailure", peek instanceof BrokerFailures);
            Assert.assertEquals("The failed broker should be 0 and time should be 100L", Collections.singletonMap(0, 100L), ((BrokerFailures) peek).failedBrokers());
            restartDeadBroker(0);
            long currentTimeMillis2 = System.currentTimeMillis();
            while (createBrokerFailureDetector.failedBrokers().isEmpty() && System.currentTimeMillis() < currentTimeMillis2 + 30000) {
            }
            Assert.assertTrue(createBrokerFailureDetector.failedBrokers().isEmpty());
            createBrokerFailureDetector.shutdownNow();
        } catch (Throwable th) {
            createBrokerFailureDetector.shutdownNow();
            throw th;
        }
    }

    @Test
    public void testDetectorStartWithFailedBrokers() throws Exception {
        BrokerFailureDetector createBrokerFailureDetector = createBrokerFailureDetector(new ConcurrentLinkedQueue(), getMockTime());
        try {
            killBroker(0);
            createBrokerFailureDetector.startDetection();
            long currentTimeMillis = System.currentTimeMillis();
            while (createBrokerFailureDetector.failedBrokers().isEmpty() && System.currentTimeMillis() < currentTimeMillis + 30000) {
            }
            Assert.assertEquals(Collections.singletonMap(0, 100L), createBrokerFailureDetector.failedBrokers());
            createBrokerFailureDetector.shutdownNow();
        } catch (Throwable th) {
            createBrokerFailureDetector.shutdownNow();
            throw th;
        }
    }

    @Test
    public void testLoadFailedBrokersFromZK() throws Exception {
        MockTime mockTime = getMockTime();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        BrokerFailureDetector createBrokerFailureDetector = createBrokerFailureDetector(concurrentLinkedQueue, mockTime);
        try {
            createBrokerFailureDetector.startDetection();
            killBroker(0);
            long currentTimeMillis = System.currentTimeMillis();
            while (concurrentLinkedQueue.isEmpty() && System.currentTimeMillis() < currentTimeMillis + 30000) {
            }
            Assert.assertEquals(Collections.singletonMap(0, 100L), createBrokerFailureDetector.failedBrokers());
            createBrokerFailureDetector.shutdownNow();
            mockTime.sleep(100L);
            createBrokerFailureDetector = createBrokerFailureDetector(concurrentLinkedQueue, mockTime);
            createBrokerFailureDetector.startDetection();
            long currentTimeMillis2 = System.currentTimeMillis();
            while (createBrokerFailureDetector.failedBrokers().isEmpty() && System.currentTimeMillis() < currentTimeMillis2 + 30000) {
            }
            Assert.assertEquals(Collections.singletonMap(0, 100L), createBrokerFailureDetector.failedBrokers());
            createBrokerFailureDetector.shutdownNow();
        } catch (Throwable th) {
            createBrokerFailureDetector.shutdownNow();
            throw th;
        }
    }

    private BrokerFailureDetector createBrokerFailureDetector(Queue<Anomaly> queue, Time time) {
        LoadMonitor loadMonitor = (LoadMonitor) EasyMock.mock(LoadMonitor.class);
        KafkaCruiseControl kafkaCruiseControl = (KafkaCruiseControl) EasyMock.mock(KafkaCruiseControl.class);
        EasyMock.expect(loadMonitor.brokersWithReplicas(EasyMock.anyInt())).andAnswer(() -> {
            return new HashSet(Arrays.asList(0, 1));
        }).anyTimes();
        EasyMock.replay(new Object[]{loadMonitor});
        Properties kafkaCruiseControlProperties = KafkaCruiseControlUnitTestUtils.getKafkaCruiseControlProperties();
        kafkaCruiseControlProperties.setProperty("zookeeper.connect", zookeeper().connectionString());
        kafkaCruiseControlProperties.setProperty("zookeeper.security.enabled", "false");
        KafkaCruiseControlConfig kafkaCruiseControlConfig = new KafkaCruiseControlConfig(kafkaCruiseControlProperties);
        EasyMock.expect(kafkaCruiseControl.config()).andReturn(kafkaCruiseControlConfig).atLeastOnce();
        EasyMock.replay(new Object[]{kafkaCruiseControl});
        return new BrokerFailureDetector(kafkaCruiseControlConfig, Option.empty(), loadMonitor, queue, time, kafkaCruiseControl, Collections.emptyList());
    }

    private void killBroker(int i) throws Exception {
        CCEmbeddedBroker cCEmbeddedBroker = this._brokers.get(Integer.valueOf(i));
        cCEmbeddedBroker.shutdown();
        cCEmbeddedBroker.awaitShutdown();
    }

    private void restartDeadBroker(int i) throws Exception {
        this._brokers.get(Integer.valueOf(i)).startup();
    }

    private MockTime getMockTime() {
        return new MockTime(0L, 100L, TimeUnit.NANOSECONDS.convert(100L, TimeUnit.MILLISECONDS));
    }
}
