package org.apache.flink.table.planner.plan.nodes.exec.stream;

import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.IntStream;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.tools.RelBuilder;
import org.apache.flink.FlinkVersion;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
import org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator;
import org.apache.flink.table.planner.plan.nodes.exec.spec.OverSpec;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
import org.apache.flink.table.planner.plan.utils.AggregateUtil;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
import org.apache.flink.table.planner.plan.utils.OverAggregateUtil;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.planner.utils.TableConfigUtils;
import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.over.ProcTimeRangeBoundedPrecedingFunction;
import org.apache.flink.table.runtime.operators.over.ProcTimeRowsBoundedPrecedingFunction;
import org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction;
import org.apache.flink.table.runtime.operators.over.RowTimeRangeBoundedPrecedingFunction;
import org.apache.flink.table.runtime.operators.over.RowTimeRangeUnboundedPrecedingFunction;
import org.apache.flink.table.runtime.operators.over.RowTimeRowsBoundedPrecedingFunction;
import org.apache.flink.table.runtime.operators.over.RowTimeRowsUnboundedPrecedingFunction;
import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.util.StateConfigUtil;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ExecNodeMetadata(name = "stream-exec-over-aggregate", version = 1, producedTransformations = {StreamExecOverAggregate.OVER_AGGREGATE_TRANSFORMATION}, minPlanVersion = FlinkVersion.v1_15, minStateVersion = FlinkVersion.v1_15)
/* loaded from: input_file:flink-table-planner.jar:org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecOverAggregate.class */
public class StreamExecOverAggregate extends ExecNodeBase<RowData> implements StreamExecNode<RowData>, SingleTransformationTranslator<RowData> {
    private static final Logger LOG = LoggerFactory.getLogger(StreamExecOverAggregate.class);
    public static final String OVER_AGGREGATE_TRANSFORMATION = "over-aggregate";
    public static final String FIELD_NAME_OVER_SPEC = "overSpec";

    @JsonProperty("overSpec")
    private final OverSpec overSpec;

    public StreamExecOverAggregate(ReadableConfig readableConfig, OverSpec overSpec, InputProperty inputProperty, RowType rowType, String str) {
        this(ExecNodeContext.newNodeId(), ExecNodeContext.newContext(StreamExecOverAggregate.class), ExecNodeContext.newPersistedConfig(StreamExecOverAggregate.class, readableConfig), overSpec, Collections.singletonList(inputProperty), rowType, str);
    }

    @JsonCreator
    public StreamExecOverAggregate(@JsonProperty("id") int i, @JsonProperty("type") ExecNodeContext execNodeContext, @JsonProperty("configuration") ReadableConfig readableConfig, @JsonProperty("overSpec") OverSpec overSpec, @JsonProperty("inputProperties") List<InputProperty> list, @JsonProperty("outputType") RowType rowType, @JsonProperty("description") String str) {
        super(i, execNodeContext, readableConfig, list, rowType, str);
        Preconditions.checkArgument(list.size() == 1);
        this.overSpec = (OverSpec) Preconditions.checkNotNull(overSpec);
    }

