/*
 * Decompiled with CFR 0.152.
 */
package de.otto.eventsourcing.monitor;

import de.otto.eventsourcing.monitor.TopicUpdateEvent;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import kafka.admin.AdminClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Scheduled;
import scala.collection.JavaConversions;
import scala.collection.Map;

public class TopicsMonitor {
    private static final Logger LOG = LoggerFactory.getLogger(TopicsMonitor.class);
    private static final ConcurrentHashMap<Integer, Long> EMPTY_CONCURRENT_MAP = new ConcurrentHashMap();
    private static final String CONSUMER_OFFSETS_TOPIC = "__consumer_offsets";
    private final ConcurrentMap<String, ConcurrentMap<Integer, Long>> serverPartitionOffsets = new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>();
    private final ConcurrentMap<String, ConcurrentMap<Integer, Long>> consumerPartitionOffsets = new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>();
    private final AdminClient adminClient;
    private final Set<String> groupIds = Collections.synchronizedSet(new HashSet());
    private final long acceptedLag;

    public TopicsMonitor(AdminClient adminClient, long acceptedLag) {
        LOG.info("Starting TopicsMonitor");
        this.adminClient = adminClient;
        this.acceptedLag = acceptedLag;
    }

    public void registerGroupId(String groupId) {
        this.groupIds.add(groupId);
    }

    public void unregisterGroupId(String groupId) {
        this.groupIds.remove(groupId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @EventListener
    public void onApplicationEvent(TopicUpdateEvent event) {
        ConcurrentMap<String, ConcurrentMap<Integer, Long>> concurrentMap = this.consumerPartitionOffsets;
        synchronized (concurrentMap) {
            this.consumerPartitionOffsets.putIfAbsent(event.getTopic(), new ConcurrentHashMap());
            ((ConcurrentMap)this.consumerPartitionOffsets.get(event.getTopic())).put(event.getPartition(), event.getOffset());
        }
    }

    public boolean isUpToDate(String topic) {
        return this.lag(topic) <= this.acceptedLag;
    }

    public long lag(String topic) {
        if (this.serverPartitionOffsets.containsKey(topic)) {
            return ((ConcurrentMap)this.serverPartitionOffsets.get(topic)).keySet().stream().map(partition -> {
                long serverOffset = (Long)((ConcurrentMap)this.serverPartitionOffsets.get(topic)).get(partition);
                long consumerOffset = ((ConcurrentMap)this.consumerPartitionOffsets.getOrDefault(topic, EMPTY_CONCURRENT_MAP)).getOrDefault(partition, 0L);
                return serverOffset - consumerOffset - 1L;
            }).max(Comparator.naturalOrder()).orElse(Long.MAX_VALUE);
        }
        return Long.MAX_VALUE;
    }

    @Scheduled(initialDelayString="${eventsourcing.topics-monitor.initial-delay:500}", fixedDelayString="${eventsourcing.topics-monitor.fetch-delay:5000}")
    public void fetchGroupOffsets() {
        try {
            this.groupIds.forEach(consumerGroup -> {
                java.util.Map topicPartitionMap = JavaConversions.mapAsJavaMap((Map)this.adminClient.listGroupOffsets(consumerGroup));
                topicPartitionMap.keySet().forEach(topicPartition -> {
                    String topic = topicPartition.topic();
                    if (!topic.equals(CONSUMER_OFFSETS_TOPIC)) {
                        Long offset = (Long)topicPartitionMap.get(topicPartition);
                        int partition = topicPartition.partition();
                        this.serverPartitionOffsets.putIfAbsent(topic, new ConcurrentHashMap());
                        ((ConcurrentMap)this.serverPartitionOffsets.get(topic)).put(partition, offset);
                        LOG.trace("Group={}, Topic={}, Partition={}, Offset={}, Lag={}, isUpToDate={}", new Object[]{consumerGroup, topic, partition, offset, this.lag(topic), this.isUpToDate(topic)});
                    }
                });
            });
        }
        catch (RuntimeException e) {
            LOG.error("Exception caught while fetching group offsets: " + e.getMessage(), (Throwable)e);
        }
    }

    public Set<String> getTopics() {
        return this.serverPartitionOffsets.keySet();
    }

    public int getNumberOfPartitions(String topic) {
        return ((ConcurrentMap)this.serverPartitionOffsets.getOrDefault(topic, EMPTY_CONCURRENT_MAP)).keySet().size();
    }
}

