package io.confluent.kafkarest.integration.v3;

import com.google.common.collect.ImmutableList;
import com.linkedin.kafka.cruisecontrol.monitor.MockSampler;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.NoopSampleStore;
import io.confluent.databalancer.KafkaDataBalanceManager;
import io.confluent.kafkarest.KafkaRestMain;
import io.confluent.kafkarest.KafkaRestResourceExtension;
import io.confluent.kafkarest.entities.v3.GetRemoveBrokerTaskResponse;
import io.confluent.kafkarest.entities.v3.ListRemoveBrokerTaskResponse;
import io.confluent.kafkarest.entities.v3.RemoveBrokerTaskData;
import io.confluent.kafkarest.entities.v3.RemoveBrokerTaskDataList;
import io.confluent.kafkarest.entities.v3.Resource;
import io.confluent.kafkarest.entities.v3.ResourceCollection;
import io.confluent.kafkarest.integration.ClusterTestHarness;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import javax.ws.rs.core.Response;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import org.apache.kafka.clients.admin.BrokerRemovalDescription;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/kafkarest/integration/v3/RemoveBrokerTaskResourceIntegrationTest.class */
public class RemoveBrokerTaskResourceIntegrationTest extends ClusterTestHarness {
    private static final int NUM_BROKERS = 5;
    private volatile boolean listBrokerRemovalTasks;
    private volatile boolean brokerRemovalWithAdmin;
    private List<Integer> nonControllerBrokers;
    private int[] brokerPorts;
    private static final Logger log = LoggerFactory.getLogger(KafkaRestMain.class);
    private static final Duration BALANCER_START_TIMEOUT = Duration.ofSeconds(120);
    private static final Duration REMOVAL_FINISH_TIMEOUT = Duration.ofMinutes(3);
    private static final Duration REMOVAL_POLL_INTERVAL = Duration.ofSeconds(2);

    public RemoveBrokerTaskResourceIntegrationTest() {
        super(NUM_BROKERS, false);
        this.listBrokerRemovalTasks = false;
        this.brokerRemovalWithAdmin = false;
    }

    public void setUp() throws Exception {
        this.brokerPorts = choosePorts(NUM_BROKERS);
        super.setUp();
        awaitBalanceEngineActivation();
        getNonControllerNodes();
    }

    public void tearDown() throws Exception {
        super.tearDown();
        Exit.resetExitProcedure();
    }

    protected void overrideKafkaRestConfigs(Properties properties) {
        properties.put("kafka.rest.resource.extension.class", KafkaRestResourceExtension.class.getName());
    }

    public Properties overrideBrokerProperties(int i, Properties properties) {
        properties.put("confluent.balancer.enable", "true");
        properties.put("confluent.balancer.network.in.max.bytes.per.second", "5000000");
        properties.put("confluent.balancer.network.out.max.bytes.per.second", "5000000");
        properties.put(KafkaConfig.ListenersProp(), getBrokerSecurityProtocol().name + "://localhost:" + this.brokerPorts[i]);
        properties.put(confluentBalancerConfig("bootstrap.servers"), Arrays.stream(this.brokerPorts).mapToObj(i2 -> {
            return "localhost:" + i2;
        }).collect(Collectors.joining(",")));
        properties.put(confluentBalancerConfig("metadata.max.age.ms"), "500");
        properties.put(confluentBalancerConfig("partition.sample.store.topic.partition.count"), "1");
        properties.put(confluentBalancerConfig("broker.sample.store.topic.partition.count"), "1");
        properties.put("confluent.metrics.reporter.topic.replicas", "2");
        properties.put(confluentBalancerConfig("num.concurrent.partition.movements.per.broker"), "50");
        properties.put(confluentBalancerConfig("num.concurrent.leader.movements"), "50");
        properties.setProperty(confluentBalancerConfig("num.partition.metrics.windows"), Integer.toString(1));
        properties.setProperty(confluentBalancerConfig("metric.sampler.class"), MockSampler.class.getName());
        properties.setProperty(confluentBalancerConfig("sample.store.class"), NoopSampleStore.class.getName());
        properties.setProperty(confluentBalancerConfig("metadata.max.age.ms"), "500");
        properties.setProperty(confluentBalancerConfig("metric.sampling.interval.ms"), "501");
        properties.setProperty(confluentBalancerConfig("num.broker.metrics.windows"), "1");
        properties.setProperty(confluentBalancerConfig("partition.metrics.window.ms"), "700");
        properties.setProperty(confluentBalancerConfig("broker.metrics.window.ms"), "700");
        return properties;
    }

