package org.apache.flink.table.planner.plan.nodes.exec.stream;

import java.util.Collections;
import java.util.List;
import java.util.stream.IntStream;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.operators.KeyContext;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.codegen.EqualiserCodeGenerator;
import org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator;
import org.apache.flink.table.planner.plan.nodes.exec.spec.PartitionSpec;
import org.apache.flink.table.planner.plan.nodes.exec.spec.SortSpec;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
import org.apache.flink.table.planner.plan.utils.RankProcessStrategy;
import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.rank.AppendOnlyTopNFunction;
import org.apache.flink.table.runtime.operators.rank.ComparableRecordComparator;
import org.apache.flink.table.runtime.operators.rank.RankRange;
import org.apache.flink.table.runtime.operators.rank.RankType;
import org.apache.flink.table.runtime.operators.rank.RetractableTopNFunction;
import org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;

@JsonIgnoreProperties(ignoreUnknown = true)
/* loaded from: input_file:org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecRank.class */
public class StreamExecRank extends ExecNodeBase<RowData> implements StreamExecNode<RowData>, SingleTransformationTranslator<RowData> {

    @Experimental
    public static final ConfigOption<Long> TABLE_EXEC_TOPN_CACHE_SIZE = ConfigOptions.key("table.exec.topn.cache-size").longType().defaultValue(10000L).withDescription("TopN operator has a cache which caches partial state contents to reduce state access. Cache size is the number of records in each TopN task.");
    public static final String FIELD_NAME_RANK_TYPE = "rankType";
    public static final String FIELD_NAME_PARTITION_SPEC = "partition";
    public static final String FIELD_NAME_SORT_SPEC = "orderBy";
    public static final String FIELD_NAME_RANK_RANG = "rankRange";
    public static final String FIELD_NAME_RANK_STRATEGY = "rankStrategy";
    public static final String FIELD_NAME_GENERATE_UPDATE_BEFORE = "generateUpdateBefore";
    public static final String FIELD_NAME_OUTPUT_RANK_NUMBER = "outputRowNumber";

    @JsonProperty("rankType")
    private final RankType rankType;

    @JsonProperty("partition")
    private final PartitionSpec partitionSpec;

    @JsonProperty("orderBy")
    private final SortSpec sortSpec;

    @JsonProperty("rankRange")
    private final RankRange rankRange;

    @JsonProperty(FIELD_NAME_RANK_STRATEGY)
    private final RankProcessStrategy rankStrategy;

    @JsonProperty("outputRowNumber")
    private final boolean outputRankNumber;

    @JsonProperty("generateUpdateBefore")
    private final boolean generateUpdateBefore;

    public StreamExecRank(RankType rankType, PartitionSpec partitionSpec, SortSpec sortSpec, RankRange rankRange, RankProcessStrategy rankProcessStrategy, boolean z, boolean z2, InputProperty inputProperty, RowType rowType, String str) {
        this(rankType, partitionSpec, sortSpec, rankRange, rankProcessStrategy, z, z2, getNewNodeId(), Collections.singletonList(inputProperty), rowType, str);
    }

