package org.apache.spark.shuffle.reader;

import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.spark.executor.ShuffleReadMetrics;
import org.apache.spark.serializer.DeserializationStream;
import org.apache.spark.serializer.Serializer;
import org.apache.spark.serializer.SerializerInstance;
import org.apache.spark.shuffle.RssSparkConfig;
import org.apache.uniffle.client.api.ShuffleReadClient;
import org.apache.uniffle.client.response.CompressedShuffleBlock;
import org.apache.uniffle.com.google.common.annotations.VisibleForTesting;
import org.apache.uniffle.common.compression.Codec;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.io.netty.buffer.ByteBufInputStream;
import org.apache.uniffle.io.netty.buffer.Unpooled;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Product2;
import scala.Tuple2;
import scala.collection.AbstractIterator;
import scala.collection.Iterator;
import scala.runtime.BoxedUnit;

/* loaded from: input_file:org/apache/spark/shuffle/reader/RssShuffleDataIterator.class */
public class RssShuffleDataIterator<K, C> extends AbstractIterator<Product2<K, C>> {
    private static final Logger LOG = LoggerFactory.getLogger(RssShuffleDataIterator.class);
    private SerializerInstance serializerInstance;
    private ShuffleReadClient shuffleReadClient;
    private ShuffleReadMetrics shuffleReadMetrics;
    private ByteBuffer uncompressedData;
    private Codec codec;
    private Iterator<Tuple2<Object, Object>> recordsIterator = null;
    private long readTime = 0;
    private long serializeTime = 0;
    private long decompressTime = 0;
    private DeserializationStream deserializationStream = null;
    private ByteBufInputStream byteBufInputStream = null;
    private long totalRawBytesLength = 0;
    private long unCompressedBytesLength = 0;

    public RssShuffleDataIterator(Serializer serializer, ShuffleReadClient shuffleReadClient, ShuffleReadMetrics shuffleReadMetrics, RssConf rssConf) {
        this.serializerInstance = serializer.newInstance();
        this.shuffleReadClient = shuffleReadClient;
        this.shuffleReadMetrics = shuffleReadMetrics;
        this.codec = rssConf.getBoolean(RssSparkConfig.SPARK_SHUFFLE_COMPRESS_KEY.substring(RssSparkConfig.SPARK_RSS_CONFIG_PREFIX.length()), true) ? Codec.newInstance(rssConf) : null;
    }

    public Iterator<Tuple2<Object, Object>> createKVIterator(ByteBuffer byteBuffer) {
        clearDeserializationStream();
        this.byteBufInputStream = new ByteBufInputStream(Unpooled.wrappedBuffer(byteBuffer), true);
        this.deserializationStream = this.serializerInstance.deserializeStream(this.byteBufInputStream);
        return this.deserializationStream.asKeyValueIterator();
    }

    private void clearDeserializationStream() {
        if (this.byteBufInputStream != null) {
            try {
                this.byteBufInputStream.close();
            } catch (IOException e) {
                LOG.warn("Can't close ByteBufInputStream, memory may be leaked.");
            }
        }
        if (this.deserializationStream != null) {
            this.deserializationStream.close();
        }
        this.deserializationStream = null;
        this.byteBufInputStream = null;
    }

    public boolean hasNext() {
        if (this.recordsIterator == null || !this.recordsIterator.hasNext()) {
            long currentTimeMillis = System.currentTimeMillis();
            CompressedShuffleBlock readShuffleBlockData = this.shuffleReadClient.readShuffleBlockData();
            ByteBuffer byteBuffer = readShuffleBlockData != null ? readShuffleBlockData.getByteBuffer() : null;
            long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
            this.shuffleReadMetrics.incFetchWaitTime(currentTimeMillis2);
            if (byteBuffer == null) {
                this.shuffleReadClient.checkProcessedBlockIds();
                this.shuffleReadClient.logStatics();
                LOG.info("Fetch {} bytes cost {} ms and {} ms to serialize{}", new Object[]{Long.valueOf(this.totalRawBytesLength), Long.valueOf(this.readTime), Long.valueOf(this.serializeTime), this.codec == null ? "." : ", " + this.decompressTime + " ms to decompress with unCompressionLength[" + this.unCompressedBytesLength + "]"});
                return false;
            }
            uncompress(readShuffleBlockData, byteBuffer);
            long currentTimeMillis3 = System.currentTimeMillis();
            this.recordsIterator = createKVIterator(this.uncompressedData);
            long currentTimeMillis4 = System.currentTimeMillis() - currentTimeMillis3;
            this.readTime += currentTimeMillis2;
            this.serializeTime += currentTimeMillis4;
        }
        return this.recordsIterator.hasNext();
    }

    private int uncompress(CompressedShuffleBlock compressedShuffleBlock, ByteBuffer byteBuffer) {
        long limit = byteBuffer.limit() - byteBuffer.position();
        this.totalRawBytesLength += limit;
        this.shuffleReadMetrics.incRemoteBytesRead(limit);
        int uncompressLength = compressedShuffleBlock.getUncompressLength();
        if (this.codec != null) {
            if (this.uncompressedData == null || this.uncompressedData.capacity() < uncompressLength) {
                if (this.uncompressedData != null) {
                    RssUtils.releaseByteBuffer(this.uncompressedData);
                }
                this.uncompressedData = byteBuffer.isDirect() ? ByteBuffer.allocateDirect(uncompressLength) : ByteBuffer.allocate(uncompressLength);
            }
            this.uncompressedData.clear();
            long currentTimeMillis = System.currentTimeMillis();
            this.codec.decompress(byteBuffer, uncompressLength, this.uncompressedData, 0);
            this.unCompressedBytesLength += uncompressLength;
            this.decompressTime += System.currentTimeMillis() - currentTimeMillis;
            this.uncompressedData.limit(this.uncompressedData.position() + uncompressLength);
        } else {
            this.uncompressedData = byteBuffer;
        }
        return uncompressLength;
    }

    /* renamed from: next, reason: merged with bridge method [inline-methods] */
    public Product2<K, C> m1083next() {
        this.shuffleReadMetrics.incRecordsRead(1L);
        return (Product2) this.recordsIterator.next();
    }

    public BoxedUnit cleanup() {
        clearDeserializationStream();
        if (this.codec != null) {
            RssUtils.releaseByteBuffer(this.uncompressedData);
        }
        if (this.shuffleReadClient != null) {
            this.shuffleReadClient.close();
        }
        this.shuffleReadClient = null;
        this.uncompressedData = null;
        return BoxedUnit.UNIT;
    }

    @VisibleForTesting
    protected ShuffleReadMetrics getShuffleReadMetrics() {
        return this.shuffleReadMetrics;
    }
}
