package io.confluent.databalancer.integration;

import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import io.confluent.databalancer.KafkaDataBalanceManager;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.TestUtils;
import kafka.zk.EmbeddedZookeeper;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.BrokerRemovalDescription;
import org.apache.kafka.clients.admin.BrokerRemovalError;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.InsufficientRebalancePlanMetricsException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.test.IntegrationTest;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConverters;

@Category({IntegrationTest.class})
/* loaded from: input_file:io/confluent/databalancer/integration/DataBalancerClusterTestHarness.class */
public abstract class DataBalancerClusterTestHarness {
    protected Logger logger;
    protected EmbeddedZookeeper zookeeper;
    protected ConfluentAdmin adminClient;
    protected Properties generalProperties;
    protected EmbeddedSBKKafkaCluster kafkaCluster;
    protected List<KafkaConfig> configs = null;
    protected List<KafkaServer> servers = null;
    protected String brokerList = null;
    private static Duration balancerStartTimeout = Duration.ofSeconds(120);
    private static Duration balancerStopTimeout = Duration.ofSeconds(120);
    protected static Duration removalFinishTimeout = Duration.ofMinutes(3);
    protected static Duration removalPollInterval = Duration.ofSeconds(2);

    protected abstract int initialBrokerCount();

    protected Properties injectTestSpecificProperties(Properties properties) {
        return properties;
    }

    protected Map<Integer, Map<String, String>> brokerOverrideProps() {
        return Collections.emptyMap();
    }

    @Before
    public void setUp() throws Exception {
        this.logger = LoggerFactory.getLogger(getClass());
        this.generalProperties = injectTestSpecificProperties(new Properties());
        this.kafkaCluster = new EmbeddedSBKKafkaCluster();
        this.kafkaCluster.startZooKeeper();
        this.kafkaCluster.startBrokers(initialBrokerCount(), this.generalProperties, brokerOverrideProps());
        this.servers = this.kafkaCluster.brokers();
        this.brokerList = TestUtils.getBrokerListStrFromServers(JavaConverters.asScalaBuffer(this.servers), SecurityProtocol.PLAINTEXT);
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", this.brokerList);
        this.adminClient = KafkaCruiseControlUtils.createAdmin(hashMap);
        awaitBalanceEngineActivation();
    }

    @After
    public void tearDown() throws Exception {
        if (this.kafkaCluster != null) {
            this.kafkaCluster.shutdown();
        }
        if (this.adminClient != null) {
            this.adminClient.close();
        }
    }

    public KafkaServer controllerKafkaServer() {
        return this.servers.stream().filter(kafkaServer -> {
            return kafkaServer.kafkaController().isActive();
        }).findFirst().get();
    }

    public KafkaServer notControllerKafkaServer() {
        return this.servers.stream().filter(kafkaServer -> {
            return !kafkaServer.kafkaController().isActive();
        }).findFirst().get();
    }

    private void awaitBalanceEngineActivation() throws InterruptedException {
        KafkaDataBalanceManager kafkaDataBalanceManager = (KafkaDataBalanceManager) controllerKafkaServer().kafkaController().dataBalancer().get();
        kafkaDataBalanceManager.getClass();
        org.apache.kafka.test.TestUtils.waitForCondition(kafkaDataBalanceManager::isActive, balancerStartTimeout.toMillis(), String.format("The databalancer did not start in %s", balancerStartTimeout));
    }

    private void awaitBalanceEngineDisabled() throws InterruptedException {
        KafkaDataBalanceManager kafkaDataBalanceManager = (KafkaDataBalanceManager) controllerKafkaServer().kafkaController().dataBalancer().get();
        org.apache.kafka.test.TestUtils.waitForCondition(() -> {
            return !kafkaDataBalanceManager.isActive();
        }, balancerStopTimeout.toMillis(), String.format("The databalancer did not stop in %s", balancerStopTimeout));
    }

