package org.apache.flink.table.runtime.operators.sort;

import java.util.ArrayList;
import java.util.HashMap;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.data.util.RowDataUtil;
import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
import org.apache.flink.table.runtime.generated.RecordComparator;
import org.apache.flink.table.runtime.operators.TableStreamOperator;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
import org.apache.flink.table.runtime.util.StreamRecordCollector;
import org.apache.flink.types.RowKind;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/sort/StreamSortOperator.class */
public class StreamSortOperator extends TableStreamOperator<RowData> implements OneInputStreamOperator<RowData, RowData> {
    private static final long serialVersionUID = 9042068324817807379L;
    private static final Logger LOG = LoggerFactory.getLogger(StreamSortOperator.class);
    private final RowDataTypeInfo inputRowType;
    private GeneratedRecordComparator gComparator;
    private transient RecordComparator comparator;
    private transient RowDataSerializer rowDataSerializer;
    private transient StreamRecordCollector<RowData> collector;
    private transient ListState<Tuple2<RowData, Long>> bufferState;
    private transient HashMap<RowData, Long> inputBuffer;

    public StreamSortOperator(RowDataTypeInfo rowDataTypeInfo, GeneratedRecordComparator generatedRecordComparator) {
        this.inputRowType = rowDataTypeInfo;
        this.gComparator = generatedRecordComparator;
    }

    public void open() throws Exception {
        super.open();
        LOG.info("Opening StreamSortOperator");
        this.rowDataSerializer = this.inputRowType.m5288createSerializer(getExecutionConfig());
        this.comparator = this.gComparator.newInstance(getContainingTask().getUserCodeClassLoader());
        this.gComparator = null;
        this.collector = new StreamRecordCollector<>(this.output);
        this.inputBuffer = new HashMap<>();
        if (this.bufferState != null) {
            ((Iterable) this.bufferState.get()).forEach(tuple2 -> {
            });
        }
    }

    public void processElement(StreamRecord<RowData> streamRecord) throws Exception {
        RowData rowData = (RowData) streamRecord.getValue();
        BinaryRowData copy = this.rowDataSerializer.toBinaryRow(rowData).copy();
        copy.setRowKind(RowKind.INSERT);
        long longValue = this.inputBuffer.getOrDefault(copy, 0L).longValue();
        if (RowDataUtil.isAccumulateMsg(rowData)) {
            this.inputBuffer.put(copy, Long.valueOf(longValue + 1));
        } else {
            if (longValue == 0) {
                throw new RuntimeException("RowData not exist!");
            }
            if (longValue == 1) {
                this.inputBuffer.remove(copy);
            } else {
                this.inputBuffer.put(copy, Long.valueOf(longValue - 1));
            }
        }
    }

    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        super.initializeState(stateInitializationContext);
        this.bufferState = stateInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("localBufferState", new TupleTypeInfo(new TypeInformation[]{this.inputRowType, Types.LONG})));
    }

    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        super.snapshotState(stateSnapshotContext);
        this.bufferState.clear();
        ArrayList arrayList = new ArrayList(this.inputBuffer.size());
        this.inputBuffer.forEach((rowData, l) -> {
            arrayList.add(Tuple2.of(rowData, l));
        });
        this.bufferState.addAll(arrayList);
    }

    @Override // org.apache.flink.table.runtime.operators.TableStreamOperator
    public void close() throws Exception {
        LOG.info("Closing StreamSortOperator");
        if (!this.inputBuffer.isEmpty()) {
            ArrayList arrayList = new ArrayList();
            arrayList.addAll(this.inputBuffer.keySet());
            arrayList.sort(this.comparator);
            arrayList.forEach(rowData -> {
                long longValue = this.inputBuffer.get(rowData).longValue();
                for (int i = 1; i <= longValue; i++) {
                    this.collector.collect(rowData);
                }
            });
        }
        super.close();
    }
}
