package com.linkedin.kafka.cruisecontrol.common;

import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.executor.ExecutionTask;
import com.linkedin.kafka.cruisecontrol.executor.ExecutionTaskManager;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.DescribeClusterOptions;
import org.apache.kafka.clients.admin.DescribeClusterResult;
import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.TopicPartitionReplica;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.common.errors.LogDirNotFoundException;
import org.apache.kafka.common.errors.NoReassignmentInProgressException;
import org.apache.kafka.common.errors.ReplicaNotAvailableException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/common/SbkAdminUtils.class */
public class SbkAdminUtils {
    private static final Logger LOG = LoggerFactory.getLogger(SbkAdminUtils.class);
    private ConfluentAdmin adminClient;
    private final long logDirResponseTimeoutMs;
    private final long describeTopicsResponseTimeoutMs;
    private final long describeClusterResponseTimeoutMs;

    public SbkAdminUtils(ConfluentAdmin confluentAdmin, KafkaCruiseControlConfig kafkaCruiseControlConfig) {
        this.adminClient = confluentAdmin;
        this.logDirResponseTimeoutMs = kafkaCruiseControlConfig.getLong(KafkaCruiseControlConfig.LOGDIR_RESPONSE_TIMEOUT_MS_CONFIG).longValue();
        this.describeTopicsResponseTimeoutMs = kafkaCruiseControlConfig.getLong(KafkaCruiseControlConfig.DESCRIBE_TOPICS_RESPONSE_TIMEOUT_MS_CONFIG).longValue();
        this.describeClusterResponseTimeoutMs = kafkaCruiseControlConfig.getLong(KafkaCruiseControlConfig.DESCRIBE_CLUSTER_RESPONSE_TIMEOUT_MS_CONFIG).longValue();
    }

    public AdminClientResult<KafkaCluster> describeCluster() throws InterruptedException {
        return describeCluster(this.describeClusterResponseTimeoutMs);
    }

    public AdminClientResult<KafkaCluster> describeCluster(long j) throws InterruptedException {
        AdminClientResult<KafkaCluster> adminClientResult;
        try {
            DescribeClusterResult describeCluster = this.adminClient.describeCluster(new DescribeClusterOptions().timeoutMs(Integer.valueOf((int) j)));
            adminClientResult = new AdminClientResult<>(new KafkaCluster((Collection) describeCluster.nodes().get(j, TimeUnit.MILLISECONDS), (Node) describeCluster.controller().get(j, TimeUnit.MILLISECONDS), (String) describeCluster.clusterId().get(j, TimeUnit.MILLISECONDS), (Set) describeCluster.authorizedOperations().get(j, TimeUnit.MILLISECONDS)));
        } catch (ExecutionException | TimeoutException | KafkaException e) {
            LOG.error("Encountered exception while describing the Kafka cluster", e);
            adminClientResult = new AdminClientResult<>(e);
        }
        return adminClientResult;
    }

    public List<Integer> getReplicasForPartition(TopicPartition topicPartition) {
        TopicDescription topicDescription;
        try {
            topicDescription = (TopicDescription) ((Map) this.adminClient.describeTopics(Collections.singletonList(topicPartition.topic())).all().get(this.describeTopicsResponseTimeoutMs, TimeUnit.MILLISECONDS)).get(topicPartition.topic());
        } catch (InterruptedException | ExecutionException | TimeoutException | KafkaException e) {
            LOG.warn("Encountered exception while fetching the replicas for topic partition {}", topicPartition, e);
        }
        if (topicDescription == null) {
            return Collections.emptyList();
        }
        Optional findAny = topicDescription.partitions().stream().filter(topicPartitionInfo -> {
            return topicPartitionInfo.partition() == topicPartition.partition();
        }).findAny();
        if (findAny.isPresent()) {
            return (List) ((TopicPartitionInfo) findAny.get()).replicas().stream().map((v0) -> {
                return v0.id();
            }).collect(Collectors.toList());
        }
        return Collections.emptyList();
    }

