package io.confluent.kafkarest.v2;

import io.confluent.kafkarest.ConsumerReadCallback;
import io.confluent.kafkarest.ConsumerRecordAndSize;
import io.confluent.kafkarest.KafkaRestConfig;
import io.confluent.kafkarest.entities.ConsumerRecord;
import java.util.List;
import java.util.Vector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/confluent/kafkarest/v2/KafkaConsumerReadTask.class */
public class KafkaConsumerReadTask<KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT> {
    private static final Logger log = LoggerFactory.getLogger(KafkaConsumerReadTask.class);
    private final KafkaConsumerState<KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT> parent;
    private final long requestTimeoutMs;
    private final int responseMinBytes;
    private final long maxResponseBytes;
    private final ConsumerReadCallback<ClientKeyT, ClientValueT> callback;
    private boolean finished;
    private List<ConsumerRecord<ClientKeyT, ClientValueT>> messages;
    private long bytesConsumed = 0;
    private boolean exceededMinResponseBytes = false;
    private boolean exceededMaxResponseBytes = false;
    private final long started;

    public KafkaConsumerReadTask(KafkaConsumerState<KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT> kafkaConsumerState, long j, long j2, ConsumerReadCallback<ClientKeyT, ClientValueT> consumerReadCallback) {
        this.parent = kafkaConsumerState;
        this.maxResponseBytes = Math.min(j2, kafkaConsumerState.getConfig().getLong(KafkaRestConfig.CONSUMER_REQUEST_MAX_BYTES_CONFIG).longValue());
        long intValue = kafkaConsumerState.getConfig().getInt(KafkaRestConfig.CONSUMER_REQUEST_TIMEOUT_MS_CONFIG).intValue();
        this.requestTimeoutMs = j <= 0 ? intValue : Math.min(j, intValue);
        int intValue2 = kafkaConsumerState.getConfig().getInt(KafkaRestConfig.PROXY_FETCH_MIN_BYTES_CONFIG).intValue();
        this.responseMinBytes = intValue2 < 0 ? Integer.MAX_VALUE : intValue2;
        this.callback = consumerReadCallback;
        this.finished = false;
        this.started = kafkaConsumerState.getConfig().m1getTime().milliseconds();
    }

    public void doPartialRead() {
        try {
            if (this.messages == null) {
                this.messages = new Vector();
            }
            addRecords();
            log.trace("KafkaConsumerReadTask exiting read with id={} messages={} bytes={}, backing off if not complete", new Object[]{this, Integer.valueOf(this.messages.size()), Long.valueOf(this.bytesConsumed)});
            boolean z = this.parent.getConfig().m1getTime().milliseconds() - this.started >= this.requestTimeoutMs;
            if (z || this.exceededMaxResponseBytes || this.exceededMinResponseBytes) {
                log.trace("Finishing KafkaConsumerReadTask id={} requestTimedOut={} exceededMaxResponseBytes={} exceededMinResponseBytes={}", new Object[]{this, Boolean.valueOf(z), Boolean.valueOf(this.exceededMaxResponseBytes), Boolean.valueOf(this.exceededMinResponseBytes)});
                finish();
            }
        } catch (Exception e) {
            finish(e);
            log.error("Unexpected exception in consumer read task id={} ", this, e);
        }
    }

    public boolean isDone() {
        return this.finished;
    }

    private void addRecords() {
        while (!this.exceededMinResponseBytes && !this.exceededMaxResponseBytes && this.parent.hasNext()) {
            synchronized (this.parent) {
                if (this.parent.hasNext()) {
                    maybeAddRecord();
                }
            }
        }
        while (!this.exceededMaxResponseBytes && this.parent.hasNextCached()) {
            synchronized (this.parent) {
                if (this.parent.hasNextCached()) {
                    maybeAddRecord();
                }
            }
        }
    }

    private void maybeAddRecord() {
        ConsumerRecordAndSize<ClientKeyT, ClientValueT> createConsumerRecord = this.parent.createConsumerRecord(this.parent.peek());
        long size = createConsumerRecord.getSize();
        if (this.bytesConsumed + size >= this.maxResponseBytes) {
            this.exceededMaxResponseBytes = true;
            return;
        }
        this.messages.add(createConsumerRecord.getRecord());
        this.parent.next();
        this.bytesConsumed += size;
        if (this.exceededMinResponseBytes || this.bytesConsumed <= this.responseMinBytes) {
            return;
        }
        this.exceededMinResponseBytes = true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void finish() {
        finish(null);
    }

    private void finish(Exception exc) {
        log.trace("Finishing KafkaConsumerReadTask id={}", this, exc);
        try {
            this.callback.onCompletion(exc == null ? this.messages : null, exc);
        } catch (Throwable th) {
            log.error("Consumer read callback threw an unhandled exception id={}", this, exc);
        }
        this.finished = true;
    }
}
