/*
 * Decompiled with CFR 0.152.
 */
package cn.leancloud.kafka.consumer;

import cn.leancloud.kafka.consumer.AbstractRecommitAwareCommitPolicy;
import cn.leancloud.kafka.consumer.VisibleForTesting;
import java.time.Duration;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class AsyncCommitPolicy<K, V>
extends AbstractRecommitAwareCommitPolicy<K, V> {
    private static final Logger logger = LoggerFactory.getLogger(AsyncCommitPolicy.class);
    private final int maxPendingAsyncCommits;
    private final OffsetCommitCallback callback;
    private int pendingAsyncCommitCounter;
    private boolean forceSync;

    AsyncCommitPolicy(Consumer<K, V> consumer, Duration recommitInterval, int maxPendingAsyncCommits) {
        super(consumer, recommitInterval);
        this.maxPendingAsyncCommits = maxPendingAsyncCommits;
        this.callback = new AsyncCommitCallback();
    }

    @Override
    public Set<TopicPartition> tryCommit(boolean noPendingRecords) {
        if (!noPendingRecords || this.completedTopicOffsets.isEmpty()) {
            if (this.needRecommit()) {
                this.commit(this.offsetsForRecommit());
            }
            return Collections.emptySet();
        }
        this.commit();
        HashSet<TopicPartition> partitions = new HashSet<TopicPartition>(this.completedTopicOffsets.keySet());
        this.completedTopicOffsets.clear();
        this.topicOffsetHighWaterMark.clear();
        return partitions;
    }

    @VisibleForTesting
    int pendingAsyncCommitCount() {
        return this.pendingAsyncCommitCounter;
    }

    @VisibleForTesting
    boolean forceSync() {
        return this.forceSync;
    }

    @VisibleForTesting
    void setForceSync(boolean forceSync) {
        this.forceSync = forceSync;
    }

    private void commit() {
        this.commit(Collections.emptyMap());
    }

    private void commit(Map<TopicPartition, OffsetAndMetadata> offsets) {
        if (this.forceSync || this.pendingAsyncCommitCounter >= this.maxPendingAsyncCommits) {
            this.syncCommit(offsets);
            this.pendingAsyncCommitCounter = 0;
            this.forceSync = false;
        } else {
            this.asyncCommit(offsets);
        }
        this.updateNextRecommitTime();
    }

    private void asyncCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
        ++this.pendingAsyncCommitCounter;
        if (offsets.isEmpty()) {
            this.consumer.commitAsync(this.callback);
        } else {
            this.consumer.commitAsync(offsets, this.callback);
        }
    }

    private void syncCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
        if (offsets.isEmpty()) {
            this.consumer.commitSync();
        } else {
            this.consumer.commitSync(offsets);
        }
    }

    private class AsyncCommitCallback
    implements OffsetCommitCallback {
        private AsyncCommitCallback() {
        }

        @Override
        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
            --AsyncCommitPolicy.this.pendingAsyncCommitCounter;
            assert (AsyncCommitPolicy.this.pendingAsyncCommitCounter >= 0) : "actual: " + AsyncCommitPolicy.access$100(AsyncCommitPolicy.this);
            if (exception != null) {
                logger.warn("Failed to commit offsets: " + offsets + " asynchronously", exception);
                AsyncCommitPolicy.this.forceSync = true;
            }
        }
    }
}

