package org.apache.flink.table.runtime.operators.rank;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.TreeMap;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.util.LRUMap;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/rank/UpdatableTopNFunction.class */
public class UpdatableTopNFunction extends AbstractTopNFunction implements CheckpointedFunction {
    private static final long serialVersionUID = 6786508184355952780L;
    private static final Logger LOG = LoggerFactory.getLogger(UpdatableTopNFunction.class);
    private final InternalTypeInfo<RowData> rowKeyType;
    private final long cacheSize;
    private transient MapState<RowData, Tuple2<RowData, Integer>> dataState;
    private transient TopNBuffer buffer;
    private transient Map<RowData, TopNBuffer> kvSortedMap;
    private transient Map<RowData, RankRow> rowKeyMap;
    private transient LRUMap<RowData, Map<RowData, RankRow>> kvRowKeyMap;
    private final TypeSerializer<RowData> inputRowSer;
    private final KeySelector<RowData, RowData> rowKeySelector;

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/rank/UpdatableTopNFunction$CacheRemovalListener.class */
    private class CacheRemovalListener implements LRUMap.RemovalListener<RowData, Map<RowData, RankRow>> {
        private CacheRemovalListener() {
        }

        @Override // org.apache.flink.table.runtime.util.LRUMap.RemovalListener
        public void onRemoval(Map.Entry<RowData, Map<RowData, RankRow>> entry) {
            RowData rowData = (RowData) UpdatableTopNFunction.this.keyContext.getCurrentKey();
            RowData key = entry.getKey();
            Map<RowData, RankRow> value = entry.getValue();
            UpdatableTopNFunction.this.keyContext.setCurrentKey(key);
            UpdatableTopNFunction.this.kvSortedMap.remove(key);
            try {
                try {
                    UpdatableTopNFunction.this.flushBufferToState(value);
                    UpdatableTopNFunction.this.keyContext.setCurrentKey(rowData);
                } catch (Throwable th) {
                    UpdatableTopNFunction.LOG.error("Fail to synchronize state!", th);
                    throw new RuntimeException(th);
                }
            } catch (Throwable th2) {
                UpdatableTopNFunction.this.keyContext.setCurrentKey(rowData);
                throw th2;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/runtime/operators/rank/UpdatableTopNFunction$RankRow.class */
    public class RankRow {
        private final RowData row;
        private int innerRank;
        private boolean dirty;

        private RankRow(RowData rowData, int i, boolean z) {
            this.row = rowData;
            this.innerRank = i;
            this.dirty = z;
        }
    }

    public UpdatableTopNFunction(long j, long j2, InternalTypeInfo<RowData> internalTypeInfo, RowDataKeySelector rowDataKeySelector, GeneratedRecordComparator generatedRecordComparator, RowDataKeySelector rowDataKeySelector2, RankType rankType, RankRange rankRange, boolean z, boolean z2, long j3) {
        super(j, j2, internalTypeInfo, generatedRecordComparator, rowDataKeySelector2, rankType, rankRange, z, z2);
        this.rowKeyType = rowDataKeySelector.mo5466getProducedType();
        this.cacheSize = j3;
        this.inputRowSer = internalTypeInfo.createSerializer(new ExecutionConfig());
        this.rowKeySelector = rowDataKeySelector;
    }

    @Override // org.apache.flink.table.runtime.operators.rank.AbstractTopNFunction
    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        int max = Math.max(1, (int) (this.cacheSize / getDefaultTopNSize()));
        this.kvSortedMap = new HashMap(max);
        this.kvRowKeyMap = new LRUMap<>(max, new CacheRemovalListener());
        LOG.info("Top{} operator is using LRU caches key-size: {}", Long.valueOf(getDefaultTopNSize()), Integer.valueOf(max));
        this.dataState = getRuntimeContext().getMapState(new MapStateDescriptor("data-state-with-update", this.rowKeyType, new TupleTypeInfo(new TypeInformation[]{this.inputRowType, Types.INT})));
        registerMetric(this.kvSortedMap.size() * getDefaultTopNSize());
    }

    public void onTimer(long j, KeyedProcessFunction<RowData, RowData, RowData>.OnTimerContext onTimerContext, Collector<RowData> collector) throws Exception {
        if (this.stateCleaningEnabled) {
            RowData rowData = (RowData) this.keyContext.getCurrentKey();
            this.kvRowKeyMap.remove(rowData);
            this.kvSortedMap.remove(rowData);
            cleanupState(this.dataState);
        }
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
    }

    public void processElement(RowData rowData, KeyedProcessFunction<RowData, RowData, RowData>.Context context, Collector<RowData> collector) throws Exception {
        registerProcessingCleanupTimer(context, context.timerService().currentProcessingTime());
        initHeapStates();
        initRankEnd(rowData);
        if (this.outputRankNumber || hasOffset()) {
            processElementWithRowNumber(rowData, collector);
        } else {
            processElementWithoutRowNumber(rowData, collector);
        }
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        for (Map.Entry<RowData, Map<RowData, RankRow>> entry : this.kvRowKeyMap.entrySet()) {
            RowData key = entry.getKey();
            Map<RowData, RankRow> value = entry.getValue();
            this.keyContext.setCurrentKey(key);
            flushBufferToState(value);
        }
    }

    private void initHeapStates() throws Exception {
        this.requestCount++;
        RowData rowData = (RowData) this.keyContext.getCurrentKey();
        this.buffer = this.kvSortedMap.get(rowData);
        this.rowKeyMap = this.kvRowKeyMap.get(rowData);
        if (this.buffer != null) {
            this.hitCount++;
            return;
        }
        this.buffer = new TopNBuffer(this.sortKeyComparator, LinkedHashSet::new);
        this.rowKeyMap = new HashMap();
        this.kvSortedMap.put(rowData, this.buffer);
        this.kvRowKeyMap.put(rowData, this.rowKeyMap);
        Iterator it = this.dataState.iterator();
        if (it != null) {
            HashMap hashMap = new HashMap();
            while (it.hasNext()) {
                Map.Entry entry = (Map.Entry) it.next();
                RowData rowData2 = (RowData) entry.getKey();
                Tuple2 tuple2 = (Tuple2) entry.getValue();
                RowData rowData3 = (RowData) tuple2.f0;
                Integer num = (Integer) tuple2.f1;
                this.rowKeyMap.put(rowData2, new RankRow(rowData3, num.intValue(), false));
                RowData rowData4 = (RowData) this.sortKeySelector.getKey(rowData3);
                TreeMap treeMap = (TreeMap) hashMap.get(rowData4);
                if (treeMap == null) {
                    treeMap = new TreeMap();
                    hashMap.put(rowData4, treeMap);
                }
                treeMap.put(num, rowData2);
            }
            for (Map.Entry entry2 : hashMap.entrySet()) {
                RowData rowData5 = (RowData) entry2.getKey();
                TreeMap treeMap2 = (TreeMap) entry2.getValue();
                for (Map.Entry entry3 : treeMap2.entrySet()) {
                    Integer num2 = (Integer) entry3.getKey();
                    int put = this.buffer.put(rowData5, (RowData) entry3.getValue());
                    if (num2.intValue() != put) {
                        LOG.warn("Failed to build sorted map from state, this may result in wrong result. The sort key is {}, partition key is {}, treeMap is {}. The expected inner rank is {}, but current size is {}.", new Object[]{rowData5, rowData, treeMap2, num2, Integer.valueOf(put)});
                    }
                }
            }
        }
    }

    private void processElementWithRowNumber(RowData rowData, Collector<RowData> collector) throws Exception {
        RowData rowData2 = (RowData) this.sortKeySelector.getKey(rowData);
        RowData rowData3 = (RowData) this.rowKeySelector.getKey(rowData);
        if (!this.rowKeyMap.containsKey(rowData3)) {
            if (checkSortKeyInBufferRange(rowData2, this.buffer)) {
                this.rowKeyMap.put(rowData3, new RankRow((RowData) this.inputRowSer.copy(rowData), this.buffer.put(rowData2, rowData3), true));
                emitRecordsWithRowNumber(rowData2, rowData, collector);
                return;
            }
            return;
        }
        RankRow rankRow = this.rowKeyMap.get(rowData3);
        RowData rowData4 = (RowData) this.sortKeySelector.getKey(rankRow.row);
        if (rowData4.equals(rowData2)) {
            Tuple2<Integer, Integer> rowNumber = rowNumber(rowData2, rowData3, this.buffer);
            int intValue = ((Integer) rowNumber.f0).intValue();
            this.rowKeyMap.put(rowData3, new RankRow((RowData) this.inputRowSer.copy(rowData), ((Integer) rowNumber.f1).intValue(), true));
            collectUpdateBefore(collector, rankRow.row, intValue);
            collectUpdateAfter(collector, rowData, intValue);
            return;
        }
        int intValue2 = ((Integer) rowNumber(rowData4, rowData3, this.buffer).f0).intValue();
        this.buffer.remove(rowData4, rowData3);
        this.rowKeyMap.put(rowData3, new RankRow((RowData) this.inputRowSer.copy(rowData), this.buffer.put(rowData2, rowData3), true));
        updateInnerRank(rowData4);
        emitRecordsWithRowNumber(rowData2, rowData, collector, rowData4, rankRow, intValue2);
    }

    private Tuple2<Integer, Integer> rowNumber(RowData rowData, RowData rowData2, TopNBuffer topNBuffer) {
        int i = 1;
        for (Map.Entry<RowData, Collection<RowData>> entry : topNBuffer.entrySet()) {
            RowData key = entry.getKey();
            Collection<RowData> value = entry.getValue();
            if (key.equals(rowData)) {
                Iterator<RowData> it = value.iterator();
                int i2 = 1;
                while (it.hasNext()) {
                    if (rowData2.equals(it.next())) {
                        return Tuple2.of(Integer.valueOf(i), Integer.valueOf(i2));
                    }
                    i2++;
                    i++;
                }
            } else {
                i += value.size();
            }
        }
        LOG.error("Failed to find the sortKey: {}, rowkey: {} in the buffer. This should never happen", rowData, rowData2);
        throw new RuntimeException("Failed to find the sortKey, rowkey in the buffer. This should never happen");
    }

    private void emitRecordsWithRowNumber(RowData rowData, RowData rowData2, Collector<RowData> collector) throws Exception {
        emitRecordsWithRowNumber(rowData, rowData2, collector, null, null, -1);
    }

    private void emitRecordsWithRowNumber(RowData rowData, RowData rowData2, Collector<RowData> collector, RowData rowData3, RankRow rankRow, int i) throws Exception {
        Iterator<Map.Entry<RowData, Collection<RowData>>> it = this.buffer.entrySet().iterator();
        int i2 = 0;
        RowData rowData4 = null;
        boolean z = false;
        while (it.hasNext() && isInRankEnd(i2)) {
            Map.Entry<RowData, Collection<RowData>> next = it.next();
            RowData key = next.getKey();
            Collection<RowData> value = next.getValue();
            if (!z && key.equals(rowData)) {
                i2 += value.size();
                rowData4 = rowData2;
                z = true;
            } else if (!z) {
                i2 += value.size();
            } else if (rowData3 != null) {
                if (this.sortKeyComparator.compare(key, rowData3) > 0) {
                    break;
                }
                Iterator<RowData> it2 = value.iterator();
                while (it2.hasNext() && i2 < i) {
                    RankRow rankRow2 = this.rowKeyMap.get(it2.next());
                    collectUpdateBefore(collector, rankRow2.row, i2);
                    collectUpdateAfter(collector, rowData4, i2);
                    rowData4 = rankRow2.row;
                    i2++;
                }
            } else {
                Iterator<RowData> it3 = value.iterator();
                while (it3.hasNext() && isInRankEnd(i2)) {
                    RankRow rankRow3 = this.rowKeyMap.get(it3.next());
                    collectUpdateBefore(collector, rankRow3.row, i2);
                    collectUpdateAfter(collector, rowData4, i2);
                    rowData4 = rankRow3.row;
                    i2++;
                }
            }
        }
        if (isInRankEnd(i2)) {
            if (rankRow == null) {
                collectInsert(collector, rowData4, i2);
                return;
            }
            Preconditions.checkArgument(i2 == i);
            collectUpdateBefore(collector, rankRow.row, i);
            collectUpdateAfter(collector, rowData4, i2);
            return;
        }
        ArrayList arrayList = new ArrayList();
        while (it.hasNext()) {
            Map.Entry<RowData, Collection<RowData>> next2 = it.next();
            for (RowData rowData5 : next2.getValue()) {
                this.rowKeyMap.remove(rowData5);
                this.dataState.remove(rowData5);
            }
            arrayList.add(next2.getKey());
        }
        Iterator it4 = arrayList.iterator();
        while (it4.hasNext()) {
            this.buffer.removeAll((RowData) it4.next());
        }
    }

    private void processElementWithoutRowNumber(RowData rowData, Collector<RowData> collector) throws Exception {
        RowData removeLast;
        RowData rowData2 = (RowData) this.sortKeySelector.getKey(rowData);
        RowData rowData3 = (RowData) this.rowKeySelector.getKey(rowData);
        if (this.rowKeyMap.containsKey(rowData3)) {
            RankRow rankRow = this.rowKeyMap.get(rowData3);
            RowData rowData4 = (RowData) this.sortKeySelector.getKey(rankRow.row);
            if (rowData4.equals(rowData2)) {
                this.rowKeyMap.put(rowData3, new RankRow((RowData) this.inputRowSer.copy(rowData), rankRow.innerRank, true));
            } else {
                this.buffer.remove(rowData4, rowData3);
                this.rowKeyMap.put(rowData3, new RankRow((RowData) this.inputRowSer.copy(rowData), this.buffer.put(rowData2, rowData3), true));
                updateInnerRank(rowData4);
            }
            collectUpdateBefore(collector, rankRow.row);
            collectUpdateAfter(collector, rowData);
            return;
        }
        if (checkSortKeyInBufferRange(rowData2, this.buffer)) {
            this.rowKeyMap.put(rowData3, new RankRow((RowData) this.inputRowSer.copy(rowData), this.buffer.put(rowData2, rowData3), true));
            if (this.buffer.getCurrentTopNum() > this.rankEnd && (removeLast = this.buffer.removeLast()) != null) {
                RankRow remove = this.rowKeyMap.remove(removeLast);
                this.dataState.remove(removeLast);
                collectDelete(collector, remove.row);
            }
            collectInsert(collector, rowData);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void flushBufferToState(Map<RowData, RankRow> map) throws Exception {
        for (Map.Entry<RowData, RankRow> entry : map.entrySet()) {
            RowData key = entry.getKey();
            RankRow value = entry.getValue();
            if (value.dirty) {
                this.dataState.put(key, Tuple2.of(value.row, Integer.valueOf(value.innerRank)));
                value.dirty = false;
            }
        }
    }

    private void updateInnerRank(RowData rowData) {
        Collection<RowData> collection = this.buffer.get(rowData);
        if (collection != null) {
            Iterator<RowData> it = collection.iterator();
            int i = 1;
            while (it.hasNext()) {
                RankRow rankRow = this.rowKeyMap.get(it.next());
                if (rankRow.innerRank != i) {
                    rankRow.innerRank = i;
                    rankRow.dirty = true;
                }
                i++;
            }
        }
    }

    public /* bridge */ /* synthetic */ void processElement(Object obj, KeyedProcessFunction.Context context, Collector collector) throws Exception {
        processElement((RowData) obj, (KeyedProcessFunction<RowData, RowData, RowData>.Context) context, (Collector<RowData>) collector);
    }
}