    public int cancelInterBrokerReplicaMovements(List<TopicPartition> list) {
        int i = 0;
        Optional empty = Optional.empty();
        Iterator it = this.adminClient.alterPartitionReassignments((Map) list.stream().collect(Collectors.toMap(topicPartition -> {
            return topicPartition;
        }, topicPartition2 -> {
            return empty;
        }))).values().entrySet().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            Map.Entry entry = (Map.Entry) it.next();
            TopicPartition topicPartition3 = (TopicPartition) entry.getKey();
            try {
                ((KafkaFuture) entry.getValue()).get();
                i++;
            } catch (InterruptedException e) {
                LOG.warn("Interrupted while cancelling partition reassignments.");
            } catch (ExecutionException | ApiException e2) {
                if ((e2.getCause() instanceof NoReassignmentInProgressException) || (e2 instanceof NoReassignmentInProgressException)) {
                    LOG.debug("Could not cancel reassignment of {} because none was in progress", topicPartition3);
                } else {
                    if (e2.getCause() instanceof UnsupportedVersionException) {
                        LOG.warn("Kafka does not support the AlterPartitionReassignments API.Cannot cancel the current partition reassignments.");
                        break;
                    }
                    LOG.warn("Reassignment cancellation for {} failed.", topicPartition3, e2);
                }
            }
        }
        return i;
    }

    public Map<ExecutionTask, DescribeReplicaLogDirsResult.ReplicaLogDirInfo> getLogdirInfoForExecutionTask(Collection<ExecutionTask> collection) {
        HashSet hashSet = new HashSet(collection.size());
        HashMap hashMap = new HashMap(collection.size());
        HashMap hashMap2 = new HashMap(collection.size());
        collection.forEach(executionTask -> {
            TopicPartitionReplica topicPartitionReplica = new TopicPartitionReplica(executionTask.proposal().topic(), executionTask.proposal().partitionId(), executionTask.brokerId());
            hashSet.add(topicPartitionReplica);
            hashMap2.put(topicPartitionReplica, executionTask);
        });
        for (Map.Entry entry : this.adminClient.describeReplicaLogDirs(hashSet).values().entrySet()) {
            try {
                hashMap.put(hashMap2.get(entry.getKey()), (DescribeReplicaLogDirsResult.ReplicaLogDirInfo) ((KafkaFuture) entry.getValue()).get(this.logDirResponseTimeoutMs, TimeUnit.MILLISECONDS));
            } catch (InterruptedException | ExecutionException | TimeoutException e) {
                LOG.warn("Encounter exception {} when fetching logdir information for replica {}", e.getMessage(), entry.getKey());
            }
        }
        return hashMap;
    }

    public void executeIntraBrokerReplicaMovements(List<ExecutionTask> list, ExecutionTaskManager executionTaskManager) {
        HashMap hashMap = new HashMap(list.size());
        HashMap hashMap2 = new HashMap(list.size());
        list.forEach(executionTask -> {
            TopicPartitionReplica topicPartitionReplica = new TopicPartitionReplica(executionTask.proposal().topic(), executionTask.proposal().partitionId(), executionTask.brokerId());
            hashMap.put(topicPartitionReplica, executionTask.proposal().replicasToMoveBetweenDisksByBroker().get(Integer.valueOf(executionTask.brokerId())).logdir());
            hashMap2.put(topicPartitionReplica, executionTask);
        });
        for (Map.Entry entry : this.adminClient.alterReplicaLogDirs(hashMap).values().entrySet()) {
            try {
                ((KafkaFuture) entry.getValue()).get(this.logDirResponseTimeoutMs, TimeUnit.MILLISECONDS);
            } catch (InterruptedException | ExecutionException | TimeoutException | LogDirNotFoundException | KafkaStorageException | ReplicaNotAvailableException e) {
                LOG.warn("Encounter exception {} when trying to execute task {}, mark task dead.", e.getMessage(), hashMap2.get(entry.getKey()));
                executionTaskManager.markTaskAborting((ExecutionTask) hashMap2.get(entry.getKey()));
                executionTaskManager.markTaskDead((ExecutionTask) hashMap2.get(entry.getKey()));
            }
        }
    }

    public boolean isOngoingIntraBrokerReplicaMovement(Collection<Integer> collection) {
        Iterator it = this.adminClient.describeLogDirs(collection).values().entrySet().iterator();
        while (it.hasNext()) {
            try {
                for (DescribeLogDirsResponse.LogDirInfo logDirInfo : ((Map) ((KafkaFuture) ((Map.Entry) it.next()).getValue()).get(this.logDirResponseTimeoutMs, TimeUnit.MILLISECONDS)).values()) {
                    if (logDirInfo.error == Errors.NONE && logDirInfo.replicaInfos.values().stream().anyMatch(replicaInfo -> {
                        return replicaInfo.isFuture;
                    })) {
                        return true;
                    }
                }
            } catch (InterruptedException | ExecutionException | TimeoutException e) {
            }
        }
        return false;
    }
}
