/*
 * Decompiled with CFR 0.152.
 */
package cn.leancloud.kafka.consumer;

import cn.leancloud.kafka.consumer.AbstractRecommitAwareCommitPolicy;
import cn.leancloud.kafka.consumer.VisibleForTesting;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class PartialAsyncCommitPolicy<K, V>
extends AbstractRecommitAwareCommitPolicy<K, V> {
    private static final Logger logger = LoggerFactory.getLogger(PartialAsyncCommitPolicy.class);
    private final int maxPendingAsyncCommits;
    private final OffsetCommitCallback callback;
    private final Map<TopicPartition, OffsetAndMetadata> pendingAsyncCommitOffset;
    private int pendingAsyncCommitCounter;
    private boolean forceSync;

    PartialAsyncCommitPolicy(Consumer<K, V> consumer, Duration syncCommitRetryInterval, int maxAttemptsForEachSyncCommit, Duration forceWholeCommitInterval, int maxPendingAsyncCommits) {
        super(consumer, syncCommitRetryInterval, maxAttemptsForEachSyncCommit, forceWholeCommitInterval);
        this.maxPendingAsyncCommits = maxPendingAsyncCommits;
        this.callback = new AsyncCommitCallback();
        this.pendingAsyncCommitOffset = new HashMap<TopicPartition, OffsetAndMetadata>();
    }

    @Override
    public Set<TopicPartition> tryCommit(boolean noPendingRecords) {
        boolean syncCommit = this.forceSync || this.pendingAsyncCommitCounter >= this.maxPendingAsyncCommits;
        Map<TopicPartition, OffsetAndMetadata> offsets = this.offsetsForPartialCommit(syncCommit);
        if (offsets.isEmpty()) {
            return Collections.emptySet();
        }
        Set<TopicPartition> partitions = this.getCompletedPartitions(noPendingRecords);
        if (syncCommit) {
            this.commitSync(offsets);
            this.pendingAsyncCommitOffset.clear();
            this.pendingAsyncCommitCounter = 0;
            this.forceSync = false;
            this.clearCachedCompletedPartitionsRecords(partitions, noPendingRecords);
        } else {
            ++this.pendingAsyncCommitCounter;
            this.consumer.commitAsync(offsets, this.callback);
            this.pendingAsyncCommitOffset.putAll(offsets);
        }
        return partitions;
    }

    @VisibleForTesting
    int pendingAsyncCommitCount() {
        return this.pendingAsyncCommitCounter;
    }

    @VisibleForTesting
    boolean forceSync() {
        return this.forceSync;
    }

    private Map<TopicPartition, OffsetAndMetadata> offsetsForPartialCommit(boolean syncCommit) {
        HashMap offsets;
        if (this.needRecommit()) {
            offsets = this.offsetsForRecommit();
            this.updateNextRecommitTime();
        } else if (syncCommit) {
            offsets = this.completedTopicOffsets;
        } else {
            offsets = new HashMap(this.completedTopicOffsets);
            for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : this.pendingAsyncCommitOffset.entrySet()) {
                offsets.remove(entry.getKey(), entry.getValue());
            }
        }
        return offsets;
    }

    private class AsyncCommitCallback
    implements OffsetCommitCallback {
        private AsyncCommitCallback() {
        }

        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
            --PartialAsyncCommitPolicy.this.pendingAsyncCommitCounter;
            assert (PartialAsyncCommitPolicy.this.pendingAsyncCommitCounter >= 0) : "actual: " + PartialAsyncCommitPolicy.access$100(PartialAsyncCommitPolicy.this);
            if (exception != null) {
                logger.warn("Failed to commit offset: " + offsets + " asynchronously", (Throwable)exception);
                PartialAsyncCommitPolicy.this.forceSync = true;
            } else {
                for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
                    PartialAsyncCommitPolicy.this.topicOffsetHighWaterMark.remove(entry.getKey(), entry.getValue().offset());
                    PartialAsyncCommitPolicy.this.completedTopicOffsets.remove(entry.getKey(), entry.getValue());
                }
            }
        }
    }
}

