/*
 * Decompiled with CFR 0.152.
 */
package cn.leancloud.kafka.consumer;

import cn.leancloud.kafka.consumer.AbstractRecommitAwareCommitPolicy;
import cn.leancloud.kafka.consumer.ProcessRecordsProgress;
import cn.leancloud.kafka.consumer.VisibleForTesting;
import java.time.Duration;
import java.util.Collections;
import java.util.Set;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class AsyncCommitPolicy<K, V>
extends AbstractRecommitAwareCommitPolicy<K, V> {
    private static final Logger logger = LoggerFactory.getLogger(AsyncCommitPolicy.class);
    private final int maxPendingAsyncCommits;
    private int pendingAsyncCommitCounter;
    private boolean forceSync;

    AsyncCommitPolicy(Consumer<K, V> consumer, Duration syncCommitRetryInterval, int maxAttemptsForEachSyncCommit, Duration recommitInterval, int maxPendingAsyncCommits) {
        super(consumer, syncCommitRetryInterval, maxAttemptsForEachSyncCommit, recommitInterval);
        this.maxPendingAsyncCommits = maxPendingAsyncCommits;
    }

    @Override
    Set<TopicPartition> tryCommit0(boolean noPendingRecords, ProcessRecordsProgress progress) {
        if (!(this.forceSync || noPendingRecords && !progress.noOffsetsToCommit())) {
            return Collections.emptySet();
        }
        Set<TopicPartition> partitions = progress.allPartitions();
        this.commit(progress);
        this.updateNextRecommitTime();
        return partitions;
    }

    @VisibleForTesting
    int pendingAsyncCommitCount() {
        return this.pendingAsyncCommitCounter;
    }

    @VisibleForTesting
    boolean forceSync() {
        return this.forceSync;
    }

    private void commit(ProcessRecordsProgress progress) {
        if (this.forceSync || this.pendingAsyncCommitCounter >= this.maxPendingAsyncCommits) {
            this.commitSyncWithRetry();
            this.pendingAsyncCommitCounter = 0;
            this.forceSync = false;
            progress.clearAll();
        } else {
            ++this.pendingAsyncCommitCounter;
            this.consumer.commitAsync((offsets, exception) -> {
                --this.pendingAsyncCommitCounter;
                assert (this.pendingAsyncCommitCounter >= 0) : "actual: " + this.pendingAsyncCommitCounter;
                if (exception != null) {
                    logger.warn("Failed to commit offsets: " + offsets + " asynchronously", exception);
                    this.forceSync = true;
                } else {
                    progress.clearCompletedPartitions(offsets);
                }
            });
        }
    }
}

