package org.apache.paimon.flink.sorter;

import java.util.ArrayList;
import java.util.List;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.runtime.operators.TableStreamOperator;
import org.apache.paimon.codegen.CodeGenUtils;
import org.apache.paimon.codegen.NormalizedKeyComputer;
import org.apache.paimon.codegen.RecordComparator;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.BinaryRowSerializer;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.data.serializer.InternalSerializers;
import org.apache.paimon.disk.IOManagerImpl;
import org.apache.paimon.memory.HeapMemorySegmentPool;
import org.apache.paimon.sort.BinaryExternalSortBuffer;
import org.apache.paimon.sort.BinaryInMemorySortBuffer;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.MutableObjectIterator;

/* loaded from: input_file:org/apache/paimon/flink/sorter/SortOperator.class */
public class SortOperator extends TableStreamOperator<InternalRow> implements OneInputStreamOperator<InternalRow, InternalRow>, BoundedOneInput {
    private final RowType keyRowType;
    private final RowType valueRowType;
    private final long maxMemory;
    private final int pageSize;
    private final int arity;
    private transient BinaryExternalSortBuffer buffer;

    public SortOperator(RowType rowType, RowType rowType2, long j, int i) {
        this.keyRowType = rowType;
        this.valueRowType = rowType2;
        this.maxMemory = j;
        this.pageSize = i;
        this.arity = rowType.getFieldCount() + rowType2.getFieldCount();
    }

    public void open() throws Exception {
        super.open();
        List<DataField> fields = this.keyRowType.getFields();
        List<DataField> fields2 = this.valueRowType.getFields();
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(fields);
        arrayList.addAll(fields2);
        RowType rowType = new RowType(arrayList);
        InternalRowSerializer create = InternalSerializers.create(rowType);
        NormalizedKeyComputer newNormalizedKeyComputer = CodeGenUtils.newNormalizedKeyComputer(rowType.getFieldTypes(), "MemTableKeyComputer");
        RecordComparator newRecordComparator = CodeGenUtils.newRecordComparator(rowType.getFieldTypes(), "MemTableComparator");
        HeapMemorySegmentPool heapMemorySegmentPool = new HeapMemorySegmentPool(this.maxMemory, this.pageSize);
        BinaryInMemorySortBuffer createBuffer = BinaryInMemorySortBuffer.createBuffer(newNormalizedKeyComputer, create, newRecordComparator, heapMemorySegmentPool);
        Configuration jobConfiguration = getContainingTask().getJobConfiguration();
        this.buffer = new BinaryExternalSortBuffer(new BinaryRowSerializer(create.getArity()), newRecordComparator, heapMemorySegmentPool.pageSize(), createBuffer, new IOManagerImpl(IOManagerImpl.splitPaths((String) jobConfiguration.get(CoreOptions.TMP_DIRS))), jobConfiguration.getInteger(ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES));
    }

    public void endInput() throws Exception {
        if (this.buffer.size() <= 0) {
            return;
        }
        MutableObjectIterator<BinaryRow> sortedIterator = this.buffer.sortedIterator();
        BinaryRow binaryRow = new BinaryRow(this.arity);
        while (true) {
            BinaryRow next = sortedIterator.next(binaryRow);
            binaryRow = next;
            if (next == null) {
                return;
            } else {
                this.output.collect(new StreamRecord(binaryRow));
            }
        }
    }

    public void processElement(StreamRecord<InternalRow> streamRecord) throws Exception {
        this.buffer.write((InternalRow) streamRecord.getValue());
    }
}
