package io.confluent.databalancer.integration;

import com.linkedin.kafka.cruisecontrol.operation.EvenClusterLoadStateManager;
import io.confluent.databalancer.ConfluentDataBalanceEngineContext;
import io.confluent.databalancer.TestConstants;
import io.confluent.databalancer.operation.EvenClusterLoadStateMachine;
import java.util.Collections;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import kafka.common.EvenClusterLoadStatusDescriptionInternal;
import org.apache.kafka.clients.admin.EvenClusterLoadStatus;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Tag(TestConstants.INTEGRATION_TEST)
@Timeout(value = 5, unit = TimeUnit.MINUTES)
/* loaded from: input_file:io/confluent/databalancer/integration/EvenClusterLoadStatusTest.class */
public class EvenClusterLoadStatusTest extends DataBalancerClusterTestHarness {
    private static final String TEST_TOPIC = "broker_rebalance_test_topic";
    private static final Logger LOG = LoggerFactory.getLogger(EvenClusterLoadStatusTest.class);

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.confluent.databalancer.integration.DataBalancerClusterTestHarness
    public int initialBrokerCount() {
        return 4;
    }

    @Override // io.confluent.databalancer.integration.DataBalancerClusterTestHarness
    protected Properties injectTestSpecificProperties(Properties properties) {
        properties.put("confluent.balancer.throttle.bytes.per.second", 10000L);
        return properties;
    }

    @Test
    public void testInitialSelfHealingStatus() throws Exception {
        ConfluentDataBalanceEngineContext dataBalanceEngineContext = getDataBalanceEngine().getDataBalanceEngineContext();
        enableAnyUnevenLoad();
        EvenClusterLoadStatusDescriptionInternal evenClusterLoadStatusDescription = dataBalanceEngineContext.getCruiseControl().context().currentEvenClusterLoadStateManager().evenClusterLoadStatusDescription();
        Assertions.assertEquals(EvenClusterLoadStatus.BALANCED, evenClusterLoadStatusDescription.currentStatus());
        Assertions.assertNull(evenClusterLoadStatusDescription.previousStatus());
    }

    @Test
    public void testBrokerRebalanceWithSelfHealing() throws Exception {
        ConfluentDataBalanceEngineContext dataBalanceEngineContext = getDataBalanceEngine().getDataBalanceEngineContext();
        AtomicReference atomicReference = new AtomicReference();
        LOG.info("Disabling goal violation self healing");
        disableAnyUnevenLoad();
        atomicReference.set(dataBalanceEngineContext.getCruiseControl().context().currentEvenClusterLoadStateManager());
        Assertions.assertEquals(EvenClusterLoadStateMachine.EvenClusterLoadState.DISABLED, ((EvenClusterLoadStateManager) atomicReference.get()).currentState());
        LOG.info("Creating test topic broker_rebalance_test_topic");
        createLopsidedTopic(TEST_TOPIC, 40);
        LOG.info("Test topic created, enabling any uneven load");
        enableAnyUnevenLoad();
        atomicReference.set(dataBalanceEngineContext.getCruiseControl().context().currentEvenClusterLoadStateManager());
        LOG.info("Goal Violation self healing enabled, waiting for action");
        DataBalancerIntegrationTestUtils.verifyReplicasMovedToBroker(this.adminClient, TEST_TOPIC, 2);
        DataBalancerIntegrationTestUtils.verifyReplicasMovedToBroker(this.adminClient, TEST_TOPIC, 3);
        AtomicReference atomicReference2 = new AtomicReference();
        TestUtils.waitForCondition(() -> {
            atomicReference2.set(((EvenClusterLoadStateManager) atomicReference.get()).currentState());
            return atomicReference2.get() == EvenClusterLoadStateMachine.EvenClusterLoadState.BALANCED;
        }, 30000L, () -> {
            return "Even load cluster status is not balanced: " + atomicReference2.get();
        });
    }

    @Test
    public void testBrokerRebalanceAbortsOnRemoveBroker() throws Exception {
        ConfluentDataBalanceEngineContext dataBalanceEngineContext = getDataBalanceEngine().getDataBalanceEngineContext();
        AtomicReference atomicReference = new AtomicReference();
        LOG.info("Disabling goal violation self healing");
        disableAnyUnevenLoad();
        atomicReference.set(dataBalanceEngineContext.getCruiseControl().context().currentEvenClusterLoadStateManager());
        Assertions.assertEquals(EvenClusterLoadStateMachine.EvenClusterLoadState.DISABLED, ((EvenClusterLoadStateManager) atomicReference.get()).currentState());
        LOG.info("Creating test topic broker_rebalance_test_topic");
        createLopsidedTopic(TEST_TOPIC, 40);
        this.kafkaCluster.produceData(TEST_TOPIC, 200);
        LOG.info("Test topic created, enabling any uneven load");
        enableAnyUnevenLoad();
        atomicReference.set(dataBalanceEngineContext.getCruiseControl().context().currentEvenClusterLoadStateManager());
        LOG.info("Goal Violation self healing enabled, waiting for action");
        DataBalancerIntegrationTestUtils.verifyReplicasMovedToBroker(this.adminClient, TEST_TOPIC, 2);
        DataBalancerIntegrationTestUtils.verifyReplicasMovedToBroker(this.adminClient, TEST_TOPIC, 3);
        getDataBalanceEngine().removeBrokers(Collections.singletonMap(Integer.valueOf(notControllerKafkaServer().config().brokerId()), Optional.empty()), false, "test");
        AtomicReference atomicReference2 = new AtomicReference();
        TestUtils.waitForCondition(() -> {
            atomicReference2.set(((EvenClusterLoadStateManager) atomicReference.get()).currentState());
            return atomicReference2.get() == EvenClusterLoadStateMachine.EvenClusterLoadState.ABORTED;
        }, 30000L, () -> {
            return "Even load cluster status is not balanced: " + atomicReference2.get();
        });
    }
}
