package io.confluent.databalancer.integration;

import io.confluent.databalancer.KafkaDataBalanceManager;
import io.confluent.kafka.test.utils.KafkaTestUtils;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import kafka.common.BrokerRemovalDescriptionInternal;
import kafka.server.KafkaServer;
import org.apache.kafka.clients.admin.BrokerRemovalDescription;
import org.apache.kafka.common.errors.BalancerOfflineException;
import org.apache.kafka.common.errors.BrokerRemovalCanceledException;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({IntegrationTest.class})
/* loaded from: input_file:io/confluent/databalancer/integration/RemoveBrokerCancellationTest.class */
public class RemoveBrokerCancellationTest extends DataBalancerClusterTestHarness {

    @Rule
    public final Timeout globalTimeout = Timeout.millis(Duration.ofMinutes(5).toMillis());
    protected AtomicBoolean exited = new AtomicBoolean(false);
    protected int brokerToRemoveId;
    protected KafkaServer serverToRemove;
    protected static final Logger log = LoggerFactory.getLogger(RemoveBrokerCancellationTest.class);
    protected static Duration removalFinishTimeout = Duration.ofMinutes(3);
    protected static Duration removalPollInterval = Duration.ofSeconds(2);

    @Override // io.confluent.databalancer.integration.DataBalancerClusterTestHarness
    public void setUp() throws Exception {
        super.setUp();
        this.serverToRemove = notControllerKafkaServer();
        this.brokerToRemoveId = this.serverToRemove.config().brokerId();
        Exit.setExitProcedure((i, str) -> {
            info("Shutting down {} as part of broker removal test", Integer.valueOf(this.serverToRemove.config().brokerId()));
            this.serverToRemove.shutdown();
            this.exited.set(true);
        });
    }

    @Override // io.confluent.databalancer.integration.DataBalancerClusterTestHarness
    public void tearDown() throws Exception {
        super.tearDown();
        Exit.resetExitProcedure();
        this.exited.set(false);
    }

    @Override // io.confluent.databalancer.integration.DataBalancerClusterTestHarness
    protected int initialBrokerCount() {
        return 3;
    }

    @Override // io.confluent.databalancer.integration.DataBalancerClusterTestHarness
    protected Properties injectTestSpecificProperties(Properties properties) {
        properties.put("confluent.balancer.throttle.bytes.per.second", 10L);
        return properties;
    }

    @Test
    public void testRemoveBroker_CancelSBKWhileRemovingResultsInPersistedCanceledState() throws ExecutionException, InterruptedException {
        KafkaTestUtils.createTopic(this.adminClient, "test-topic", 50, 2);
        this.kafkaCluster.produceData("test-topic", 200);
        this.adminClient.removeBrokers(Collections.singletonList(Integer.valueOf(this.brokerToRemoveId))).all().get();
        awaitRemovalInProgress();
        disableSBK();
        boolean z = false;
        try {
            this.adminClient.describeBrokerRemovals().descriptions().get();
        } catch (ExecutionException e) {
            if (!(e.getCause() instanceof BalancerOfflineException)) {
                throw e;
            }
            z = true;
        }
        Assert.assertTrue("Expected BalancerOfflineException while SBK is disabled", z);
        enableSBK();
        Map map = (Map) this.adminClient.describeBrokerRemovals().descriptions().get();
        Assert.assertTrue(String.format("Expected to be able to describe a broker removal after it was canceled, instead got map %s", map), map.containsKey(Integer.valueOf(this.brokerToRemoveId)));
        BrokerRemovalDescription brokerRemovalDescription = (BrokerRemovalDescription) map.get(Integer.valueOf(this.brokerToRemoveId));
        Assert.assertTrue(String.format("Expected removal for broker %s to be canceled. (removal: %s)", Integer.valueOf(this.brokerToRemoveId), brokerRemovalDescription), isCanceledRemoval(brokerRemovalDescription));
    }

