package io.confluent.databalancer.integration;

import io.confluent.databalancer.TestConstants;
import io.confluent.kafka.test.utils.KafkaTestUtils;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import org.apache.kafka.clients.admin.BrokerAdditionDescription;
import org.apache.kafka.common.errors.BalancerOfflineException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

@Tag(TestConstants.INTEGRATION_TEST)
@Timeout(value = 5, unit = TimeUnit.MINUTES)
/* loaded from: input_file:io/confluent/databalancer/integration/AddBrokerTest.class */
public class AddBrokerTest extends DataBalancerClusterTestHarness {
    private static final String TEST_TOPIC = "broker_addition_test_topic";

    protected int maxBrokerCount() {
        return 6;
    }

    @Override // io.confluent.databalancer.integration.DataBalancerClusterTestHarness
    protected Properties injectTestSpecificProperties(Properties properties) {
        return properties;
    }

    @Override // io.confluent.databalancer.integration.DataBalancerClusterTestHarness
    protected Map<Integer, Map<String, String>> brokerOverrideProps() {
        HashMap hashMap = new HashMap();
        Map singletonMap = Collections.singletonMap(KafkaConfig.RackProp(), "0");
        Map singletonMap2 = Collections.singletonMap(KafkaConfig.RackProp(), "1");
        hashMap.put(0, singletonMap);
        hashMap.put(1, singletonMap2);
        hashMap.put(2, singletonMap);
        hashMap.put(3, singletonMap2);
        hashMap.put(4, singletonMap);
        hashMap.put(5, singletonMap2);
        return hashMap;
    }

    @Test
    public void testSubsequentBrokerAddition() throws Throwable {
        KafkaTestUtils.createTopic(this.adminClient, TEST_TOPIC, 20, 2);
        int initialBrokerCount = initialBrokerCount();
        info("Adding new broker");
        addBroker(initialBrokerCount);
        BrokerAdditionDescription verifyBrokerAddIsInProgress = DataBalancerIntegrationTestUtils.verifyBrokerAddIsInProgress(this.adminClient, initialBrokerCount);
        info("Adding another broker");
        addBroker(initialBrokerCount + 1);
        DataBalancerIntegrationTestUtils.verifyBrokerAddIsInProgress(this.adminClient, initialBrokerCount + 1);
        Assertions.assertNotEquals(DataBalancerIntegrationTestUtils.verifyBrokerAddIsInProgress(this.adminClient, initialBrokerCount).createTimeMs(), verifyBrokerAddIsInProgress.createTimeMs());
        DataBalancerIntegrationTestUtils.verifyBrokerAdded(this.adminClient, initialBrokerCount);
        DataBalancerIntegrationTestUtils.verifyBrokerAdded(this.adminClient, initialBrokerCount + 1);
    }

    @Test
    public void testBrokerAddition() throws Throwable {
        KafkaTestUtils.createTopic(this.adminClient, TEST_TOPIC, 20, 2);
        int initialBrokerCount = initialBrokerCount();
        info("Adding new broker");
        addBroker(initialBrokerCount);
        DataBalancerIntegrationTestUtils.verifyReplicasMovedToBroker(this.adminClient, TEST_TOPIC, initialBrokerCount);
        DataBalancerIntegrationTestUtils.verifyBrokerAdded(this.adminClient, initialBrokerCount);
    }

    @Test
    public void testAddMultipleBrokers() throws Throwable {
        KafkaTestUtils.createTopic(this.adminClient, TEST_TOPIC, 20, 2);
        for (int initialBrokerCount = initialBrokerCount(); initialBrokerCount < maxBrokerCount(); initialBrokerCount++) {
            info("Adding new broker {}", Integer.valueOf(initialBrokerCount));
            addBroker(initialBrokerCount);
        }
        for (int maxBrokerCount = maxBrokerCount() - 1; maxBrokerCount >= initialBrokerCount(); maxBrokerCount--) {
            info("Validating data presence on {}", Integer.valueOf(maxBrokerCount));
            DataBalancerIntegrationTestUtils.verifyReplicasMovedToBroker(this.adminClient, TEST_TOPIC, maxBrokerCount);
            DataBalancerIntegrationTestUtils.verifyBrokerAdded(this.adminClient, maxBrokerCount);
        }
    }

    @Test
    public void testRemovedBrokerCanBeAdded() throws Throwable {
        KafkaTestUtils.createTopic(this.adminClient, TEST_TOPIC, 20, 2);
        KafkaServer notControllerKafkaServer = notControllerKafkaServer();
        removeBroker(notControllerKafkaServer, new AtomicBoolean(false), "a removed broker can be added back to the cluster");
        this.kafkaCluster.shutdownBrokerIds(Collections.singletonList(Integer.valueOf(notControllerKafkaServer.config().brokerId())));
        removeBrokerExclusion(notControllerKafkaServer.config().brokerId());
        int brokerId = notControllerKafkaServer.config().brokerId();
        addBroker(brokerId);
        DataBalancerIntegrationTestUtils.verifyReplicasMovedToBroker(this.adminClient, TEST_TOPIC, brokerId);
        DataBalancerIntegrationTestUtils.verifyBrokerAdded(this.adminClient, brokerId);
    }

    @Test
    public void testBrokerAddCreateObservers() throws Throwable {
        KafkaTestUtils.createTopic(this.adminClient, TEST_TOPIC, 20, 3);
        DataBalancerIntegrationTestUtils.alterTopicPlacementConfig(this.adminClient, TEST_TOPIC, "{\"version\":1,\"replicas\":[{\"count\": 2, \"constraints\":{\"rack\":\"0\"}}], \"observers\": [{\"count\": 2, \"constraints\":{\"rack\":\"1\"}}]}");
        int initialBrokerCount = initialBrokerCount();
        addBroker(initialBrokerCount);
        DataBalancerIntegrationTestUtils.verifyReplicasMovedToBroker(this.adminClient, TEST_TOPIC, initialBrokerCount);
        DataBalancerIntegrationTestUtils.verifyTopicPlacement(this.adminClient, TEST_TOPIC, "{\"version\":1,\"replicas\":[{\"count\": 2, \"constraints\":{\"rack\":\"0\"}}], \"observers\": [{\"count\": 2, \"constraints\":{\"rack\":\"1\"}}]}");
        DataBalancerIntegrationTestUtils.verifyBrokerAdded(this.adminClient, initialBrokerCount);
    }

    @Test
    public void testDescribeBrokerAdditionsDisabledBalancerShouldThrowBalancerOfflineException() throws InterruptedException, ExecutionException {
        disableSelfBalancing();
        DataBalancerIntegrationTestUtils.assertAdminApiThrows(BalancerOfflineException.class, () -> {
        });
    }
}
