package datahub.shaded.org.apache.kafka.clients.consumer.internals;

import datahub.shaded.org.apache.kafka.clients.consumer.AcknowledgeType;
import datahub.shaded.org.apache.kafka.clients.consumer.ConsumerRecord;
import datahub.shaded.org.apache.kafka.common.KafkaException;
import datahub.shaded.org.apache.kafka.common.TopicIdPartition;
import datahub.shaded.org.apache.kafka.common.TopicPartition;
import datahub.shaded.org.apache.kafka.common.errors.CorruptRecordException;
import datahub.shaded.org.apache.kafka.common.errors.RecordDeserializationException;
import datahub.shaded.org.apache.kafka.common.header.Headers;
import datahub.shaded.org.apache.kafka.common.header.internals.RecordHeaders;
import datahub.shaded.org.apache.kafka.common.message.ShareFetchResponseData;
import datahub.shaded.org.apache.kafka.common.record.Record;
import datahub.shaded.org.apache.kafka.common.record.RecordBatch;
import datahub.shaded.org.apache.kafka.common.record.TimestampType;
import datahub.shaded.org.apache.kafka.common.requests.ShareFetchResponse;
import datahub.shaded.org.apache.kafka.common.utils.BufferSupplier;
import datahub.shaded.org.apache.kafka.common.utils.CloseableIterator;
import datahub.shaded.org.apache.kafka.common.utils.LogContext;
import datahub.shaded.slf4j.Logger;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.Optional;

/* loaded from: input_file:datahub/shaded/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.class */
public class ShareCompletedFetch {
    final TopicIdPartition partition;
    final ShareFetchResponseData.PartitionData partitionData;
    final short requestVersion;
    private final Logger log;
    private final BufferSupplier decompressionBufferSupplier;
    private final Iterator<? extends RecordBatch> batches;
    private int recordsRead;
    private int bytesRead;
    private RecordBatch currentBatch;
    private Record lastRecord;
    private CloseableIterator<Record> records;
    private final List<OffsetAndDeliveryCount> acquiredRecordList;
    private ListIterator<OffsetAndDeliveryCount> acquiredRecordIterator;
    private final ShareFetchMetricsAggregator metricAggregator;
    private KafkaException cachedBatchException = null;
    private KafkaException cachedRecordException = null;
    private boolean isConsumed = false;
    private boolean initialized = false;
    private OffsetAndDeliveryCount nextAcquired = null;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:datahub/shaded/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch$OffsetAndDeliveryCount.class */
    public static class OffsetAndDeliveryCount {
        final long offset;
        final short deliveryCount;

        OffsetAndDeliveryCount(long j, short s) {
            this.offset = j;
            this.deliveryCount = s;
        }

