/*
 * Decompiled with CFR 0.152.
 */
package cn.leancloud.kafka.consumer;

import cn.leancloud.kafka.consumer.CommitPolicy;
import cn.leancloud.kafka.consumer.ConsumerRecordHandler;
import cn.leancloud.kafka.consumer.LcKafkaConsumerBuilder;
import java.io.Closeable;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class Fetcher<K, V>
implements Runnable,
Closeable {
    private static final Logger logger = LoggerFactory.getLogger(Fetcher.class);
    private final long pollTimeout;
    private final Consumer<K, V> consumer;
    private final ConsumerRecordHandler<K, V> handler;
    private final ExecutorCompletionService<ConsumerRecord<K, V>> service;
    private final Map<ConsumerRecord<K, V>, Future<ConsumerRecord<K, V>>> pendingFutures = new HashMap<ConsumerRecord<K, V>, Future<ConsumerRecord<K, V>>>();
    private final CommitPolicy<K, V> policy;
    private final long gracefulShutdownMillis;
    private volatile boolean closed;

    Fetcher(LcKafkaConsumerBuilder<K, V> consumerBuilder) {
        this(consumerBuilder.getConsumer(), consumerBuilder.getPollTimeout(), consumerBuilder.getConsumerRecordHandler(), consumerBuilder.getWorkerPool(), consumerBuilder.getPolicy(), consumerBuilder.gracefulShutdownMillis());
    }

    Fetcher(Consumer<K, V> consumer, long pollTimeout, ConsumerRecordHandler<K, V> handler, ExecutorService workerPool, CommitPolicy<K, V> policy, long gracefulShutdownMillis) {
        this.consumer = consumer;
        this.pollTimeout = pollTimeout;
        this.handler = handler;
        this.service = new ExecutorCompletionService(workerPool);
        this.policy = policy;
        this.gracefulShutdownMillis = gracefulShutdownMillis;
    }

    @Override
    public void run() {
        logger.debug("Fetcher thread started.");
        long pollTimeout = this.pollTimeout;
        Consumer<K, V> consumer = this.consumer;
        while (true) {
            try {
                while (true) {
                    ConsumerRecords<K, V> records = consumer.poll(pollTimeout);
                    if (logger.isDebugEnabled()) {
                        logger.debug("Fetched " + records.count() + " records from: " + records.partitions());
                    }
                    this.dispatchFetchedRecords(records);
                    this.processCompletedRecords();
                    if (!this.pendingFutures.isEmpty() && !records.isEmpty()) {
                        consumer.pause(records.partitions());
                    }
                    this.tryCommitRecordOffsets();
                }
            }
            catch (WakeupException ex) {
                if (!this.closed()) continue;
            }
            catch (Exception ex) {
                this.close();
                logger.error("Fetcher quit with unexpected exception. Will rebalance after poll timeout.", ex);
            }
            break;
        }
        this.gracefulShutdown();
    }

    @Override
    public void close() {
        this.closed = true;
        this.consumer.wakeup();
    }

    Map<ConsumerRecord<K, V>, Future<ConsumerRecord<K, V>>> pendingFutures() {
        return this.pendingFutures;
    }

    private boolean closed() {
        return this.closed;
    }

    private void dispatchFetchedRecords(ConsumerRecords<K, V> records) {
        ConsumerRecordHandler handler = this.handler;
        for (ConsumerRecord record : records) {
            Future<ConsumerRecord> future = this.service.submit(() -> {
                handler.handleRecord(record);
                return record;
            });
            this.pendingFutures.put(record, future);
            this.policy.addPendingRecord(record);
        }
    }

    private void processCompletedRecords() throws InterruptedException, ExecutionException {
        Future<ConsumerRecord<K, V>> f;
        while ((f = this.service.poll()) != null) {
            assert (f.isDone());
            ConsumerRecord<K, V> r = f.get();
            Future<ConsumerRecord<K, V>> v = this.pendingFutures.remove(r);
            assert (v != null);
            this.policy.completeRecord(r);
        }
    }

    private void tryCommitRecordOffsets() {
        Set<TopicPartition> partitions = this.policy.tryCommit(this.pendingFutures.isEmpty());
        if (!partitions.isEmpty()) {
            try {
                this.consumer.resume(partitions);
            }
            catch (IllegalStateException ex) {
                partitions.retainAll(this.consumer.assignment());
                this.consumer.resume(partitions);
            }
        }
    }

    private void gracefulShutdown() {
        long start = System.currentTimeMillis();
        long remain = this.gracefulShutdownMillis;
        try {
            for (Future<ConsumerRecord<K, V>> future : this.pendingFutures.values()) {
                try {
                    if (remain > 0L) {
                        future.get(remain, TimeUnit.MILLISECONDS);
                        remain = this.gracefulShutdownMillis - (System.currentTimeMillis() - start);
                        continue;
                    }
                    future.cancel(false);
                }
                catch (TimeoutException ex) {
                    remain = 0L;
                }
            }
            this.processCompletedRecords();
        }
        catch (InterruptedException ex) {
            logger.warn("Graceful shutdown was interrupted.");
        }
        catch (ExecutionException ex) {
            logger.error("Handle message got unexpected exception. Continue shutdown without wait handling message done.", ex);
        }
        this.policy.partialCommit();
        this.pendingFutures.clear();
        logger.debug("Fetcher thread exit.");
    }
}

