/*
 * Decompiled with CFR 0.152.
 */
package io.specmesh.kafka.admin;

import io.specmesh.kafka.admin.SmAdminClient;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.clients.admin.DescribeLogDirsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.LogDirDescription;
import org.apache.kafka.clients.admin.MemberAssignment;
import org.apache.kafka.clients.admin.MemberDescription;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.ReplicaInfo;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;

public class SimpleAdminClient
implements SmAdminClient {
    public static final long TIMEOUT = 300L;
    private final Admin adminClient;

    SimpleAdminClient(Admin adminClient) {
        this.adminClient = adminClient;
    }

    @Override
    public List<SmAdminClient.ConsumerGroup> groupsForTopicPrefix(String topicPrefix) {
        try {
            List<ConsumerGroupDescription> groupDescriptions = this.groupsDescriptions(topicPrefix);
            Map<String, List<SmAdminClient.Partition>> groupOffsets = this.groupOffsets(groupDescriptions, topicPrefix);
            return groupDescriptions.stream().map(groupDescription -> {
                SmAdminClient.ConsumerGroup.ConsumerGroupBuilder groupBuilder = SmAdminClient.ConsumerGroup.builder();
                groupBuilder.id(groupDescription.groupId());
                groupBuilder.members(groupDescription.members().stream().map(member -> SmAdminClient.Member.builder().id(member.consumerId()).host(member.host()).clientId(member.clientId()).build()).collect(Collectors.toList()));
                groupBuilder.partitions(groupOffsets.getOrDefault(groupDescription.groupId(), List.of()));
                SmAdminClient.ConsumerGroup group = groupBuilder.build();
                group.calculateTotalOffset();
                return group;
            }).collect(Collectors.toList());
        }
        catch (InterruptedException | ExecutionException | TimeoutException e) {
            throw new SmAdminClient.ClientException("Failed to list consumer-groups for:" + topicPrefix, e);
        }
    }

    private List<ConsumerGroupDescription> groupsDescriptions(String topicPrefix) throws InterruptedException, ExecutionException, TimeoutException {
        Collection allGroups = (Collection)this.adminClient.listConsumerGroups().all().get(300L, TimeUnit.SECONDS);
        List allGroupIds = allGroups.stream().filter(listing -> !listing.isSimpleConsumerGroup() && listing.state().isPresent() && ((ConsumerGroupState)listing.state().get()).equals((Object)ConsumerGroupState.STABLE)).map(ConsumerGroupListing::groupId).collect(Collectors.toList());
        Map allDescriptions = (Map)this.adminClient.describeConsumerGroups(allGroupIds).all().get(300L, TimeUnit.SECONDS);
        return allDescriptions.values().stream().filter(description -> this.isConsumingFromTopicPrefix((ConsumerGroupDescription)description, topicPrefix)).collect(Collectors.toList());
    }

    private Map<String, List<SmAdminClient.Partition>> groupOffsets(List<ConsumerGroupDescription> groupDescriptions, String topicPrefix) throws InterruptedException, ExecutionException, TimeoutException {
        Map specs = groupDescriptions.stream().map(ConsumerGroupDescription::groupId).collect(Collectors.toMap(Function.identity(), id -> new ListConsumerGroupOffsetsSpec()));
        Map offsets = (Map)this.adminClient.listConsumerGroupOffsets(specs).all().get(300L, TimeUnit.SECONDS);
        return offsets.entrySet().stream().filter(e -> ((Map)e.getValue()).keySet().stream().anyMatch(tp -> tp.topic().startsWith(topicPrefix))).collect(Collectors.toMap(Map.Entry::getKey, e -> ((Map)e.getValue()).entrySet().stream().map(o -> SmAdminClient.Partition.builder().id(((TopicPartition)o.getKey()).partition()).topic(((TopicPartition)o.getKey()).topic()).offset(((OffsetAndMetadata)o.getValue()).offset()).build()).collect(Collectors.toList())));
    }

    private boolean isConsumingFromTopicPrefix(ConsumerGroupDescription groupDescription, String prefix) {
        return groupDescription.members().stream().map(MemberDescription::assignment).map(MemberAssignment::topicPartitions).flatMap(Collection::stream).anyMatch(tp -> tp.topic().startsWith(prefix));
    }

    @Override
    public long topicVolumeUsingLogDirs(String topic) {
        try {
            List<Integer> brokers = this.brokerIds();
            DescribeLogDirsResult logDirsResult = this.adminClient.describeLogDirs(brokers);
            Map logDirsByBroker = (Map)logDirsResult.allDescriptions().get(300L, TimeUnit.SECONDS);
            long totalSize = 0L;
            for (Map logDirs : logDirsByBroker.values()) {
                for (LogDirDescription logDirInfo : logDirs.values()) {
                    for (Map.Entry replicaInfoEntry : logDirInfo.replicaInfos().entrySet()) {
                        TopicPartition tp = (TopicPartition)replicaInfoEntry.getKey();
                        if (!topic.equals(tp.topic())) continue;
                        totalSize += ((ReplicaInfo)replicaInfoEntry.getValue()).size();
                    }
                }
            }
            return totalSize;
        }
        catch (InterruptedException | ExecutionException | TimeoutException e) {
            throw new SmAdminClient.ClientException("Failed to get topicVolumeUsingLogDirs:" + topic, e);
        }
    }

    @Override
    public List<Integer> brokerIds() {
        try {
            return ((Collection)this.adminClient.describeCluster().nodes().get()).stream().mapToInt(Node::id).boxed().collect(Collectors.toList());
        }
        catch (InterruptedException | ExecutionException e) {
            throw new SmAdminClient.ClientException("Failed to describe cluster to get brokerIds", e);
        }
    }

    @Override
    public long topicVolumeOffsets(String topic) {
        try {
            DescribeTopicsResult describeTopicsResult = this.adminClient.describeTopics(Collections.singleton(topic));
            TopicDescription topicDescription = (TopicDescription)((Map)describeTopicsResult.allTopicNames().get(300L, TimeUnit.SECONDS)).get(topic);
            HashMap<TopicPartition, OffsetSpec> endOffsetsQuery = new HashMap<TopicPartition, OffsetSpec>();
            HashMap<TopicPartition, OffsetSpec> startOffsetsQuery = new HashMap<TopicPartition, OffsetSpec>();
            for (TopicPartitionInfo partitionInfo : topicDescription.partitions()) {
                TopicPartition partition = new TopicPartition(topic, partitionInfo.partition());
                endOffsetsQuery.put(partition, OffsetSpec.latest());
                startOffsetsQuery.put(partition, OffsetSpec.earliest());
            }
            ListOffsetsResult endOffsetsResult = this.adminClient.listOffsets(endOffsetsQuery);
            ListOffsetsResult startOffsetsResult = this.adminClient.listOffsets(startOffsetsQuery);
            long totalVolume = 0L;
            for (TopicPartition partition : endOffsetsQuery.keySet()) {
                long endOffset = ((ListOffsetsResult.ListOffsetsResultInfo)endOffsetsResult.partitionResult(partition).get()).offset();
                long startOffset = ((ListOffsetsResult.ListOffsetsResultInfo)startOffsetsResult.partitionResult(partition).get()).offset();
                totalVolume += endOffset - startOffset;
            }
            return totalVolume;
        }
        catch (ExecutionException e) {
            if (e.toString().contains(UnknownTopicOrPartitionException.class.getName())) {
                return 0L;
            }
            throw new SmAdminClient.ClientException("Failed to get topicVolumeOffsets for topic:" + topic, e);
        }
        catch (InterruptedException | TimeoutException e) {
            throw new SmAdminClient.ClientException("Failed to get topicVolumeOffsets for topic:" + topic, e);
        }
    }
}

