/*
 * Decompiled with CFR 0.152.
 */
package cn.leancloud.kafka.consumer;

import cn.leancloud.kafka.consumer.CommitPolicy;
import cn.leancloud.kafka.consumer.VisibleForTesting;
import java.time.Duration;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.function.BinaryOperator;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RetriableException;

abstract class AbstractCommitPolicy<K, V>
implements CommitPolicy<K, V> {
    static SleepFunction sleepFunction = Thread::sleep;
    final Map<TopicPartition, Long> topicOffsetHighWaterMark;
    final Map<TopicPartition, OffsetAndMetadata> completedTopicOffsets;
    protected final Consumer<K, V> consumer;
    private final long syncCommitRetryIntervalMs;
    private final int maxAttemptsForEachSyncCommit;

    AbstractCommitPolicy(Consumer<K, V> consumer, Duration syncCommitRetryInterval, int maxAttemptsForEachSyncCommit) {
        this.consumer = consumer;
        this.topicOffsetHighWaterMark = new HashMap<TopicPartition, Long>();
        this.completedTopicOffsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        this.syncCommitRetryIntervalMs = syncCommitRetryInterval.toMillis();
        this.maxAttemptsForEachSyncCommit = maxAttemptsForEachSyncCommit;
    }

    @Override
    public void markPendingRecord(ConsumerRecord<K, V> record) {
        this.topicOffsetHighWaterMark.merge(new TopicPartition(record.topic(), record.partition()), record.offset() + 1L, Math::max);
    }

    @Override
    public void markCompletedRecord(ConsumerRecord<K, V> record) {
        this.completedTopicOffsets.merge(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1L), BinaryOperator.maxBy(Comparator.comparing(OffsetAndMetadata::offset)));
    }

    @Override
    public Set<TopicPartition> syncPartialCommit() {
        this.commitSync(this.completedTopicOffsets);
        Set<TopicPartition> partitions = this.checkCompletedPartitions();
        this.completedTopicOffsets.clear();
        for (TopicPartition p : partitions) {
            this.topicOffsetHighWaterMark.remove(p);
        }
        return partitions;
    }

    Set<TopicPartition> getCompletedPartitions(boolean noPendingRecords) {
        Set<TopicPartition> partitions;
        if (noPendingRecords) {
            assert (this.checkCompletedPartitions().equals(this.topicOffsetHighWaterMark.keySet())) : "expect: " + this.checkCompletedPartitions() + " actual: " + this.topicOffsetHighWaterMark.keySet();
            partitions = new HashSet<TopicPartition>(this.topicOffsetHighWaterMark.keySet());
        } else {
            partitions = this.checkCompletedPartitions();
        }
        return partitions;
    }

    void clearCachedCompletedPartitionsRecords(Set<TopicPartition> completedPartitions, boolean noPendingRecords) {
        this.completedTopicOffsets.clear();
        if (noPendingRecords) {
            this.topicOffsetHighWaterMark.clear();
        } else {
            for (TopicPartition p : completedPartitions) {
                this.topicOffsetHighWaterMark.remove(p);
            }
        }
    }

    @VisibleForTesting
    Map<TopicPartition, Long> topicOffsetHighWaterMark() {
        return this.topicOffsetHighWaterMark;
    }

    @VisibleForTesting
    Map<TopicPartition, OffsetAndMetadata> completedTopicOffsets() {
        return this.completedTopicOffsets;
    }

    void commitSync() {
        RetryContext context = this.context();
        while (true) {
            try {
                this.consumer.commitSync();
                return;
            }
            catch (RetriableException e) {
                context.onError(e);
                continue;
            }
            break;
        }
    }

    void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
        RetryContext context = this.context();
        while (true) {
            try {
                this.consumer.commitSync(offsets);
                return;
            }
            catch (RetriableException e) {
                context.onError(e);
                continue;
            }
            break;
        }
    }

    private Set<TopicPartition> checkCompletedPartitions() {
        return this.completedTopicOffsets.entrySet().stream().filter(entry -> this.topicOffsetMeetHighWaterMark((TopicPartition)entry.getKey(), (OffsetAndMetadata)entry.getValue())).map(Map.Entry::getKey).collect(Collectors.toSet());
    }

    private boolean topicOffsetMeetHighWaterMark(TopicPartition topicPartition, OffsetAndMetadata offset) {
        Long offsetHighWaterMark = this.topicOffsetHighWaterMark.get(topicPartition);
        if (offsetHighWaterMark != null) {
            return offset.offset() >= offsetHighWaterMark;
        }
        return true;
    }

    private RetryContext context() {
        return new RetryContext(this.syncCommitRetryIntervalMs, this.maxAttemptsForEachSyncCommit);
    }

    private static class RetryContext {
        private final long retryInterval;
        private final int maxAttempts;
        private int numOfAttempts;

        private RetryContext(long retryInterval, int maxAttempts) {
            this.retryInterval = retryInterval;
            this.maxAttempts = maxAttempts;
            this.numOfAttempts = 0;
        }

        void onError(RetriableException e) {
            if (++this.numOfAttempts >= this.maxAttempts) {
                throw e;
            }
            try {
                sleepFunction.sleep(this.retryInterval);
            }
            catch (InterruptedException ex) {
                e.addSuppressed((Throwable)ex);
                Thread.currentThread().interrupt();
                throw e;
            }
        }
    }

    static interface SleepFunction {
        public void sleep(long var1) throws InterruptedException;
    }
}

