/*
 * Decompiled with CFR 0.152.
 */
package kafka.tier.compatibility;

import java.math.RoundingMode;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.StreamSupport;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;

public class ProducerConsumerTracker {
    private Map<String, Boolean> tracker = new HashMap<String, Boolean>();
    private Set<String> outstandingMessages = new HashSet<String>();
    private long numProducedMessagesYetToBeConsumed = 0L;
    private long numConsumedMessages = 0L;
    private Map<TopicPartition, Map<Long, Long>> producerTimestampOffsetMap = new HashMap<TopicPartition, Map<Long, Long>>();
    private Map<TopicPartition, Long> perPartitionMsgCount = new TreeMap<TopicPartition, Long>((o1, o2) -> {
        if (o1.equals(o2)) {
            return 0;
        }
        int topicResult = o1.topic().compareTo(o2.topic());
        if (topicResult == 0) {
            return o1.partition() - o2.partition();
        }
        return topicResult;
    });
    private Map<TopicPartition, PerPartitionConsumptionInfo> consumptionInfo = new HashMap<TopicPartition, PerPartitionConsumptionInfo>();

    public synchronized long recordProducedMessage(String msg, RecordMetadata metadata) {
        if (this.outstandingMessages.contains(msg)) {
            this.outstandingMessages.remove(msg);
            if (this.tracker.get(msg) == null) {
                TopicPartition tp = new TopicPartition(metadata.topic(), metadata.partition());
                this.tracker.put(msg, false);
                ++this.numProducedMessagesYetToBeConsumed;
                if (this.producerTimestampOffsetMap.get(tp) == null) {
                    this.producerTimestampOffsetMap.put(tp, new HashMap());
                }
                this.producerTimestampOffsetMap.get(tp).put(metadata.timestamp(), metadata.offset());
                Long existingCount = this.perPartitionMsgCount.get(tp);
                if (existingCount == null) {
                    existingCount = 0L;
                }
                this.perPartitionMsgCount.put(tp, existingCount + 1L);
                return this.tracker.size();
            }
            throw new IllegalStateException(String.format("Message %s already exists in topic %s, partition %d. RecordMetadata: %s", msg, metadata.topic(), metadata.partition(), metadata));
        }
        throw new IllegalStateException(String.format("Message %s was never attempted to be sent in topic %s, partition %d. RecordMetadata: %s", msg, metadata.topic(), metadata.partition(), metadata));
    }

    public synchronized long recordConsumedMessageBatch(ConsumerRecords<String, String> records) {
        StreamSupport.stream(records.spliterator(), false).forEach(record -> {
            String msg = String.format("Key=%s,Value=%s", record.key(), record.value());
            Boolean seen = this.tracker.get(msg);
            if (seen != null) {
                if (!seen.booleanValue()) {
                    this.tracker.put(msg, true);
                    TopicPartition tp = new TopicPartition(record.topic(), record.partition());
                    if (this.numProducedMessagesYetToBeConsumed == 0L) {
                        throw new IllegalStateException(String.format("Messages yet to be consumed can not drop below zero for TopicPartition:%s while processing ConsumerRecord:%s" + tp, record));
                    }
                    Long existingCount = this.perPartitionMsgCount.get(tp);
                    if (existingCount == 0L) {
                        throw new IllegalStateException(String.format("Messages yet to be consumed can not drop below zero for TopicPartition: %s while processing ConsumerRecord: %s", tp, record));
                    }
                    --this.numProducedMessagesYetToBeConsumed;
                    this.perPartitionMsgCount.put(tp, existingCount - 1L);
                    PerPartitionConsumptionInfo consumptionInfo = this.consumptionInfo.get(tp);
                    if (consumptionInfo == null) {
                        this.consumptionInfo.put(tp, new PerPartitionConsumptionInfo(record.timestamp(), record.offset()));
                    } else {
                        consumptionInfo.update(record.timestamp(), record.offset());
                    }
                    ++this.numConsumedMessages;
                }
            } else if (this.outstandingMessages.contains(msg)) {
                this.outstandingMessages.remove(msg);
            } else {
                throw new IllegalStateException(String.format("Unknown message %s consumed, which was never produced. ConsumerRecord: %s", msg, record));
            }
        });
        return this.numProducedMessagesYetToBeConsumed;
    }

    public synchronized void recordOutstandingMessage(String msg) {
        if (this.outstandingMessages.contains(msg) || this.tracker.get(msg) != null) {
            throw new IllegalStateException(String.format("Message %s already exists", msg));
        }
        this.outstandingMessages.add(msg);
    }

