package io.confluent.databalancer.integration;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import kafka.common.TopicPlacement;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/databalancer/integration/DataBalancerIntegrationTestUtils.class */
public class DataBalancerIntegrationTestUtils {
    private static final Duration OPERATION_FINISH_TIMEOUT = Duration.ofMinutes(2);
    private static final Duration OPERATION_POLL_INTERVAL = Duration.ofSeconds(2);
    private static final Logger LOG = LoggerFactory.getLogger(DataBalancerIntegrationTestUtils.class);

    public static void verifyReplicasMovedToBroker(ConfluentAdmin confluentAdmin, String str, int i) throws InterruptedException {
        TestUtils.waitForCondition(() -> {
            if (((Map) confluentAdmin.listPartitionReassignments().reassignments().get()).isEmpty()) {
                return ((Map) confluentAdmin.describeTopics(Collections.singletonList(str)).all().get()).values().stream().anyMatch(topicDescription -> {
                    return topicDescription.partitions().stream().anyMatch(topicPartitionInfo -> {
                        return topicPartitionInfo.replicas().stream().anyMatch(node -> {
                            return node.id() == i;
                        });
                    });
                });
            }
            return false;
        }, OPERATION_FINISH_TIMEOUT.toMillis(), OPERATION_POLL_INTERVAL.toMillis(), () -> {
            return "Replicas were not balanced onto the new broker";
        });
    }

    public static List<TopicPartition> partitionsOnBroker(int i, Admin admin) throws ExecutionException, InterruptedException {
        List<TopicPartition> list = (List) ((Map) admin.describeTopics((Set) admin.listTopics().names().get()).all().get()).entrySet().stream().flatMap(entry -> {
            return ((TopicDescription) entry.getValue()).partitions().stream().filter(topicPartitionInfo -> {
                return topicPartitionInfo.replicas().stream().map((v0) -> {
                    return v0.id();
                }).anyMatch(num -> {
                    return num.intValue() == i;
                });
            }).map(topicPartitionInfo2 -> {
                return new TopicPartition((String) entry.getKey(), topicPartitionInfo2.partition());
            });
        }).collect(Collectors.toList());
        LOG.info("Partitions on broker {} are {}", Integer.valueOf(i), list);
        return list;
    }

    public static void verifyTopicPlacement(Admin admin, String str, String str2) throws InterruptedException {
        TopicPlacement topicPlacement = (TopicPlacement) TopicPlacement.parse(str2).get();
        LOG.info("Verifying that replica distribution for {} matches {}", str, topicPlacement);
        TestUtils.waitForCondition(() -> {
            TopicDescription topicDescription = (TopicDescription) ((Map) admin.describeTopics(Collections.singleton(str)).all().get()).get(str);
            Assert.assertTrue("Expected partitions to be described for topic", !topicDescription.partitions().isEmpty());
            for (TopicPartitionInfo topicPartitionInfo : topicDescription.partitions()) {
                List observers = topicPartitionInfo.observers();
                ArrayList arrayList = new ArrayList(topicPartitionInfo.replicas());
                arrayList.removeAll(observers);
                for (TopicPlacement.ConstraintCount constraintCount : topicPlacement.replicas()) {
                    LOG.info("Verifying sync replicas {} match constraints {}", arrayList, constraintCount);
                    new ArrayList(arrayList).removeIf(node -> {
                        return !constraintCount.matches(attributes(node));
                    });
                    Assert.assertEquals("Expected number of sync replicas to match topic placement", constraintCount.count(), r0.size());
                }
                for (TopicPlacement.ConstraintCount constraintCount2 : topicPlacement.observers()) {
                    LOG.info("Verifying observers {} match constraints {}", observers, constraintCount2);
                    new ArrayList(observers).removeIf(node2 -> {
                        return !constraintCount2.matches(attributes(node2));
                    });
                    Assert.assertEquals("Expected number of observers to match topic placement", constraintCount2.count(), r0.size());
                }
            }
            return true;
        }, OPERATION_FINISH_TIMEOUT.toMillis(), OPERATION_POLL_INTERVAL.toMillis(), () -> {
            return "Change in TopicPlacement did not trigger and complete cluster self-healing in time.";
        });
    }

    public static void alterTopicPlacementConfig(Admin admin, String str, String str2) throws ExecutionException, InterruptedException {
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, str);
        AlterConfigOp alterConfigOp = new AlterConfigOp(new ConfigEntry("confluent.placement.constraints", str2), AlterConfigOp.OpType.SET);
        LOG.info("Changing topic placement for topic {} to {}", str, str2);
        admin.incrementalAlterConfigs(Collections.singletonMap(configResource, Collections.singleton(alterConfigOp))).all().get();
        TestUtils.waitForCondition(() -> {
            Optional findFirst = ((Config) ((Map) admin.describeConfigs(Collections.singleton(configResource)).all().get()).get(configResource)).entries().stream().filter(configEntry -> {
                return configEntry.name().equals("confluent.placement.constraints");
            }).findFirst();
            return findFirst.isPresent() && ((ConfigEntry) findFirst.get()).value().equals(str2);
        }, "TopicPlacement change did not complete in time");
        LOG.info("Topic placement change complete");
    }

    private static Map<String, String> attributes(Node node) {
        return Collections.singletonMap("rack", node.rack());
    }
}
