package com.linkedin.kafka.cruisecontrol.server;

import com.linkedin.kafka.cruisecontrol.BrokerShutdownIntegrationTestHarness;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.client.BlockingSendClient;
import com.linkedin.kafka.cruisecontrol.common.AdminClientResult;
import com.linkedin.kafka.cruisecontrol.common.KafkaCluster;
import com.linkedin.kafka.cruisecontrol.common.SbkAdminUtils;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import io.confluent.kafka.test.cluster.EmbeddedKafka;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({IntegrationTest.class})
/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/server/BrokerShutdownManagerIntegrationTest.class */
public class BrokerShutdownManagerIntegrationTest extends BrokerShutdownIntegrationTestHarness {
    private BrokerShutdownManager shutdownManager;
    private static String apiTimeoutMs = "30000";
    private static String removalShutdownMs = "60000";
    private SbkAdminUtils adminUtils;

    @Override // com.linkedin.kafka.cruisecontrol.BrokerShutdownIntegrationTestHarness
    @Before
    public void setUp() throws Exception {
        super.setUp();
        KafkaCruiseControlConfig config = config();
        BlockingSendClient.Builder builder = new BlockingSendClient.Builder(brokerToShutdown().kafkaServer().config(), new SystemTime(), "client-id", new LogContext());
        this.adminUtils = new SbkAdminUtils(KafkaCruiseControlUtils.createAdmin(config.originals()), config);
        this.shutdownManager = new BrokerShutdownManager(this.adminUtils, config, builder, new SystemTime());
    }

    @Override // com.linkedin.kafka.cruisecontrol.BrokerShutdownIntegrationTestHarness
    protected void exitProcedure() {
        brokerToShutdown().shutdown();
    }

    @Test
    public void testMaybeShutdownShutsDownBroker() throws Exception {
        Assert.assertTrue("Expected broker to have been proactively shutdown", this.shutdownManager.maybeShutdownBroker(brokerToShutdownId(), Optional.of(brokerToShutdownCurrentEpoch())));
        assertBrokerShutdown();
    }

    @Test
    public void testMaybeShutdownShutsDownBrokerAfterRestart() throws Exception {
        int brokerToShutdownId = brokerToShutdownId();
        Assert.assertTrue("Expected broker to have been proactively shutdown", this.shutdownManager.maybeShutdownBroker(brokerToShutdownId, Optional.of(brokerToShutdownCurrentEpoch())));
        TestUtils.waitForCondition(() -> {
            return getClusterSize() == numBrokers() - 1;
        }, "Failed to wait for broker to leave cluster");
        EmbeddedKafka embeddedKafka = (EmbeddedKafka) this.kafkaCluster.kafkas().get(brokerToShutdownId);
        embeddedKafka.shutdown();
        embeddedKafka.startBroker(this.clusterTime);
        TestUtils.waitForCondition(() -> {
            return getClusterSize() == numBrokers();
        }, "Failed to wait for broker to rejoin cluster");
        Assert.assertTrue("Expected broker to have been proactively shutdown", this.shutdownManager.maybeShutdownBroker(brokerToShutdownId, Optional.of(Long.valueOf(embeddedKafka.kafkaServer().kafkaController().brokerEpoch()))));
        assertBrokerShutdown();
    }

    private int getClusterSize() throws Exception {
        AdminClientResult describeCluster = this.adminUtils.describeCluster(10000L);
        if (describeCluster.hasException()) {
            throw new ExecutionException("Failed to describe the cluster", describeCluster.exception());
        }
        return ((KafkaCluster) describeCluster.result()).nodes().size();
    }

    private KafkaCruiseControlConfig config() {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", this.kafkaCluster.bootstrapServers());
        properties.setProperty("zookeeper.connect", this.kafkaCluster.zkConnect());
        properties.setProperty("default.api.timeout.ms", apiTimeoutMs);
        properties.setProperty("broker.removal.shutdown.timeout.ms", removalShutdownMs);
        return new KafkaCruiseControlConfig(properties);
    }
}
