package io.confluent.databalancer.integration;

import io.confluent.databalancer.KafkaDataBalanceManager;
import io.confluent.databalancer.TestConstants;
import io.confluent.kafka.test.utils.KafkaTestUtils;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import kafka.server.KafkaServer;
import org.apache.kafka.clients.admin.BrokerRemovalDescription;
import org.apache.kafka.clients.admin.BrokerShutdownStatus;
import org.apache.kafka.clients.admin.NewPartitionReassignment;
import org.apache.kafka.clients.admin.PartitionReassignmentsStatus;
import org.apache.kafka.clients.admin.RemoveBrokersOptions;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.BalancerOfflineException;
import org.apache.kafka.common.errors.BalancerOperationFailedException;
import org.apache.kafka.common.errors.InvalidBrokerRemovalException;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConverters;

@Tag(TestConstants.INTEGRATION_TEST)
@Timeout(value = 7, unit = TimeUnit.MINUTES)
/* loaded from: input_file:io/confluent/databalancer/integration/RemoveBrokerTest.class */
public class RemoveBrokerTest extends DataBalancerClusterTestHarness {
    protected static final Logger log = LoggerFactory.getLogger(RemoveBrokerTest.class);
    protected AtomicBoolean exited = new AtomicBoolean(false);

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.confluent.databalancer.integration.DataBalancerClusterTestHarness
    public int initialBrokerCount() {
        return 5;
    }

    @Override // io.confluent.databalancer.integration.DataBalancerClusterTestHarness
    @AfterEach
    public void tearDown() throws Exception {
        super.tearDown();
        Exit.resetExitProcedure();
        this.exited.set(false);
    }

    @Override // io.confluent.databalancer.integration.DataBalancerClusterTestHarness
    protected Properties injectTestSpecificProperties(Properties properties) {
        properties.put("confluent.balancer.throttle.bytes.per.second", "10000000");
        return properties;
    }

    @Test
    public void testRemoveBroker_DisabledBalancerShouldThrowBalancerOfflineException() throws InterruptedException, ExecutionException {
        int brokerId = notControllerKafkaServer().config().brokerId();
        disableSelfBalancing();
        DataBalancerIntegrationTestUtils.assertAdminApiThrows(BalancerOfflineException.class, () -> {
        });
        DataBalancerIntegrationTestUtils.assertAdminApiThrows(BalancerOfflineException.class, () -> {
        });
    }

    @Test
    public void testRemoveBrokersIdempotency() throws Throwable {
        KafkaTestUtils.createTopic(this.adminClient, "test-topic", 20, 2);
        List<KafkaServer> notControllerKafkaServers = notControllerKafkaServers(2);
        removeBrokers(notControllerKafkaServers, true, true, this.exited, String.format("removing %d brokers", 2), true);
        awaitBrokerMetadataPropagation(initialBrokerCount() - 2);
        assertRemovalIdempotency(notControllerKafkaServers);
    }

    private void assertRemovalIdempotency(List<KafkaServer> list) throws ExecutionException, InterruptedException {
        List list2 = (List) list.stream().map(kafkaServer -> {
            return Integer.valueOf(kafkaServer.config().brokerId());
        }).collect(Collectors.toList());
        Integer num = (Integer) list2.stream().findAny().get();
        this.adminClient.removeBrokers(list2, new RemoveBrokersOptions().shouldShutdownBrokers(false)).all().get();
        BrokerRemovalDescription brokerRemovalDescription = (BrokerRemovalDescription) ((Map) this.adminClient.describeBrokerRemovals().descriptions().get()).get(num);
        assertBrokerRemovalStatus(brokerRemovalDescription);
        Assertions.assertTrue(isSuccessfulRemoval(brokerRemovalDescription), String.format("Expected the removal status for broker %s to not have changed after the API was called again to test idempotency", num));
    }