        public String toString() {
            return "OffsetAndDeliveryCount{offset=" + this.offset + ", deliveryCount=" + ((int) this.deliveryCount) + "}";
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ShareCompletedFetch(LogContext logContext, BufferSupplier bufferSupplier, TopicIdPartition topicIdPartition, ShareFetchResponseData.PartitionData partitionData, ShareFetchMetricsAggregator shareFetchMetricsAggregator, short s) {
        this.log = logContext.logger(ShareCompletedFetch.class);
        this.decompressionBufferSupplier = bufferSupplier;
        this.partition = topicIdPartition;
        this.partitionData = partitionData;
        this.metricAggregator = shareFetchMetricsAggregator;
        this.requestVersion = s;
        this.batches = ShareFetchResponse.recordsOrFail(partitionData).batches().iterator();
        this.acquiredRecordList = buildAcquiredRecordList(partitionData.acquiredRecords());
    }

    private List<OffsetAndDeliveryCount> buildAcquiredRecordList(List<ShareFetchResponseData.AcquiredRecords> list) {
        LinkedList linkedList = new LinkedList();
        list.forEach(acquiredRecords -> {
            long firstOffset = acquiredRecords.firstOffset();
            while (true) {
                long j = firstOffset;
                if (j > acquiredRecords.lastOffset()) {
                    return;
                }
                linkedList.add(new OffsetAndDeliveryCount(j, acquiredRecords.deliveryCount()));
                firstOffset = j + 1;
            }
        });
        return linkedList;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isInitialized() {
        return this.initialized;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setInitialized() {
        this.initialized = true;
    }

    public boolean isConsumed() {
        return this.isConsumed;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void drain() {
        if (this.isConsumed) {
            return;
        }
        maybeCloseRecordStream();
        this.cachedRecordException = null;
        this.cachedBatchException = null;
        this.isConsumed = true;
        recordAggregatedMetrics(this.bytesRead, this.recordsRead);
    }

    void recordAggregatedMetrics(int i, int i2) {
        this.metricAggregator.record(this.partition.topicPartition(), i, i2);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Code restructure failed: missing block: B:38:0x0083, code lost:
    
        if (r8.nextAcquired == null) goto L56;
     */
    /* JADX WARN: Code restructure failed: missing block: B:39:0x0086, code lost:
    
        r0.addGap(r8.nextAcquired.offset);
        r8.nextAcquired = nextAcquiredRecord();
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public <K, V> datahub.shaded.org.apache.kafka.clients.consumer.internals.ShareInFlightBatch<K, V> fetchRecords(datahub.shaded.org.apache.kafka.clients.consumer.internals.Deserializers<K, V> r9, int r10, boolean r11) {
        /*
            Method dump skipped, instructions count: 439
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: datahub.shaded.org.apache.kafka.clients.consumer.internals.ShareCompletedFetch.fetchRecords(datahub.shaded.org.apache.kafka.clients.consumer.internals.Deserializers, int, boolean):datahub.shaded.org.apache.kafka.clients.consumer.internals.ShareInFlightBatch");
    }

    private void initializeNextAcquired() {
        if (this.nextAcquired == null) {
            if (this.acquiredRecordIterator == null) {
                this.acquiredRecordIterator = this.acquiredRecordList.listIterator();
            }
            if (this.acquiredRecordIterator.hasNext()) {
                this.nextAcquired = this.acquiredRecordIterator.next();
            }
        }
    }

    private OffsetAndDeliveryCount nextAcquiredRecord() {
        if (this.acquiredRecordIterator.hasNext()) {
            return this.acquiredRecordIterator.next();
        }
        return null;
    }

    private <K, V> void rejectRecordBatch(ShareInFlightBatch<K, V> shareInFlightBatch, RecordBatch recordBatch) {
        this.acquiredRecordIterator = this.acquiredRecordList.listIterator();
        OffsetAndDeliveryCount nextAcquiredRecord = nextAcquiredRecord();
        long baseOffset = recordBatch.baseOffset();
        while (true) {
            long j = baseOffset;
            if (j > recordBatch.lastOffset() || nextAcquiredRecord == null) {
                return;
            }
            if (j == nextAcquiredRecord.offset) {
                shareInFlightBatch.addAcknowledgement(j, AcknowledgeType.REJECT);
            } else if (j < nextAcquiredRecord.offset) {
                baseOffset = j + 1;
            }
            nextAcquiredRecord = nextAcquiredRecord();
            baseOffset = j + 1;
        }
    }

    <K, V> ConsumerRecord<K, V> parseRecord(Deserializers<K, V> deserializers, TopicIdPartition topicIdPartition, Optional<Integer> optional, TimestampType timestampType, Record record, short s) {
        K deserialize;
        V deserialize2;
        RecordHeaders recordHeaders = new RecordHeaders(record.headers());
        ByteBuffer key = record.key();
        ByteBuffer value = record.value();
        if (key == null) {
            deserialize = null;
        } else {
            try {
                deserialize = deserializers.keyDeserializer.deserialize(topicIdPartition.topic(), recordHeaders, key);
            } catch (RuntimeException e) {
                this.log.error("Key Deserializers with error: {}", deserializers);
                throw newRecordDeserializationException(RecordDeserializationException.DeserializationExceptionOrigin.KEY, topicIdPartition.topicPartition(), timestampType, record, e, recordHeaders);
            }
        }
        K k = deserialize;
        if (value == null) {
            deserialize2 = null;
        } else {
            try {
                deserialize2 = deserializers.valueDeserializer.deserialize(topicIdPartition.topic(), recordHeaders, value);
            } catch (RuntimeException e2) {
                this.log.error("Value Deserializers with error: {}", deserializers);
                throw newRecordDeserializationException(RecordDeserializationException.DeserializationExceptionOrigin.VALUE, topicIdPartition.topicPartition(), timestampType, record, e2, recordHeaders);
            }
        }
        return new ConsumerRecord<>(topicIdPartition.topic(), topicIdPartition.partition(), record.offset(), record.timestamp(), timestampType, key == null ? -1 : key.remaining(), value == null ? -1 : value.remaining(), k, deserialize2, recordHeaders, optional);
    }

    private static RecordDeserializationException newRecordDeserializationException(RecordDeserializationException.DeserializationExceptionOrigin deserializationExceptionOrigin, TopicPartition topicPartition, TimestampType timestampType, Record record, RuntimeException runtimeException, Headers headers) {
        return new RecordDeserializationException(deserializationExceptionOrigin, topicPartition, record.offset(), record.timestamp(), timestampType, record.key(), record.value(), headers, "Error deserializing " + deserializationExceptionOrigin.name() + " for partition " + topicPartition + " at offset " + record.offset() + ". The record has been released.", runtimeException);
    }

    private Record nextFetchedRecord(boolean z) {
        while (true) {
            if (this.records == null || !this.records.hasNext()) {
                maybeCloseRecordStream();
                if (!this.batches.hasNext()) {
                    drain();
                    return null;
                }
                this.currentBatch = this.batches.next();
                maybeEnsureValid(this.currentBatch, z);
                this.records = this.currentBatch.streamingIterator(this.decompressionBufferSupplier);
            } else {
                Record next = this.records.next();
                maybeEnsureValid(next, z);
                if (!this.currentBatch.isControlBatch()) {
                    return next;
                }
            }
        }
    }

    private Optional<Integer> maybeLeaderEpoch(int i) {
        return i == -1 ? Optional.empty() : Optional.of(Integer.valueOf(i));
    }

    private void maybeEnsureValid(RecordBatch recordBatch, boolean z) {
        if (!z || recordBatch.magic() < 2) {
            return;
        }
        try {
            recordBatch.ensureValid();
        } catch (CorruptRecordException e) {
            throw new CorruptRecordException("Record batch for partition " + this.partition.topicPartition() + " at offset " + recordBatch.baseOffset() + " is invalid, cause: " + e.getMessage());
        }
    }

    private void maybeEnsureValid(Record record, boolean z) {
        if (z) {
            try {
                record.ensureValid();
            } catch (CorruptRecordException e) {
                throw new CorruptRecordException("Record for partition " + this.partition.topicPartition() + " at offset " + record.offset() + " is invalid, cause: " + e.getMessage());
            }
        }
    }

    private void maybeCloseRecordStream() {
        if (this.records != null) {
            this.records.close();
            this.records = null;
        }
    }
}