    @Test
    public void testRemoveBroker_BrokerRestartCancelsRemoval() throws InterruptedException, ExecutionException {
        KafkaTestUtils.createTopic(this.adminClient, "test-topic", 50, 2);
        this.kafkaCluster.produceData("test-topic", 200);
        KafkaDataBalanceManager kafkaDataBalanceManager = (KafkaDataBalanceManager) controllerKafkaServer().kafkaController().dataBalancer().get();
        this.adminClient.removeBrokers(Collections.singletonList(Integer.valueOf(this.brokerToRemoveId))).all().get();
        AtomicReference atomicReference = new AtomicReference();
        TestUtils.waitForCondition(() -> {
            Map map = (Map) this.adminClient.describeBrokerRemovals().descriptions().get();
            if (map.isEmpty()) {
                return false;
            }
            BrokerRemovalDescription brokerRemovalDescription = (BrokerRemovalDescription) map.get(Integer.valueOf(this.brokerToRemoveId));
            if (isFailedRemoval(brokerRemovalDescription)) {
                return retryRemoval(brokerRemovalDescription, this.brokerToRemoveId);
            }
            if (this.exited.get()) {
                info("Broker {} was shut down due to removal. Restarting its server", Integer.valueOf(this.brokerToRemoveId));
                this.serverToRemove.startup();
                return true;
            }
            if (isCompletedRemoval(brokerRemovalDescription)) {
                atomicReference.set("Broker removal finished successfully despite the broker restarting.");
                return true;
            }
            info("Removal is still pending. PAR: {} BSS: {}", brokerRemovalDescription.partitionReassignmentsStatus(), brokerRemovalDescription.brokerShutdownStatus());
            return false;
        }, removalFinishTimeout.toMillis(), removalPollInterval.toMillis(), () -> {
            return "Broker removal did not complete successfully in time!";
        });
        if (atomicReference.get() != null && !((String) atomicReference.get()).isEmpty()) {
            Assert.fail((String) atomicReference.get());
        }
        Assert.assertEquals("Expected one broker removal to be stored in memory", 1L, kafkaDataBalanceManager.brokerRemovals().size());
        TestUtils.waitForCondition(() -> {
            return ((BrokerRemovalDescriptionInternal) kafkaDataBalanceManager.brokerRemovals().get(0)).exception() instanceof BrokerRemovalCanceledException;
        }, 60000L, String.format("Broker removal status did not have the expected %s exception. Instead it has %s", BrokerRemovalCanceledException.class.getSimpleName(), ((BrokerRemovalDescriptionInternal) kafkaDataBalanceManager.brokerRemovals().get(0)).exception()));
    }

    private void awaitRemovalInProgress() throws InterruptedException {
        AtomicReference atomicReference = new AtomicReference();
        TestUtils.waitForCondition(() -> {
            Map map = (Map) this.adminClient.describeBrokerRemovals().descriptions().get();
            if (map.isEmpty()) {
                return false;
            }
            BrokerRemovalDescription brokerRemovalDescription = (BrokerRemovalDescription) map.get(Integer.valueOf(this.brokerToRemoveId));
            if (isFailedRemoval(brokerRemovalDescription)) {
                return retryRemoval(brokerRemovalDescription, this.brokerToRemoveId);
            }
            if (isCompletedRemoval(brokerRemovalDescription)) {
                atomicReference.set("Broker removal finished successfully when it shouldn't have.");
                return true;
            }
            if (isRemovalInProgress(brokerRemovalDescription)) {
                return true;
            }
            info("Removal is in an unknown state. PAR: {} BSS: {}", brokerRemovalDescription.partitionReassignmentsStatus(), brokerRemovalDescription.brokerShutdownStatus());
            return false;
        }, removalFinishTimeout.toMillis(), removalPollInterval.toMillis(), () -> {
            return "Broker removal did not become in progress in time!";
        });
        if (atomicReference.get() == null || ((String) atomicReference.get()).isEmpty()) {
            return;
        }
        Assert.fail((String) atomicReference.get());
    }
}
