package io.confluent.databalancer.integration;

import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import io.confluent.databalancer.TestConstants;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import kafka.server.KafkaServer;
import org.apache.kafka.clients.admin.BalancerOperationError;
import org.apache.kafka.clients.admin.BrokerAdditionDescription;
import org.apache.kafka.clients.admin.EvenClusterLoadStatusDescription;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

@Tag(TestConstants.INTEGRATION_TEST)
@Timeout(value = 7, unit = TimeUnit.MINUTES)
/* loaded from: input_file:io/confluent/databalancer/integration/BrokerReplicaExclusionTest.class */
public class BrokerReplicaExclusionTest extends DataBalancerClusterTestHarness {
    private static final String TEST_TOPIC = "broker_rebalance_test_topic";
    protected AtomicBoolean exited = new AtomicBoolean(false);

    @Override // io.confluent.databalancer.integration.DataBalancerClusterTestHarness
    @AfterEach
    public void tearDown() throws Exception {
        super.tearDown();
        Exit.resetExitProcedure();
        this.exited.set(false);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.confluent.databalancer.integration.DataBalancerClusterTestHarness
    public int initialBrokerCount() {
        return 4;
    }

    @Override // io.confluent.databalancer.integration.DataBalancerClusterTestHarness
    protected Properties injectTestSpecificProperties(Properties properties) {
        properties.put("confluent.balancer.goal.violation.delay.on.new.brokers.ms", KafkaCruiseControlConfig.GOAL_VIOLATION_DELAY_ON_NEW_BROKERS_MS_DEFAULT.toString());
        properties.put("confluent.balancer.heal.uneven.load.trigger", ConfluentConfigs.BalancerSelfHealMode.ANY_UNEVEN_LOAD.toString());
        return properties;
    }

    @Test
    public void testBalancerOperationsShouldFailDueToExclusion() throws Throwable {
        info("Disabling self-balancing dynamically");
        disableSelfBalancing();
        info("Creating test topic broker_rebalance_test_topic");
        createLopsidedTopic(TEST_TOPIC, 40);
        info("Test topic {} created", TEST_TOPIC);
        int brokerId = notControllerKafkaServer().config().brokerId();
        info("Excluding broker " + brokerId);
        excludeBroker(brokerId);
        rebalanceAndAssertFailureDueToExclusion();
        addBrokerAndAssertFailureDueToExclusion();
        assertCantRemoveNonExcludedBroker(controllerKafkaServer().config().brokerId());
    }

    @Test
    public void testCanRemoveExcludedBroker() throws Throwable {
        info("Creating test topic broker_rebalance_test_topic");
        createLopsidedTopic(TEST_TOPIC, 40);
        info("Test topic {} created", TEST_TOPIC);
        KafkaServer notControllerKafkaServer = notControllerKafkaServer();
        int brokerId = notControllerKafkaServer.config().brokerId();
        info("Excluding broker " + brokerId);
        excludeBroker(brokerId);
        removeBroker(notControllerKafkaServer, this.exited, "removing an already-excluded broker");
    }

    private void rebalanceAndAssertFailureDueToExclusion() throws ExecutionException, InterruptedException {
        TopicDescription topicDescription = (TopicDescription) ((Map) this.adminClient.describeTopics(Collections.singletonList(TEST_TOPIC)).all().get()).get(TEST_TOPIC);
        info("Enabling Self-Balancing dynamically");
        enableSelfBalancing();
        info("Self Balancing enabled, waiting to see an error when trying to rebalance");
        AtomicReference atomicReference = new AtomicReference();
        AtomicReference atomicReference2 = new AtomicReference();
        TestUtils.waitForCondition(() -> {
            EvenClusterLoadStatusDescription evenClusterLoadStatusDescription = (EvenClusterLoadStatusDescription) this.adminClient.describeEvenClusterLoadStatus().description().get();
            boolean z = evenClusterLoadStatusDescription.evenClusterLoadError().isPresent() && ((BalancerOperationError) evenClusterLoadStatusDescription.evenClusterLoadError().get()).errorCode() == Errors.BALANCER_EXCLUDED_BROKER_FOR_REPLICA_PLACEMENT_ERROR.code();
            Optional evenClusterLoadError = evenClusterLoadStatusDescription.evenClusterLoadError();
            atomicReference.set(evenClusterLoadStatusDescription.currentEvenClusterLoadStatus());
            atomicReference2.set(evenClusterLoadError);
            return z;
        }, 30000L, () -> {
            return String.format("Even load cluster status does not surface error due to present exclusion. Status: %s, ErrorOpt: %s ", atomicReference.get(), atomicReference2.get());
        });
        TopicDescription topicDescription2 = (TopicDescription) ((Map) this.adminClient.describeTopics(Collections.singletonList(TEST_TOPIC)).all().get()).get(TEST_TOPIC);
        Assertions.assertEquals(topicDescription.partitions(), topicDescription2.partitions(), String.format("Expected lopsided topic %s to not have been rebalanced but instead it did. Before: %s After: %s", TEST_TOPIC, topicDescription.partitions(), topicDescription2.partitions()));
    }

    private void addBrokerAndAssertFailureDueToExclusion() throws Throwable {
        int initialBrokerCount = initialBrokerCount();
        info("Adding a new broker (broker id {})", Integer.valueOf(initialBrokerCount));
        addBroker(initialBrokerCount);
        DataBalancerIntegrationTestUtils.verifyBrokerAddIsCanceled(this.adminClient, initialBrokerCount);
        Optional additionError = ((BrokerAdditionDescription) ((Optional) this.adminClient.describeBrokerAdditions().description(initialBrokerCount).get()).get()).additionError();
        Assertions.assertTrue(additionError.isPresent(), "Expected the broker addition to have an error");
        Assertions.assertEquals(Errors.BALANCER_EXCLUDED_BROKER_FOR_REPLICA_PLACEMENT_ERROR.code(), ((BalancerOperationError) additionError.get()).errorCode());
    }

    private void assertCantRemoveNonExcludedBroker(int i) {
        DataBalancerIntegrationTestUtils.assertAdminApiThrows(Errors.BALANCER_EXCLUDED_BROKER_FOR_REPLICA_PLACEMENT_ERROR.exception().getClass(), () -> {
        });
    }
}
