package io.confluent.databalancer.integration;

import io.confluent.databalancer.TestConstants;
import io.confluent.kafka.test.utils.KafkaTestUtils;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import kafka.server.KafkaServer;
import org.apache.kafka.clients.admin.BrokerRemovalDescription;
import org.apache.kafka.clients.admin.PartitionReassignmentsStatus;
import org.apache.kafka.common.errors.BalancerOfflineException;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Tag(TestConstants.INTEGRATION_TEST)
@Timeout(value = 7, unit = TimeUnit.MINUTES)
/* loaded from: input_file:io/confluent/databalancer/integration/RemoveBrokerFailoverTest.class */
public class RemoveBrokerFailoverTest extends DataBalancerClusterTestHarness {
    protected static final Logger log = LoggerFactory.getLogger(RemoveBrokerFailoverTest.class);
    protected AtomicBoolean exited = new AtomicBoolean(false);

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.confluent.databalancer.integration.DataBalancerClusterTestHarness
    public int initialBrokerCount() {
        return 6;
    }

    @Override // io.confluent.databalancer.integration.DataBalancerClusterTestHarness
    @AfterEach
    public void tearDown() throws Exception {
        super.tearDown();
        Exit.resetExitProcedure();
        this.exited.set(false);
    }

    @Override // io.confluent.databalancer.integration.DataBalancerClusterTestHarness
    protected Properties injectTestSpecificProperties(Properties properties) {
        properties.put("confluent.balancer.throttle.bytes.per.second", "10000000");
        return properties;
    }

    @Test
    public void testRemoveMultipleIncludingController() throws Throwable {
        List<KafkaServer> notControllerKafkaServers = notControllerKafkaServers(1);
        notControllerKafkaServers.add(controllerKafkaServer());
        KafkaTestUtils.createTopic(this.adminClient, "test-topic", 20, 2);
        removeBrokers(notControllerKafkaServers, true, true, this.exited, "removing the controller broker");
    }

    @Test
    void testRemoveBroker_RemovedBrokerShutdownAndFailoverDuringReassignmentDoesntCancelRemoval() throws Throwable {
        List<KafkaServer> notControllerKafkaServers = notControllerKafkaServers(2);
        List<Integer> list = (List) notControllerKafkaServers.stream().map(kafkaServer -> {
            return Integer.valueOf(kafkaServer.config().brokerId());
        }).collect(Collectors.toList());
        KafkaServer controllerKafkaServer = controllerKafkaServer();
        KafkaTestUtils.createTopic(this.adminClient, "test-topic", 20, 2);
        removeBrokers(notControllerKafkaServers, true, false, this.exited, "testing failover and shutdown during removal", false, notControllerKafkaServers.size() - 1);
        TestUtils.waitForCondition(() -> {
            Map map = (Map) this.adminClient.describeBrokerRemovals().descriptions().get();
            return (map.isEmpty() || ((BrokerRemovalDescription) map.get(list.get(0))).reassignmentsStatus() == PartitionReassignmentsStatus.PENDING) ? false : true;
        }, removalFinishTimeout.toMillis(), removalPollInterval.toMillis(), () -> {
            return "Broker removal did not start reassigning in time.";
        });
        info("Simulating shutdown of removing broker {}", notControllerKafkaServers.get(0));
        this.servers.stream().filter(kafkaServer2 -> {
            return kafkaServer2 == notControllerKafkaServers.get(0);
        }).findFirst().get().shutdown();
        info("Now forcing failover by restarting the controller node.");
        controllerKafkaServer.shutdown();
        controllerKafkaServer.startup();
        assertRemovalCompletion(list, notControllerKafkaServers.get(1).config().brokerId(), false, true, this.exited, "test failover and broker shutdown");
    }

    @Test
    public void testRemoveBrokerFailoverDuringReassignment() throws Throwable {
        KafkaServer notControllerKafkaServer = notControllerKafkaServer();
        int brokerId = notControllerKafkaServer.config().brokerId();
        KafkaTestUtils.createTopic(this.adminClient, "test-topic", 20, 2);
        removeBroker(notControllerKafkaServer, false, false, this.exited, "command failover during reassignment");
        TestUtils.waitForCondition(() -> {
            Map map = (Map) this.adminClient.describeBrokerRemovals().descriptions().get();
            return !map.isEmpty() && ((BrokerRemovalDescription) map.get(Integer.valueOf(brokerId))).reassignmentsStatus() == PartitionReassignmentsStatus.IN_PROGRESS;
        }, removalFinishTimeout.toMillis(), removalPollInterval.toMillis(), () -> {
            return "Broker removal did not go into the reassignments phase in time!";
        });
        this.logger.info("Shutting down controller broker {} as part of testing {}", Integer.valueOf(controllerKafkaServer().config().brokerId()), "command failover during reassignment");
        controllerKafkaServer().shutdown();
        this.logger.info("Successfully shut down controller broker {} as part of testing {}.Asserting that the new Balancer picks up the in-progress broker removal operation", Integer.valueOf(controllerKafkaServer().config().brokerId()), "command failover during reassignment");
        AtomicReference atomicReference = new AtomicReference("");
        TestUtils.waitForCondition(() -> {
            try {
                Map map = (Map) this.adminClient.describeBrokerRemovals().descriptions().get();
                if (map.isEmpty()) {
                    atomicReference.set("No broker removal descriptions were present");
                    return false;
                }
                BrokerRemovalDescription brokerRemovalDescription = (BrokerRemovalDescription) map.get(Integer.valueOf(brokerId));
                boolean z = brokerRemovalDescription.reassignmentsStatus() == PartitionReassignmentsStatus.IN_PROGRESS || brokerRemovalDescription.reassignmentsStatus() == PartitionReassignmentsStatus.ERROR;
                if (!z) {
                    atomicReference.set(String.format("The broker removal operation for broker %d did not successfully fail over - the status was %s", Integer.valueOf(notControllerKafkaServer.config().brokerId()), brokerRemovalDescription));
                }
                return z;
            } catch (ExecutionException e) {
                if (!(e.getCause() instanceof BalancerOfflineException)) {
                    throw e;
                }
                atomicReference.set("Balancer was offline");
                return false;
            }
        }, removalFinishTimeout.toMillis(), removalPollInterval.toMillis(), () -> {
            return String.format("Broker removal did not start back from the plan execution phase! Reason: %s", atomicReference.get());
        });
    }
}