    @JsonCreator
    public StreamExecRank(@JsonProperty("rankType") RankType rankType, @JsonProperty("partition") PartitionSpec partitionSpec, @JsonProperty("orderBy") SortSpec sortSpec, @JsonProperty("rankRange") RankRange rankRange, @JsonProperty("rankStrategy") RankProcessStrategy rankProcessStrategy, @JsonProperty("outputRowNumber") boolean z, @JsonProperty("generateUpdateBefore") boolean z2, @JsonProperty("id") int i, @JsonProperty("inputProperties") List<InputProperty> list, @JsonProperty("outputType") RowType rowType, @JsonProperty("description") String str) {
        super(i, list, rowType, str);
        Preconditions.checkArgument(list.size() == 1);
        this.rankType = (RankType) Preconditions.checkNotNull(rankType);
        this.rankRange = (RankRange) Preconditions.checkNotNull(rankRange);
        this.rankStrategy = (RankProcessStrategy) Preconditions.checkNotNull(rankProcessStrategy);
        this.sortSpec = (SortSpec) Preconditions.checkNotNull(sortSpec);
        this.partitionSpec = (PartitionSpec) Preconditions.checkNotNull(partitionSpec);
        this.outputRankNumber = z;
        this.generateUpdateBefore = z2;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase
    public Transformation<RowData> translateToPlanInternal(PlannerBase plannerBase) {
        CheckpointedFunction retractableTopNFunction;
        switch (this.rankType) {
            case ROW_NUMBER:
                ExecEdge execEdge = getInputEdges().get(0);
                Transformation<?> translateToPlan = execEdge.translateToPlan(plannerBase);
                RowType rowType = (RowType) execEdge.getOutputType();
                InternalTypeInfo<RowData> of = InternalTypeInfo.of(rowType);
                int[] fieldIndices = this.sortSpec.getFieldIndices();
                RowDataKeySelector rowDataSelector = KeySelectorUtil.getRowDataSelector(fieldIndices, of);
                int[] array = IntStream.range(0, fieldIndices.length).toArray();
                SortSpec.SortSpecBuilder builder = SortSpec.builder();
                IntStream.range(0, fieldIndices.length).forEach(i -> {
                    builder.addField(i, this.sortSpec.getFieldSpec(i).getIsAscendingOrder(), this.sortSpec.getFieldSpec(i).getNullIsLast());
                });
                SortSpec build = builder.build();
                TableConfig tableConfig = plannerBase.getTableConfig();
                GeneratedRecordComparator gen = ComparatorCodeGenerator.gen(tableConfig, "StreamExecSortComparator", RowType.of(this.sortSpec.getFieldTypes(rowType)), build);
                long j = tableConfig.getConfiguration().getLong(TABLE_EXEC_TOPN_CACHE_SIZE);
                long minIdleStateRetentionTime = tableConfig.getMinIdleStateRetentionTime();
                long maxIdleStateRetentionTime = tableConfig.getMaxIdleStateRetentionTime();
                if (this.rankStrategy instanceof RankProcessStrategy.AppendFastStrategy) {
                    retractableTopNFunction = new AppendOnlyTopNFunction(minIdleStateRetentionTime, maxIdleStateRetentionTime, of, gen, rowDataSelector, this.rankType, this.rankRange, this.generateUpdateBefore, this.outputRankNumber, j);
                } else if (this.rankStrategy instanceof RankProcessStrategy.UpdateFastStrategy) {
                    retractableTopNFunction = new UpdatableTopNFunction(minIdleStateRetentionTime, maxIdleStateRetentionTime, of, KeySelectorUtil.getRowDataSelector(((RankProcessStrategy.UpdateFastStrategy) this.rankStrategy).getPrimaryKeys(), of), gen, rowDataSelector, this.rankType, this.rankRange, this.generateUpdateBefore, this.outputRankNumber, j);
                } else {
                    if (!(this.rankStrategy instanceof RankProcessStrategy.RetractStrategy)) {
                        throw new TableException(String.format("rank strategy:%s is not supported.", this.rankStrategy));
                    }
                    retractableTopNFunction = new RetractableTopNFunction(minIdleStateRetentionTime, maxIdleStateRetentionTime, of, new ComparableRecordComparator(gen, array, this.sortSpec.getFieldTypes(rowType), this.sortSpec.getAscendingOrders(), this.sortSpec.getNullsIsLast()), rowDataSelector, this.rankType, this.rankRange, new EqualiserCodeGenerator((LogicalType[]) rowType.getFields().stream().map((v0) -> {
                        return v0.getType();
                    }).toArray(i2 -> {
                        return new LogicalType[i2];
                    })).generateRecordEqualiser("RankValueEqualiser"), this.generateUpdateBefore, this.outputRankNumber);
                }
                KeyContext keyedProcessOperator = new KeyedProcessOperator(retractableTopNFunction);
                retractableTopNFunction.setKeyContext(keyedProcessOperator);
                OneInputTransformation oneInputTransformation = new OneInputTransformation(translateToPlan, getDescription(), keyedProcessOperator, InternalTypeInfo.of((RowType) getOutputType()), translateToPlan.getParallelism());
                RowDataKeySelector rowDataSelector2 = KeySelectorUtil.getRowDataSelector(this.partitionSpec.getFieldIndices(), of);
                oneInputTransformation.setStateKeySelector(rowDataSelector2);
                oneInputTransformation.setStateKeyType(rowDataSelector2.mo6157getProducedType());
                return oneInputTransformation;
            case RANK:
                throw new TableException("RANK() on streaming table is not supported currently");
            case DENSE_RANK:
                throw new TableException("DENSE_RANK() on streaming table is not supported currently");
            default:
                throw new TableException(String.format("Streaming tables do not support %s rank function.", this.rankType));
        }
    }
}
