package datahub.shaded.org.apache.kafka.clients.consumer.internals;

import datahub.shaded.org.apache.kafka.clients.consumer.ConsumerRecord;
import datahub.shaded.org.apache.kafka.common.IsolationLevel;
import datahub.shaded.org.apache.kafka.common.KafkaException;
import datahub.shaded.org.apache.kafka.common.TopicPartition;
import datahub.shaded.org.apache.kafka.common.errors.CorruptRecordException;
import datahub.shaded.org.apache.kafka.common.errors.RecordDeserializationException;
import datahub.shaded.org.apache.kafka.common.errors.SerializationException;
import datahub.shaded.org.apache.kafka.common.header.Headers;
import datahub.shaded.org.apache.kafka.common.header.internals.RecordHeaders;
import datahub.shaded.org.apache.kafka.common.message.FetchResponseData;
import datahub.shaded.org.apache.kafka.common.record.ControlRecordType;
import datahub.shaded.org.apache.kafka.common.record.Record;
import datahub.shaded.org.apache.kafka.common.record.RecordBatch;
import datahub.shaded.org.apache.kafka.common.record.TimestampType;
import datahub.shaded.org.apache.kafka.common.requests.FetchResponse;
import datahub.shaded.org.apache.kafka.common.utils.BufferSupplier;
import datahub.shaded.org.apache.kafka.common.utils.CloseableIterator;
import datahub.shaded.org.slf4j.Logger;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.Set;