    public void addBroker(int i) {
        Properties properties = new Properties();
        Map<String, String> orDefault = brokerOverrideProps().getOrDefault(Integer.valueOf(i), Collections.emptyMap());
        properties.putAll(this.generalProperties);
        properties.putAll(orDefault);
        this.kafkaCluster.startBroker(i, properties);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void removeBroker(KafkaServer kafkaServer, AtomicBoolean atomicBoolean) throws InterruptedException, ExecutionException {
        int brokerId = kafkaServer.config().brokerId();
        Exit.setExitProcedure((i, str) -> {
            info("Shutting down {} as part of broker removal test", Integer.valueOf(kafkaServer.config().brokerId()));
            atomicBoolean.set(true);
            kafkaServer.shutdown();
        });
        info("Removing broker with id {}", Integer.valueOf(brokerId));
        this.adminClient.removeBrokers(Collections.singletonList(Integer.valueOf(brokerId))).all().get();
        AtomicReference atomicReference = new AtomicReference();
        org.apache.kafka.test.TestUtils.waitForCondition(() -> {
            Map map = (Map) this.adminClient.describeBrokerRemovals().descriptions().get();
            if (map.isEmpty()) {
                return false;
            }
            BrokerRemovalDescription brokerRemovalDescription = (BrokerRemovalDescription) map.get(Integer.valueOf(brokerId));
            if (isCompletedRemoval(brokerRemovalDescription)) {
                return true;
            }
            if (isFailedPlanComputationInRemoval(brokerRemovalDescription)) {
                return retryRemoval(brokerRemovalDescription, brokerId);
            }
            if (!isFailedRemoval(brokerRemovalDescription)) {
                info("Removal is still pending. PAR: {} BSS: {}", brokerRemovalDescription.partitionReassignmentsStatus(), brokerRemovalDescription.brokerShutdownStatus());
                return false;
            }
            String format = String.format("Broker removal failed for an unexpected reason - description object %s", brokerRemovalDescription);
            atomicReference.set(((BrokerRemovalError) brokerRemovalDescription.removalError().get()).exception());
            info(format);
            return true;
        }, removalFinishTimeout.toMillis(), removalPollInterval.toMillis(), () -> {
            return "Broker removal did not complete successfully in time!";
        });
        if (atomicReference.get() != null) {
            throw ((ApiException) atomicReference.get());
        }
        Assert.assertTrue("Expected Exit to be called", atomicBoolean.get());
        org.apache.kafka.test.TestUtils.waitForCondition(() -> {
            return ((Collection) this.adminClient.describeCluster().nodes().get()).size() == initialBrokerCount() - 1;
        }, 60000L, "Cluster size did not shrink!");
        Assert.assertEquals("Expected one broker removal to be stored in memory", 1L, ((Map) this.adminClient.describeBrokerRemovals().descriptions().get()).size());
        Assert.assertEquals("Broker should have no partitions after removal", Collections.emptyList(), DataBalancerIntegrationTestUtils.partitionsOnBroker(brokerId, this.adminClient));
    }

    public void info(String str) {
        info(str, Collections.emptyList());
    }

    public void info(String str, Object... objArr) {
        String join = String.join("", Collections.nCopies(10, "-"));
        String format = String.format("%s-%s-%s", join, getClass().getName(), join);
        this.logger.info(format);
        this.logger.info(str, objArr);
        this.logger.info(format);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void disableSBK() throws InterruptedException, ExecutionException {
        this.adminClient.incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.BROKER, ""), Collections.singleton(new AlterConfigOp(new ConfigEntry("confluent.balancer.enable", "false"), AlterConfigOp.OpType.SET)))).all().get();
        awaitBalanceEngineDisabled();
        info("SBK was disabled");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void enableSBK() throws InterruptedException, ExecutionException {
        this.adminClient.incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.BROKER, ""), Collections.singleton(new AlterConfigOp(new ConfigEntry("confluent.balancer.enable", "true"), AlterConfigOp.OpType.SET)))).all().get();
        awaitBalanceEngineActivation();
        info("SBK was enabled via the dynamic config");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean retryRemoval(BrokerRemovalDescription brokerRemovalDescription, int i) throws ExecutionException, InterruptedException {
        info("Broker removal failed due to", ((BrokerRemovalError) brokerRemovalDescription.removalError().orElse(new BrokerRemovalError(Errors.NONE, (String) null))).exception());
        info("Re-scheduling removal...");
        this.adminClient.removeBrokers(Collections.singletonList(Integer.valueOf(i))).all().get();
        return false;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isCompletedRemoval(BrokerRemovalDescription brokerRemovalDescription) {
        return (brokerRemovalDescription.brokerShutdownStatus() == BrokerRemovalDescription.BrokerShutdownStatus.COMPLETE) && (brokerRemovalDescription.partitionReassignmentsStatus() == BrokerRemovalDescription.PartitionReassignmentsStatus.COMPLETE);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isRemovalInProgress(BrokerRemovalDescription brokerRemovalDescription) {
        boolean z = brokerRemovalDescription.partitionReassignmentsStatus() == BrokerRemovalDescription.PartitionReassignmentsStatus.IN_PROGRESS;
        return (z && (brokerRemovalDescription.brokerShutdownStatus() == BrokerRemovalDescription.BrokerShutdownStatus.COMPLETE)) || (z && (brokerRemovalDescription.brokerShutdownStatus() == BrokerRemovalDescription.BrokerShutdownStatus.PENDING));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isFailedRemoval(BrokerRemovalDescription brokerRemovalDescription) {
        return (brokerRemovalDescription.brokerShutdownStatus() == BrokerRemovalDescription.BrokerShutdownStatus.FAILED || brokerRemovalDescription.brokerShutdownStatus() == BrokerRemovalDescription.BrokerShutdownStatus.CANCELED) || (brokerRemovalDescription.partitionReassignmentsStatus() == BrokerRemovalDescription.PartitionReassignmentsStatus.CANCELED || brokerRemovalDescription.partitionReassignmentsStatus() == BrokerRemovalDescription.PartitionReassignmentsStatus.FAILED);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isCanceledRemoval(BrokerRemovalDescription brokerRemovalDescription) {
        return (brokerRemovalDescription.partitionReassignmentsStatus() == BrokerRemovalDescription.PartitionReassignmentsStatus.CANCELED) || (brokerRemovalDescription.brokerShutdownStatus() == BrokerRemovalDescription.BrokerShutdownStatus.CANCELED);
    }

    protected boolean isFailedPlanComputationInRemoval(BrokerRemovalDescription brokerRemovalDescription) {
        return brokerRemovalDescription.removalError().isPresent() && (((BrokerRemovalError) brokerRemovalDescription.removalError().get()).exception() instanceof InsufficientRebalancePlanMetricsException);
    }
}
