package io.confluent.kafkarest.v2;

import io.confluent.kafkarest.ConsumerRecordAndSize;
import io.confluent.kafkarest.ConsumerWorkerReadCallback;
import io.confluent.kafkarest.KafkaRestConfig;
import io.confluent.kafkarest.entities.ConsumerRecord;
import java.util.Iterator;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/kafkarest/v2/KafkaConsumerReadTask.class */
class KafkaConsumerReadTask<KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT> implements Future<List<ConsumerRecord<ClientKeyT, ClientValueT>>> {
    private static final Logger log = LoggerFactory.getLogger(KafkaConsumerReadTask.class);
    private KafkaConsumerState<KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT> parent;
    private final long requestTimeoutMs;
    private final long maxResponseBytes;
    private final ConsumerWorkerReadCallback<ClientKeyT, ClientValueT> callback;
    private CountDownLatch finished;
    private Iterator<org.apache.kafka.clients.consumer.ConsumerRecord<ClientKeyT, ClientValueT>> iter;
    private List<ConsumerRecord<ClientKeyT, ClientValueT>> messages;
    private long bytesConsumed = 0;
    private final long started;
    long waitExpiration;

    public KafkaConsumerReadTask(KafkaConsumerState<KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT> kafkaConsumerState, long j, long j2, ConsumerWorkerReadCallback<ClientKeyT, ClientValueT> consumerWorkerReadCallback) {
        this.parent = kafkaConsumerState;
        this.maxResponseBytes = Math.min(j2, kafkaConsumerState.getConfig().getLong(KafkaRestConfig.CONSUMER_REQUEST_MAX_BYTES_CONFIG));
        long j3 = kafkaConsumerState.getConfig().getInt(KafkaRestConfig.CONSUMER_REQUEST_TIMEOUT_MS_CONFIG);
        this.requestTimeoutMs = j <= 0 ? j3 : Math.min(j, j3);
        this.callback = consumerWorkerReadCallback;
        this.finished = new CountDownLatch(1);
        this.started = kafkaConsumerState.getConfig().m7getTime().milliseconds();
    }

    public boolean doPartialRead() {
        try {
            if (this.messages == null) {
                this.parent.startRead();
                this.messages = new Vector();
            }
            long j = 0;
            long milliseconds = this.parent.getConfig().m7getTime().milliseconds();
            while (this.parent.hasNext()) {
                ConsumerRecordAndSize<ClientKeyT, ClientValueT> createConsumerRecord = this.parent.createConsumerRecord(this.parent.peek());
                j = createConsumerRecord.getSize();
                if (this.bytesConsumed + j >= this.maxResponseBytes) {
                    break;
                }
                this.messages.add(createConsumerRecord.getRecord());
                this.parent.next();
                this.bytesConsumed += j;
            }
            log.trace("KafkaConsumerReadTask exiting read with id={} messages={} bytes={}, backing off if not complete", new Object[]{this, Integer.valueOf(this.messages.size()), Long.valueOf(this.bytesConsumed)});
            long milliseconds2 = this.parent.getConfig().m7getTime().milliseconds() - this.started;
            this.waitExpiration = Math.min(milliseconds + this.parent.getConfig().getInt(KafkaRestConfig.CONSUMER_ITERATOR_BACKOFF_MS_CONFIG), this.started + this.requestTimeoutMs);
            boolean z = milliseconds2 >= this.requestTimeoutMs;
            boolean z2 = this.bytesConsumed + j >= this.maxResponseBytes;
            if (!z && !z2) {
                return true;
            }
            log.trace("Finishing KafkaConsumerReadTask id={} requestTimedOut={} exceededMaxResponseBytes={}", new Object[]{this, Boolean.valueOf(z), Boolean.valueOf(z2)});
            finish();
            return true;
        } catch (Exception e) {
            finish(e);
            log.error("Unexpected exception in consumer read task id={} ", this, e);
            return false;
        }
    }

    void finish() {
        finish(null);
    }

    void finish(Exception exc) {
        log.trace("Finishing KafkaConsumerReadTask id={} exception={}", this, exc);
        this.parent.finishRead();
        try {
            this.callback.onCompletion(exc == null ? this.messages : null, exc);
        } catch (Throwable th) {
            log.error("Consumer read callback threw an unhandled exception id={}", this, exc);
        }
        this.finished.countDown();
    }

    @Override // java.util.concurrent.Future
    public boolean cancel(boolean z) {
        return false;
    }

    @Override // java.util.concurrent.Future
    public boolean isCancelled() {
        return false;
    }

    @Override // java.util.concurrent.Future
    public boolean isDone() {
        return this.finished.getCount() == 0;
    }

    @Override // java.util.concurrent.Future
    public List<ConsumerRecord<ClientKeyT, ClientValueT>> get() throws InterruptedException, ExecutionException {
        this.finished.await();
        return this.messages;
    }

    @Override // java.util.concurrent.Future
    public List<ConsumerRecord<ClientKeyT, ClientValueT>> get(long j, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
        this.finished.await(j, timeUnit);
        if (this.finished.getCount() > 0) {
            throw new TimeoutException();
        }
        return this.messages;
    }
}
