package io.confluent.databalancer.integration;

import io.confluent.databalancer.KafkaDataBalanceManager;
import io.confluent.databalancer.TestConstants;
import io.confluent.kafka.test.utils.KafkaTestUtils;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import kafka.common.BrokerRemovalDescriptionInternal;
import kafka.server.KafkaServer;
import org.apache.kafka.clients.admin.BrokerRemovalDescription;
import org.apache.kafka.clients.admin.ExclusionOp;
import org.apache.kafka.clients.admin.RemoveBrokersOptions;
import org.apache.kafka.common.errors.BalancerOfflineException;
import org.apache.kafka.common.errors.BrokerRemovalCanceledException;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Tag(TestConstants.INTEGRATION_TEST)
@Timeout(value = 5, unit = TimeUnit.MINUTES)
/* loaded from: input_file:io/confluent/databalancer/integration/RemoveBrokerCancellationTest.class */
public class RemoveBrokerCancellationTest extends DataBalancerClusterTestHarness {
    protected static final Logger log = LoggerFactory.getLogger(RemoveBrokerCancellationTest.class);
    protected static Duration removalFinishTimeout = Duration.ofMinutes(3);
    protected static Duration removalPollInterval = Duration.ofSeconds(2);
    protected AtomicBoolean exited = new AtomicBoolean(false);
    protected int brokerToRemoveId;
    protected KafkaServer serverToRemove;

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.confluent.databalancer.integration.DataBalancerClusterTestHarness
    public int initialBrokerCount() {
        return 5;
    }

    @Override // io.confluent.databalancer.integration.DataBalancerClusterTestHarness
    @BeforeEach
    public void setUp() throws Exception {
        super.setUp();
        this.serverToRemove = notControllerKafkaServer();
        this.brokerToRemoveId = this.serverToRemove.config().brokerId();
        Exit.setExitProcedure((i, str) -> {
            info("Shutting down {} as part of broker removal test", Integer.valueOf(this.serverToRemove.config().brokerId()));
            this.serverToRemove.shutdown();
            this.exited.set(true);
            throw new RuntimeException("Thrown since the Exit wrapper expects the exit procedure to stop the code execution.");
        });
    }

    @Override // io.confluent.databalancer.integration.DataBalancerClusterTestHarness
    @AfterEach
    public void tearDown() throws Exception {
        super.tearDown();
        Exit.resetExitProcedure();
        this.exited.set(false);
    }

    @Override // io.confluent.databalancer.integration.DataBalancerClusterTestHarness
    protected Properties injectTestSpecificProperties(Properties properties) {
        properties.put("confluent.balancer.throttle.bytes.per.second", 10L);
        return properties;
    }

    @Test
    public void testRemoveBroker_CancelSBCWhileRemovingResultsInPersistedCanceledState() throws ExecutionException, InterruptedException {
        KafkaTestUtils.createTopic(this.adminClient, "test-topic", 50, 2);
        this.kafkaCluster.produceData("test-topic", 200);
        this.adminClient.removeBrokers(Collections.singletonList(Integer.valueOf(this.brokerToRemoveId)), new RemoveBrokersOptions().shouldShutdownBrokers(true)).all().get();
        awaitRemovalInProgress(true);
        disableSelfBalancing();
        boolean z = false;
        try {
            this.adminClient.describeBrokerRemovals().descriptions().get();
        } catch (ExecutionException e) {
            if (!(e.getCause() instanceof BalancerOfflineException)) {
                throw e;
            }
            z = true;
        }
        Assertions.assertTrue(z, "Expected BalancerOfflineException while SBK is disabled");
        enableSelfBalancing();
        Map map = (Map) this.adminClient.describeBrokerRemovals().descriptions().get();
        Assertions.assertTrue(map.containsKey(Integer.valueOf(this.brokerToRemoveId)), String.format("Expected to be able to describe a broker removal after it was canceled, instead got map %s", map));
        BrokerRemovalDescription brokerRemovalDescription = (BrokerRemovalDescription) map.get(Integer.valueOf(this.brokerToRemoveId));
        Assertions.assertTrue(isRemovalCanceled(brokerRemovalDescription), String.format("Expected removal for broker %s to be canceled. (removal: %s)", Integer.valueOf(this.brokerToRemoveId), brokerRemovalDescription));
    }

    @Test
    public void testRemoveBroker_BrokerRestartCancelsRemoval() throws InterruptedException, ExecutionException {
        testConditionDuringRemovalExecutionCancelsRemoval(kafkaServer -> {
            info("Simulating broker restart: shutting down {}", Integer.valueOf(this.brokerToRemoveId));
            this.serverToRemove.shutdown();
            info("Broker {} was shut down to simulate unexpected restart. Restarting its server", Integer.valueOf(this.brokerToRemoveId));
            this.serverToRemove.startup();
        });
    }