    @Override // org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase
    protected Transformation<RowData> translateToPlanInternal(PlannerBase plannerBase, ExecNodeConfig execNodeConfig) {
        int i;
        KeyedProcessFunction<RowData, RowData, RowData> createBoundedOverProcessFunction;
        if (this.overSpec.getGroups().size() > 1) {
            throw new TableException("All aggregates must be computed on the same window.");
        }
        OverSpec.GroupSpec groupSpec = this.overSpec.getGroups().get(0);
        int[] fieldIndices = groupSpec.getSort().getFieldIndices();
        boolean[] ascendingOrders = groupSpec.getSort().getAscendingOrders();
        if (fieldIndices.length != 1 || ascendingOrders.length != 1) {
            throw new TableException("The window can only be ordered by a single time column.");
        }
        if (!ascendingOrders[0]) {
            throw new TableException("The window can only be ordered in ASCENDING mode.");
        }
        int[] fieldIndices2 = this.overSpec.getPartition().getFieldIndices();
        if (fieldIndices2.length > 0 && execNodeConfig.getStateRetentionTime() < 0) {
            LOG.warn("No state retention interval configured for a query which accumulates state. Please provide a query configuration with valid retention interval to prevent excessive state size. You may specify a retention time of 0 to not clean up the state.");
        }
        ExecEdge execEdge = getInputEdges().get(0);
        Transformation<?> translateToPlan = execEdge.translateToPlan(plannerBase);
        RowType rowType = (RowType) execEdge.getOutputType();
        int i2 = fieldIndices[0];
        LogicalType type = ((RowType.RowField) rowType.getFields().get(i2)).getType();
        if (LogicalTypeChecks.isRowtimeAttribute(type)) {
            i = i2;
        } else {
            if (!LogicalTypeChecks.isProctimeAttribute(type)) {
                throw new TableException("OVER windows' ordering in stream mode must be defined on a time attribute.");
            }
            i = -1;
        }
        List<RexLiteral> constants = this.overSpec.getConstants();
        ArrayList arrayList = new ArrayList(rowType.getFieldNames());
        ArrayList arrayList2 = new ArrayList(rowType.getChildren());
        IntStream.range(0, constants.size()).forEach(i3 -> {
            arrayList.add("TMP" + i3);
        });
        for (int i4 = 0; i4 < constants.size(); i4++) {
            arrayList.add("TMP" + i4);
            arrayList2.add(FlinkTypeFactory.toLogicalType(constants.get(i4).getType()));
        }
        RowType of = RowType.of((LogicalType[]) arrayList2.toArray(new LogicalType[0]), (String[]) arrayList.toArray(new String[0]));
        CodeGeneratorContext codeGeneratorContext = new CodeGeneratorContext(execNodeConfig, plannerBase.getFlinkContext().getClassLoader());
        if (groupSpec.getLowerBound().isPreceding() && groupSpec.getLowerBound().isUnbounded() && groupSpec.getUpperBound().isCurrentRow()) {
            createBoundedOverProcessFunction = createUnboundedOverProcessFunction(codeGeneratorContext, groupSpec.getAggCalls(), constants, of, rowType, i, groupSpec.isRows(), execNodeConfig, plannerBase.createRelBuilder(), plannerBase.getTypeFactory());
        } else {
            if (!groupSpec.getLowerBound().isPreceding() || groupSpec.getLowerBound().isUnbounded() || !groupSpec.getUpperBound().isCurrentRow()) {
                throw new TableException("OVER RANGE FOLLOWING windows are not supported yet.");
            }
            Object boundary = OverAggregateUtil.getBoundary(this.overSpec, groupSpec.getLowerBound());
            if (boundary instanceof BigDecimal) {
                throw new TableException("the specific value is decimal which haven not supported yet.");
            }
            createBoundedOverProcessFunction = createBoundedOverProcessFunction(codeGeneratorContext, groupSpec.getAggCalls(), constants, of, rowType, i, groupSpec.isRows(), ((-1) * ((Long) boundary).longValue()) + (groupSpec.isRows() ? 1 : 0), execNodeConfig, plannerBase.createRelBuilder(), plannerBase.getTypeFactory());
        }
        OneInputTransformation createOneInputTransformation = ExecNodeUtil.createOneInputTransformation((Transformation) translateToPlan, createTransformationMeta(OVER_AGGREGATE_TRANSFORMATION, execNodeConfig), (StreamOperator) new KeyedProcessOperator(createBoundedOverProcessFunction), (TypeInformation) InternalTypeInfo.of(getOutputType()), translateToPlan.getParallelism(), false);
        RowDataKeySelector rowDataSelector = KeySelectorUtil.getRowDataSelector(plannerBase.getFlinkContext().getClassLoader(), fieldIndices2, InternalTypeInfo.of(rowType));
        createOneInputTransformation.setStateKeySelector(rowDataSelector);
        createOneInputTransformation.setStateKeyType(rowDataSelector.getProducedType());
        return createOneInputTransformation;
    }

