package io.confluent.databalancer.integration;

import io.confluent.databalancer.KafkaDataBalanceManager;
import io.confluent.kafka.test.utils.KafkaTestUtils;
import java.time.Duration;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import kafka.server.KafkaServer;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.BrokerRemovalDescription;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.NewPartitionReassignment;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.BalancerOfflineException;
import org.apache.kafka.common.errors.BalancerOperationFailedException;
import org.apache.kafka.common.errors.InvalidBrokerRemovalException;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConverters;

@Category({IntegrationTest.class})
/* loaded from: input_file:io/confluent/databalancer/integration/RemoveBrokerTest.class */
public class RemoveBrokerTest extends DataBalancerClusterTestHarness {
    protected static final Logger log = LoggerFactory.getLogger(RemoveBrokerTest.class);

    @Rule
    public final Timeout globalTimeout = Timeout.millis(Duration.ofMinutes(7).toMillis());
    protected AtomicBoolean exited = new AtomicBoolean(false);

    @Override // io.confluent.databalancer.integration.DataBalancerClusterTestHarness
    public void tearDown() throws Exception {
        super.tearDown();
        Exit.resetExitProcedure();
        this.exited.set(false);
    }

    @Override // io.confluent.databalancer.integration.DataBalancerClusterTestHarness
    protected Properties injectTestSpecificProperties(Properties properties) {
        properties.put("confluent.balancer.throttle.bytes.per.second", "10000000");
        return properties;
    }

    @Override // io.confluent.databalancer.integration.DataBalancerClusterTestHarness
    protected int initialBrokerCount() {
        return 3;
    }