    @Test
    public void testRemoveBroker_NoProposalsShouldComplete() throws Throwable {
        int brokerId = notControllerKafkaServer().config().brokerId();
        while (moveReplicasOffBroker(brokerId).size() != 0) {
            info("Moving replicas off of broker {}", Integer.valueOf(brokerId));
        }
        removeBroker(notControllerKafkaServer(), this.exited, "removing a broker without any replicas on it");
    }

    @Test
    public void testRemoveMultipleBrokerWithShutdown() throws Throwable {
        removeBrokers(notControllerKafkaServers(2), true, true, this.exited, "removing a broker without any replicas on it");
    }

    @Test
    public void testRemoveBroker_OtherBrokerRestartShouldNotCancelRemoval() throws Throwable {
        KafkaServer notControllerKafkaServer = notControllerKafkaServer();
        int brokerId = notControllerKafkaServer.config().brokerId();
        int brokerId2 = controllerKafkaServer().config().brokerId();
        KafkaServer kafkaServer = this.servers.stream().filter(kafkaServer2 -> {
            return (kafkaServer2.config().brokerId() == brokerId || kafkaServer2.config().brokerId() == brokerId2) ? false : true;
        }).findFirst().get();
        int brokerId3 = kafkaServer.config().brokerId();
        testConditionDuringRemovalExecutionDoesNotCancelRemoval(() -> {
            info("Simulating broker restart of another broker: shutting down {}", Integer.valueOf(brokerId3));
            kafkaServer.shutdown();
            info("Broker {} was shut down to simulate unexpected restart. Restarting its server", Integer.valueOf(brokerId3));
            kafkaServer.startup();
        }, notControllerKafkaServer);
    }

    @Test
    public void testRemoveOneDeadOneAliveBroker() throws Throwable {
        KafkaTestUtils.createTopic(this.adminClient, "test-topic", 20, 2);
        assertRemovalInputValidation();
        List<KafkaServer> notControllerKafkaServers = notControllerKafkaServers(2);
        KafkaServer kafkaServer = notControllerKafkaServers.get(0);
        info("Shutting down broker {} as part of testing that {}", kafkaServer, "a dead broker can be removed (removing 1 dead, 1 alive broker)");
        kafkaServer.shutdown();
        info("Successfully shut down broker {} as part of testing that {}", kafkaServer, "a dead broker can be removed (removing 1 dead, 1 alive broker)");
        removeBrokers(notControllerKafkaServers, true, true, this.exited, "a dead broker can be removed (removing 1 dead, 1 alive broker)", false, notControllerKafkaServers.size() - 1);
    }

    private void assertRemovalInputValidation() throws InterruptedException {
        InvalidBrokerRemovalException invalidBrokerRemovalException = null;
        try {
            this.adminClient.removeBrokers(Collections.emptyList()).all().get();
        } catch (ExecutionException e) {
            invalidBrokerRemovalException = e.getCause();
        }
        Assertions.assertNotNull(invalidBrokerRemovalException, "Able to remove broker with empty list.");
        InvalidBrokerRemovalException invalidBrokerRemovalException2 = null;
        try {
            this.adminClient.removeBrokers(Collections.singletonList(-1)).all().get();
        } catch (ExecutionException e2) {
            invalidBrokerRemovalException2 = e2.getCause();
        }
        Assertions.assertNotNull(invalidBrokerRemovalException2, "Able to remove broker with negative id.");
        InvalidBrokerRemovalException invalidBrokerRemovalException3 = null;
        try {
            this.adminClient.removeBrokers(Collections.singletonList(1000)).all().get();
        } catch (ExecutionException e3) {
            invalidBrokerRemovalException3 = e3.getCause();
        }
        Assertions.assertNotNull(invalidBrokerRemovalException3, "Able to remove non existent broker with id: 1000");
    }

