package org.apache.flink.table.planner.plan.nodes.exec.stream;

import java.util.Collections;
import java.util.List;
import org.apache.flink.FlinkVersion;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.codegen.EqualiserCodeGenerator;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.planner.plan.utils.AggregateUtil;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.bundle.KeyedMapBundleOperator;
import org.apache.flink.table.runtime.operators.deduplicate.ProcTimeDeduplicateKeepLastRowFunction;
import org.apache.flink.table.runtime.operators.deduplicate.ProcTimeMiniBatchDeduplicateKeepLastRowFunction;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;

@ExecNodeMetadata(name = "stream-exec-changelog-normalize", version = 1, consumedOptions = {"table.exec.mini-batch.enabled", "table.exec.mini-batch.size"}, producedTransformations = {StreamExecChangelogNormalize.CHANGELOG_NORMALIZE_TRANSFORMATION}, minPlanVersion = FlinkVersion.v1_15, minStateVersion = FlinkVersion.v1_15)
/* loaded from: input_file:flink-table-store-codegen.jar:org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecChangelogNormalize.class */
public class StreamExecChangelogNormalize extends ExecNodeBase<RowData> implements StreamExecNode<RowData>, SingleTransformationTranslator<RowData> {
    public static final String CHANGELOG_NORMALIZE_TRANSFORMATION = "changelog-normalize";
    public static final String FIELD_NAME_UNIQUE_KEYS = "uniqueKeys";
    public static final String FIELD_NAME_GENERATE_UPDATE_BEFORE = "generateUpdateBefore";

    @JsonProperty("uniqueKeys")
    private final int[] uniqueKeys;

    @JsonProperty("generateUpdateBefore")
    private final boolean generateUpdateBefore;

    public StreamExecChangelogNormalize(ReadableConfig readableConfig, int[] iArr, boolean z, InputProperty inputProperty, RowType rowType, String str) {
        this(Integer.valueOf(ExecNodeContext.newNodeId()), ExecNodeContext.newContext(StreamExecChangelogNormalize.class), ExecNodeContext.newPersistedConfig(StreamExecChangelogNormalize.class, readableConfig), iArr, z, Collections.singletonList(inputProperty), rowType, str);
    }

    @JsonCreator
    public StreamExecChangelogNormalize(@JsonProperty("id") Integer num, @JsonProperty("type") ExecNodeContext execNodeContext, @JsonProperty("configuration") ReadableConfig readableConfig, @JsonProperty("uniqueKeys") int[] iArr, @JsonProperty("generateUpdateBefore") boolean z, @JsonProperty("inputProperties") List<InputProperty> list, @JsonProperty("outputType") RowType rowType, @JsonProperty("description") String str) {
        super(num.intValue(), execNodeContext, readableConfig, list, rowType, str);
        this.uniqueKeys = iArr;
        this.generateUpdateBefore = z;
    }

    @Override // org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase
    protected Transformation<RowData> translateToPlanInternal(PlannerBase plannerBase, ExecNodeConfig execNodeConfig) {
        Transformation<?> translateToPlan = getInputEdges().get(0).translateToPlan(plannerBase);
        InternalTypeInfo outputType = translateToPlan.getOutputType();
        long stateRetentionTime = execNodeConfig.getStateRetentionTime();
        boolean booleanValue = ((Boolean) execNodeConfig.get(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED)).booleanValue();
        GeneratedRecordEqualiser generateRecordEqualiser = new EqualiserCodeGenerator(outputType.toRowType()).generateRecordEqualiser("DeduplicateRowEqualiser");
        OneInputTransformation createOneInputTransformation = ExecNodeUtil.createOneInputTransformation((Transformation) translateToPlan, createTransformationMeta(CHANGELOG_NORMALIZE_TRANSFORMATION, execNodeConfig), (StreamOperator) (booleanValue ? new KeyedMapBundleOperator(new ProcTimeMiniBatchDeduplicateKeepLastRowFunction(outputType, outputType.createSerializer(plannerBase.getExecEnv().getConfig()), stateRetentionTime, this.generateUpdateBefore, true, false, generateRecordEqualiser), AggregateUtil.createMiniBatchTrigger(execNodeConfig)) : new KeyedProcessOperator(new ProcTimeDeduplicateKeepLastRowFunction(outputType, stateRetentionTime, this.generateUpdateBefore, true, false, generateRecordEqualiser))), (TypeInformation) outputType, translateToPlan.getParallelism());
        RowDataKeySelector rowDataSelector = KeySelectorUtil.getRowDataSelector(this.uniqueKeys, outputType);
        createOneInputTransformation.setStateKeySelector(rowDataSelector);
        createOneInputTransformation.setStateKeyType(rowDataSelector.getProducedType());
        return createOneInputTransformation;
    }
}
