/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.rank;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.ListTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
import org.apache.flink.table.runtime.keyselector.BaseRowKeySelector;
import org.apache.flink.table.runtime.operators.rank.AbstractTopNFunction;
import org.apache.flink.table.runtime.operators.rank.RankRange;
import org.apache.flink.table.runtime.operators.rank.RankType;
import org.apache.flink.table.runtime.operators.rank.TopNBuffer;
import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo;
import org.apache.flink.table.runtime.util.LRUMap;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AppendOnlyTopNFunction
extends AbstractTopNFunction {
    private static final long serialVersionUID = -4708453213104128010L;
    private static final Logger LOG = LoggerFactory.getLogger(AppendOnlyTopNFunction.class);
    private final BaseRowTypeInfo sortKeyType;
    private final TypeSerializer<BaseRow> inputRowSer;
    private final long cacheSize;
    private transient MapState<BaseRow, List<BaseRow>> dataState;
    private transient TopNBuffer buffer;
    private transient Map<BaseRow, TopNBuffer> kvSortedMap;

    public AppendOnlyTopNFunction(long minRetentionTime, long maxRetentionTime, BaseRowTypeInfo inputRowType, GeneratedRecordComparator sortKeyGeneratedRecordComparator, BaseRowKeySelector sortKeySelector, RankType rankType, RankRange rankRange, boolean generateRetraction, boolean outputRankNumber, long cacheSize) {
        super(minRetentionTime, maxRetentionTime, inputRowType, sortKeyGeneratedRecordComparator, sortKeySelector, rankType, rankRange, generateRetraction, outputRankNumber);
        this.sortKeyType = sortKeySelector.getProducedType();
        this.inputRowSer = inputRowType.createSerializer(new ExecutionConfig());
        this.cacheSize = cacheSize;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        int lruCacheSize = Math.max(1, (int)(this.cacheSize / this.getDefaultTopNSize()));
        this.kvSortedMap = new LRUMap<BaseRow, TopNBuffer>(lruCacheSize);
        LOG.info("Top{} operator is using LRU caches key-size: {}", (Object)this.getDefaultTopNSize(), (Object)lruCacheSize);
        ListTypeInfo valueTypeInfo = new ListTypeInfo((TypeInformation)this.inputRowType);
        MapStateDescriptor mapStateDescriptor = new MapStateDescriptor("data-state-with-append", (TypeInformation)this.sortKeyType, (TypeInformation)valueTypeInfo);
        this.dataState = this.getRuntimeContext().getMapState(mapStateDescriptor);
        this.registerMetric((long)this.kvSortedMap.size() * this.getDefaultTopNSize());
    }

    public void processElement(BaseRow input, KeyedProcessFunction.Context context, Collector<BaseRow> out) throws Exception {
        long currentTime = context.timerService().currentProcessingTime();
        this.registerProcessingCleanupTimer(context, currentTime);
        this.initHeapStates();
        this.initRankEnd(input);
        BaseRow sortKey = (BaseRow)this.sortKeySelector.getKey((Object)input);
        if (this.checkSortKeyInBufferRange(sortKey, this.buffer)) {
            this.buffer.put(sortKey, (BaseRow)this.inputRowSer.copy((Object)input));
            Collection<BaseRow> inputs = this.buffer.get(sortKey);
            this.dataState.put((Object)sortKey, (Object)((List)inputs));
            if (this.outputRankNumber || this.hasOffset()) {
                this.processElementWithRowNumber(sortKey, input, out);
            } else {
                this.processElementWithoutRowNumber(input, out);
            }
        }
    }

    public void onTimer(long timestamp, KeyedProcessFunction.OnTimerContext ctx, Collector<BaseRow> out) throws Exception {
        if (this.stateCleaningEnabled) {
            this.kvSortedMap.remove(this.keyContext.getCurrentKey());
            this.cleanupState(new State[]{this.dataState});
        }
    }

    private void initHeapStates() throws Exception {
        ++this.requestCount;
        BaseRow currentKey = (BaseRow)this.keyContext.getCurrentKey();
        this.buffer = this.kvSortedMap.get(currentKey);
        if (this.buffer == null) {
            this.buffer = new TopNBuffer(this.sortKeyComparator, ArrayList::new);
            this.kvSortedMap.put(currentKey, this.buffer);
            Iterator iter = this.dataState.iterator();
            if (iter != null) {
                while (iter.hasNext()) {
                    Map.Entry entry = (Map.Entry)iter.next();
                    BaseRow sortKey = (BaseRow)entry.getKey();
                    List values = (List)entry.getValue();
                    this.buffer.putAll(sortKey, values);
                }
            }
        } else {
            ++this.hitCount;
        }
    }

    private void processElementWithRowNumber(BaseRow sortKey, BaseRow input, Collector<BaseRow> out) throws Exception {
        Iterator<Map.Entry<BaseRow, Collection<BaseRow>>> iterator = this.buffer.entrySet().iterator();
        long curRank = 0L;
        boolean findsSortKey = false;
        while (iterator.hasNext() && this.isInRankEnd(curRank)) {
            Map.Entry<BaseRow, Collection<BaseRow>> entry = iterator.next();
            Collection<BaseRow> records = entry.getValue();
            if (!findsSortKey && entry.getKey().equals(sortKey)) {
                this.collect(out, input, curRank += (long)records.size());
                findsSortKey = true;
                continue;
            }
            if (findsSortKey) {
                Iterator<BaseRow> recordsIter = records.iterator();
                while (recordsIter.hasNext() && this.isInRankEnd(curRank)) {
                    BaseRow prevRow = recordsIter.next();
                    this.retract(out, prevRow, ++curRank - 1L);
                    this.collect(out, prevRow, curRank);
                }
                continue;
            }
            curRank += (long)records.size();
        }
        ArrayList<BaseRow> toDeleteSortKeys = new ArrayList<BaseRow>();
        while (iterator.hasNext()) {
            Map.Entry<BaseRow, Collection<BaseRow>> entry = iterator.next();
            BaseRow key = entry.getKey();
            this.dataState.remove((Object)key);
            toDeleteSortKeys.add(key);
        }
        for (BaseRow toDeleteKey : toDeleteSortKeys) {
            this.buffer.removeAll(toDeleteKey);
        }
    }

    private void processElementWithoutRowNumber(BaseRow input, Collector<BaseRow> out) throws Exception {
        if ((long)this.buffer.getCurrentTopNum() > this.rankEnd) {
            Map.Entry<BaseRow, Collection<BaseRow>> lastEntry = this.buffer.lastEntry();
            BaseRow lastKey = lastEntry.getKey();
            List lastList = (List)lastEntry.getValue();
            BaseRow lastElement = (BaseRow)lastList.remove(lastList.size() - 1);
            if (lastList.isEmpty()) {
                this.buffer.removeAll(lastKey);
                this.dataState.remove((Object)lastKey);
            } else {
                this.dataState.put((Object)lastKey, (Object)lastList);
            }
            if (input.equals(lastElement)) {
                return;
            }
            this.delete(out, lastElement);
        }
        this.collect(out, input);
    }
}