    @Disabled
    @Test
    public void testRemoveBroker_FailureInShutdownShouldShowBalancerFailedOperation() throws ExecutionException, InterruptedException {
        KafkaServer controllerKafkaServer = controllerKafkaServer();
        KafkaServer notControllerKafkaServer = notControllerKafkaServer();
        int brokerId = notControllerKafkaServer.config().brokerId();
        HashSet hashSet = new HashSet();
        hashSet.add(Integer.valueOf(brokerId));
        controllerKafkaServer.kafkaController().controllerContext().removeLiveBrokers(JavaConverters.asScalaSet(hashSet).toSet());
        Assertions.assertThrows(BalancerOperationFailedException.class, () -> {
            removeBroker(notControllerKafkaServer, true, true, this.exited, "the operation returns an error when the shutdown request part of the removal fails");
        });
        Map map = (Map) this.adminClient.describeBrokerRemovals().descriptions().get();
        if (map.isEmpty()) {
            Assertions.fail("Expected to have broker removals to describe");
        }
        BrokerRemovalDescription brokerRemovalDescription = (BrokerRemovalDescription) map.get(Integer.valueOf(brokerId));
        Assertions.assertEquals(PartitionReassignmentsStatus.COMPLETED, brokerRemovalDescription.reassignmentsStatus());
        Assertions.assertEquals(BrokerShutdownStatus.ERROR, brokerRemovalDescription.shutdownStatus());
        Assertions.assertTrue(brokerRemovalDescription.lastUpdateTimeMs() > 0, "Should have last update time populated");
        Assertions.assertTrue(brokerRemovalDescription.createTimeMs() > 0, "Should have last create time populated");
        Assertions.assertTrue(brokerRemovalDescription.lastUpdateTimeMs() > brokerRemovalDescription.createTimeMs(), "Last update time should be later than start time");
    }

    private List<Integer> brokerIdsWithoutRemovedBroker(int i) {
        List<Integer> list = (List) this.servers.stream().map(kafkaServer -> {
            return Integer.valueOf(kafkaServer.config().brokerId());
        }).collect(Collectors.toList());
        list.remove(i);
        return list;
    }

    private List<TopicPartition> moveReplicasOffBroker(int i) throws ExecutionException, InterruptedException {
        List<Integer> brokerIdsWithoutRemovedBroker = brokerIdsWithoutRemovedBroker(i);
        this.adminClient.alterPartitionReassignments((Map) DataBalancerIntegrationTestUtils.partitionsOnBroker(i, this.adminClient).stream().collect(Collectors.toMap(topicPartition -> {
            return topicPartition;
        }, topicPartition2 -> {
            return Optional.of(new NewPartitionReassignment(brokerIdsWithoutRemovedBroker));
        }))).all().get();
        TestUtils.waitForCondition(() -> {
            return ((Map) this.adminClient.listPartitionReassignments().reassignments().get()).size() == 0;
        }, 60000L, () -> {
            return "Expected all ongoing partition reassignments to finish";
        });
        return DataBalancerIntegrationTestUtils.partitionsOnBroker(i, this.adminClient);
    }

    private void testConditionDuringRemovalExecutionDoesNotCancelRemoval(Runnable runnable, KafkaServer kafkaServer) throws Throwable {
        int brokerId = kafkaServer.config().brokerId();
        KafkaTestUtils.createTopic(this.adminClient, "test-topic", 50, 2);
        this.kafkaCluster.produceData("test-topic", 200);
        KafkaDataBalanceManager dataBalancer = getDataBalancer();
        removeBroker(kafkaServer, false, false, this.exited, "test broker removal is not cancelled when unrelated broker restarts");
        Assertions.assertEquals(1, dataBalancer.brokerRemovals().size(), "Expected one broker removal to be stored in memory");
        runnable.run();
        assertRemovalCompletion(Collections.singletonList(Integer.valueOf(brokerId)), brokerId, false, false, this.exited, "test broker removal is not cancelled when unrelated broker restarts");
    }
}
