package org.apache.flink.table.planner.plan.nodes.exec.stream;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.tools.RelBuilder;
import org.apache.flink.FlinkVersion;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
import org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
import org.apache.flink.table.planner.plan.utils.AggregateUtil;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.aggregate.MiniBatchIncrementalGroupAggFunction;
import org.apache.flink.table.runtime.operators.bundle.KeyedMapBundleOperator;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;

@ExecNodeMetadata(name = "stream-exec-incremental-group-aggregate", version = 1, consumedOptions = {"table.exec.mini-batch.enabled", "table.exec.mini-batch.size"}, producedTransformations = {StreamExecIncrementalGroupAggregate.INCREMENTAL_GROUP_AGGREGATE_TRANSFORMATION}, minPlanVersion = FlinkVersion.v1_15, minStateVersion = FlinkVersion.v1_15)
/* loaded from: input_file:flink-table-store-codegen.jar:org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIncrementalGroupAggregate.class */
public class StreamExecIncrementalGroupAggregate extends StreamExecAggregateBase {
    public static final String INCREMENTAL_GROUP_AGGREGATE_TRANSFORMATION = "incremental-group-aggregate";
    public static final String FIELD_NAME_PARTIAL_AGG_GROUPING = "partialAggGrouping";
    public static final String FIELD_NAME_FINAL_AGG_GROUPING = "finalAggGrouping";
    public static final String FIELD_NAME_PARTIAL_ORIGINAL_AGG_CALLS = "partialOriginalAggCalls";
    public static final String FIELD_NAME_PARTIAL_AGG_CALL_NEED_RETRACTIONS = "partialAggCallNeedRetractions";
    public static final String FIELD_NAME_PARTIAL_LOCAL_AGG_INPUT_TYPE = "partialLocalAggInputRowType";
    public static final String FIELD_NAME_PARTIAL_AGG_NEED_RETRACTION = "partialAggNeedRetraction";

    @JsonProperty(FIELD_NAME_PARTIAL_AGG_GROUPING)
    private final int[] partialAggGrouping;

    @JsonProperty(FIELD_NAME_FINAL_AGG_GROUPING)
    private final int[] finalAggGrouping;

    @JsonProperty(FIELD_NAME_PARTIAL_ORIGINAL_AGG_CALLS)
    private final AggregateCall[] partialOriginalAggCalls;

    @JsonProperty(FIELD_NAME_PARTIAL_AGG_CALL_NEED_RETRACTIONS)
    private final boolean[] partialAggCallNeedRetractions;

    @JsonProperty(FIELD_NAME_PARTIAL_LOCAL_AGG_INPUT_TYPE)
    private final RowType partialLocalAggInputType;

    @JsonProperty(FIELD_NAME_PARTIAL_AGG_NEED_RETRACTION)
    private final boolean partialAggNeedRetraction;

    public StreamExecIncrementalGroupAggregate(ReadableConfig readableConfig, int[] iArr, int[] iArr2, AggregateCall[] aggregateCallArr, boolean[] zArr, RowType rowType, boolean z, InputProperty inputProperty, RowType rowType2, String str) {
        this(ExecNodeContext.newNodeId(), ExecNodeContext.newContext(StreamExecIncrementalGroupAggregate.class), ExecNodeContext.newPersistedConfig(StreamExecIncrementalGroupAggregate.class, readableConfig), iArr, iArr2, aggregateCallArr, zArr, rowType, z, Collections.singletonList(inputProperty), rowType2, str);
    }

