package org.apache.spark.shuffle.sort;

import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
import javax.annotation.Nullable;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.memory.MemoryConsumer;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.serializer.DummySerializerInstance;
import org.apache.spark.shuffle.sort.ShuffleInMemorySorter;
import org.apache.spark.storage.BlockManager;
import org.apache.spark.storage.DiskBlockObjectWriter;
import org.apache.spark.storage.FileSegment;
import org.apache.spark.storage.TempShuffleBlockId;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.array.LongArray;
import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.spark_project.guava.annotations.VisibleForTesting;
import scala.Tuple2;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/spark/shuffle/sort/ShuffleExternalSorter.class */
public final class ShuffleExternalSorter extends MemoryConsumer {
    private static final Logger logger;

    @VisibleForTesting
    static final int DISK_WRITE_BUFFER_SIZE = 1048576;
    private final int numPartitions;
    private final TaskMemoryManager taskMemoryManager;
    private final BlockManager blockManager;
    private final TaskContext taskContext;
    private final ShuffleWriteMetrics writeMetrics;
    private final long numElementsForSpillThreshold;
    private final int fileBufferSizeBytes;
    private final LinkedList<MemoryBlock> allocatedPages;
    private final LinkedList<SpillInfo> spills;
    private long peakMemoryUsedBytes;

    @Nullable
    private ShuffleInMemorySorter inMemSorter;

    @Nullable
    private MemoryBlock currentPage;
    private long pageCursor;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ShuffleExternalSorter(TaskMemoryManager taskMemoryManager, BlockManager blockManager, TaskContext taskContext, int i, int i2, SparkConf sparkConf, ShuffleWriteMetrics shuffleWriteMetrics) {
        super(taskMemoryManager, (int) Math.min(134217728L, taskMemoryManager.pageSizeBytes()), taskMemoryManager.getTungstenMemoryMode());
        this.allocatedPages = new LinkedList<>();
        this.spills = new LinkedList<>();
        this.currentPage = null;
        this.pageCursor = -1L;
        this.taskMemoryManager = taskMemoryManager;
        this.blockManager = blockManager;
        this.taskContext = taskContext;
        this.numPartitions = i2;
        this.fileBufferSizeBytes = ((int) sparkConf.getSizeAsKb("spark.shuffle.file.buffer", "32k")) * 1024;
        this.numElementsForSpillThreshold = sparkConf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", 1073741824L);
        this.writeMetrics = shuffleWriteMetrics;
        this.inMemSorter = new ShuffleInMemorySorter(this, i, sparkConf.getBoolean("spark.shuffle.sort.useRadixSort", true));
        this.peakMemoryUsedBytes = getMemoryUsage();
    }

    private void writeSortedFile(boolean z) throws IOException {
        ShuffleWriteMetrics shuffleWriteMetrics = z ? this.writeMetrics : new ShuffleWriteMetrics();
        ShuffleInMemorySorter.ShuffleSorterIterator sortedIterator = this.inMemSorter.getSortedIterator();
        byte[] bArr = new byte[1048576];
        Tuple2<TempShuffleBlockId, File> createTempShuffleBlock = this.blockManager.diskBlockManager().createTempShuffleBlock();
        File mo8827_2 = createTempShuffleBlock.mo8827_2();
        TempShuffleBlockId mo8828_1 = createTempShuffleBlock.mo8828_1();
        SpillInfo spillInfo = new SpillInfo(this.numPartitions, mo8827_2, mo8828_1);
        DiskBlockObjectWriter diskWriter = this.blockManager.getDiskWriter(mo8828_1, mo8827_2, DummySerializerInstance.INSTANCE, this.fileBufferSizeBytes, shuffleWriteMetrics);
        int i = -1;
        while (sortedIterator.hasNext()) {
            sortedIterator.loadNext();
            int partitionId = sortedIterator.packedRecordPointer.getPartitionId();
            if (!$assertionsDisabled && partitionId < i) {
                throw new AssertionError();
            }
            if (partitionId != i) {
                if (i != -1) {
                    spillInfo.partitionLengths[i] = diskWriter.commitAndGet().length();
                }
                i = partitionId;
            }
            long recordPointer = sortedIterator.packedRecordPointer.getRecordPointer();
            Object page = this.taskMemoryManager.getPage(recordPointer);
            long offsetInPage = this.taskMemoryManager.getOffsetInPage(recordPointer);
            int i2 = Platform.getInt(page, offsetInPage);
            long j = offsetInPage + 4;
            while (i2 > 0) {
                int min = Math.min(1048576, i2);
                Platform.copyMemory(page, j, bArr, Platform.BYTE_ARRAY_OFFSET, min);
                diskWriter.write(bArr, 0, min);
                j += min;
                i2 -= min;
            }
            diskWriter.recordWritten();
        }
        FileSegment commitAndGet = diskWriter.commitAndGet();
        diskWriter.close();
        if (i != -1) {
            spillInfo.partitionLengths[i] = commitAndGet.length();
            this.spills.add(spillInfo);
        }
        if (z) {
            return;
        }
        this.writeMetrics.incRecordsWritten(shuffleWriteMetrics.recordsWritten());
        this.taskContext.taskMetrics().incDiskBytesSpilled(shuffleWriteMetrics.bytesWritten());
    }