    @Test
    public void testRemoveBroker_SetReplicaExclusionCancelsRemoval() throws InterruptedException, ExecutionException {
        this.serverToRemove = notControllerKafkaServer();
        this.brokerToRemoveId = this.serverToRemove.config().brokerId();
        int brokerId = controllerKafkaServer().config().brokerId();
        testConditionDuringRemovalExecutionCancelsRemoval(kafkaServer -> {
            try {
                this.adminClient.alterBrokerReplicaExclusions(Collections.singletonMap(Integer.valueOf(brokerId), new ExclusionOp(ExclusionOp.OpType.SET, "test-exclusion-cancels-removal"))).result().get();
                info("Applied exclusion");
            } catch (Exception e) {
                this.logger.error("Failed to apply exclusion due to ", e);
            }
        });
    }

    @Test
    public void testRemoveBroker_RemoveReplicaExclusionCancelsRemoval() throws InterruptedException, ExecutionException {
        this.serverToRemove = notControllerKafkaServer();
        this.brokerToRemoveId = this.serverToRemove.config().brokerId();
        testConditionDuringRemovalExecutionCancelsRemoval(kafkaServer -> {
            try {
                this.adminClient.alterBrokerReplicaExclusions(Collections.singletonMap(Integer.valueOf(this.brokerToRemoveId), new ExclusionOp(ExclusionOp.OpType.DELETE, "test-exclusion-cancels-removal"))).result().get();
                info("Removed exclusion");
            } catch (Exception e) {
                this.logger.error("Failed to delete exclusion due to ", e);
            }
        });
    }

    private void testConditionDuringRemovalExecutionCancelsRemoval(Consumer<KafkaServer> consumer) throws InterruptedException, ExecutionException {
        KafkaTestUtils.createTopic(this.adminClient, "test-topic", 50, 2);
        this.kafkaCluster.produceData("test-topic", 200);
        KafkaDataBalanceManager dataBalancer = getDataBalancer();
        this.adminClient.removeBrokers(Collections.singletonList(Integer.valueOf(this.brokerToRemoveId)), new RemoveBrokersOptions().shouldShutdownBrokers(true)).all().get();
        awaitRemovalInProgress(true);
        Assertions.assertEquals(1, dataBalancer.brokerRemovals().size(), "Expected one broker removal to be stored in memory");
        consumer.accept(this.serverToRemove);
        TestUtils.waitForCondition(() -> {
            return ((BrokerRemovalDescriptionInternal) dataBalancer.brokerRemovals().get(0)).exception() instanceof BrokerRemovalCanceledException;
        }, 60000L, String.format("Broker removal status did not have the expected %s exception. Instead it has %s", BrokerRemovalCanceledException.class.getSimpleName(), ((BrokerRemovalDescriptionInternal) dataBalancer.brokerRemovals().get(0)).exception()));
        Map map = (Map) this.adminClient.describeBrokerRemovals().descriptions().get();
        Assertions.assertTrue(map.containsKey(Integer.valueOf(this.brokerToRemoveId)), "Expected to be able to describe a broker removal after it was canceled");
        BrokerRemovalDescription brokerRemovalDescription = (BrokerRemovalDescription) map.get(Integer.valueOf(this.brokerToRemoveId));
        Assertions.assertTrue(isRemovalCanceled(brokerRemovalDescription), String.format("Expected removal for broker %s to be canceled. (removal: %s)", Integer.valueOf(this.brokerToRemoveId), brokerRemovalDescription));
    }

    private void awaitRemovalInProgress(boolean z) throws InterruptedException {
        AtomicReference atomicReference = new AtomicReference();
        TestUtils.waitForCondition(() -> {
            Map map = (Map) this.adminClient.describeBrokerRemovals().descriptions().get();
            if (map.isEmpty()) {
                return false;
            }
            BrokerRemovalDescription brokerRemovalDescription = (BrokerRemovalDescription) map.get(Integer.valueOf(this.brokerToRemoveId));
            Assertions.assertEquals(Boolean.valueOf(z), Boolean.valueOf(brokerRemovalDescription.isShutdownScheduled()));
            if (isRemovalFailed(brokerRemovalDescription)) {
                return retryRemoval(brokerRemovalDescription, this.brokerToRemoveId, z);
            }
            if (isSuccessfulRemoval(brokerRemovalDescription)) {
                atomicReference.set("Broker removal finished successfully when it shouldn't have.");
                return true;
            }
            if (isRemovalInProgress(brokerRemovalDescription)) {
                return true;
            }
            info("Removal is in an unknown state. EXC: {} PAR: {} BSS: {}", brokerRemovalDescription.brokerReplicaExclusionStatus(), brokerRemovalDescription.reassignmentsStatus(), brokerRemovalDescription.shutdownStatus());
            return false;
        }, removalFinishTimeout.toMillis(), removalPollInterval.toMillis(), () -> {
            return "Broker removal did not become in progress in time!";
        });
        if (atomicReference.get() == null || ((String) atomicReference.get()).isEmpty()) {
            return;
        }
        Assertions.fail((String) atomicReference.get());
    }
}
