package com.linkedin.kafka.cruisecontrol.detector;

import com.linkedin.cruisecontrol.detector.Anomaly;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import io.confluent.databalancer.KafkaDataBalanceManager;
import io.confluent.databalancer.TestConstants;
import io.confluent.databalancer.integration.DataBalancerClusterTestHarness;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import kafka.common.AliveBrokersSnapshot;
import kafka.server.KafkaServer;
import org.apache.kafka.clients.admin.NewPartitionReassignment;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

@Tag(TestConstants.INTEGRATION_TEST)
/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/detector/BrokerFailureDetectorTest.class */
public class BrokerFailureDetectorTest extends DataBalancerClusterTestHarness {
    Time mockTime;
    KafkaDataBalanceManager dataBalancer;
    BrokerFailureDetector detector;
    Queue<Anomaly> anomalies;
    List<Anomaly> addedAnomalies;

    @Override // io.confluent.databalancer.integration.DataBalancerClusterTestHarness
    public int initialBrokerCount() {
        return 3;
    }

    @Override // io.confluent.databalancer.integration.DataBalancerClusterTestHarness
    @BeforeEach
    public void setUp() throws Exception {
        super.setUp();
        this.mockTime = getMockTime();
        this.dataBalancer = getDataBalancer();
        this.detector = getBrokerFailureDetector();
    }

    @Override // io.confluent.databalancer.integration.DataBalancerClusterTestHarness
    protected Properties injectTestSpecificProperties(Properties properties) {
        properties.put("confluent.license.topic.replication.factor", Short.valueOf((short) initialBrokerCount()));
        properties.put("confluent.balancer.topic.replication.factor", Short.valueOf((short) (initialBrokerCount() - 1)));
        return properties;
    }

    @Test
    public void testFailureDetection() throws InterruptedException, ExecutionException {
        Assertions.assertTrue(this.anomalies.isEmpty());
        int brokerId = notControllerKafkaServer().config().brokerId();
        excludeBroker(brokerId);
        killBroker(brokerId);
        TestUtils.waitForCondition(() -> {
            return !this.addedAnomalies.isEmpty();
        }, "One broker failure should have been detected before timeout.");
        Assertions.assertEquals(1, this.addedAnomalies.size(), "One broker failure should have been detected before timeout.");
        BrokerFailures brokerFailures = (Anomaly) this.addedAnomalies.get(0);
        Assertions.assertTrue(brokerFailures instanceof BrokerFailures, "The anomaly should be BrokerFailure");
        Assertions.assertEquals(Collections.singletonMap(Integer.valueOf(brokerId), 100L), brokerFailures.failedBrokers(), "The failed broker should be " + brokerId + " and time should be 100L");
        restartDeadBroker(brokerId);
        TestUtils.waitForCondition(() -> {
            return this.detector.failedBrokers().isEmpty();
        }, "failed broker list is not empty.");
        Assertions.assertTrue(this.detector.failedBrokers().isEmpty());
    }

    @Test
    public void testFailureDetectionDoesNotProceedWhenThereAreExcludedBrokers() throws InterruptedException, ExecutionException {
        Assertions.assertTrue(this.anomalies.isEmpty());
        int brokerId = notControllerKafkaServer().config().brokerId();
        excludeBroker(controllerKafkaServer().config().brokerId());
        killBroker(brokerId);
        TestUtils.waitForCondition(() -> {
            return !this.detector.failedBrokers().isEmpty();
        }, "The broker failure detector did not detect the failed broker in time.");
        Assertions.assertTrue(this.addedAnomalies.isEmpty(), "No broker failures should have been reported due to the present replica exclusion.");
    }

    @Test
    public void testPartialClusterFailure() throws Exception {
        Assertions.assertTrue(this.anomalies.isEmpty());
        Set<Integer> set = (Set) this.servers.stream().filter(kafkaServer -> {
            return !kafkaServer.kafkaController().isActive();
        }).map(kafkaServer2 -> {
            return Integer.valueOf(kafkaServer2.config().brokerId());
        }).collect(Collectors.toSet());
        reassignApiPersistenceTopic(set);
        set.forEach((v1) -> {
            killBroker(v1);
        });
        TestUtils.waitForCondition(() -> {
            return this.detector.failedBrokers().size() == 2;
        }, "Expected two brokers in failed broker list.");
        Assertions.assertEquals(set, this.detector.failedBrokers().keySet());
        ArrayList arrayList = new ArrayList(set);
        Integer num = (Integer) arrayList.get(0);
        Integer num2 = (Integer) arrayList.get(1);
        restartDeadBroker(num.intValue());
        TestUtils.waitForCondition(() -> {
            return this.detector.failedBrokers().size() == 1;
        }, "Expected one broker in failed broker list.");
        Assertions.assertEquals(num2, (Integer) this.detector.failedBrokers().keySet().iterator().next());
    }