    private String confluentBalancerConfig(String str) {
        return "confluent.balancer." + str;
    }

    @Test
    public void deleteBroker_listRemoveBrokerTasks_existingCluster_deletesBroker() throws InterruptedException {
        String clusterId = getClusterId();
        int intValue = this.nonControllerBrokers.get(0).intValue();
        setExitProcedure((KafkaServer) this.servers.get(intValue));
        requestBrokerRemoval(clusterId, intValue);
        this.listBrokerRemovalTasks = true;
        this.brokerRemovalWithAdmin = false;
        brokerRemovalWithRetries(clusterId, intValue);
        awaitBrokerRemoval(intValue);
        verifyBrokerRemoval(getBrokerPath(clusterId, intValue));
    }

    @Test
    public void deleteBroker_getRemoveBrokerTask_existingCluster_deletesBroker() throws InterruptedException {
        String clusterId = getClusterId();
        int intValue = this.nonControllerBrokers.get(1).intValue();
        setExitProcedure((KafkaServer) this.servers.get(intValue));
        requestBrokerRemoval(clusterId, intValue);
        this.listBrokerRemovalTasks = false;
        this.brokerRemovalWithAdmin = false;
        brokerRemovalWithRetries(clusterId, intValue);
        awaitBrokerRemoval(intValue);
        verifyBrokerRemoval(getBrokerPath(clusterId, intValue));
    }

    @Test
    public void listRemoveBrokerTasks_existingCluster_listsTasks() throws InterruptedException {
        String clusterId = getClusterId();
        int intValue = this.nonControllerBrokers.get(1).intValue();
        setExitProcedure((KafkaServer) this.servers.get(intValue));
        requestBrokerRemovalWithAdmin(intValue);
        this.listBrokerRemovalTasks = true;
        this.brokerRemovalWithAdmin = true;
        brokerRemovalWithRetries(clusterId, intValue);
        awaitBrokerRemoval(intValue);
        verifyBrokerRemoval(getBrokerPath(clusterId, intValue));
    }

    @Test
    public void getRemoveBrokerTasks_existingCluster_returnsTasks() throws InterruptedException {
        String clusterId = getClusterId();
        int intValue = this.nonControllerBrokers.get(1).intValue();
        setExitProcedure((KafkaServer) this.servers.get(intValue));
        requestBrokerRemovalWithAdmin(intValue);
        this.listBrokerRemovalTasks = false;
        this.brokerRemovalWithAdmin = true;
        brokerRemovalWithRetries(clusterId, intValue);
        awaitBrokerRemoval(intValue);
        verifyBrokerRemoval(getBrokerPath(clusterId, intValue));
    }

    @Test
    public void listRemoveBrokerTasks_noBrokerRemoval_existingCluster_returnsEmpty() {
        Response response = request(getRemoveBrokerTasksPath(getClusterId())).accept(new String[]{"application/json"}).get();
        Assert.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
        Assert.assertTrue(((ListRemoveBrokerTaskResponse) response.readEntity(ListRemoveBrokerTaskResponse.class)).getValue().getData().isEmpty());
    }

    @Test
    public void getRemoveBrokerTasks_noBrokerRemoval_existingCluster_throwsNotFound() {
        Assert.assertEquals(Response.Status.NOT_FOUND.getStatusCode(), request(getRemoveBrokerTasksPath(getClusterId()) + 1).accept(new String[]{"application/json"}).get().getStatus());
    }

    @Test
    public void listRemoveBrokerTasks_nonExistingCluster_throwsNotFound() {
        Assert.assertEquals(Response.Status.NOT_FOUND.getStatusCode(), request(getRemoveBrokerTasksPath("foobar")).accept(new String[]{"application/json"}).get().getStatus());
    }

    @Test
    public void getRemoveBrokerTasks_nonExistingCluster_throwsNotFound() {
        Assert.assertEquals(Response.Status.NOT_FOUND.getStatusCode(), request(getRemoveBrokerTasksPath("foobar") + 1).accept(new String[]{"application/json"}).get().getStatus());
    }

    @Test
    public void getRemoveBrokerTasks_nonExistingBroker_throwsNotFound() {
        Assert.assertEquals(Response.Status.NOT_FOUND.getStatusCode(), request(getRemoveBrokerTasksPath(getClusterId()) + 100).accept(new String[]{"application/json"}).get().getStatus());
    }

