/*
 * Decompiled with CFR 0.152.
 */
package cn.leancloud.kafka.consumer;

import cn.leancloud.kafka.consumer.AbstractRecommitAwareCommitPolicy;
import cn.leancloud.kafka.consumer.ProcessRecordsProgress;
import cn.leancloud.kafka.consumer.VisibleForTesting;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class PartialAsyncCommitPolicy<K, V>
extends AbstractRecommitAwareCommitPolicy<K, V> {
    private static final Logger logger = LoggerFactory.getLogger(PartialAsyncCommitPolicy.class);
    private final int maxPendingAsyncCommits;
    private final Map<TopicPartition, OffsetAndMetadata> pendingAsyncCommitOffset;
    private int pendingAsyncCommitCounter;
    private boolean forceSync;

    PartialAsyncCommitPolicy(Consumer<K, V> consumer, Duration syncCommitRetryInterval, int maxAttemptsForEachSyncCommit, Duration forceWholeCommitInterval, int maxPendingAsyncCommits) {
        super(consumer, syncCommitRetryInterval, maxAttemptsForEachSyncCommit, forceWholeCommitInterval);
        this.maxPendingAsyncCommits = maxPendingAsyncCommits;
        this.pendingAsyncCommitOffset = new HashMap<TopicPartition, OffsetAndMetadata>();
    }

    @Override
    Set<TopicPartition> tryCommit0(boolean noPendingRecords, ProcessRecordsProgress progress) {
        Set<TopicPartition> completePartitions;
        if (this.forceSync) {
            return this.tryCommitOnForceSync(noPendingRecords, progress);
        }
        if (progress.noOffsetsToCommit()) {
            return Collections.emptySet();
        }
        if (noPendingRecords) {
            return this.fullCommit(progress);
        }
        if (this.useSyncCommit()) {
            completePartitions = this.partialCommitSync(progress);
            this.pendingAsyncCommitOffset.clear();
            this.pendingAsyncCommitCounter = 0;
        } else {
            Map<TopicPartition, OffsetAndMetadata> offsets = progress.completedOffsetsToCommit();
            for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : this.pendingAsyncCommitOffset.entrySet()) {
                offsets.remove(entry.getKey(), entry.getValue());
            }
            if (offsets.isEmpty()) {
                return Collections.emptySet();
            }
            ++this.pendingAsyncCommitCounter;
            completePartitions = progress.completedPartitions(offsets);
            this.pendingAsyncCommitOffset.putAll(offsets);
            this.consumer.commitAsync(offsets, (committedOffsets, exception) -> {
                --this.pendingAsyncCommitCounter;
                assert (this.pendingAsyncCommitCounter >= 0) : "actual: " + this.pendingAsyncCommitCounter;
                if (exception != null) {
                    logger.warn("Failed to commit offset: " + committedOffsets + " asynchronously", exception);
                    this.forceSync = true;
                } else {
                    progress.updateCommittedOffsets(committedOffsets);
                    progress.clearCompletedPartitions(committedOffsets);
                }
            });
        }
        return completePartitions;
    }

    @VisibleForTesting
    int pendingAsyncCommitCount() {
        return this.pendingAsyncCommitCounter;
    }

    @VisibleForTesting
    void setPendingAsyncCommitCount(int count) {
        this.pendingAsyncCommitCounter = count;
    }

    @VisibleForTesting
    Map<TopicPartition, OffsetAndMetadata> pendingAsyncCommitOffset() {
        return this.pendingAsyncCommitOffset;
    }

    @VisibleForTesting
    boolean forceSync() {
        return this.forceSync;
    }

    @VisibleForTesting
    void setForceSync(boolean forceSync) {
        this.forceSync = forceSync;
    }

    private Set<TopicPartition> tryCommitOnForceSync(boolean noPendingRecords, ProcessRecordsProgress progress) {
        Set<TopicPartition> completedPartitions;
        if (noPendingRecords) {
            completedPartitions = this.fullCommitSync(progress);
            this.updateNextRecommitTime();
        } else {
            completedPartitions = this.partialCommitSync(progress);
        }
        this.pendingAsyncCommitOffset.clear();
        this.pendingAsyncCommitCounter = 0;
        this.forceSync = false;
        return completedPartitions;
    }

    private boolean useSyncCommit() {
        return this.pendingAsyncCommitCounter >= this.maxPendingAsyncCommits;
    }

    private Set<TopicPartition> fullCommit(ProcessRecordsProgress progress) {
        Set<TopicPartition> completePartitions = progress.allPartitions();
        if (this.useSyncCommit()) {
            this.commitSyncWithRetry();
            this.pendingAsyncCommitCounter = 0;
        } else {
            ++this.pendingAsyncCommitCounter;
            this.consumer.commitAsync((offsets, exception) -> {
                --this.pendingAsyncCommitCounter;
                assert (this.pendingAsyncCommitCounter >= 0) : "actual: " + this.pendingAsyncCommitCounter;
                if (exception != null) {
                    logger.warn("Failed to commit offset: " + offsets + " asynchronously", exception);
                    this.forceSync = true;
                } else {
                    progress.clearCompletedPartitions(offsets);
                }
            });
        }
        this.updateNextRecommitTime();
        progress.clearAll();
        this.pendingAsyncCommitOffset.clear();
        return completePartitions;
    }
}

