package io.confluent.databalancer.integration;

import com.linkedin.kafka.cruisecontrol.KafkaCruiseControl;
import com.linkedin.kafka.cruisecontrol.executor.ExecutorState;
import io.confluent.databalancer.TestConstants;
import io.confluent.kafka.test.cluster.EmbeddedKafka;
import io.confluent.kafka.test.utils.KafkaTestUtils;
import java.util.Collection;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

@Tag(TestConstants.INTEGRATION_TEST)
@Timeout(value = 3, unit = TimeUnit.MINUTES)
/* loaded from: input_file:io/confluent/databalancer/integration/BrokerFailureTest.class */
public class BrokerFailureTest extends DataBalancerClusterTestHarness {
    private static final String TEST_TOPIC = "broker_failure_test_topic";

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.confluent.databalancer.integration.DataBalancerClusterTestHarness
    public int initialBrokerCount() {
        return 4;
    }

    @Override // io.confluent.databalancer.integration.DataBalancerClusterTestHarness
    protected Properties injectTestSpecificProperties(Properties properties) {
        properties.put("confluent.balancer.heal.broker.failure.threshold.ms", "1000");
        properties.put("confluent.balancer.throttle.bytes.per.second", "100000000");
        return properties;
    }

    @Test
    public void testBrokerFailureHandling() throws Throwable {
        KafkaTestUtils.createTopic(this.adminClient, TEST_TOPIC, 20, 2);
        int brokerId = notControllerKafkaServer().config().brokerId();
        info("Shutting down broker {}", Integer.valueOf(brokerId));
        ((EmbeddedKafka) this.kafkaCluster.kafkas().get(brokerId)).shutdown();
        TestUtils.waitForCondition(() -> {
            return ((Collection) this.adminClient.describeCluster().nodes().get()).size() == initialBrokerCount() - 1;
        }, 60000L, "Cluster size did not shrink!");
        KafkaCruiseControl cruiseControl = getDataBalanceEngine().getDataBalanceEngineContext().getCruiseControl();
        TestUtils.waitForCondition(() -> {
            return DataBalancerIntegrationTestUtils.partitionsOnBroker(brokerId, this.adminClient).isEmpty() && cruiseControl.executionState() == ExecutorState.State.NO_TASK_IN_PROGRESS;
        }, removalFinishTimeout.toMillis(), removalPollInterval.toMillis(), () -> {
            return "Broker failure not detected and acted on in time!";
        });
    }
}
