package org.apache.flink.table.runtime.operators.sort;

import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
import org.apache.flink.table.runtime.generated.RecordComparator;
import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/sort/ProcTimeSortOperator.class */
public class ProcTimeSortOperator extends BaseTemporalSortOperator {
    private static final long serialVersionUID = -2028983921907321193L;
    private static final Logger LOG = LoggerFactory.getLogger(ProcTimeSortOperator.class);
    private final BaseRowTypeInfo inputRowType;
    private GeneratedRecordComparator gComparator;
    private transient RecordComparator comparator;
    private transient List<BaseRow> sortBuffer;
    private transient ListState<BaseRow> dataState;

    public ProcTimeSortOperator(BaseRowTypeInfo baseRowTypeInfo, GeneratedRecordComparator generatedRecordComparator) {
        this.inputRowType = baseRowTypeInfo;
        this.gComparator = generatedRecordComparator;
    }

    @Override // org.apache.flink.table.runtime.operators.sort.BaseTemporalSortOperator
    public void open() throws Exception {
        super.open();
        LOG.info("Opening ProcTimeSortOperator");
        this.comparator = this.gComparator.newInstance(getContainingTask().getUserCodeClassLoader());
        this.gComparator = null;
        this.sortBuffer = new ArrayList();
        this.dataState = getRuntimeContext().getListState(new ListStateDescriptor("sortState", this.inputRowType));
    }

    public void processElement(StreamRecord<BaseRow> streamRecord) throws Exception {
        BaseRow baseRow = (BaseRow) streamRecord.getValue();
        long currentProcessingTime = this.timerService.currentProcessingTime();
        this.dataState.add(baseRow);
        this.timerService.registerProcessingTimeTimer(currentProcessingTime + 1);
    }

    public void onProcessingTime(InternalTimer<BaseRow, VoidNamespace> internalTimer) throws Exception {
        Iterable iterable = (Iterable) this.dataState.get();
        this.sortBuffer.clear();
        List<BaseRow> list = this.sortBuffer;
        list.getClass();
        iterable.forEach((v1) -> {
            r1.add(v1);
        });
        this.sortBuffer.sort(this.comparator);
        this.sortBuffer.forEach(baseRow -> {
            this.collector.collect(baseRow);
        });
        this.dataState.clear();
    }

    public void onEventTime(InternalTimer<BaseRow, VoidNamespace> internalTimer) throws Exception {
        throw new UnsupportedOperationException("Now Sort only is supported based processing time here!");
    }
}
