/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.sql.comet.execution.shuffle;

import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.comet.CometConf$;
import org.apache.comet.Native;
import org.apache.comet.shaded.guava.annotations.VisibleForTesting;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.serializer.SerializationStream;
import org.apache.spark.serializer.SerializerInstance;
import org.apache.spark.shuffle.ShuffleWriteMetricsReporter;
import org.apache.spark.shuffle.comet.CometShuffleMemoryAllocator;
import org.apache.spark.shuffle.sort.RowPartition;
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
import org.apache.spark.sql.comet.execution.shuffle.ExposedByteArrayOutputStream;
import org.apache.spark.sql.comet.execution.shuffle.SpillWriter;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.storage.FileSegment;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.UnsafeAlignedOffset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;

public final class CometDiskBlockWriter {
    private static final Logger logger = LoggerFactory.getLogger(CometDiskBlockWriter.class);
    private static final ClassTag<Object> OBJECT_CLASS_TAG = ClassTag$.MODULE$.Object();
    private static final LinkedList<CometDiskBlockWriter> currentWriters = new LinkedList();
    private ConcurrentLinkedQueue<Future<Void>> asyncSpillTasks = new ConcurrentLinkedQueue();
    private final LinkedList<ArrowIPCWriter> spillingWriters = new LinkedList();
    private final TaskContext taskContext;
    @VisibleForTesting
    static final int DEFAULT_INITIAL_SER_BUFFER_SIZE = 0x100000;
    static final int MAXIMUM_PAGE_SIZE_BYTES = 0x8000000;
    private final CometShuffleMemoryAllocator allocator;
    private final SerializerInstance serializer;
    private final Native nativeLib;
    private final int uaoSize = UnsafeAlignedOffset.getUaoSize();
    private final StructType schema;
    private final ShuffleWriteMetricsReporter writeMetrics;
    private final File file;
    private long totalWritten = 0L;
    private boolean initialized = false;
    private final int columnarBatchSize;
    private final boolean isAsync;
    private final int asyncThreadNum;
    private final ExecutorService threadPool;
    private final int numElementsForSpillThreshold;
    private final double preferDictionaryRatio;
    private ArrowIPCWriter activeWriter;
    private boolean spilling = false;
    private ExposedByteArrayOutputStream serBuffer;
    private SerializationStream serOutputStream;
    private long outputRecords = 0L;
    private long insertRecords = 0L;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    CometDiskBlockWriter(File file, TaskMemoryManager taskMemoryManager, TaskContext taskContext, SerializerInstance serializer, StructType schema, ShuffleWriteMetricsReporter writeMetrics, SparkConf conf, boolean isAsync, int asyncThreadNum, ExecutorService threadPool) {
        this.allocator = CometShuffleMemoryAllocator.getInstance(conf, taskMemoryManager, Math.min(0x8000000L, taskMemoryManager.pageSizeBytes()));
        this.nativeLib = new Native();
        this.taskContext = taskContext;
        this.serializer = serializer;
        this.schema = schema;
        this.writeMetrics = writeMetrics;
        this.file = file;
        this.isAsync = isAsync;
        this.asyncThreadNum = asyncThreadNum;
        this.threadPool = threadPool;
        this.columnarBatchSize = (Integer)CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_BATCH_SIZE().get();
        this.numElementsForSpillThreshold = (Integer)CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_SPILL_THRESHOLD().get();
        this.preferDictionaryRatio = (Double)CometConf$.MODULE$.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO().get();
        this.activeWriter = new ArrowIPCWriter();
        LinkedList<CometDiskBlockWriter> linkedList = currentWriters;
        synchronized (linkedList) {
            currentWriters.add(this);
        }
    }

    public void setChecksumAlgo(String checksumAlgo) {
        this.activeWriter.setChecksumAlgo(checksumAlgo);
    }

    public void setChecksum(long checksum) {
        this.activeWriter.setChecksum(checksum);
    }

