/*
 * Decompiled with CFR 0.152.
 */
package cn.leancloud.kafka.consumer;

import cn.leancloud.kafka.consumer.CommitPolicy;
import cn.leancloud.kafka.consumer.ProcessRecordsProgress;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RetriableException;

abstract class AbstractCommitPolicy<K, V>
implements CommitPolicy {
    static SleepFunction sleepFunction = Thread::sleep;
    protected final Consumer<K, V> consumer;
    private final long syncCommitRetryIntervalMs;
    private final int maxAttemptsForEachSyncCommit;

    AbstractCommitPolicy(Consumer<K, V> consumer, Duration syncCommitRetryInterval, int maxAttemptsForEachSyncCommit) {
        this.consumer = consumer;
        this.syncCommitRetryIntervalMs = syncCommitRetryInterval.toMillis();
        this.maxAttemptsForEachSyncCommit = maxAttemptsForEachSyncCommit;
    }

    @Override
    public Set<TopicPartition> partialCommitSync(ProcessRecordsProgress progress) {
        Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = progress.completedOffsetsToCommit();
        if (offsetsToCommit.isEmpty()) {
            return Collections.emptySet();
        }
        this.commitSyncWithRetry(offsetsToCommit);
        progress.updateCommittedOffsets(offsetsToCommit);
        return progress.clearCompletedPartitions(offsetsToCommit);
    }

    Set<TopicPartition> fullCommitSync(ProcessRecordsProgress progress) {
        this.commitSyncWithRetry();
        Set<TopicPartition> completePartitions = progress.allPartitions();
        progress.clearAll();
        return completePartitions;
    }

    void commitSyncWithRetry() {
        RetryContext context = this.context();
        while (true) {
            try {
                this.consumer.commitSync();
                return;
            }
            catch (RetriableException e) {
                context.onError(e);
                continue;
            }
            break;
        }
    }

    void commitSyncWithRetry(Map<TopicPartition, OffsetAndMetadata> offsets) {
        RetryContext context = this.context();
        while (true) {
            try {
                this.consumer.commitSync(offsets);
                return;
            }
            catch (RetriableException e) {
                context.onError(e);
                continue;
            }
            break;
        }
    }

    private RetryContext context() {
        return new RetryContext(this.syncCommitRetryIntervalMs, this.maxAttemptsForEachSyncCommit);
    }

    private static class RetryContext {
        private final long retryInterval;
        private final int maxAttempts;
        private int numOfAttempts;

        private RetryContext(long retryInterval, int maxAttempts) {
            this.retryInterval = retryInterval;
            this.maxAttempts = maxAttempts;
            this.numOfAttempts = 0;
        }

        void onError(RetriableException e) {
            if (++this.numOfAttempts >= this.maxAttempts) {
                throw e;
            }
            try {
                sleepFunction.sleep(this.retryInterval);
            }
            catch (InterruptedException ex) {
                e.addSuppressed(ex);
                Thread.currentThread().interrupt();
                throw e;
            }
        }
    }

    static interface SleepFunction {
        public void sleep(long var1) throws InterruptedException;
    }
}

