package org.apache.flink.runtime.io.network.partition.hybrid.tiered.file;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferHeader;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.io.network.buffer.CompositeBuffer;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileReader;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.ProducerMergedPartitionFileIndex;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReader.class */
public class ProducerMergedPartitionFileReader implements PartitionFileReader {
    private static final Logger LOG = LoggerFactory.getLogger(ProducerMergedPartitionFileReader.class);
    private final ByteBuffer reusedHeaderBuffer = BufferReaderWriterUtil.allocatedHeaderBuffer();
    private final Path dataFilePath;
    private final ProducerMergedPartitionFileIndex dataIndex;
    private volatile FileChannel fileChannel;

    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReader$ProducerMergedReadProgress.class */
    public static class ProducerMergedReadProgress implements PartitionFileReader.ReadProgress {
        private final long currentBufferOffset;
        private final long endOfRegionOffset;

        public ProducerMergedReadProgress(long j, long j2) {
            this.currentBufferOffset = j;
            this.endOfRegionOffset = j2;
        }

        public long getCurrentBufferOffset() {
            return this.currentBufferOffset;
        }

        public long getEndOfRegionOffset() {
            return this.endOfRegionOffset;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public ProducerMergedPartitionFileReader(Path path, ProducerMergedPartitionFileIndex producerMergedPartitionFileIndex) {
        this.dataFilePath = path;
        this.dataIndex = producerMergedPartitionFileIndex;
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileReader
    public PartitionFileReader.ReadBufferResult readBuffer(TieredStoragePartitionId tieredStoragePartitionId, TieredStorageSubpartitionId tieredStorageSubpartitionId, int i, int i2, MemorySegment memorySegment, BufferRecycler bufferRecycler, @Nullable PartitionFileReader.ReadProgress readProgress, @Nullable CompositeBuffer compositeBuffer) throws IOException {
        lazyInitializeFileChannel();
        Tuple2<Long, Long> readStartAndEndOffset = getReadStartAndEndOffset(tieredStorageSubpartitionId, i2, readProgress, compositeBuffer);
        if (readStartAndEndOffset == null) {
            return null;
        }
        long longValue = readStartAndEndOffset.f0.longValue();
        long longValue2 = readStartAndEndOffset.f1.longValue();
        int min = Math.min(memorySegment.size(), (int) (longValue2 - longValue));
        if (min == 0) {
            return null;
        }
        LinkedList linkedList = new LinkedList();
        ByteBuffer wrap = memorySegment.wrap(0, min);
        this.fileChannel.position(longValue);
        readFileDataToBuffer(memorySegment, bufferRecycler, wrap);
        Tuple2<Integer, Integer> sliceBuffer = sliceBuffer(wrap, memorySegment, compositeBuffer, bufferRecycler, linkedList);
        return getReadBufferResult(linkedList, longValue, longValue2, min, sliceBuffer.f0.intValue(), sliceBuffer.f1.intValue());
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileReader
    public long getPriority(TieredStoragePartitionId tieredStoragePartitionId, TieredStorageSubpartitionId tieredStorageSubpartitionId, int i, int i2, @Nullable PartitionFileReader.ReadProgress readProgress) {
        lazyInitializeFileChannel();
        ProducerMergedReadProgress convertToCurrentReadProgress = convertToCurrentReadProgress(readProgress);
        return (convertToCurrentReadProgress == null || convertToCurrentReadProgress.getCurrentBufferOffset() == convertToCurrentReadProgress.getEndOfRegionOffset()) ? ((Long) this.dataIndex.getRegion(tieredStorageSubpartitionId, i2).map((v0) -> {
            return v0.getRegionStartOffset();
        }).orElse(Long.MAX_VALUE)).longValue() : convertToCurrentReadProgress.getCurrentBufferOffset();
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileReader
    public void release() {
        if (this.fileChannel != null) {
            try {
                this.fileChannel.close();
            } catch (IOException e) {
                ExceptionUtils.rethrow(e, "Failed to close file channel.");
            }
        }
        IOUtils.deleteFileQuietly(this.dataFilePath);
    }

    private void lazyInitializeFileChannel() {
        if (this.fileChannel == null) {
            try {
                this.fileChannel = FileChannel.open(this.dataFilePath, StandardOpenOption.READ);
            } catch (IOException e) {
                ExceptionUtils.rethrow(e, "Failed to open file channel.");
            }
        }
    }

    private Tuple2<Integer, Integer> sliceBuffer(ByteBuffer byteBuffer, MemorySegment memorySegment, @Nullable CompositeBuffer compositeBuffer, BufferRecycler bufferRecycler, List<Buffer> list) {
        BufferHeader parseBufferHeader;
        Preconditions.checkState(this.reusedHeaderBuffer.position() == 0);
        Preconditions.checkState(compositeBuffer == null || compositeBuffer.missingLength() > 0);
        NetworkBuffer networkBuffer = new NetworkBuffer(memorySegment, bufferRecycler);
        networkBuffer.setSize(byteBuffer.remaining());
        try {
            int i = 0;
            if (compositeBuffer != null) {
                try {
                    networkBuffer.retainBuffer();
                    int position = byteBuffer.position() + compositeBuffer.missingLength();
                    int missingLength = compositeBuffer.missingLength();
                    compositeBuffer.addPartialBuffer(networkBuffer.readOnlySlice(byteBuffer.position(), missingLength));
                    i = 0 + missingLength;
                    byteBuffer.position(position);
                    list.add(compositeBuffer);
                } catch (Throwable th) {
                    LOG.error("Failed to slice the read buffer {}.", byteBuffer, th);
                    throw th;
                }
            }
            CompositeBuffer compositeBuffer2 = null;
            while (true) {
                if (!byteBuffer.hasRemaining() || (parseBufferHeader = parseBufferHeader(byteBuffer)) == null) {
                    break;
                }
                int i2 = i + 8;
                if (parseBufferHeader.getLength() > byteBuffer.remaining()) {
                    networkBuffer.retainBuffer();
                    int remaining = byteBuffer.remaining();
                    i = i2 + remaining;
                    compositeBuffer2 = new CompositeBuffer(parseBufferHeader);
                    compositeBuffer2.addPartialBuffer(networkBuffer.readOnlySlice(byteBuffer.position(), remaining));
                    list.add(compositeBuffer2);
                    break;
                }
                networkBuffer.retainBuffer();
                Buffer readOnlySlice = networkBuffer.readOnlySlice(byteBuffer.position(), parseBufferHeader.getLength());
                readOnlySlice.setDataType(parseBufferHeader.getDataType());
                readOnlySlice.setCompressed(parseBufferHeader.isCompressed());
                byteBuffer.position(byteBuffer.position() + parseBufferHeader.getLength());
                i = i2 + parseBufferHeader.getLength();
                list.add(readOnlySlice);
            }
            Tuple2<Integer, Integer> of = Tuple2.of(Integer.valueOf(i), Integer.valueOf(getPartialBufferReadBytes(compositeBuffer2)));
            networkBuffer.recycleBuffer();
            return of;
        } catch (Throwable th2) {
            networkBuffer.recycleBuffer();
            throw th2;
        }
    }

    @Nullable
    private Tuple2<Long, Long> getReadStartAndEndOffset(TieredStorageSubpartitionId tieredStorageSubpartitionId, int i, @Nullable PartitionFileReader.ReadProgress readProgress, @Nullable CompositeBuffer compositeBuffer) {
        long regionStartOffset;
        long regionEndOffset;
        ProducerMergedReadProgress convertToCurrentReadProgress = convertToCurrentReadProgress(readProgress);
        if (convertToCurrentReadProgress == null || convertToCurrentReadProgress.getCurrentBufferOffset() == convertToCurrentReadProgress.getEndOfRegionOffset()) {
            Optional<ProducerMergedPartitionFileIndex.FixedSizeRegion> region = this.dataIndex.getRegion(tieredStorageSubpartitionId, i);
            if (!region.isPresent()) {
                return null;
            }
            regionStartOffset = region.get().getRegionStartOffset();
            regionEndOffset = region.get().getRegionEndOffset();
        } else {
            regionStartOffset = convertToCurrentReadProgress.getCurrentBufferOffset() + getPartialBufferReadBytes(compositeBuffer);
            regionEndOffset = convertToCurrentReadProgress.getEndOfRegionOffset();
        }
        Preconditions.checkState(regionStartOffset <= regionEndOffset);
        return Tuple2.of(Long.valueOf(regionStartOffset), Long.valueOf(regionEndOffset));
    }

    private static PartitionFileReader.ReadBufferResult getReadBufferResult(List<Buffer> list, long j, long j2, int i, int i2, int i3) {
        boolean z = j + ((long) i2) < j2;
        ProducerMergedReadProgress producerMergedReadProgress = new ProducerMergedReadProgress((j + i2) - i3, j2);
        Preconditions.checkState(i2 <= i && i - i2 < 8);
        return new PartitionFileReader.ReadBufferResult(list, z, producerMergedReadProgress);
    }

    private void readFileDataToBuffer(MemorySegment memorySegment, BufferRecycler bufferRecycler, ByteBuffer byteBuffer) throws IOException {
        try {
            BufferReaderWriterUtil.readByteBufferFully(this.fileChannel, byteBuffer);
            byteBuffer.flip();
        } catch (Throwable th) {
            bufferRecycler.recycle(memorySegment);
            throw th;
        }
    }

    private static int getPartialBufferReadBytes(@Nullable CompositeBuffer compositeBuffer) {
        if (compositeBuffer == null) {
            return 0;
        }
        return compositeBuffer.readableBytes() + 8;
    }

    private static ProducerMergedReadProgress convertToCurrentReadProgress(@Nullable PartitionFileReader.ReadProgress readProgress) {
        if (readProgress == null) {
            return null;
        }
        Preconditions.checkState(readProgress instanceof ProducerMergedReadProgress);
        return (ProducerMergedReadProgress) readProgress;
    }

    private BufferHeader parseBufferHeader(ByteBuffer byteBuffer) {
        Preconditions.checkArgument(this.reusedHeaderBuffer.position() == 0);
        BufferHeader bufferHeader = null;
        try {
            if (byteBuffer.remaining() >= 8) {
                bufferHeader = BufferReaderWriterUtil.parseBufferHeader(byteBuffer);
            }
            this.reusedHeaderBuffer.clear();
            return bufferHeader;
        } catch (Throwable th) {
            this.reusedHeaderBuffer.clear();
            LOG.error("Failed to parse buffer header.", th);
            throw th;
        }
    }
}