    @JsonCreator
    public StreamExecIncrementalGroupAggregate(@JsonProperty("id") int i, @JsonProperty("type") ExecNodeContext execNodeContext, @JsonProperty("configuration") ReadableConfig readableConfig, @JsonProperty("partialAggGrouping") int[] iArr, @JsonProperty("finalAggGrouping") int[] iArr2, @JsonProperty("partialOriginalAggCalls") AggregateCall[] aggregateCallArr, @JsonProperty("partialAggCallNeedRetractions") boolean[] zArr, @JsonProperty("partialLocalAggInputRowType") RowType rowType, @JsonProperty("partialAggNeedRetraction") boolean z, @JsonProperty("inputProperties") List<InputProperty> list, @JsonProperty("outputType") RowType rowType2, @JsonProperty("description") String str) {
        super(i, execNodeContext, readableConfig, list, rowType2, str);
        this.partialAggGrouping = (int[]) Preconditions.checkNotNull(iArr);
        this.finalAggGrouping = (int[]) Preconditions.checkNotNull(iArr2);
        this.partialOriginalAggCalls = (AggregateCall[]) Preconditions.checkNotNull(aggregateCallArr);
        this.partialAggCallNeedRetractions = (boolean[]) Preconditions.checkNotNull(zArr);
        Preconditions.checkArgument(aggregateCallArr.length == zArr.length);
        this.partialLocalAggInputType = (RowType) Preconditions.checkNotNull(rowType);
        this.partialAggNeedRetraction = z;
    }

    @Override // org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase
    protected Transformation<RowData> translateToPlanInternal(PlannerBase plannerBase, ExecNodeConfig execNodeConfig) {
        ExecEdge execEdge = getInputEdges().get(0);
        Transformation<?> translateToPlan = execEdge.translateToPlan(plannerBase);
        AggregateInfoList createPartialAggInfoList = AggregateUtil.createPartialAggInfoList(this.partialLocalAggInputType, JavaScalaConversionUtil.toScala(Arrays.asList(this.partialOriginalAggCalls)), this.partialAggCallNeedRetractions, this.partialAggNeedRetraction, false);
        GeneratedAggsHandleFunction generateAggsHandler = generateAggsHandler("PartialGroupAggsHandler", createPartialAggInfoList, this.partialAggGrouping.length, createPartialAggInfoList.getAccTypes(), execNodeConfig, plannerBase.getRelBuilder(), true);
        GeneratedAggsHandleFunction generateAggsHandler2 = generateAggsHandler("FinalGroupAggsHandler", AggregateUtil.createIncrementalAggInfoList(this.partialLocalAggInputType, JavaScalaConversionUtil.toScala(Arrays.asList(this.partialOriginalAggCalls)), this.partialAggCallNeedRetractions, this.partialAggNeedRetraction), 0, createPartialAggInfoList.getAccTypes(), execNodeConfig, plannerBase.getRelBuilder(), false);
        RowDataKeySelector rowDataSelector = KeySelectorUtil.getRowDataSelector(this.partialAggGrouping, InternalTypeInfo.of(execEdge.getOutputType()));
        OneInputTransformation createOneInputTransformation = ExecNodeUtil.createOneInputTransformation((Transformation) translateToPlan, createTransformationMeta(INCREMENTAL_GROUP_AGGREGATE_TRANSFORMATION, execNodeConfig), (StreamOperator) new KeyedMapBundleOperator(new MiniBatchIncrementalGroupAggFunction(generateAggsHandler, generateAggsHandler2, KeySelectorUtil.getRowDataSelector(this.finalAggGrouping, rowDataSelector.getProducedType()), execNodeConfig.getStateRetentionTime()), AggregateUtil.createMiniBatchTrigger(execNodeConfig)), (TypeInformation) InternalTypeInfo.of(getOutputType()), translateToPlan.getParallelism());
        createOneInputTransformation.setStateKeySelector(rowDataSelector);
        createOneInputTransformation.setStateKeyType(rowDataSelector.getProducedType());
        return createOneInputTransformation;
    }

    private GeneratedAggsHandleFunction generateAggsHandler(String str, AggregateInfoList aggregateInfoList, int i, DataType[] dataTypeArr, ExecNodeConfig execNodeConfig, RelBuilder relBuilder, boolean z) {
        return new AggsHandlerCodeGenerator(new CodeGeneratorContext(execNodeConfig.getTableConfig()), relBuilder, JavaScalaConversionUtil.toScala(this.partialLocalAggInputType.getChildren()), z).needMerge(i, true, dataTypeArr).generateAggsHandler(str, aggregateInfoList);
    }
}