    private void brokerRemovalWithRetries(String str, int i) throws InterruptedException {
        TestUtils.waitForCondition(() -> {
            RemoveBrokerTaskData value;
            GetRemoveBrokerTaskResponse getRemoveBrokerTaskResponse = null;
            ListRemoveBrokerTaskResponse listRemoveBrokerTaskResponse = null;
            if (this.listBrokerRemovalTasks) {
                Response response = request(getRemoveBrokerTasksPath(str)).accept(new String[]{"application/json"}).get();
                Assert.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
                listRemoveBrokerTaskResponse = (ListRemoveBrokerTaskResponse) response.readEntity(ListRemoveBrokerTaskResponse.class);
                ImmutableList data = listRemoveBrokerTaskResponse.getValue().getData();
                if (data.isEmpty()) {
                    return false;
                }
                value = (RemoveBrokerTaskData) data.get(0);
            } else {
                Response response2 = request(getRemoveBrokerTasksPath(str) + i).accept(new String[]{"application/json"}).get();
                Assert.assertEquals(Response.Status.OK.getStatusCode(), response2.getStatus());
                getRemoveBrokerTaskResponse = (GetRemoveBrokerTaskResponse) response2.readEntity(GetRemoveBrokerTaskResponse.class);
                value = getRemoveBrokerTaskResponse.getValue();
            }
            if (value == null) {
                return false;
            }
            if (isRemovalCompleted(value)) {
                if (this.listBrokerRemovalTasks) {
                    Assert.assertEquals(getExpectedListRemoveBrokerTaskResponse(str, i), listRemoveBrokerTaskResponse);
                    return true;
                }
                Assert.assertEquals(getExpectedGetRemoveBrokerTaskResponse(str, i), getRemoveBrokerTaskResponse);
                return true;
            }
            if (isFailedPlanComputationInRemoval(value)) {
                log.info("Broker removal failed due to ", Errors.forCode(((Short) value.getErrorCode().orElse(Short.valueOf(Errors.NONE.code()))).shortValue()).exception());
                log.info("Re-trying broker removal...");
                return this.brokerRemovalWithAdmin ? requestBrokerRemovalWithAdmin(i) : requestBrokerRemoval(str, i);
            }
            if (isFailedRemoval(value)) {
                throw Errors.forCode(((Short) value.getErrorCode().get()).shortValue()).exception();
            }
            log.info("Broker removal is in pending state. PartitionReassignmentStatus: {} BrokerShutdownStatus: {}", value.getPartitionReassignmentStatus(), value.getBrokerShutdownStatus());
            return false;
        }, REMOVAL_FINISH_TIMEOUT.toMillis(), REMOVAL_POLL_INTERVAL.toMillis(), () -> {
            return "Broker removal did not complete in time";
        });
    }

    private ListRemoveBrokerTaskResponse getExpectedListRemoveBrokerTaskResponse(String str, int i) {
        String str2 = this.restConnect;
        return ListRemoveBrokerTaskResponse.create(RemoveBrokerTaskDataList.builder().setMetadata(ResourceCollection.Metadata.builder().setSelf(str2 + "/v3/clusters/" + str + "/remove-broker-tasks").build()).setData(Collections.singletonList(RemoveBrokerTaskData.builder().setMetadata(Resource.Metadata.builder().setSelf(str2 + "/v3/clusters/" + str + "/remove-broker-tasks/" + i).setResourceName("crn:///kafka=" + str + "/remove-broker-task=" + i).build()).setClusterId(str).setBrokerId(i).setBrokerShutdownStatus(BrokerRemovalDescription.BrokerShutdownStatus.COMPLETE).setPartitionReassignmentStatus(BrokerRemovalDescription.PartitionReassignmentsStatus.COMPLETE).setBroker(Resource.Relationship.create(str2 + getBrokerPath(str, i))).build())).build());
    }

    private GetRemoveBrokerTaskResponse getExpectedGetRemoveBrokerTaskResponse(String str, int i) {
        String str2 = this.restConnect;
        return GetRemoveBrokerTaskResponse.create(RemoveBrokerTaskData.builder().setMetadata(Resource.Metadata.builder().setSelf(str2 + "/v3/clusters/" + str + "/remove-broker-tasks/" + i).setResourceName("crn:///kafka=" + str + "/remove-broker-task=" + i).build()).setClusterId(str).setBrokerId(i).setBrokerShutdownStatus(BrokerRemovalDescription.BrokerShutdownStatus.COMPLETE).setPartitionReassignmentStatus(BrokerRemovalDescription.PartitionReassignmentsStatus.COMPLETE).setBroker(Resource.Relationship.create(str2 + getBrokerPath(str, i))).build());
    }