/* loaded from: input_file:datahub/shaded/org/apache/kafka/clients/consumer/internals/CompletedFetch.class */
public class CompletedFetch {
    final TopicPartition partition;
    final FetchResponseData.PartitionData partitionData;
    final short requestVersion;
    private final Logger log;
    private final SubscriptionState subscriptions;
    private final BufferSupplier decompressionBufferSupplier;
    private final Iterator<? extends RecordBatch> batches;
    private final PriorityQueue<FetchResponseData.AbortedTransaction> abortedTransactions;
    private final FetchMetricsAggregator metricAggregator;
    private int recordsRead;
    private int bytesRead;
    private RecordBatch currentBatch;
    private Record lastRecord;
    private CloseableIterator<Record> records;
    private long nextFetchOffset;
    private Exception cachedRecordException = null;
    private boolean corruptLastRecord = false;
    private boolean isConsumed = false;
    private boolean initialized = false;
    private Optional<Integer> lastEpoch = Optional.empty();
    private final Set<Long> abortedProducerIds = new HashSet();

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletedFetch(Logger logger, SubscriptionState subscriptionState, BufferSupplier bufferSupplier, TopicPartition topicPartition, FetchResponseData.PartitionData partitionData, FetchMetricsAggregator fetchMetricsAggregator, Long l, short s) {
        this.log = logger;
        this.subscriptions = subscriptionState;
        this.decompressionBufferSupplier = bufferSupplier;
        this.partition = topicPartition;
        this.partitionData = partitionData;
        this.metricAggregator = fetchMetricsAggregator;
        this.batches = FetchResponse.recordsOrFail(partitionData).batches().iterator();
        this.nextFetchOffset = l.longValue();
        this.requestVersion = s;
        this.abortedTransactions = abortedTransactions(partitionData);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long nextFetchOffset() {
        return this.nextFetchOffset;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Optional<Integer> lastEpoch() {
        return this.lastEpoch;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isInitialized() {
        return this.initialized;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setInitialized() {
        this.initialized = true;
    }

    public boolean isConsumed() {
        return this.isConsumed;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void recordAggregatedMetrics(int i, int i2) {
        this.metricAggregator.record(this.partition, i, i2);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void drain() {
        if (this.isConsumed) {
            return;
        }
        maybeCloseRecordStream();
        this.cachedRecordException = null;
        this.isConsumed = true;
        recordAggregatedMetrics(this.bytesRead, this.recordsRead);
        if (this.bytesRead > 0) {
            this.subscriptions.movePartitionToEnd(this.partition);
        }
    }

    private void maybeEnsureValid(FetchConfig fetchConfig, RecordBatch recordBatch) {
        if (!fetchConfig.checkCrcs || recordBatch.magic() < 2) {
            return;
        }
        try {
            recordBatch.ensureValid();
        } catch (CorruptRecordException e) {
            String valueOf = String.valueOf(this.partition);
            long baseOffset = recordBatch.baseOffset();
            e.getMessage();
            KafkaException kafkaException = new KafkaException("Record batch for partition " + valueOf + " at offset " + baseOffset + " is invalid, cause: " + kafkaException);
            throw kafkaException;
        }
    }

    private void maybeEnsureValid(FetchConfig fetchConfig, Record record) {
        if (fetchConfig.checkCrcs) {
            try {
                record.ensureValid();
            } catch (CorruptRecordException e) {
                String valueOf = String.valueOf(this.partition);
                long offset = record.offset();
                e.getMessage();
                KafkaException kafkaException = new KafkaException("Record for partition " + valueOf + " at offset " + offset + " is invalid, cause: " + kafkaException);
                throw kafkaException;
            }
        }
    }

    private void maybeCloseRecordStream() {
        if (this.records != null) {
            this.records.close();
            this.records = null;
        }
    }

    private Record nextFetchedRecord(FetchConfig fetchConfig) {
        while (true) {
            if (this.records == null || !this.records.hasNext()) {
                maybeCloseRecordStream();
                if (!this.batches.hasNext()) {
                    if (this.currentBatch != null) {
                        this.nextFetchOffset = this.currentBatch.nextOffset();
                    }
                    drain();
                    return null;
                }
                this.currentBatch = this.batches.next();
                this.lastEpoch = maybeLeaderEpoch(this.currentBatch.partitionLeaderEpoch());
                maybeEnsureValid(fetchConfig, this.currentBatch);
                if (fetchConfig.isolationLevel == IsolationLevel.READ_COMMITTED && this.currentBatch.hasProducerId()) {
                    consumeAbortedTransactionsUpTo(this.currentBatch.lastOffset());
                    long producerId = this.currentBatch.producerId();
                    if (containsAbortMarker(this.currentBatch)) {
                        this.abortedProducerIds.remove(Long.valueOf(producerId));
                    } else if (isBatchAborted(this.currentBatch)) {
                        this.log.debug("Skipping aborted record batch from partition {} with producerId {} and offsets {} to {}", this.partition, Long.valueOf(producerId), Long.valueOf(this.currentBatch.baseOffset()), Long.valueOf(this.currentBatch.lastOffset()));
                        this.nextFetchOffset = this.currentBatch.nextOffset();
                    }
                }
                this.records = this.currentBatch.streamingIterator(this.decompressionBufferSupplier);
            } else {
                Record next = this.records.next();
                if (next.offset() >= this.nextFetchOffset) {
                    maybeEnsureValid(fetchConfig, next);
                    if (!this.currentBatch.isControlBatch()) {
                        return next;
                    }
                    this.nextFetchOffset = next.offset() + 1;
                } else {
                    continue;
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <K, V> List<ConsumerRecord<K, V>> fetchRecords(FetchConfig fetchConfig, Deserializers<K, V> deserializers, int i) {
        if (this.corruptLastRecord) {
            throw new KafkaException("Received exception when fetching the next record from " + String.valueOf(this.partition) + ". If needed, please seek past the record to continue consumption.", this.cachedRecordException);
        }
        if (this.isConsumed) {
            return Collections.emptyList();
        }
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < i; i2++) {
            try {
                if (this.cachedRecordException == null) {
                    this.corruptLastRecord = true;
                    this.lastRecord = nextFetchedRecord(fetchConfig);
                    this.corruptLastRecord = false;
                }
                if (this.lastRecord == null) {
                    break;
                }
                arrayList.add(parseRecord(deserializers, this.partition, maybeLeaderEpoch(this.currentBatch.partitionLeaderEpoch()), this.currentBatch.timestampType(), this.lastRecord));
                this.recordsRead++;
                this.bytesRead += this.lastRecord.sizeInBytes();
                this.nextFetchOffset = this.lastRecord.offset() + 1;
                this.cachedRecordException = null;
            } catch (SerializationException e) {
                this.cachedRecordException = e;
                if (arrayList.isEmpty()) {
                    throw e;
                }
            } catch (KafkaException e2) {
                this.cachedRecordException = e2;
                if (arrayList.isEmpty()) {
                    throw new KafkaException("Received exception when fetching the next record from " + String.valueOf(this.partition) + ". If needed, please seek past the record to continue consumption.", e2);
                }
            }
        }
        return arrayList;
    }

    <K, V> ConsumerRecord<K, V> parseRecord(Deserializers<K, V> deserializers, TopicPartition topicPartition, Optional<Integer> optional, TimestampType timestampType, Record record) {
        K deserialize;
        V deserialize2;
        ByteBuffer key = record.key();
        ByteBuffer value = record.value();
        RecordHeaders recordHeaders = new RecordHeaders(record.headers());
        if (key == null) {
            deserialize = null;
        } else {
            try {
                deserialize = deserializers.keyDeserializer.deserialize(topicPartition.topic(), recordHeaders, key);
            } catch (RuntimeException e) {
                this.log.error("Key Deserializers with error: {}", deserializers);
                throw newRecordDeserializationException(RecordDeserializationException.DeserializationExceptionOrigin.KEY, topicPartition, timestampType, record, e, recordHeaders);
            }
        }
        K k = deserialize;
        if (value == null) {
            deserialize2 = null;
        } else {
            try {
                deserialize2 = deserializers.valueDeserializer.deserialize(topicPartition.topic(), recordHeaders, value);
            } catch (RuntimeException e2) {
                this.log.error("Value Deserializers with error: {}", deserializers);
                throw newRecordDeserializationException(RecordDeserializationException.DeserializationExceptionOrigin.VALUE, topicPartition, timestampType, record, e2, recordHeaders);
            }
        }
        return new ConsumerRecord<>(topicPartition.topic(), topicPartition.partition(), record.offset(), record.timestamp(), timestampType, key == null ? -1 : key.remaining(), value == null ? -1 : value.remaining(), k, deserialize2, recordHeaders, optional);
    }

    private static RecordDeserializationException newRecordDeserializationException(RecordDeserializationException.DeserializationExceptionOrigin deserializationExceptionOrigin, TopicPartition topicPartition, TimestampType timestampType, Record record, RuntimeException runtimeException, Headers headers) {
        return new RecordDeserializationException(deserializationExceptionOrigin, topicPartition, record.offset(), record.timestamp(), timestampType, record.key(), record.value(), headers, "Error deserializing " + deserializationExceptionOrigin.name() + " for partition " + String.valueOf(topicPartition) + " at offset " + record.offset() + ". If needed, please seek past the record to continue consumption.", runtimeException);
    }

    private Optional<Integer> maybeLeaderEpoch(int i) {
        return i == -1 ? Optional.empty() : Optional.of(Integer.valueOf(i));
    }

    private void consumeAbortedTransactionsUpTo(long j) {
        if (this.abortedTransactions == null) {
            return;
        }
        while (!this.abortedTransactions.isEmpty() && this.abortedTransactions.peek().firstOffset() <= j) {
            this.abortedProducerIds.add(Long.valueOf(this.abortedTransactions.poll().producerId()));
        }
    }

    private boolean isBatchAborted(RecordBatch recordBatch) {
        return recordBatch.isTransactional() && this.abortedProducerIds.contains(Long.valueOf(recordBatch.producerId()));
    }

    private PriorityQueue<FetchResponseData.AbortedTransaction> abortedTransactions(FetchResponseData.PartitionData partitionData) {
        if (partitionData.abortedTransactions() == null || partitionData.abortedTransactions().isEmpty()) {
            return null;
        }
        PriorityQueue<FetchResponseData.AbortedTransaction> priorityQueue = new PriorityQueue<>(partitionData.abortedTransactions().size(), Comparator.comparingLong((v0) -> {
            return v0.firstOffset();
        }));
        priorityQueue.addAll(partitionData.abortedTransactions());
        return priorityQueue;
    }

    private boolean containsAbortMarker(RecordBatch recordBatch) {
        if (!recordBatch.isControlBatch()) {
            return false;
        }
        Iterator<Record> it = recordBatch.iterator();
        if (it.hasNext()) {
            return ControlRecordType.ABORT == ControlRecordType.parse(it.next().key());
        }
        return false;
    }
}
