package io.confluent.kafkarest;

import io.confluent.kafkarest.entities.ConsumerRecord;
import io.confluent.rest.exceptions.RestException;
import java.util.List;
import java.util.Map;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.ConsumerTimeoutException;
import kafka.message.MessageAndMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/confluent/kafkarest/ConsumerReadTask.class */
public class ConsumerReadTask<KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT> implements Future<List<ConsumerRecord<ClientKeyT, ClientValueT>>> {
    private static final Logger log = LoggerFactory.getLogger(ConsumerReadTask.class);
    private ConsumerState parent;
    private final long maxResponseBytes;
    private final ConsumerReadCallback<ClientKeyT, ClientValueT> callback;
    private CountDownLatch finished = new CountDownLatch(1);
    private ConsumerTopicState topicState;
    private ConsumerIterator<KafkaKeyT, KafkaValueT> iter;
    private List<ConsumerRecord<ClientKeyT, ClientValueT>> messages;
    private long bytesConsumed;
    private final long started;

    public ConsumerReadTask(ConsumerState consumerState, String str, long j, ConsumerReadCallback<ClientKeyT, ClientValueT> consumerReadCallback) {
        this.bytesConsumed = 0L;
        this.parent = consumerState;
        this.maxResponseBytes = Math.min(j, consumerState.getConfig().getLong(KafkaRestConfig.CONSUMER_REQUEST_MAX_BYTES_CONFIG));
        this.callback = consumerReadCallback;
        this.started = consumerState.getConfig().m6getTime().milliseconds();
        try {
            this.topicState = consumerState.getOrCreateTopicState(str);
            ConsumerReadTask clearFailedTask = this.topicState.clearFailedTask();
            if (clearFailedTask != null) {
                this.messages = clearFailedTask.messages;
                this.bytesConsumed = clearFailedTask.bytesConsumed;
            }
        } catch (RestException e) {
            finish(e);
        }
    }

    public void doPartialRead() {
        try {
            if (this.iter == null) {
                this.parent.startRead(this.topicState);
                this.iter = this.topicState.getIterator();
                this.messages = new Vector();
            }
            long j = 0;
            int i = this.parent.getConfig().getInt(KafkaRestConfig.CONSUMER_REQUEST_TIMEOUT_MS_CONFIG);
            while (this.iter.hasNext()) {
                try {
                    ConsumerRecordAndSize<ClientKeyT, ClientValueT> createConsumerRecord = this.parent.createConsumerRecord((MessageAndMetadata) this.iter.peek());
                    j = createConsumerRecord.getSize();
                    if (this.bytesConsumed + j >= this.maxResponseBytes) {
                        break;
                    }
                    this.iter.next();
                    this.messages.add(createConsumerRecord.getRecord());
                    this.bytesConsumed += j;
                } catch (ConsumerTimeoutException e) {
                    log.trace("ConsumerReadTask timed out, using backoff id={}", this);
                }
            }
            log.trace("ConsumerReadTask exiting read with id={} messages={} bytes={}", new Object[]{this, Integer.valueOf(this.messages.size()), Long.valueOf(this.bytesConsumed)});
            boolean z = this.parent.getConfig().m6getTime().milliseconds() - this.started >= ((long) i);
            boolean z2 = this.bytesConsumed + j >= this.maxResponseBytes;
            if (z || z2) {
                log.trace("Finishing ConsumerReadTask id={} requestTimedOut={} exceededMaxResponseBytes={}", new Object[]{this, Boolean.valueOf(z), Boolean.valueOf(z2)});
                finish();
            }
        } catch (Exception e2) {
            e = e2;
            if (!(e instanceof RestException)) {
                e = Errors.kafkaErrorException(e);
            }
            finish((RestException) e);
            log.error("Unexpected exception in consumer read task id={} ", this, e);
        }
    }

    public void finish() {
        finish(null);
    }

    public void finish(RestException restException) {
        log.trace("Finishing ConsumerReadTask id={} exception={}", this, restException);
        if (restException == null) {
            Map<Integer, Long> consumedOffsets = this.topicState.getConsumedOffsets();
            for (ConsumerRecord<ClientKeyT, ClientValueT> consumerRecord : this.messages) {
                consumedOffsets.put(Integer.valueOf(consumerRecord.getPartition()), Long.valueOf(consumerRecord.getOffset()));
            }
        } else if (this.topicState != null && this.messages != null && this.messages.size() > 0) {
            log.trace("Saving failed ConsumerReadTask for subsequent call id={}", this, restException);
            this.topicState.setFailedTask(this);
        }
        if (this.topicState != null) {
            this.parent.finishRead(this.topicState);
        }
        try {
            this.callback.onCompletion(restException == null ? this.messages : null, restException);
        } catch (Throwable th) {
            log.error("Consumer read callback threw an unhandled exception id={}", this, restException);
        }
        this.finished.countDown();
    }

    @Override // java.util.concurrent.Future
    public boolean cancel(boolean z) {
        return false;
    }

    @Override // java.util.concurrent.Future
    public boolean isCancelled() {
        return false;
    }

    @Override // java.util.concurrent.Future
    public boolean isDone() {
        return this.finished.getCount() == 0;
    }

    @Override // java.util.concurrent.Future
    public List<ConsumerRecord<ClientKeyT, ClientValueT>> get() throws InterruptedException, ExecutionException {
        this.finished.await();
        return this.messages;
    }

    @Override // java.util.concurrent.Future
    public List<ConsumerRecord<ClientKeyT, ClientValueT>> get(long j, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
        this.finished.await(j, timeUnit);
        if (this.finished.getCount() > 0) {
            throw new TimeoutException();
        }
        return this.messages;
    }
}
