package org.apache.flink.table.runtime.operators.rank;

import java.util.Collection;
import java.util.Comparator;
import java.util.Map;
import java.util.function.Function;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.streaming.api.operators.KeyContext;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.GenericRow;
import org.apache.flink.table.dataformat.JoinedRow;
import org.apache.flink.table.dataformat.util.BaseRowUtil;
import org.apache.flink.table.runtime.functions.KeyedProcessFunctionWithCleanupState;
import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
import org.apache.flink.table.runtime.keyselector.BaseRowKeySelector;
import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/rank/AbstractTopNFunction.class */
public abstract class AbstractTopNFunction extends KeyedProcessFunctionWithCleanupState<BaseRow, BaseRow, BaseRow> {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractTopNFunction.class);
    private static final String RANK_UNSUPPORTED_MSG = "RANK() on streaming table is not supported currently";
    private static final String DENSE_RANK_UNSUPPORTED_MSG = "DENSE_RANK() on streaming table is not supported currently";
    private static final String WITHOUT_RANK_END_UNSUPPORTED_MSG = "Rank end is not specified. Currently rank only support TopN, which means the rank end must be specified.";
    private static final long DEFAULT_TOPN_SIZE = 100;
    private GeneratedRecordComparator generatedSortKeyComparator;
    protected Comparator<BaseRow> sortKeyComparator;
    private final boolean generateRetraction;
    protected final boolean outputRankNumber;
    protected final BaseRowTypeInfo inputRowType;
    protected final KeySelector<BaseRow, BaseRow> sortKeySelector;
    protected KeyContext keyContext;
    private final boolean isConstantRankEnd;
    private final long rankStart;
    private final int rankEndIndex;
    protected long rankEnd;
    private transient Function<BaseRow, Long> rankEndFetcher;
    private ValueState<Long> rankEndState;
    private Counter invalidCounter;
    private JoinedRow outputRow;
    protected long hitCount;
    protected long requestCount;

    /* JADX INFO: Access modifiers changed from: package-private */
    public AbstractTopNFunction(long j, long j2, BaseRowTypeInfo baseRowTypeInfo, GeneratedRecordComparator generatedRecordComparator, BaseRowKeySelector baseRowKeySelector, RankType rankType, RankRange rankRange, boolean z, boolean z2) {
        super(j, j2);
        this.hitCount = 0L;
        this.requestCount = 0L;
        switch (rankType) {
            case ROW_NUMBER:
                if (rankRange instanceof ConstantRankRange) {
                    ConstantRankRange constantRankRange = (ConstantRankRange) rankRange;
                    this.isConstantRankEnd = true;
                    this.rankStart = constantRankRange.getRankStart();
                    this.rankEnd = constantRankRange.getRankEnd();
                    this.rankEndIndex = -1;
                } else {
                    if (!(rankRange instanceof VariableRankRange)) {
                        LOG.error(WITHOUT_RANK_END_UNSUPPORTED_MSG);
                        throw new UnsupportedOperationException(WITHOUT_RANK_END_UNSUPPORTED_MSG);
                    }
                    this.rankEndIndex = ((VariableRankRange) rankRange).getRankEndIndex();
                    this.isConstantRankEnd = false;
                    this.rankStart = -1L;
                    this.rankEnd = -1L;
                }
                this.generatedSortKeyComparator = generatedRecordComparator;
                this.generateRetraction = z;
                this.inputRowType = baseRowTypeInfo;
                this.outputRankNumber = z2;
                this.sortKeySelector = baseRowKeySelector;
                return;
            case RANK:
                LOG.error(RANK_UNSUPPORTED_MSG);
                throw new UnsupportedOperationException(RANK_UNSUPPORTED_MSG);
            case DENSE_RANK:
                LOG.error(DENSE_RANK_UNSUPPORTED_MSG);
                throw new UnsupportedOperationException(DENSE_RANK_UNSUPPORTED_MSG);
            default:
                LOG.error("Streaming tables do not support {}", rankType.name());
                throw new UnsupportedOperationException("Streaming tables do not support " + rankType.toString());
        }
    }

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        initCleanupTimeState("RankFunctionCleanupTime");
        this.outputRow = new JoinedRow();
        if (!this.isConstantRankEnd) {
            this.rankEndState = getRuntimeContext().getState(new ValueStateDescriptor("rankEnd", Types.LONG));
        }
        this.sortKeyComparator = this.generatedSortKeyComparator.newInstance(getRuntimeContext().getUserCodeClassLoader());
        this.generatedSortKeyComparator = null;
        this.invalidCounter = getRuntimeContext().getMetricGroup().counter("topn.invalidTopSize");
        if (this.isConstantRankEnd) {
            return;
        }
        LogicalType logicalType = this.inputRowType.getLogicalTypes()[this.rankEndIndex];
        switch (logicalType.getTypeRoot()) {
            case BIGINT:
                this.rankEndFetcher = baseRow -> {
                    return Long.valueOf(baseRow.getLong(this.rankEndIndex));
                };
                return;
            case INTEGER:
                this.rankEndFetcher = baseRow2 -> {
                    return Long.valueOf(baseRow2.getInt(this.rankEndIndex));
                };
                return;
            case SMALLINT:
                this.rankEndFetcher = baseRow3 -> {
                    return Long.valueOf(baseRow3.getShort(this.rankEndIndex));
                };
                return;
            default:
                LOG.error("variable rank index column must be long, short or int type, while input type is {}", logicalType.getClass().getName());
                throw new UnsupportedOperationException("variable rank index column must be long type, while input type is " + logicalType.getClass().getName());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public long getDefaultTopNSize() {
        return this.isConstantRankEnd ? this.rankEnd : DEFAULT_TOPN_SIZE;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public long initRankEnd(BaseRow baseRow) throws Exception {
        if (this.isConstantRankEnd) {
            return this.rankEnd;
        }
        Long l = (Long) this.rankEndState.value();
        long longValue = this.rankEndFetcher.apply(baseRow).longValue();
        if (l == null) {
            this.rankEnd = longValue;
            this.rankEndState.update(Long.valueOf(this.rankEnd));
            return this.rankEnd;
        }
        this.rankEnd = l.longValue();
        if (this.rankEnd != longValue) {
            this.invalidCounter.inc();
        }
        return this.rankEnd;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean checkSortKeyInBufferRange(BaseRow baseRow, TopNBuffer topNBuffer) {
        Comparator<BaseRow> sortKeyComparator = topNBuffer.getSortKeyComparator();
        Map.Entry<BaseRow, Collection<BaseRow>> lastEntry = topNBuffer.lastEntry();
        return lastEntry == null || sortKeyComparator.compare(baseRow, lastEntry.getKey()) < 0 || ((long) topNBuffer.getCurrentTopNum()) < getDefaultTopNSize();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void registerMetric(long j) {
        getRuntimeContext().getMetricGroup().gauge("topn.cache.hitRate", () -> {
            return Double.valueOf(this.requestCount == 0 ? 1.0d : Long.valueOf(this.hitCount).doubleValue() / this.requestCount);
        });
        getRuntimeContext().getMetricGroup().gauge("topn.cache.size", () -> {
            return Long.valueOf(j);
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void collect(Collector<BaseRow> collector, BaseRow baseRow) {
        BaseRowUtil.setAccumulate(baseRow);
        collector.collect(baseRow);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void delete(Collector<BaseRow> collector, BaseRow baseRow) {
        BaseRowUtil.setRetract(baseRow);
        collector.collect(baseRow);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void delete(Collector<BaseRow> collector, BaseRow baseRow, long j) {
        if (isInRankRange(j)) {
            collector.collect(createOutputRow(baseRow, j, (byte) 1));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void collect(Collector<BaseRow> collector, BaseRow baseRow, long j) {
        if (isInRankRange(j)) {
            collector.collect(createOutputRow(baseRow, j, (byte) 0));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void retract(Collector<BaseRow> collector, BaseRow baseRow, long j) {
        if (this.generateRetraction && isInRankRange(j)) {
            collector.collect(createOutputRow(baseRow, j, (byte) 1));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isInRankEnd(long j) {
        return j <= this.rankEnd;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isInRankRange(long j) {
        return j <= this.rankEnd && j >= this.rankStart;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean hasOffset() {
        return this.rankStart > 1;
    }

    private BaseRow createOutputRow(BaseRow baseRow, long j, byte b) {
        if (!this.outputRankNumber) {
            baseRow.setHeader(b);
            return baseRow;
        }
        GenericRow genericRow = new GenericRow(1);
        genericRow.setField(0, Long.valueOf(j));
        this.outputRow.replace(baseRow, genericRow);
        this.outputRow.setHeader(b);
        return this.outputRow;
    }

    public void setKeyContext(KeyContext keyContext) {
        this.keyContext = keyContext;
    }
}