    private void reassignApiPersistenceTopic(Set<Integer> set) throws Exception {
        LinkedList linkedList = new LinkedList();
        linkedList.add(Integer.valueOf(controllerKafkaServer().config().brokerId()));
        linkedList.addAll(set);
        TopicPartition topicPartition = new TopicPartition("_confluent_balancer_api_state", 0);
        HashMap hashMap = new HashMap();
        hashMap.put(topicPartition, Optional.of(new NewPartitionReassignment(linkedList)));
        this.adminClient.alterPartitionReassignments(hashMap).all().get();
    }

    @Test
    public void testDetectorStartWithFailedBrokers() throws InterruptedException {
        shutdownDetector();
        int brokerId = notControllerKafkaServer().config().brokerId();
        killBroker(brokerId);
        startDetector();
        TestUtils.waitForCondition(() -> {
            return !this.detector.failedBrokers().isEmpty();
        }, "failed broker list is empty.");
        Assertions.assertEquals(1, this.detector.failedBrokers().keySet().size());
        Assertions.assertEquals(brokerId, ((Integer) this.detector.failedBrokers().keySet().iterator().next()).intValue());
    }

    @Test
    public void testLoadFailedBrokersFromZK() throws InterruptedException {
        int brokerId = notControllerKafkaServer().config().brokerId();
        killBroker(brokerId);
        TestUtils.waitForCondition(() -> {
            return !this.addedAnomalies.isEmpty();
        }, "One broker failure should have been detected before timeout.");
        Assertions.assertEquals(Collections.singletonMap(Integer.valueOf(brokerId), 100L), this.detector.failedBrokers());
        shutdownDetector();
        this.mockTime.sleep(100L);
        startDetector();
        TestUtils.waitForCondition(() -> {
            return !this.detector.failedBrokers().isEmpty();
        }, "failed broker list is empty.");
        Assertions.assertEquals(Collections.singletonMap(Integer.valueOf(brokerId), 100L), this.detector.failedBrokers());
    }

    private BrokerFailureDetector getBrokerFailureDetector() {
        BrokerFailureDetector brokerFailureDetector = this.dataBalancer.getBalanceEngine().getDataBalanceEngineContext().getCruiseControl().getAnomalyDetector().getBrokerFailureDetector();
        brokerFailureDetector.setTime(this.mockTime);
        this.anomalies = (Queue) Mockito.spy(brokerFailureDetector.getAnomalies());
        this.addedAnomalies = new LinkedList();
        ((Queue) Mockito.doAnswer(invocationOnMock -> {
            this.addedAnomalies.add(invocationOnMock.getArgument(0));
            return true;
        }).doCallRealMethod().when(this.anomalies)).add(ArgumentMatchers.any());
        brokerFailureDetector.setAnomalies(this.anomalies);
        return brokerFailureDetector;
    }

    private void startDetector() throws InterruptedException {
        this.dataBalancer.onElection(AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        long nanoTime = System.nanoTime();
        while (!this.dataBalancer.isActive() && System.nanoTime() < nanoTime + TimeUnit.NANOSECONDS.convert(30L, TimeUnit.SECONDS)) {
            Thread.sleep(1000L);
        }
        Assertions.assertTrue(this.dataBalancer.isActive(), "Unable to start data balancer");
        this.detector = getBrokerFailureDetector();
    }

    private void shutdownDetector() throws InterruptedException {
        this.dataBalancer.onResignation();
        long nanoTime = System.nanoTime();
        while (this.dataBalancer.isActive() && System.nanoTime() < nanoTime + TimeUnit.NANOSECONDS.convert(30L, TimeUnit.SECONDS)) {
            Thread.sleep(1000L);
        }
        Assertions.assertFalse(this.dataBalancer.isActive(), "Unable to shutdown data balancer");
    }

    private void killBroker(int i) {
        KafkaServer server = server(i);
        server.shutdown();
        server.awaitShutdown();
        Collection allBrokersInCluster = KafkaCruiseControlUtils.getAllBrokersInCluster(this.adminClient);
        Assertions.assertFalse(allBrokersInCluster.contains(Integer.valueOf(server.config().brokerId())), "List of alive brokers " + allBrokersInCluster + " contains dead broker " + server.config().brokerId());
    }

    private void restartDeadBroker(int i) {
        server(i).startup();
    }

    private MockTime getMockTime() {
        return new MockTime(0L, 100L, TimeUnit.NANOSECONDS.convert(100L, TimeUnit.MILLISECONDS));
    }
}
