/*
 * Decompiled with CFR 0.152.
 */
package cn.leancloud.kafka.consumer;

import cn.leancloud.kafka.consumer.AbstractPartialCommitPolicy;
import cn.leancloud.kafka.consumer.VisibleForTesting;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class PartialAsyncCommitPolicy<K, V>
extends AbstractPartialCommitPolicy<K, V> {
    private static final Logger logger = LoggerFactory.getLogger(PartialAsyncCommitPolicy.class);
    private final int maxPendingAsyncCommits;
    private final OffsetCommitCallback callback;
    private int pendingAsyncCommitCounter;
    private boolean forceSync;

    PartialAsyncCommitPolicy(Consumer<K, V> consumer, Duration forceWholeCommitInterval, int maxPendingAsyncCommits) {
        super(consumer, forceWholeCommitInterval);
        this.maxPendingAsyncCommits = maxPendingAsyncCommits;
        this.callback = new AsyncCommitCallback();
    }

    @Override
    public Set<TopicPartition> tryCommit(boolean noPendingRecords) {
        Map<TopicPartition, OffsetAndMetadata> offsets = this.offsetsForPartialCommit();
        if (offsets.isEmpty()) {
            return Collections.emptySet();
        }
        Set<TopicPartition> partitions = this.getCompletedPartitions(noPendingRecords);
        if (this.forceSync || this.pendingAsyncCommitCounter >= this.maxPendingAsyncCommits) {
            this.consumer.commitSync(offsets);
            this.pendingAsyncCommitCounter = 0;
            this.forceSync = false;
            this.clearCachedCompletedPartitionsRecords(partitions, noPendingRecords);
        } else {
            ++this.pendingAsyncCommitCounter;
            this.consumer.commitAsync(offsets, this.callback);
        }
        this.updateNextRecommitTime();
        return partitions;
    }

    @VisibleForTesting
    int pendingAsyncCommitCount() {
        return this.pendingAsyncCommitCounter;
    }

    @VisibleForTesting
    boolean forceSync() {
        return this.forceSync;
    }

    private class AsyncCommitCallback
    implements OffsetCommitCallback {
        private AsyncCommitCallback() {
        }

        @Override
        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
            --PartialAsyncCommitPolicy.this.pendingAsyncCommitCounter;
            assert (PartialAsyncCommitPolicy.this.pendingAsyncCommitCounter >= 0) : "actual: " + PartialAsyncCommitPolicy.access$100(PartialAsyncCommitPolicy.this);
            if (exception != null) {
                logger.warn("Failed to commit offset: " + offsets + " asynchronously", exception);
                PartialAsyncCommitPolicy.this.forceSync = true;
            } else {
                HashMap completeOffsets = offsets == PartialAsyncCommitPolicy.this.completedTopicOffsets ? new HashMap(offsets) : offsets;
                for (Map.Entry entry : completeOffsets.entrySet()) {
                    PartialAsyncCommitPolicy.this.completedTopicOffsets.remove(entry.getKey(), entry.getValue());
                    PartialAsyncCommitPolicy.this.topicOffsetHighWaterMark.remove(entry.getKey(), ((OffsetAndMetadata)entry.getValue()).offset());
                }
            }
        }
    }
}