    public long getChecksum() {
        return this.activeWriter.getChecksum();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doSpill(boolean forceSync) throws IOException {
        if (this.spilling || this.activeWriter.numRecords() == 0) {
            return;
        }
        this.spilling = true;
        if (this.isAsync && !forceSync) {
            block3: while (this.asyncSpillTasks.size() == this.asyncThreadNum) {
                for (Future<Void> task : this.asyncSpillTasks) {
                    if (!task.isDone()) continue;
                    this.asyncSpillTasks.remove(task);
                    continue block3;
                }
            }
            final ArrowIPCWriter spillingWriter = this.activeWriter;
            this.activeWriter = new ArrowIPCWriter();
            this.spillingWriters.add(spillingWriter);
            this.asyncSpillTasks.add(this.threadPool.submit(new Runnable(){

                @Override
                public void run() {
                    try {
                        long written = spillingWriter.doSpilling(false);
                        CometDiskBlockWriter.this.totalWritten += written;
                    }
                    catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                    finally {
                        spillingWriter.freeMemory();
                        CometDiskBlockWriter.this.spillingWriters.remove(spillingWriter);
                    }
                }
            }, null));
        } else {
            CometDiskBlockWriter cometDiskBlockWriter = this;
            synchronized (cometDiskBlockWriter) {
                this.totalWritten += this.activeWriter.doSpilling(false);
                this.activeWriter.freeMemory();
            }
        }
        this.spilling = false;
    }

    public long getOutputRecords() {
        return this.outputRecords;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void insertRow(UnsafeRow row, int partitionId) throws IOException {
        ++this.insertRecords;
        if (!this.initialized) {
            this.serBuffer = new ExposedByteArrayOutputStream(0x100000);
            this.serOutputStream = this.serializer.serializeStream((OutputStream)this.serBuffer);
            this.initialized = true;
        }
        this.serBuffer.reset();
        this.serOutputStream.writeKey((Object)partitionId, OBJECT_CLASS_TAG);
        this.serOutputStream.writeValue((Object)row, OBJECT_CLASS_TAG);
        this.serOutputStream.flush();
        int serializedRecordSize = this.serBuffer.size();
        assert (serializedRecordSize > 0);
        CometDiskBlockWriter cometDiskBlockWriter = this;
        synchronized (cometDiskBlockWriter) {
            int required;
            if (this.activeWriter.numRecords() >= this.numElementsForSpillThreshold || this.activeWriter.numRecords() >= this.columnarBatchSize) {
                int threshold = Math.min(this.numElementsForSpillThreshold, this.columnarBatchSize);
                logger.info("Spilling data because number of spilledRecords crossed the threshold " + threshold);
                this.doSpill(false);
                if (this.activeWriter.numRecords() != 0) {
                    throw new RuntimeException("activeWriter.numRecords()(" + this.activeWriter.numRecords() + ") != 0");
                }
            }
            if (!this.activeWriter.acquireNewPageIfNecessary(required = serializedRecordSize + this.uaoSize)) {
                this.activeWriter.initialCurrentPage(required);
            }
            this.activeWriter.insertRecord(this.serBuffer.getBuf(), Platform.BYTE_ARRAY_OFFSET, serializedRecordSize);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    FileSegment close() throws IOException {
        if (this.isAsync) {
            for (Future<Void> task : this.asyncSpillTasks) {
                try {
                    task.get();
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        }
        this.totalWritten += this.activeWriter.doSpilling(true);
        if (this.outputRecords != this.insertRecords) {
            throw new RuntimeException("outputRecords(" + this.outputRecords + ") != insertRecords(" + this.insertRecords + "). Please file a bug report.");
        }
        this.serBuffer = null;
        this.serOutputStream = null;
        this.activeWriter.freeMemory();
        LinkedList<CometDiskBlockWriter> linkedList = currentWriters;
        synchronized (linkedList) {
            currentWriters.remove(this);
        }
        return new FileSegment(this.file, 0L, this.totalWritten);
    }

    File getFile() {
        return this.file;
    }

    long getActiveMemoryUsage() {
        return this.activeWriter.getMemoryUsage();
    }

    void freeMemory() {
        for (ArrowIPCWriter writer : this.spillingWriters) {
            writer.freeMemory();
        }
        this.activeWriter.freeMemory();
    }

    class ArrowIPCWriter
    extends SpillWriter {
        private final RowPartition rowPartition;

        ArrowIPCWriter() {
            this.rowPartition = new RowPartition(CometDiskBlockWriter.this.columnarBatchSize);
            this.allocatedPages = new LinkedList();
            this.allocator = CometDiskBlockWriter.this.allocator;
            this.nativeLib = CometDiskBlockWriter.this.nativeLib;
            this.dataTypes = this.serializeSchema(CometDiskBlockWriter.this.schema);
        }

        void insertRecord(Object recordBase, long recordOffset, int length) {
            Object base = this.currentPage.getBaseObject();
            long recordAddress = this.allocator.encodePageNumberAndOffset(this.currentPage, this.pageCursor);
            this.rowPartition.addRow(this.allocator.getOffsetInPage(recordAddress) + (long)CometDiskBlockWriter.this.uaoSize + 4L, length - 4);
            UnsafeAlignedOffset.putSize((Object)base, (long)this.pageCursor, (int)length);
            this.pageCursor += (long)CometDiskBlockWriter.this.uaoSize;
            Platform.copyMemory((Object)recordBase, (long)recordOffset, (Object)base, (long)this.pageCursor, (long)length);
            this.pageCursor += (long)length;
        }

        int numRecords() {
            return this.rowPartition.getNumRows();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        long doSpilling(boolean isLast) throws IOException {
            long written;
            Object writeMetricsToUse = isLast ? CometDiskBlockWriter.this.writeMetrics : new ShuffleWriteMetrics();
            File file = CometDiskBlockWriter.this.file;
            synchronized (file) {
                CometDiskBlockWriter.this.outputRecords += (long)this.rowPartition.getNumRows();
                written = this.doSpilling(this.dataTypes, CometDiskBlockWriter.this.file, this.rowPartition, (ShuffleWriteMetricsReporter)writeMetricsToUse, CometDiskBlockWriter.this.preferDictionaryRatio);
            }
            file = CometDiskBlockWriter.this.writeMetrics;
            synchronized (file) {
                if (!isLast) {
                    CometDiskBlockWriter.this.writeMetrics.incRecordsWritten(((ShuffleWriteMetrics)writeMetricsToUse).recordsWritten());
                    CometDiskBlockWriter.this.taskContext.taskMetrics().incDiskBytesSpilled(((ShuffleWriteMetrics)writeMetricsToUse).bytesWritten());
                }
            }
            return written;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        protected void spill(int required) throws IOException {
            LinkedList<CometDiskBlockWriter> linkedList = currentWriters;
            synchronized (linkedList) {
                Collections.sort(currentWriters, new Comparator<CometDiskBlockWriter>(){

                    @Override
                    public int compare(CometDiskBlockWriter lhs, CometDiskBlockWriter rhs) {
                        long lhsMemoryUsage = lhs.getActiveMemoryUsage();
                        long rhsMemoryUsage = rhs.getActiveMemoryUsage();
                        return Long.compare(rhsMemoryUsage, lhsMemoryUsage);
                    }
                });
                for (CometDiskBlockWriter writer : currentWriters) {
                    writer.doSpill(true);
                    if (this.allocator.getAvailableMemory() < (long)required) continue;
                    break;
                }
            }
        }
    }
}

