/*
 * Decompiled with CFR 0.152.
 */
package cn.leancloud.kafka.consumer;

import cn.leancloud.kafka.consumer.CommitPolicy;
import cn.leancloud.kafka.consumer.VisibleForTesting;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.function.BinaryOperator;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

abstract class AbstractCommitPolicy<K, V>
implements CommitPolicy<K, V> {
    protected final Consumer<K, V> consumer;
    final Map<TopicPartition, Long> topicOffsetHighWaterMark;
    final Map<TopicPartition, OffsetAndMetadata> completedTopicOffsets;

    AbstractCommitPolicy(Consumer<K, V> consumer) {
        this.consumer = consumer;
        this.topicOffsetHighWaterMark = new HashMap<TopicPartition, Long>();
        this.completedTopicOffsets = new HashMap<TopicPartition, OffsetAndMetadata>();
    }

    @Override
    public void addPendingRecord(ConsumerRecord<K, V> record) {
        this.topicOffsetHighWaterMark.merge(new TopicPartition(record.topic(), record.partition()), record.offset() + 1L, Math::max);
    }

    @Override
    public void completeRecord(ConsumerRecord<K, V> record) {
        this.completedTopicOffsets.merge(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1L), BinaryOperator.maxBy(Comparator.comparing(OffsetAndMetadata::offset)));
    }

    @Override
    public Set<TopicPartition> partialCommit() {
        this.consumer.commitSync(this.completedTopicOffsets);
        Set<TopicPartition> partitions = this.checkCompletedPartitions();
        this.completedTopicOffsets.clear();
        for (TopicPartition p : partitions) {
            this.topicOffsetHighWaterMark.remove(p);
        }
        return partitions;
    }

    Set<TopicPartition> getCompletedPartitions(boolean noPendingRecords) {
        Set<TopicPartition> partitions;
        if (noPendingRecords) {
            assert (this.checkCompletedPartitions().equals(this.topicOffsetHighWaterMark.keySet())) : "expect: " + this.checkCompletedPartitions() + " actual: " + this.topicOffsetHighWaterMark.keySet();
            partitions = new HashSet<TopicPartition>(this.topicOffsetHighWaterMark.keySet());
        } else {
            partitions = this.checkCompletedPartitions();
        }
        return partitions;
    }

    void clearCachedCompletedPartitionsRecords(Set<TopicPartition> completedPartitions, boolean noPendingRecords) {
        this.completedTopicOffsets.clear();
        if (noPendingRecords) {
            this.topicOffsetHighWaterMark.clear();
        } else {
            for (TopicPartition p : completedPartitions) {
                this.topicOffsetHighWaterMark.remove(p);
            }
        }
    }

    @VisibleForTesting
    Map<TopicPartition, Long> topicOffsetHighWaterMark() {
        return this.topicOffsetHighWaterMark;
    }

    @VisibleForTesting
    Map<TopicPartition, OffsetAndMetadata> completedTopicOffsets() {
        return this.completedTopicOffsets;
    }

    private Set<TopicPartition> checkCompletedPartitions() {
        return this.completedTopicOffsets.entrySet().stream().filter(entry -> this.topicOffsetMeetHighWaterMark((TopicPartition)entry.getKey(), (OffsetAndMetadata)entry.getValue())).map(Map.Entry::getKey).collect(Collectors.toSet());
    }

    private boolean topicOffsetMeetHighWaterMark(TopicPartition topicPartition, OffsetAndMetadata offset) {
        Long offsetHighWaterMark = this.topicOffsetHighWaterMark.get(topicPartition);
        if (offsetHighWaterMark != null) {
            return offset.offset() >= offsetHighWaterMark;
        }
        return true;
    }
}