    public synchronized void removeOutstandingMessage(String msg) {
        this.outstandingMessages.remove(msg);
    }

    public synchronized long getNumberOfProducedMessages() {
        return this.tracker.size();
    }

    public synchronized long getNumberOfMessagesToBeConsumed() {
        return this.numProducedMessagesYetToBeConsumed;
    }

    public synchronized long getNumberOfConsumedMessages() {
        return this.numConsumedMessages;
    }

    public synchronized Map<TopicPartition, Map<Long, Long>> getProducerTimestampOffsetMap() {
        return this.producerTimestampOffsetMap;
    }

    public synchronized Map<TopicPartition, PerPartitionConsumptionInfo> getConsumptionInfo() {
        return this.consumptionInfo;
    }

    private Map<TopicPartition, Long> getMapSortedByDescendingMsgCount() {
        ArrayList<Map.Entry<TopicPartition, Long>> list = new ArrayList<Map.Entry<TopicPartition, Long>>(this.perPartitionMsgCount.entrySet());
        list.sort((e1, e2) -> {
            long msgCount2;
            long msgCount1 = (Long)e1.getValue();
            if (msgCount1 == (msgCount2 = ((Long)e2.getValue()).longValue())) {
                return 0;
            }
            if (msgCount1 > msgCount2) {
                return -1;
            }
            return 1;
        });
        LinkedHashMap<TopicPartition, Long> result = new LinkedHashMap<TopicPartition, Long>();
        for (Map.Entry entry : list) {
            result.put((TopicPartition)entry.getKey(), (Long)entry.getValue());
        }
        return result;
    }

    public synchronized String status(boolean printPartitionCounts) {
        long yetToBeConsumed = this.getNumberOfMessagesToBeConsumed();
        long consumedMessages = this.getNumberOfConsumedMessages();
        DecimalFormat df = new DecimalFormat("##.##");
        df.setRoundingMode(RoundingMode.DOWN);
        String percentConsumed = df.format(((double)consumedMessages + 0.0) / (double)(consumedMessages + yetToBeConsumed) * 100.0);
        StringBuilder debugInfo = new StringBuilder();
        if (printPartitionCounts) {
            debugInfo.append("Breakdown of partition -> {remainingMessages, maxOffset} sorted by # of remaining messages (descending):\n");
            int processed = 0;
            Map<TopicPartition, Long> sortedPartitionMsgCount = this.getMapSortedByDescendingMsgCount();
            for (Map.Entry<TopicPartition, Long> entry : sortedPartitionMsgCount.entrySet()) {
                TopicPartition partition = entry.getKey();
                long msgCount = entry.getValue();
                debugInfo.append(entry.getKey());
                debugInfo.append(": {remainingMessages: ");
                debugInfo.append(msgCount);
                debugInfo.append(", maxConsumedOffset: ");
                PerPartitionConsumptionInfo perPartitionConsumptionInfo = this.consumptionInfo.get(partition);
                if (perPartitionConsumptionInfo != null) {
                    debugInfo.append(perPartitionConsumptionInfo.maxConsumedOffset);
                } else {
                    debugInfo.append(-1);
                }
                debugInfo.append("}");
                if (++processed >= this.perPartitionMsgCount.size()) continue;
                debugInfo.append("\n");
            }
        }
        return String.format("Consumed %d messages, remaining: %d, %s%% consumed. %s", consumedMessages, yetToBeConsumed, percentConsumed, debugInfo.toString());
    }

    public static class PerPartitionConsumptionInfo {
        private Map<Long, Long> timestampToOffsetMap = new HashMap<Long, Long>();
        private long maxConsumedOffset = -1L;

        public PerPartitionConsumptionInfo(long timestamp, long offset) {
            this();
            this.timestampToOffsetMap.put(timestamp, offset);
            this.maxConsumedOffset = Math.max(offset, this.maxConsumedOffset);
        }

        public PerPartitionConsumptionInfo() {
        }

        public void update(long timestamp, long offset) {
            this.timestampToOffsetMap.put(timestamp, offset);
            this.maxConsumedOffset = Math.max(offset, this.maxConsumedOffset);
        }

        public long maxOffset() {
            return this.maxConsumedOffset;
        }

        public Map<Long, Long> timestampToOffsetMap() {
            return this.timestampToOffsetMap;
        }
    }
}

