/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.rank;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ListTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.util.BaseRowUtil;
import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
import org.apache.flink.table.runtime.generated.RecordEqualiser;
import org.apache.flink.table.runtime.keyselector.BaseRowKeySelector;
import org.apache.flink.table.runtime.operators.rank.AbstractTopNFunction;
import org.apache.flink.table.runtime.operators.rank.RankRange;
import org.apache.flink.table.runtime.operators.rank.RankType;
import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo;
import org.apache.flink.table.runtime.typeutils.SortedMapTypeInfo;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RetractableTopNFunction
extends AbstractTopNFunction {
    private static final long serialVersionUID = 1365312180599454479L;
    private static final Logger LOG = LoggerFactory.getLogger(RetractableTopNFunction.class);
    private static final String STATE_CLEARED_WARN_MSG = "The state is cleared because of state ttl. This will result in incorrect result. You can increase the state ttl to avoid this.";
    private final BaseRowTypeInfo sortKeyType;
    private final boolean lenient = true;
    private transient MapState<BaseRow, List<BaseRow>> dataState;
    private transient ValueState<SortedMap<BaseRow, Long>> treeMap;
    private GeneratedRecordEqualiser generatedEqualiser;
    private RecordEqualiser equaliser;
    private Comparator<BaseRow> serializableComparator;

    public RetractableTopNFunction(long minRetentionTime, long maxRetentionTime, BaseRowTypeInfo inputRowType, GeneratedRecordComparator generatedRecordComparator, BaseRowKeySelector sortKeySelector, RankType rankType, RankRange rankRange, GeneratedRecordEqualiser generatedEqualiser, boolean generateRetraction, boolean outputRankNumber) {
        super(minRetentionTime, maxRetentionTime, inputRowType, generatedRecordComparator, sortKeySelector, rankType, rankRange, generateRetraction, outputRankNumber);
        this.sortKeyType = sortKeySelector.getProducedType();
        this.serializableComparator = new ComparatorWrapper(generatedRecordComparator);
        this.generatedEqualiser = generatedEqualiser;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        this.equaliser = (RecordEqualiser)this.generatedEqualiser.newInstance(this.getRuntimeContext().getUserCodeClassLoader());
        this.generatedEqualiser = null;
        ListTypeInfo valueTypeInfo = new ListTypeInfo((TypeInformation)this.inputRowType);
        MapStateDescriptor mapStateDescriptor = new MapStateDescriptor("data-state", (TypeInformation)this.sortKeyType, (TypeInformation)valueTypeInfo);
        this.dataState = this.getRuntimeContext().getMapState(mapStateDescriptor);
        ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("sorted-map", new SortedMapTypeInfo((TypeInformation<BaseRow>)this.sortKeyType, BasicTypeInfo.LONG_TYPE_INFO, this.serializableComparator));
        this.treeMap = this.getRuntimeContext().getState(valueStateDescriptor);
    }

    public void processElement(BaseRow input, KeyedProcessFunction.Context ctx, Collector<BaseRow> out) throws Exception {
        long currentTime = ctx.timerService().currentProcessingTime();
        this.registerProcessingCleanupTimer(ctx, currentTime);
        this.initRankEnd(input);
        TreeMap<BaseRow, Long> sortedMap = (TreeMap<BaseRow, Long>)this.treeMap.value();
        if (sortedMap == null) {
            sortedMap = new TreeMap<BaseRow, Long>(this.sortKeyComparator);
        }
        BaseRow sortKey = (BaseRow)this.sortKeySelector.getKey((Object)input);
        if (BaseRowUtil.isAccumulateMsg(input)) {
            if (sortedMap.containsKey(sortKey)) {
                sortedMap.put(sortKey, (Long)sortedMap.get(sortKey) + 1L);
            } else {
                sortedMap.put(sortKey, 1L);
            }
            if (this.outputRankNumber || this.hasOffset()) {
                this.emitRecordsWithRowNumber(sortedMap, sortKey, input, out);
            } else {
                this.emitRecordsWithoutRowNumber(sortedMap, sortKey, input, out);
            }
            ArrayList<BaseRow> inputs = (ArrayList<BaseRow>)this.dataState.get((Object)sortKey);
            if (inputs == null) {
                inputs = new ArrayList<BaseRow>();
            }
            inputs.add(input);
            this.dataState.put((Object)sortKey, inputs);
        } else {
            if (this.outputRankNumber || this.hasOffset()) {
                this.retractRecordWithRowNumber(sortedMap, sortKey, input, out);
            } else {
                this.retractRecordWithoutRowNumber(sortedMap, sortKey, input, out);
            }
            if (sortedMap.containsKey(sortKey)) {
                long count = (Long)sortedMap.get(sortKey) - 1L;
                if (count == 0L) {
                    sortedMap.remove(sortKey);
                } else {
                    sortedMap.put(sortKey, count);
                }
            } else if (sortedMap.isEmpty()) {
                LOG.warn(STATE_CLEARED_WARN_MSG);
            } else {
                throw new RuntimeException("Can not retract a non-existent record: ${inputBaseRow.toString}. This should never happen.");
            }
        }
        this.treeMap.update(sortedMap);
    }

    public void onTimer(long timestamp, KeyedProcessFunction.OnTimerContext ctx, Collector<BaseRow> out) throws Exception {
        if (this.stateCleaningEnabled) {
            this.cleanupState(new State[]{this.dataState, this.treeMap});
        }
    }

    private void emitRecordsWithRowNumber(SortedMap<BaseRow, Long> sortedMap, BaseRow sortKey, BaseRow inputRow, Collector<BaseRow> out) throws Exception {
        Iterator<Map.Entry<BaseRow, Long>> iterator = sortedMap.entrySet().iterator();
        long curRank = 0L;
        boolean findsSortKey = false;
        while (iterator.hasNext() && this.isInRankEnd(curRank)) {
            Map.Entry<BaseRow, Long> entry = iterator.next();
            BaseRow key = entry.getKey();
            if (!findsSortKey && key.equals(sortKey)) {
                this.collect(out, inputRow, curRank += entry.getValue().longValue());
                findsSortKey = true;
                continue;
            }
            if (findsSortKey) {
                List inputs = (List)this.dataState.get((Object)key);
                if (inputs == null) {
                    LOG.warn(STATE_CLEARED_WARN_MSG);
                    continue;
                }
                for (int i = 0; i < inputs.size() && this.isInRankEnd(curRank); ++i) {
                    BaseRow prevRow = (BaseRow)inputs.get(i);
                    this.retract(out, prevRow, ++curRank - 1L);
                    this.collect(out, prevRow, curRank);
                }
                continue;
            }
            curRank += entry.getValue().longValue();
        }
    }

    private void emitRecordsWithoutRowNumber(SortedMap<BaseRow, Long> sortedMap, BaseRow sortKey, BaseRow inputRow, Collector<BaseRow> out) throws Exception {
        Iterator<Map.Entry<BaseRow, Long>> iterator = sortedMap.entrySet().iterator();
        long curRank = 0L;
        boolean findsSortKey = false;
        BaseRow toCollect = null;
        BaseRow toDelete = null;
        while (iterator.hasNext() && this.isInRankEnd(curRank)) {
            Map.Entry<BaseRow, Long> entry = iterator.next();
            BaseRow key = entry.getKey();
            if (!findsSortKey && key.equals(sortKey)) {
                if (this.isInRankRange(curRank += entry.getValue().longValue())) {
                    toCollect = inputRow;
                }
                findsSortKey = true;
                continue;
            }
            if (findsSortKey) {
                List inputs = (List)this.dataState.get((Object)key);
                if (inputs == null) {
                    LOG.warn(STATE_CLEARED_WARN_MSG);
                    continue;
                }
                long count = entry.getValue();
                long rankOfLastRecord = curRank + count;
                if (this.isInRankEnd(rankOfLastRecord)) {
                    curRank = rankOfLastRecord;
                    continue;
                }
                int index = Long.valueOf(this.rankEnd - curRank).intValue();
                toDelete = (BaseRow)inputs.get(index);
                break;
            }
            curRank += entry.getValue().longValue();
        }
        if (toDelete != null) {
            this.delete(out, toDelete);
        }
        if (toCollect != null) {
            this.collect(out, inputRow);
        }
    }

    private void retractRecordWithRowNumber(SortedMap<BaseRow, Long> sortedMap, BaseRow sortKey, BaseRow inputRow, Collector<BaseRow> out) throws Exception {
        Iterator<Map.Entry<BaseRow, Long>> iterator = sortedMap.entrySet().iterator();
        long curRank = 0L;
        boolean findsSortKey = false;
        while (iterator.hasNext() && this.isInRankEnd(curRank)) {
            BaseRow prevRow;
            List inputs;
            Map.Entry<BaseRow, Long> entry = iterator.next();
            BaseRow key = entry.getKey();
            if (!findsSortKey && key.equals(sortKey)) {
                inputs = (List)this.dataState.get((Object)key);
                if (inputs == null) {
                    LOG.warn(STATE_CLEARED_WARN_MSG);
                    continue;
                }
                Iterator inputIter = inputs.iterator();
                while (inputIter.hasNext() && this.isInRankEnd(curRank)) {
                    ++curRank;
                    prevRow = (BaseRow)inputIter.next();
                    if (!findsSortKey && this.equaliser.equalsWithoutHeader(prevRow, inputRow)) {
                        this.delete(out, prevRow, curRank);
                        --curRank;
                        findsSortKey = true;
                        inputIter.remove();
                        continue;
                    }
                    if (!findsSortKey) continue;
                    this.retract(out, prevRow, curRank + 1L);
                    this.collect(out, prevRow, curRank);
                }
                if (inputs.isEmpty()) {
                    this.dataState.remove((Object)key);
                    continue;
                }
                this.dataState.put((Object)key, (Object)inputs);
                continue;
            }
            if (findsSortKey) {
                inputs = (List)this.dataState.get((Object)key);
                for (int i = 0; i < inputs.size() && this.isInRankEnd(curRank); ++i) {
                    prevRow = (BaseRow)inputs.get(i);
                    this.retract(out, prevRow, ++curRank + 1L);
                    this.collect(out, prevRow, curRank);
                }
                continue;
            }
            curRank += entry.getValue().longValue();
        }
    }

    private void retractRecordWithoutRowNumber(SortedMap<BaseRow, Long> sortedMap, BaseRow sortKey, BaseRow inputRow, Collector<BaseRow> out) throws Exception {
        Iterator<Map.Entry<BaseRow, Long>> iterator = sortedMap.entrySet().iterator();
        long curRank = 0L;
        boolean findsSortKey = false;
        while (iterator.hasNext() && this.isInRankEnd(curRank)) {
            Map.Entry<BaseRow, Long> entry = iterator.next();
            BaseRow key = entry.getKey();
            if (!findsSortKey && key.equals(sortKey)) {
                List inputs = (List)this.dataState.get((Object)key);
                if (inputs == null) {
                    LOG.warn(STATE_CLEARED_WARN_MSG);
                    continue;
                }
                Iterator inputIter = inputs.iterator();
                while (inputIter.hasNext() && this.isInRankEnd(curRank)) {
                    ++curRank;
                    BaseRow prevRow = (BaseRow)inputIter.next();
                    if (!findsSortKey && this.equaliser.equalsWithoutHeader(prevRow, inputRow)) {
                        this.delete(out, prevRow, curRank);
                        --curRank;
                        findsSortKey = true;
                        inputIter.remove();
                        continue;
                    }
                    if (!findsSortKey || curRank != this.rankEnd) continue;
                    this.collect(out, prevRow, curRank);
                    break;
                }
                if (inputs.isEmpty()) {
                    this.dataState.remove((Object)key);
                    continue;
                }
                this.dataState.put((Object)key, (Object)inputs);
                continue;
            }
            if (findsSortKey) {
                long count = entry.getValue();
                long rankOfLastRecord = curRank + count;
                if (rankOfLastRecord < this.rankEnd) {
                    curRank = rankOfLastRecord;
                    continue;
                }
                int index = Long.valueOf(this.rankEnd - curRank - 1L).intValue();
                List inputs = (List)this.dataState.get((Object)key);
                BaseRow toAdd = (BaseRow)inputs.get(index);
                this.collect(out, toAdd);
                break;
            }
            curRank += entry.getValue().longValue();
        }
    }

    private static class ComparatorWrapper
    implements Comparator<BaseRow>,
    Serializable {
        private static final long serialVersionUID = 4386377835781068140L;
        private transient Comparator<BaseRow> comparator;
        private GeneratedRecordComparator generatedRecordComparator;

        private ComparatorWrapper(GeneratedRecordComparator generatedRecordComparator) {
            this.generatedRecordComparator = generatedRecordComparator;
        }

        @Override
        public int compare(BaseRow o1, BaseRow o2) {
            if (this.comparator == null) {
                this.comparator = (Comparator)this.generatedRecordComparator.newInstance(Thread.currentThread().getContextClassLoader());
            }
            return this.comparator.compare(o1, o2);
        }

        @Override
        public boolean equals(Object obj) {
            if (obj instanceof ComparatorWrapper) {
                ComparatorWrapper o = (ComparatorWrapper)obj;
                GeneratedRecordComparator oGeneratedComparator = o.generatedRecordComparator;
                return this.generatedRecordComparator.getClassName().equals(oGeneratedComparator.getClassName()) && this.generatedRecordComparator.getCode().equals(oGeneratedComparator.getCode()) && Arrays.equals(this.generatedRecordComparator.getReferences(), oGeneratedComparator.getReferences());
            }
            return false;
        }
    }
}

