package org.apache.flink.table.runtime.operators.rank;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.ListTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
import org.apache.flink.table.runtime.keyselector.BaseRowKeySelector;
import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo;
import org.apache.flink.table.runtime.util.LRUMap;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/rank/AppendOnlyTopNFunction.class */
public class AppendOnlyTopNFunction extends AbstractTopNFunction {
    private static final long serialVersionUID = -4708453213104128010L;
    private static final Logger LOG = LoggerFactory.getLogger(AppendOnlyTopNFunction.class);
    private final BaseRowTypeInfo sortKeyType;
    private final TypeSerializer<BaseRow> inputRowSer;
    private final long cacheSize;
    private transient MapState<BaseRow, List<BaseRow>> dataState;
    private transient TopNBuffer buffer;
    private transient Map<BaseRow, TopNBuffer> kvSortedMap;

    public AppendOnlyTopNFunction(long j, long j2, BaseRowTypeInfo baseRowTypeInfo, GeneratedRecordComparator generatedRecordComparator, BaseRowKeySelector baseRowKeySelector, RankType rankType, RankRange rankRange, boolean z, boolean z2, long j3) {
        super(j, j2, baseRowTypeInfo, generatedRecordComparator, baseRowKeySelector, rankType, rankRange, z, z2);
        this.sortKeyType = baseRowKeySelector.mo5010getProducedType();
        this.inputRowSer = baseRowTypeInfo.m5070createSerializer(new ExecutionConfig());
        this.cacheSize = j3;
    }

    @Override // org.apache.flink.table.runtime.operators.rank.AbstractTopNFunction
    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        int max = Math.max(1, (int) (this.cacheSize / getDefaultTopNSize()));
        this.kvSortedMap = new LRUMap(max);
        LOG.info("Top{} operator is using LRU caches key-size: {}", Long.valueOf(getDefaultTopNSize()), Integer.valueOf(max));
        this.dataState = getRuntimeContext().getMapState(new MapStateDescriptor("data-state-with-append", this.sortKeyType, new ListTypeInfo(this.inputRowType)));
        registerMetric(this.kvSortedMap.size() * getDefaultTopNSize());
    }

    public void processElement(BaseRow baseRow, KeyedProcessFunction<BaseRow, BaseRow, BaseRow>.Context context, Collector<BaseRow> collector) throws Exception {
        registerProcessingCleanupTimer(context, context.timerService().currentProcessingTime());
        initHeapStates();
        initRankEnd(baseRow);
        BaseRow baseRow2 = (BaseRow) this.sortKeySelector.getKey(baseRow);
        if (checkSortKeyInBufferRange(baseRow2, this.buffer)) {
            this.buffer.put(baseRow2, (BaseRow) this.inputRowSer.copy(baseRow));
            this.dataState.put(baseRow2, (List) this.buffer.get(baseRow2));
            if (this.outputRankNumber || hasOffset()) {
                processElementWithRowNumber(baseRow2, baseRow, collector);
            } else {
                processElementWithoutRowNumber(baseRow, collector);
            }
        }
    }

    public void onTimer(long j, KeyedProcessFunction<BaseRow, BaseRow, BaseRow>.OnTimerContext onTimerContext, Collector<BaseRow> collector) throws Exception {
        if (this.stateCleaningEnabled) {
            this.kvSortedMap.remove(this.keyContext.getCurrentKey());
            cleanupState(this.dataState);
        }
    }

    private void initHeapStates() throws Exception {
        this.requestCount++;
        BaseRow baseRow = (BaseRow) this.keyContext.getCurrentKey();
        this.buffer = this.kvSortedMap.get(baseRow);
        if (this.buffer != null) {
            this.hitCount++;
            return;
        }
        this.buffer = new TopNBuffer(this.sortKeyComparator, ArrayList::new);
        this.kvSortedMap.put(baseRow, this.buffer);
        Iterator it = this.dataState.iterator();
        if (it != null) {
            while (it.hasNext()) {
                Map.Entry entry = (Map.Entry) it.next();
                this.buffer.putAll((BaseRow) entry.getKey(), (List) entry.getValue());
            }
        }
    }

    private void processElementWithRowNumber(BaseRow baseRow, BaseRow baseRow2, Collector<BaseRow> collector) throws Exception {
        Iterator<Map.Entry<BaseRow, Collection<BaseRow>>> it = this.buffer.entrySet().iterator();
        long j = 0;
        boolean z = false;
        while (it.hasNext() && isInRankEnd(j)) {
            Map.Entry<BaseRow, Collection<BaseRow>> next = it.next();
            Collection<BaseRow> value = next.getValue();
            if (!z && next.getKey().equals(baseRow)) {
                j += value.size();
                collect(collector, baseRow2, j);
                z = true;
            } else if (z) {
                Iterator<BaseRow> it2 = value.iterator();
                while (it2.hasNext() && isInRankEnd(j)) {
                    j++;
                    BaseRow next2 = it2.next();
                    retract(collector, next2, j - 1);
                    collect(collector, next2, j);
                }
            } else {
                j += value.size();
            }
        }
        ArrayList arrayList = new ArrayList();
        while (it.hasNext()) {
            BaseRow key = it.next().getKey();
            this.dataState.remove(key);
            arrayList.add(key);
        }
        Iterator it3 = arrayList.iterator();
        while (it3.hasNext()) {
            this.buffer.removeAll((BaseRow) it3.next());
        }
    }

    private void processElementWithoutRowNumber(BaseRow baseRow, Collector<BaseRow> collector) throws Exception {
        if (this.buffer.getCurrentTopNum() > this.rankEnd) {
            Map.Entry<BaseRow, Collection<BaseRow>> lastEntry = this.buffer.lastEntry();
            BaseRow key = lastEntry.getKey();
            List list = (List) lastEntry.getValue();
            BaseRow baseRow2 = (BaseRow) list.remove(list.size() - 1);
            if (list.isEmpty()) {
                this.buffer.removeAll(key);
                this.dataState.remove(key);
            } else {
                this.dataState.put(key, list);
            }
            if (baseRow.equals(baseRow2)) {
                return;
            } else {
                delete(collector, baseRow2);
            }
        }
        collect(collector, baseRow);
    }

    public /* bridge */ /* synthetic */ void processElement(Object obj, KeyedProcessFunction.Context context, Collector collector) throws Exception {
        processElement((BaseRow) obj, (KeyedProcessFunction<BaseRow, BaseRow, BaseRow>.Context) context, (Collector<BaseRow>) collector);
    }
}