    private void setExitProcedure(KafkaServer kafkaServer) {
        Exit.setExitProcedure((i, str) -> {
            log.info("Shutting down {} as part of broker removal test", Integer.valueOf(kafkaServer.config().brokerId()));
            kafkaServer.shutdown();
        });
    }

    private void awaitBrokerRemoval(int i) throws InterruptedException {
        TestUtils.waitForCondition(() -> {
            return !((Set) getBrokers().stream().map((v0) -> {
                return v0.id();
            }).collect(Collectors.toSet())).contains(Integer.valueOf(i));
        }, 60000L, "Broker not removed from the cluster in time.");
    }

    private void verifyBrokerRemoval(String str) {
        Assert.assertEquals(Response.Status.NOT_FOUND.getStatusCode(), request(str).accept(new String[]{"application/json"}).get().getStatus());
    }

    private boolean requestBrokerRemoval(String str, int i) {
        Response delete = request(getBrokerPath(str, i)).accept(new String[]{"application/json"}).delete();
        Assert.assertEquals(Response.Status.ACCEPTED.getStatusCode(), delete.getStatus());
        Assert.assertTrue(((String) delete.readEntity(String.class)).isEmpty());
        Assert.assertEquals(delete.getLocation().getPath(), getRemoveBrokerTasksPath(str) + i);
        return false;
    }

    private boolean requestBrokerRemovalWithAdmin(int i) {
        Properties adminProperties = this.restConfig.getAdminProperties();
        adminProperties.put("bootstrap.servers", this.brokerList);
        ConfluentAdmin.create(adminProperties).removeBrokers(Arrays.asList(Integer.valueOf(i)));
        return false;
    }

    private boolean isRemovalCompleted(RemoveBrokerTaskData removeBrokerTaskData) {
        return removeBrokerTaskData.getBrokerShutdownStatus() == BrokerRemovalDescription.BrokerShutdownStatus.COMPLETE && removeBrokerTaskData.getPartitionReassignmentStatus() == BrokerRemovalDescription.PartitionReassignmentsStatus.COMPLETE;
    }

    private boolean isFailedPlanComputationInRemoval(RemoveBrokerTaskData removeBrokerTaskData) {
        return removeBrokerTaskData.getErrorCode().isPresent() && ((Short) removeBrokerTaskData.getErrorCode().get()).shortValue() == Errors.INSUFFICIENT_REBALANCE_PLAN_METRICS.code();
    }

    private boolean isFailedRemoval(RemoveBrokerTaskData removeBrokerTaskData) {
        return (removeBrokerTaskData.getPartitionReassignmentStatus() == BrokerRemovalDescription.PartitionReassignmentsStatus.CANCELED || removeBrokerTaskData.getPartitionReassignmentStatus() == BrokerRemovalDescription.PartitionReassignmentsStatus.FAILED) || (removeBrokerTaskData.getBrokerShutdownStatus() == BrokerRemovalDescription.BrokerShutdownStatus.CANCELED || removeBrokerTaskData.getBrokerShutdownStatus() == BrokerRemovalDescription.BrokerShutdownStatus.FAILED);
    }

    private void awaitBalanceEngineActivation() throws InterruptedException {
        KafkaDataBalanceManager kafkaDataBalanceManager = (KafkaDataBalanceManager) controllerKafkaServer().kafkaController().dataBalancer().get();
        kafkaDataBalanceManager.getClass();
        TestUtils.waitForCondition(kafkaDataBalanceManager::isActive, BALANCER_START_TIMEOUT.toMillis(), String.format("The databalancer did not start in %s", BALANCER_START_TIMEOUT));
    }

    private void getNonControllerNodes() {
        int controllerID = getControllerID();
        this.nonControllerBrokers = (List) getBrokers().stream().map((v0) -> {
            return v0.id();
        }).filter(num -> {
            return num.intValue() != controllerID;
        }).collect(Collectors.toList());
    }

    private String getBrokerPath(String str, int i) {
        return "/v3/clusters/" + str + "/brokers/" + i;
    }

    private String getRemoveBrokerTasksPath(String str) {
        return "/v3/clusters/" + str + "/remove-broker-tasks/";
    }

    private KafkaServer controllerKafkaServer() {
        return (KafkaServer) this.servers.stream().filter(kafkaServer -> {
            return kafkaServer.kafkaController().isActive();
        }).findFirst().get();
    }
}
