/*
 * Decompiled with CFR 0.152.
 */
package cn.leancloud.kafka.consumer;

import cn.leancloud.kafka.consumer.CompletedOffsets;
import cn.leancloud.kafka.consumer.VisibleForTesting;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

class ProcessRecordsProgress {
    private final Map<TopicPartition, Long> topicOffsetHighWaterMark = new HashMap<TopicPartition, Long>();
    private final Map<TopicPartition, CompletedOffsets> completedOffsets = new HashMap<TopicPartition, CompletedOffsets>();

    void markPendingRecord(ConsumerRecord<?, ?> record) {
        TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
        this.topicOffsetHighWaterMark.merge(topicPartition, record.offset() + 1L, Math::max);
        CompletedOffsets offset = this.completedOffsets.get(topicPartition);
        if (offset == null) {
            this.completedOffsets.put(topicPartition, new CompletedOffsets(record.offset() - 1L));
        }
    }

    void markCompletedRecord(ConsumerRecord<?, ?> record) {
        CompletedOffsets offset = this.completedOffsets.get(new TopicPartition(record.topic(), record.partition()));
        if (offset != null) {
            offset.addCompleteOffset(record.offset());
        }
    }

    void clearAll() {
        this.topicOffsetHighWaterMark.clear();
        this.completedOffsets.clear();
    }

    void clearFor(Collection<TopicPartition> partitions) {
        for (TopicPartition p : partitions) {
            this.topicOffsetHighWaterMark.remove(p);
            this.completedOffsets.remove(p);
        }
    }

    @VisibleForTesting
    Map<TopicPartition, Long> pendingRecordOffsets() {
        return this.topicOffsetHighWaterMark;
    }

    boolean noPendingRecords() {
        return this.topicOffsetHighWaterMark.isEmpty();
    }

    Set<TopicPartition> allPartitions() {
        return new HashSet<TopicPartition>(this.topicOffsetHighWaterMark.keySet());
    }

    boolean noCompletedRecords() {
        return this.completedOffsets.isEmpty();
    }

    Map<TopicPartition, OffsetAndMetadata> completedOffsetsToCommit() {
        if (this.noCompletedRecords()) {
            return Collections.emptyMap();
        }
        HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        for (Map.Entry<TopicPartition, CompletedOffsets> entry : this.completedOffsets.entrySet()) {
            CompletedOffsets offset = entry.getValue();
            if (!offset.hasOffsetToCommit()) continue;
            offsets.put(entry.getKey(), offset.getOffsetToCommit());
        }
        return offsets;
    }

    boolean noOffsetsToCommit() {
        if (this.noCompletedRecords()) {
            return true;
        }
        for (Map.Entry<TopicPartition, CompletedOffsets> entry : this.completedOffsets.entrySet()) {
            CompletedOffsets offset = entry.getValue();
            if (!offset.hasOffsetToCommit()) continue;
            return false;
        }
        return true;
    }

    void updateCommittedOffsets(Map<TopicPartition, OffsetAndMetadata> offsets) {
        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
            CompletedOffsets offset = this.completedOffsets.get(entry.getKey());
            offset.updateCommittedOffset(entry.getValue().offset());
        }
    }

    Set<TopicPartition> clearCompletedPartitions(Map<TopicPartition, OffsetAndMetadata> committedOffsets) {
        Set<TopicPartition> partitions = this.completedPartitions(committedOffsets);
        this.clearFor(partitions);
        return partitions;
    }

    Set<TopicPartition> completedPartitions() {
        return this.completedPartitions(this.completedOffsetsToCommit());
    }

    Set<TopicPartition> completedPartitions(Map<TopicPartition, OffsetAndMetadata> committedOffsets) {
        return committedOffsets.entrySet().stream().filter(entry -> this.topicOffsetMeetHighWaterMark((TopicPartition)entry.getKey(), (OffsetAndMetadata)entry.getValue())).map(Map.Entry::getKey).collect(Collectors.toSet());
    }

    private boolean topicOffsetMeetHighWaterMark(TopicPartition topicPartition, OffsetAndMetadata offset) {
        Long offsetHighWaterMark = this.topicOffsetHighWaterMark.get(topicPartition);
        if (offsetHighWaterMark != null) {
            return offset.offset() >= offsetHighWaterMark;
        }
        return true;
    }
}