    @Override // org.apache.spark.memory.MemoryConsumer
    public long spill(long j, MemoryConsumer memoryConsumer) throws IOException {
        if (memoryConsumer != this || this.inMemSorter == null || this.inMemSorter.numRecords() == 0) {
            return 0L;
        }
        Logger logger2 = logger;
        Object[] objArr = new Object[4];
        objArr[0] = Long.valueOf(Thread.currentThread().getId());
        objArr[1] = Utils.bytesToString(getMemoryUsage());
        objArr[2] = Integer.valueOf(this.spills.size());
        objArr[3] = this.spills.size() > 1 ? " times" : " time";
        logger2.info("Thread {} spilling sort data of {} to disk ({} {} so far)", objArr);
        writeSortedFile(false);
        long freeMemory = freeMemory();
        this.inMemSorter.reset();
        this.taskContext.taskMetrics().incMemoryBytesSpilled(freeMemory);
        return freeMemory;
    }

    private long getMemoryUsage() {
        long j = 0;
        Iterator<MemoryBlock> it = this.allocatedPages.iterator();
        while (it.hasNext()) {
            j += it.next().size();
        }
        return (this.inMemSorter == null ? 0L : this.inMemSorter.getMemoryUsage()) + j;
    }

    private void updatePeakMemoryUsed() {
        long memoryUsage = getMemoryUsage();
        if (memoryUsage > this.peakMemoryUsedBytes) {
            this.peakMemoryUsedBytes = memoryUsage;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long getPeakMemoryUsedBytes() {
        updatePeakMemoryUsed();
        return this.peakMemoryUsedBytes;
    }

    private long freeMemory() {
        updatePeakMemoryUsed();
        long j = 0;
        Iterator<MemoryBlock> it = this.allocatedPages.iterator();
        while (it.hasNext()) {
            MemoryBlock next = it.next();
            j += next.size();
            freePage(next);
        }
        this.allocatedPages.clear();
        this.currentPage = null;
        this.pageCursor = 0L;
        return j;
    }

    public void cleanupResources() {
        freeMemory();
        if (this.inMemSorter != null) {
            this.inMemSorter.free();
            this.inMemSorter = null;
        }
        Iterator<SpillInfo> it = this.spills.iterator();
        while (it.hasNext()) {
            SpillInfo next = it.next();
            if (next.file.exists() && !next.file.delete()) {
                logger.error("Unable to delete spill file {}", next.file.getPath());
            }
        }
    }

    private void growPointerArrayIfNecessary() throws IOException {
        if (!$assertionsDisabled && this.inMemSorter == null) {
            throw new AssertionError();
        }
        if (this.inMemSorter.hasSpaceForAnotherRecord()) {
            return;
        }
        try {
            LongArray allocateArray = allocateArray((this.inMemSorter.getMemoryUsage() / 8) * 2);
            if (this.inMemSorter.hasSpaceForAnotherRecord()) {
                freeArray(allocateArray);
            } else {
                this.inMemSorter.expandPointerArray(allocateArray);
            }
        } catch (OutOfMemoryError e) {
            if (this.inMemSorter.hasSpaceForAnotherRecord()) {
                return;
            }
            logger.error("Unable to grow the pointer array");
            throw e;
        }
    }

    private void acquireNewPageIfNecessary(int i) {
        if (this.currentPage == null || this.pageCursor + i > this.currentPage.getBaseOffset() + this.currentPage.size()) {
            this.currentPage = allocatePage(i);
            this.pageCursor = this.currentPage.getBaseOffset();
            this.allocatedPages.add(this.currentPage);
        }
    }

    public void insertRecord(Object obj, long j, int i, int i2) throws IOException {
        if (!$assertionsDisabled && this.inMemSorter == null) {
            throw new AssertionError();
        }
        if (this.inMemSorter.numRecords() >= this.numElementsForSpillThreshold) {
            logger.info("Spilling data because number of spilledRecords crossed the threshold " + this.numElementsForSpillThreshold);
            spill();
        }
        growPointerArrayIfNecessary();
        acquireNewPageIfNecessary(i + 4);
        if (!$assertionsDisabled && this.currentPage == null) {
            throw new AssertionError();
        }
        Object baseObject = this.currentPage.getBaseObject();
        long encodePageNumberAndOffset = this.taskMemoryManager.encodePageNumberAndOffset(this.currentPage, this.pageCursor);
        Platform.putInt(baseObject, this.pageCursor, i);
        this.pageCursor += 4;
        Platform.copyMemory(obj, j, baseObject, this.pageCursor, i);
        this.pageCursor += i;
        this.inMemSorter.insertRecord(encodePageNumberAndOffset, i2);
    }

    public SpillInfo[] closeAndGetSpills() throws IOException {
        try {
            if (this.inMemSorter != null) {
                writeSortedFile(true);
                freeMemory();
                this.inMemSorter.free();
                this.inMemSorter = null;
            }
            return (SpillInfo[]) this.spills.toArray(new SpillInfo[this.spills.size()]);
        } catch (IOException e) {
            cleanupResources();
            throw e;
        }
    }

    static {
        $assertionsDisabled = !ShuffleExternalSorter.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger((Class<?>) ShuffleExternalSorter.class);
    }
}
