/*
 * Decompiled with CFR 0.152.
 */
package cn.leancloud.kafka.consumer;

import cn.leancloud.kafka.consumer.AbstractCommitPolicy;
import cn.leancloud.kafka.consumer.VisibleForTesting;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

abstract class AbstractRecommitAwareCommitPolicy<K, V>
extends AbstractCommitPolicy<K, V> {
    private final Duration recommitInterval;
    private long nextRecommitNanos;

    AbstractRecommitAwareCommitPolicy(Consumer<K, V> consumer, Duration recommitInterval) {
        super(consumer);
        this.recommitInterval = recommitInterval;
        this.updateNextRecommitTime(System.nanoTime());
    }

    Map<TopicPartition, OffsetAndMetadata> offsetsForRecommit() {
        assert (this.needRecommit()) : "current nanos: " + System.nanoTime() + " nextRecommitNanos:" + this.nextRecommitNanos;
        HashMap<TopicPartition, OffsetAndMetadata> ret = new HashMap<TopicPartition, OffsetAndMetadata>(this.completedTopicOffsets);
        for (TopicPartition partition : this.consumer.assignment()) {
            OffsetAndMetadata offset = this.consumer.committed(partition);
            if (offset == null) continue;
            ret.putIfAbsent(partition, offset);
        }
        return ret;
    }

    boolean needRecommit() {
        return System.nanoTime() >= this.nextRecommitNanos;
    }

    void updateNextRecommitTime() {
        this.updateNextRecommitTime(System.nanoTime());
    }

    @VisibleForTesting
    long nextRecommitNanos() {
        return this.nextRecommitNanos;
    }

    private void updateNextRecommitTime(long currentNanos) {
        this.nextRecommitNanos = currentNanos + this.recommitInterval.toNanos();
    }
}

