/*
 * Decompiled with CFR 0.152.
 */
package cn.leancloud.kafka.consumer;

import cn.leancloud.kafka.consumer.CommitPolicy;
import cn.leancloud.kafka.consumer.ConsumerRecordHandler;
import cn.leancloud.kafka.consumer.LcKafkaConsumerBuilder;
import cn.leancloud.kafka.consumer.ProcessRecordsProgress;
import cn.leancloud.kafka.consumer.UnsubscribedStatus;
import cn.leancloud.kafka.consumer.VisibleForTesting;
import java.io.Closeable;
import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class Fetcher<K, V>
implements Runnable,
Closeable {
    private static final Logger logger = LoggerFactory.getLogger(Fetcher.class);
    private final long pollTimeoutMillis;
    private final Consumer<K, V> consumer;
    private final ConsumerRecordHandler<K, V> handler;
    private final ExecutorCompletionService<ConsumerRecord<K, V>> service;
    private final Map<ConsumerRecord<K, V>, Future<ConsumerRecord<K, V>>> pendingFutures;
    private final CommitPolicy policy;
    private final long gracefulShutdownTimeoutNanos;
    private final CompletableFuture<UnsubscribedStatus> unsubscribeStatusFuture;
    private final long handleRecordTimeoutNanos;
    private final ProcessRecordsProgress progress;
    private volatile boolean closed;

    Fetcher(LcKafkaConsumerBuilder<K, V> consumerBuilder) {
        this(consumerBuilder.getConsumer(), consumerBuilder.getPollTimeout(), consumerBuilder.getConsumerRecordHandler(), consumerBuilder.getWorkerPool(), consumerBuilder.getPolicy(), consumerBuilder.getGracefulShutdownTimeout(), consumerBuilder.getHandleRecordTimeout(), new ProcessRecordsProgress());
    }

    Fetcher(Consumer<K, V> consumer, Duration pollTimeout, ConsumerRecordHandler<K, V> handler, ExecutorService workerPool, CommitPolicy policy, Duration gracefulShutdownTimeout, Duration handleRecordTimeout, ProcessRecordsProgress progress) {
        this.progress = progress;
        this.pendingFutures = new HashMap<ConsumerRecord<K, V>, Future<ConsumerRecord<K, V>>>();
        this.consumer = consumer;
        this.pollTimeoutMillis = pollTimeout.toMillis();
        this.handler = handler;
        this.service = new ExecutorCompletionService(workerPool);
        this.policy = policy;
        this.gracefulShutdownTimeoutNanos = gracefulShutdownTimeout.toNanos();
        this.unsubscribeStatusFuture = new CompletableFuture();
        this.handleRecordTimeoutNanos = handleRecordTimeout.toNanos();
    }

    @Override
    public void run() {
        logger.debug("Fetcher thread started.");
        long pollTimeoutMillis = this.pollTimeoutMillis;
        Consumer<K, V> consumer = this.consumer;
        UnsubscribedStatus unsubscribedStatus = UnsubscribedStatus.CLOSED;
        while (true) {
            try {
                while (true) {
                    ConsumerRecords<K, V> records = consumer.poll(pollTimeoutMillis);
                    if (logger.isDebugEnabled()) {
                        logger.debug("Fetched " + records.count() + " records from: " + records.partitions());
                    }
                    this.dispatchFetchedRecords(records);
                    this.processCompletedRecords();
                    this.processTimeoutRecords();
                    if (!this.pendingFutures.isEmpty() && !records.isEmpty()) {
                        consumer.pause(records.partitions());
                    }
                    this.tryCommitRecordOffsets();
                }
            }
            catch (WakeupException ex) {
                if (!this.closed()) continue;
            }
            catch (ExecutionException ex) {
                unsubscribedStatus = UnsubscribedStatus.ERROR;
                this.markClosed();
            }
            catch (Throwable ex) {
                if (ex instanceof InterruptedException) {
                    Thread.currentThread().interrupt();
                }
                unsubscribedStatus = UnsubscribedStatus.ERROR;
                this.markClosed();
                logger.error("Fetcher quit with unexpected exception.", ex);
            }
            break;
        }
        this.gracefulShutdown(unsubscribedStatus);
        logger.debug("Fetcher thread exit.");
    }

    @Override
    public void close() {
        this.markClosed();
        this.consumer.wakeup();
    }

    CompletableFuture<UnsubscribedStatus> unsubscribeStatusFuture() {
        return this.unsubscribeStatusFuture;
    }

    ProcessRecordsProgress progress() {
        return this.progress;
    }

    @VisibleForTesting
    Map<ConsumerRecord<K, V>, Future<ConsumerRecord<K, V>>> pendingFutures() {
        return this.pendingFutures;
    }

    private void markClosed() {
        this.closed = true;
    }

    private boolean closed() {
        return this.closed;
    }

    private void dispatchFetchedRecords(ConsumerRecords<K, V> records) {
        ConsumerRecordHandler handler = this.handler;
        for (ConsumerRecord record : records) {
            Future<ConsumerRecord<K, V>> future = this.service.submit(() -> {
                handler.handleRecord(record);
                return record;
            });
            this.pendingFutures.put(record, this.timeoutAwareFuture(future));
            this.progress.markPendingRecord(record);
        }
    }

    private Future<ConsumerRecord<K, V>> timeoutAwareFuture(Future<ConsumerRecord<K, V>> future) {
        if (this.unlimitedHandleRecordTime()) {
            return future;
        }
        return new TimeoutFuture<K, V>(future, this.handleRecordTimeoutNanos);
    }

    private void processCompletedRecords() throws InterruptedException, ExecutionException {
        Future<ConsumerRecord<K, V>> f;
        while ((f = this.service.poll()) != null) {
            this.processCompletedRecord(f);
        }
    }

    private void processCompletedRecord(Future<ConsumerRecord<K, V>> future) throws InterruptedException, ExecutionException {
        assert (future.isDone());
        ConsumerRecord<K, V> record = future.get();
        assert (record != null);
        assert (!future.isCancelled());
        Future<ConsumerRecord<K, V>> v = this.pendingFutures.remove(record);
        assert (v != null);
        this.progress.markCompletedRecord(record);
    }

    private void processTimeoutRecords() throws TimeoutException {
        if (this.unlimitedHandleRecordTime()) {
            return;
        }
        for (Map.Entry<ConsumerRecord<K, V>, Future<ConsumerRecord<K, V>>> entry : this.pendingFutures.entrySet()) {
            TimeoutFuture future = (TimeoutFuture)entry.getValue();
            if (!future.timeout()) continue;
            future.cancel(false);
            this.pendingFutures.remove(entry.getKey(), entry.getValue());
            throw new TimeoutException("timeout on handling record: " + entry.getKey());
        }
    }

    private void tryCommitRecordOffsets() {
        Set<TopicPartition> partitions = this.policy.tryCommit(this.pendingFutures.isEmpty(), this.progress);
        if (!partitions.isEmpty()) {
            try {
                this.consumer.resume(partitions);
            }
            catch (IllegalStateException ex) {
                partitions.retainAll(this.consumer.assignment());
                this.consumer.resume(partitions);
            }
        }
    }

    private boolean unlimitedHandleRecordTime() {
        return this.handleRecordTimeoutNanos == 0L;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void gracefulShutdown(UnsubscribedStatus unsubscribedStatus) {
        long shutdownTimeout = 0L;
        try {
            shutdownTimeout = this.waitPendingFuturesDone();
            this.policy.partialCommitSync(this.progress);
            this.pendingFutures.clear();
        }
        catch (Exception ex) {
            logger.error("Graceful shutdown got unexpected exception", ex);
        }
        finally {
            try {
                this.consumer.close(shutdownTimeout, TimeUnit.NANOSECONDS);
            }
            finally {
                this.unsubscribeStatusFuture.complete(unsubscribedStatus);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private long waitPendingFuturesDone() {
        long start = System.nanoTime();
        long remain = this.gracefulShutdownTimeoutNanos;
        Iterator<Map.Entry<ConsumerRecord<K, V>, Future<ConsumerRecord<K, V>>>> iterator = this.pendingFutures.entrySet().iterator();
        while (iterator.hasNext()) {
            Map.Entry<ConsumerRecord<K, V>, Future<ConsumerRecord<K, V>>> entry = iterator.next();
            Future<ConsumerRecord<K, V>> future = entry.getValue();
            try {
                assert (remain >= 0L);
                ConsumerRecord<K, V> record = future.get(remain, TimeUnit.MILLISECONDS);
                assert (record != null);
                this.progress.markCompletedRecord(record);
                continue;
            }
            catch (TimeoutException ex) {
                future.cancel(false);
                continue;
            }
            catch (InterruptedException ex) {
                future.cancel(false);
                Thread.currentThread().interrupt();
                continue;
            }
            catch (CancellationException ex) {
                continue;
            }
            catch (ExecutionException ex) {
                logger.error("Fetcher quit with unexpected exception on handling consumer record: " + entry.getKey(), ex.getCause());
                continue;
            }
            finally {
                if (remain < 0L) continue;
                remain = Math.max(0L, this.gracefulShutdownTimeoutNanos - (System.nanoTime() - start));
                continue;
            }
            break;
        }
        return remain;
    }

    @VisibleForTesting
    static class TimeoutFuture<K, V>
    implements Future<ConsumerRecord<K, V>> {
        private final Future<ConsumerRecord<K, V>> wrappedFuture;
        private final long timeoutAtNanos;
        private final Time time;

        TimeoutFuture(Future<ConsumerRecord<K, V>> wrappedFuture, long timeoutInNanos) {
            this(wrappedFuture, timeoutInNanos, Time.SYSTEM);
        }

        TimeoutFuture(Future<ConsumerRecord<K, V>> wrappedFuture, long timeoutInNanos, Time time) {
            assert (timeoutInNanos >= 0L);
            this.wrappedFuture = wrappedFuture;
            long timeoutAtNanos = time.nanoseconds() + timeoutInNanos;
            if (timeoutAtNanos < 0L) {
                timeoutAtNanos = Long.MAX_VALUE;
            }
            this.timeoutAtNanos = timeoutAtNanos;
            this.time = time;
        }

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            return this.wrappedFuture.cancel(mayInterruptIfRunning);
        }

        @Override
        public boolean isCancelled() {
            return this.wrappedFuture.isCancelled();
        }

        @Override
        public boolean isDone() {
            return this.wrappedFuture.isDone();
        }

        @Override
        public ConsumerRecord<K, V> get() throws InterruptedException, ExecutionException {
            return this.wrappedFuture.get();
        }

        @Override
        public ConsumerRecord<K, V> get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
            if (this.timeout()) {
                throw new TimeoutException();
            }
            long timeoutNanos = Math.max(0L, Math.min(unit.toNanos(timeout), this.timeoutAtNanos - this.time.nanoseconds()));
            return this.wrappedFuture.get(timeoutNanos, TimeUnit.NANOSECONDS);
        }

        boolean timeout() {
            return this.time.nanoseconds() >= this.timeoutAtNanos;
        }
    }
}

