package io.confluent.kafkarest;

import io.confluent.kafkarest.entities.ConsumerRecord;
import io.confluent.rest.exceptions.RestException;
import java.util.List;
import java.util.Map;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.ConsumerTimeoutException;
import kafka.message.MessageAndMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/confluent/kafkarest/ConsumerReadTask.class */
public class ConsumerReadTask<KafkaK, KafkaV, ClientK, ClientV> implements Future<List<ConsumerRecord<ClientK, ClientV>>> {
    private static final Logger log = LoggerFactory.getLogger(ConsumerReadTask.class);
    private ConsumerState parent;
    private final long maxResponseBytes;
    private final ConsumerWorkerReadCallback<ClientK, ClientV> callback;
    private CountDownLatch finished = new CountDownLatch(1);
    private ConsumerTopicState topicState;
    private ConsumerIterator<KafkaK, KafkaV> iter;
    private List<ConsumerRecord<ClientK, ClientV>> messages;
    private long bytesConsumed;
    private final long started;
    long waitExpiration;

    public ConsumerReadTask(ConsumerState consumerState, String str, long j, ConsumerWorkerReadCallback<ClientK, ClientV> consumerWorkerReadCallback) {
        this.bytesConsumed = 0L;
        this.parent = consumerState;
        this.maxResponseBytes = Math.min(j, consumerState.getConfig().getLong(KafkaRestConfig.CONSUMER_REQUEST_MAX_BYTES_CONFIG));
        this.callback = consumerWorkerReadCallback;
        this.started = consumerState.getConfig().m7getTime().milliseconds();
        try {
            this.topicState = consumerState.getOrCreateTopicState(str);
            ConsumerReadTask clearFailedTask = this.topicState.clearFailedTask();
            if (clearFailedTask != null) {
                this.messages = clearFailedTask.messages;
                this.bytesConsumed = clearFailedTask.bytesConsumed;
            }
        } catch (RestException e) {
            finish(e);
        }
    }

    public boolean doPartialRead() {
        try {
            if (this.iter == null) {
                this.parent.startRead(this.topicState);
                this.iter = this.topicState.getIterator();
                this.messages = new Vector();
                this.waitExpiration = 0L;
            }
            boolean z = false;
            long j = 0;
            long milliseconds = this.parent.getConfig().m7getTime().milliseconds();
            int i = this.parent.getConfig().getInt(KafkaRestConfig.CONSUMER_REQUEST_TIMEOUT_MS_CONFIG);
            while (this.iter.hasNext()) {
                try {
                    ConsumerRecordAndSize<ClientK, ClientV> createConsumerRecord = this.parent.createConsumerRecord((MessageAndMetadata) this.iter.peek());
                    j = createConsumerRecord.getSize();
                    if (this.bytesConsumed + j >= this.maxResponseBytes) {
                        break;
                    }
                    this.iter.next();
                    this.messages.add(createConsumerRecord.getRecord());
                    this.bytesConsumed += j;
                } catch (ConsumerTimeoutException e) {
                    z = true;
                }
            }
            long milliseconds2 = this.parent.getConfig().m7getTime().milliseconds() - this.started;
            this.waitExpiration = Math.min(milliseconds + this.parent.getConfig().getInt(KafkaRestConfig.CONSUMER_ITERATOR_BACKOFF_MS_CONFIG), this.started + this.parent.getConfig().getInt(KafkaRestConfig.CONSUMER_REQUEST_TIMEOUT_MS_CONFIG));
            if (milliseconds2 >= i || this.bytesConsumed + j >= this.maxResponseBytes) {
                finish();
            }
            return z;
        } catch (Exception e2) {
            finish(e2);
            log.error("Unexpected exception in consumer read thread: ", e2);
            return false;
        }
    }

    public void finish() {
        finish(null);
    }

    public void finish(Exception exc) {
        if (exc == null) {
            Map<Integer, Long> consumedOffsets = this.topicState.getConsumedOffsets();
            for (ConsumerRecord<ClientK, ClientV> consumerRecord : this.messages) {
                consumedOffsets.put(Integer.valueOf(consumerRecord.getPartition()), Long.valueOf(consumerRecord.getOffset()));
            }
        } else if (this.topicState != null && this.messages != null && this.messages.size() > 0) {
            this.topicState.setFailedTask(this);
        }
        if (this.topicState != null) {
            this.parent.finishRead(this.topicState);
        }
        try {
            this.callback.onCompletion(exc == null ? this.messages : null, exc);
        } catch (Throwable th) {
            log.error("Consumer read callback threw an unhandled exception", exc);
        }
        this.finished.countDown();
    }

    @Override // java.util.concurrent.Future
    public boolean cancel(boolean z) {
        return false;
    }

    @Override // java.util.concurrent.Future
    public boolean isCancelled() {
        return false;
    }

    @Override // java.util.concurrent.Future
    public boolean isDone() {
        return this.finished.getCount() == 0;
    }

    @Override // java.util.concurrent.Future
    public List<ConsumerRecord<ClientK, ClientV>> get() throws InterruptedException, ExecutionException {
        this.finished.await();
        return this.messages;
    }

    @Override // java.util.concurrent.Future
    public List<ConsumerRecord<ClientK, ClientV>> get(long j, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
        this.finished.await(j, timeUnit);
        if (this.finished.getCount() > 0) {
            throw new TimeoutException();
        }
        return this.messages;
    }
}
