/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.sort;

import java.util.ArrayList;
import java.util.PriorityQueue;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
import org.apache.flink.table.runtime.generated.RecordComparator;
import org.apache.flink.table.runtime.operators.TableStreamOperator;
import org.apache.flink.table.runtime.typeutils.AbstractRowSerializer;
import org.apache.flink.table.runtime.util.StreamRecordCollector;
import org.apache.flink.util.Collector;

public class SortLimitOperator
extends TableStreamOperator<BaseRow>
implements OneInputStreamOperator<BaseRow, BaseRow>,
BoundedOneInput {
    private final boolean isGlobal;
    private final long limitStart;
    private final long limitEnd;
    private GeneratedRecordComparator genComparator;
    private transient PriorityQueue<BaseRow> heap;
    private transient Collector<BaseRow> collector;
    private transient RecordComparator comparator;
    private transient AbstractRowSerializer<BaseRow> inputSer;

    public SortLimitOperator(boolean isGlobal, long limitStart, long limitEnd, GeneratedRecordComparator genComparator) {
        this.isGlobal = isGlobal;
        this.limitStart = limitStart;
        this.limitEnd = limitEnd;
        this.genComparator = genComparator;
    }

    public void open() throws Exception {
        super.open();
        this.inputSer = (AbstractRowSerializer)this.getOperatorConfig().getTypeSerializerIn1(this.getUserCodeClassloader());
        this.comparator = (RecordComparator)this.genComparator.newInstance(this.getUserCodeClassloader());
        this.genComparator = null;
        this.heap = new PriorityQueue((int)this.limitEnd, (o1, o2) -> this.comparator.compare((BaseRow)o2, (BaseRow)o1));
        this.collector = new StreamRecordCollector<BaseRow>(this.output);
    }

    public void processElement(StreamRecord<BaseRow> element) throws Exception {
        BaseRow record = (BaseRow)element.getValue();
        if ((long)this.heap.size() >= this.limitEnd) {
            BaseRow peek = this.heap.peek();
            if (this.comparator.compare(peek, record) > 0) {
                this.heap.poll();
                this.heap.add((BaseRow)this.inputSer.copy(record));
            }
        } else {
            this.heap.add((BaseRow)this.inputSer.copy(record));
        }
    }

    public void endInput() throws Exception {
        if (this.isGlobal) {
            ArrayList<BaseRow> list = new ArrayList<BaseRow>(this.heap);
            list.sort((o1, o2) -> this.comparator.compare((BaseRow)o1, (BaseRow)o2));
            int maxIndex = (int)Math.min(this.limitEnd, (long)list.size());
            for (int i = (int)this.limitStart; i < maxIndex; ++i) {
                this.collector.collect(list.get(i));
            }
        } else {
            for (BaseRow row : this.heap) {
                this.collector.collect((Object)row);
            }
        }
    }
}