    private KeyedProcessFunction<RowData, RowData, RowData> createUnboundedOverProcessFunction(CodeGeneratorContext codeGeneratorContext, List<AggregateCall> list, List<RexLiteral> list2, RowType rowType, RowType rowType2, int i, boolean z, ExecNodeConfig execNodeConfig, RelBuilder relBuilder, FlinkTypeFactory flinkTypeFactory) {
        AggregateInfoList transformToStreamAggregateInfoList = AggregateUtil.transformToStreamAggregateInfoList(flinkTypeFactory, rowType, JavaScalaConversionUtil.toScala(list), new boolean[list.size()], false, true, true);
        LogicalType[] logicalTypeArr = (LogicalType[]) rowType2.getChildren().toArray(new LogicalType[0]);
        GeneratedAggsHandleFunction generateAggsHandler = new AggsHandlerCodeGenerator(codeGeneratorContext, relBuilder, JavaScalaConversionUtil.toScala(Arrays.asList(logicalTypeArr)), false).needAccumulate().withConstants(JavaScalaConversionUtil.toScala(list2)).generateAggsHandler("UnboundedOverAggregateHelper", transformToStreamAggregateInfoList);
        LogicalType[] logicalTypeArr2 = (LogicalType[]) Arrays.stream(transformToStreamAggregateInfoList.getAccTypes()).map(LogicalTypeDataTypeConverter::fromDataTypeToLogicalType).toArray(i2 -> {
            return new LogicalType[i2];
        });
        return i >= 0 ? z ? new RowTimeRowsUnboundedPrecedingFunction(execNodeConfig.getStateRetentionTime(), TableConfigUtils.getMaxIdleStateRetentionTime(execNodeConfig), generateAggsHandler, logicalTypeArr2, logicalTypeArr, i) : new RowTimeRangeUnboundedPrecedingFunction(execNodeConfig.getStateRetentionTime(), TableConfigUtils.getMaxIdleStateRetentionTime(execNodeConfig), generateAggsHandler, logicalTypeArr2, logicalTypeArr, i) : new ProcTimeUnboundedPrecedingFunction(StateConfigUtil.createTtlConfig(execNodeConfig.getStateRetentionTime()), generateAggsHandler, logicalTypeArr2);
    }

    private KeyedProcessFunction<RowData, RowData, RowData> createBoundedOverProcessFunction(CodeGeneratorContext codeGeneratorContext, List<AggregateCall> list, List<RexLiteral> list2, RowType rowType, RowType rowType2, int i, boolean z, long j, ExecNodeConfig execNodeConfig, RelBuilder relBuilder, FlinkTypeFactory flinkTypeFactory) {
        boolean[] zArr = new boolean[list.size()];
        Arrays.fill(zArr, true);
        AggregateInfoList transformToStreamAggregateInfoList = AggregateUtil.transformToStreamAggregateInfoList(flinkTypeFactory, rowType, JavaScalaConversionUtil.toScala(list), zArr, true, true, true);
        LogicalType[] logicalTypeArr = (LogicalType[]) rowType2.getChildren().toArray(new LogicalType[0]);
        GeneratedAggsHandleFunction generateAggsHandler = new AggsHandlerCodeGenerator(codeGeneratorContext, relBuilder, JavaScalaConversionUtil.toScala(Arrays.asList(logicalTypeArr)), false).needRetract().needAccumulate().withConstants(JavaScalaConversionUtil.toScala(list2)).generateAggsHandler("BoundedOverAggregateHelper", transformToStreamAggregateInfoList);
        LogicalType[] logicalTypeArr2 = (LogicalType[]) Arrays.stream(transformToStreamAggregateInfoList.getAccTypes()).map(LogicalTypeDataTypeConverter::fromDataTypeToLogicalType).toArray(i2 -> {
            return new LogicalType[i2];
        });
        return i >= 0 ? z ? new RowTimeRowsBoundedPrecedingFunction(execNodeConfig.getStateRetentionTime(), TableConfigUtils.getMaxIdleStateRetentionTime(execNodeConfig), generateAggsHandler, logicalTypeArr2, logicalTypeArr, j, i) : new RowTimeRangeBoundedPrecedingFunction(generateAggsHandler, logicalTypeArr2, logicalTypeArr, j, i) : z ? new ProcTimeRowsBoundedPrecedingFunction(execNodeConfig.getStateRetentionTime(), TableConfigUtils.getMaxIdleStateRetentionTime(execNodeConfig), generateAggsHandler, logicalTypeArr2, logicalTypeArr, j) : new ProcTimeRangeBoundedPrecedingFunction(generateAggsHandler, logicalTypeArr2, logicalTypeArr, j);
    }
}