    @Test
    public void testRemoveBroker_DisabledBalancerShouldThrowBalancerOfflineException() throws InterruptedException, ExecutionException {
        int brokerId = notControllerKafkaServer().config().brokerId();
        this.adminClient.incrementalAlterConfigs(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(new ConfigResource(ConfigResource.Type.BROKER, ""), Collections.singleton(new AlterConfigOp(new ConfigEntry("confluent.balancer.enable", "false"), AlterConfigOp.OpType.SET)))})).all().get();
        KafkaDataBalanceManager kafkaDataBalanceManager = (KafkaDataBalanceManager) controllerKafkaServer().kafkaController().dataBalancer().get();
        TestUtils.waitForCondition(() -> {
            return !kafkaDataBalanceManager.isActive();
        }, 15000L, String.format("The databalancer did not start in %s", 15000L));
        ExecutionException executionException = (ExecutionException) Assert.assertThrows(ExecutionException.class, () -> {
        });
        Assert.assertNotNull("Expected to have a cause for the execution exception", executionException.getCause());
        Assert.assertEquals(BalancerOfflineException.class, executionException.getCause().getClass());
    }

    @Test
    public void testRemoveBroker() throws InterruptedException, ExecutionException {
        KafkaTestUtils.createTopic(this.adminClient, "test-topic", 20, 2);
        removeBroker(notControllerKafkaServer(), this.exited);
    }

    @Test
    public void testRemoveBroker_NoProposalsShouldComplete() throws InterruptedException, ExecutionException {
        int brokerId = notControllerKafkaServer().config().brokerId();
        while (moveReplicasOffBroker(brokerId).size() != 0) {
            info("Moving replicas off of broker {}", Integer.valueOf(brokerId));
        }
        removeBroker(notControllerKafkaServer(), this.exited);
    }

    @Test
    public void testRemoveController() throws InterruptedException, ExecutionException {
        KafkaTestUtils.createTopic(this.adminClient, "test-topic", 20, 2);
        removeBroker(controllerKafkaServer(), this.exited);
    }

    @Test
    public void testDeadBroker() throws Exception {
        KafkaTestUtils.createTopic(this.adminClient, "test-topic", 20, 2);
        InvalidBrokerRemovalException invalidBrokerRemovalException = null;
        try {
            this.adminClient.removeBrokers(Collections.emptyList()).all().get();
        } catch (ExecutionException e) {
            invalidBrokerRemovalException = e.getCause();
        }
        Assert.assertNotNull("Able to remove broker with empty list.", invalidBrokerRemovalException);
        InvalidBrokerRemovalException invalidBrokerRemovalException2 = null;
        try {
            this.adminClient.removeBrokers(Collections.singletonList(-1)).all().get();
        } catch (ExecutionException e2) {
            invalidBrokerRemovalException2 = e2.getCause();
        }
        Assert.assertNotNull("Able to remove broker with negative id.", invalidBrokerRemovalException2);
        InvalidBrokerRemovalException invalidBrokerRemovalException3 = null;
        try {
            this.adminClient.removeBrokers(Collections.singletonList(1000)).all().get();
        } catch (ExecutionException e3) {
            invalidBrokerRemovalException3 = e3.getCause();
        }
        Assert.assertNotNull("Able to remove non existent broker with id: 1000", invalidBrokerRemovalException3);
        KafkaServer notControllerKafkaServer = notControllerKafkaServer();
        notControllerKafkaServer.shutdown();
        this.exited.set(true);
        removeBroker(notControllerKafkaServer, this.exited);
    }

    @Test
    public void testRemoveBroker_FailureInShutdownShouldShowBalancerFailedOperation() throws ExecutionException, InterruptedException {
        KafkaServer controllerKafkaServer = controllerKafkaServer();
        KafkaServer notControllerKafkaServer = notControllerKafkaServer();
        int brokerId = notControllerKafkaServer.config().brokerId();
        HashSet hashSet = new HashSet();
        hashSet.add(Integer.valueOf(brokerId));
        controllerKafkaServer.kafkaController().controllerContext().removeLiveBrokers(JavaConverters.asScalaSet(hashSet).toSet());
        Assert.assertThrows(BalancerOperationFailedException.class, () -> {
            removeBroker(notControllerKafkaServer, this.exited);
        });
        Map map = (Map) this.adminClient.describeBrokerRemovals().descriptions().get();
        if (map.isEmpty()) {
            Assert.fail("Expected to have broker removals to describe");
        }
        BrokerRemovalDescription brokerRemovalDescription = (BrokerRemovalDescription) map.get(Integer.valueOf(brokerId));
        Assert.assertEquals(BrokerRemovalDescription.PartitionReassignmentsStatus.CANCELED, brokerRemovalDescription.partitionReassignmentsStatus());
        Assert.assertEquals(BrokerRemovalDescription.BrokerShutdownStatus.FAILED, brokerRemovalDescription.brokerShutdownStatus());
    }

    private List<Integer> brokerIdsWithoutRemovedBroker(int i) {
        List<Integer> list = (List) this.servers.stream().map(kafkaServer -> {
            return Integer.valueOf(kafkaServer.config().brokerId());
        }).collect(Collectors.toList());
        list.remove(i);
        return list;
    }

    private List<TopicPartition> moveReplicasOffBroker(int i) throws ExecutionException, InterruptedException {
        List<Integer> brokerIdsWithoutRemovedBroker = brokerIdsWithoutRemovedBroker(i);
        this.adminClient.alterPartitionReassignments((Map) DataBalancerIntegrationTestUtils.partitionsOnBroker(i, this.adminClient).stream().collect(Collectors.toMap(topicPartition -> {
            return topicPartition;
        }, topicPartition2 -> {
            return Optional.of(new NewPartitionReassignment(brokerIdsWithoutRemovedBroker));
        }))).all().get();
        TestUtils.waitForCondition(() -> {
            return ((Map) this.adminClient.listPartitionReassignments().reassignments().get()).size() == 0;
        }, 60000L, () -> {
            return "Expected all ongoing partition reassignments to finish";
        });
        return DataBalancerIntegrationTestUtils.partitionsOnBroker(i, this.adminClient);
    }
}
