package com.linkedin.kafka.cruisecontrol.server;

import com.linkedin.kafka.cruisecontrol.BrokerShutdownIntegrationTestHarness;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.client.BlockingSendClient;
import com.linkedin.kafka.cruisecontrol.common.AdminClientResult;
import com.linkedin.kafka.cruisecontrol.common.KafkaCluster;
import com.linkedin.kafka.cruisecontrol.common.SbkAdminUtils;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import io.confluent.databalancer.TestConstants;
import io.confluent.kafka.test.cluster.EmbeddedKafka;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

@Tag(TestConstants.INTEGRATION_TEST)
@Timeout(value = 2, unit = TimeUnit.MINUTES)
/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/server/BrokerShutdownManagerIntegrationTest.class */
public class BrokerShutdownManagerIntegrationTest extends BrokerShutdownIntegrationTestHarness {
    private BrokerShutdownManager shutdownManager;
    private static String apiTimeoutMs = "30000";
    private static String removalShutdownMs = "60000";
    private SbkAdminUtils adminUtils;
    private int numTotalBrokers = 3;
    private List<Integer> brokerIdsToShutdown = Arrays.asList(0, 1);

    @Override // com.linkedin.kafka.cruisecontrol.BrokerShutdownIntegrationTestHarness
    @BeforeEach
    public void setUp() throws Exception {
        super.setUp();
        SystemTime systemTime = new SystemTime();
        KafkaCruiseControlConfig config = config();
        BlockingSendClient.Builder builder = new BlockingSendClient.Builder(embeddedKafka(0).kafkaServer().config(), systemTime, "client-id", new LogContext());
        this.adminUtils = new SbkAdminUtils(KafkaCruiseControlUtils.createAdmin(config.originals()), config);
        this.shutdownManager = new BrokerShutdownManager(this.adminUtils, config, builder, systemTime);
    }

    @Override // com.linkedin.kafka.cruisecontrol.BrokerShutdownIntegrationTestHarness
    protected int numBrokers() {
        return this.numTotalBrokers;
    }

    @Override // com.linkedin.kafka.cruisecontrol.BrokerShutdownIntegrationTestHarness
    protected void exitProcedure(int i) {
        embeddedKafka(this.brokerIdsToShutdown.get(i - 1).intValue()).shutdown();
    }

    private Map<Integer, Optional<Long>> epochsByBrokerId(List<Integer> list) {
        return (Map) list.stream().collect(Collectors.toMap(num -> {
            return num;
        }, num2 -> {
            return Optional.of(brokerCurrentEpoch(embeddedKafka(num2.intValue())));
        }));
    }

    @Test
    public void testMaybeShutdownShutsDownBrokers() throws Exception {
        Assertions.assertEquals((Map) this.brokerIdsToShutdown.stream().collect(Collectors.toMap(num -> {
            return num;
        }, num2 -> {
            return true;
        })), this.shutdownManager.maybeShutdownBrokers(epochsByBrokerId(this.brokerIdsToShutdown)));
        assertBrokerShutdown(this.brokerIdsToShutdown.size());
    }

    @Disabled
    @Test
    public void testMaybeShutdownShutsDownBrokerAfterRestart() throws Exception {
        Map map = (Map) this.brokerIdsToShutdown.stream().collect(Collectors.toMap(num -> {
            return num;
        }, num2 -> {
            return true;
        }));
        Assertions.assertEquals(map, this.shutdownManager.maybeShutdownBrokers(epochsByBrokerId(this.brokerIdsToShutdown)));
        assertBrokerShutdown(2);
        TestUtils.waitForCondition(() -> {
            return getClusterSize() == numBrokers() - this.brokerIdsToShutdown.size();
        }, String.format("Failed to wait for all brokers to leave cluster", this.brokerIdsToShutdown));
        Iterator<Integer> it = this.brokerIdsToShutdown.iterator();
        while (it.hasNext()) {
            EmbeddedKafka embeddedKafka = embeddedKafka(it.next().intValue());
            embeddedKafka.shutdown();
            embeddedKafka.startBroker(this.clusterTime);
        }
        TestUtils.waitForCondition(() -> {
            return getClusterSize() == numBrokers();
        }, "Failed to wait for all brokers to rejoin the cluster");
        resetExitCounts();
        Assertions.assertEquals(map, this.shutdownManager.maybeShutdownBrokers(epochsByBrokerId(this.brokerIdsToShutdown)));
        assertBrokerShutdown(2);
    }

    private int getClusterSize() throws Exception {
        AdminClientResult describeCluster = this.adminUtils.describeCluster(10000);
        if (describeCluster.hasException()) {
            throw new ExecutionException("Failed to describe the cluster", describeCluster.exception());
        }
        return ((KafkaCluster) describeCluster.result()).nodes().size();
    }

    private KafkaCruiseControlConfig config() {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", this.kafkaCluster.bootstrapServers());
        properties.setProperty("zookeeper.connect", this.kafkaCluster.zkConnect());
        properties.setProperty("default.api.timeout.ms", apiTimeoutMs);
        properties.setProperty("broker.removal.shutdown.timeout.ms", removalShutdownMs);
        return new KafkaCruiseControlConfig(properties);
    }
}
