package org.apache.flink.table.planner.plan.nodes.exec.stream;

import java.util.Collections;
import java.util.List;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.codegen.EqualiserCodeGenerator;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.bundle.KeyedMapBundleOperator;
import org.apache.flink.table.runtime.operators.bundle.trigger.CountBundleTrigger;
import org.apache.flink.table.runtime.operators.deduplicate.ProcTimeDeduplicateKeepFirstRowFunction;
import org.apache.flink.table.runtime.operators.deduplicate.ProcTimeDeduplicateKeepLastRowFunction;
import org.apache.flink.table.runtime.operators.deduplicate.ProcTimeMiniBatchDeduplicateKeepFirstRowFunction;
import org.apache.flink.table.runtime.operators.deduplicate.ProcTimeMiniBatchDeduplicateKeepLastRowFunction;
import org.apache.flink.table.runtime.operators.deduplicate.RowTimeDeduplicateFunction;
import org.apache.flink.table.runtime.operators.deduplicate.RowTimeMiniBatchDeduplicateFunction;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.typeutils.TypeCheckUtils;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;

@JsonIgnoreProperties(ignoreUnknown = true)
/* loaded from: input_file:org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDeduplicate.class */
public class StreamExecDeduplicate extends ExecNodeBase<RowData> implements StreamExecNode<RowData>, SingleTransformationTranslator<RowData> {
    public static final String FIELD_NAME_UNIQUE_KEYS = "uniqueKeys";
    public static final String FIELD_NAME_IS_ROWTIME = "isRowtime";
    public static final String FIELD_NAME_KEEP_LAST_ROW = "keepLastRow";
    public static final String FIELD_NAME_GENERATE_UPDATE_BEFORE = "generateUpdateBefore";

    @Experimental
    public static final ConfigOption<Boolean> TABLE_EXEC_INSERT_AND_UPDATE_AFTER_SENSITIVE = ConfigOptions.key("table.exec.insert-and-updateafter-sensitive").booleanType().defaultValue(true).withDescription("Set whether the job (especially the sinks) is sensitive to INSERT messages and UPDATE_AFTER messages. If false, Flink may send UPDATE_AFTER instead of INSERT for the first row at some times (e.g. deduplication for last row). If true, Flink will guarantee to send INSERT for the first row, but there will be additional overhead.Default is true.");

    @JsonProperty("uniqueKeys")
    private final int[] uniqueKeys;

    @JsonProperty("isRowtime")
    private final boolean isRowtime;

    @JsonProperty(FIELD_NAME_KEEP_LAST_ROW)
    private final boolean keepLastRow;

    @JsonProperty("generateUpdateBefore")
    private final boolean generateUpdateBefore;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDeduplicate$DeduplicateOperatorTranslator.class */
    public static abstract class DeduplicateOperatorTranslator {
        private final TableConfig tableConfig;
        protected final InternalTypeInfo<RowData> rowTypeInfo;
        protected final TypeSerializer<RowData> typeSerializer;
        protected final boolean keepLastRow;
        protected final boolean generateUpdateBefore;

        protected DeduplicateOperatorTranslator(TableConfig tableConfig, InternalTypeInfo<RowData> internalTypeInfo, TypeSerializer<RowData> typeSerializer, boolean z, boolean z2) {
            this.tableConfig = tableConfig;
            this.rowTypeInfo = internalTypeInfo;
            this.typeSerializer = typeSerializer;
            this.keepLastRow = z;
            this.generateUpdateBefore = z2;
        }

        protected boolean generateInsert() {
            return this.tableConfig.getConfiguration().getBoolean(StreamExecDeduplicate.TABLE_EXEC_INSERT_AND_UPDATE_AFTER_SENSITIVE);
        }

        protected boolean isMiniBatchEnabled() {
            return this.tableConfig.getConfiguration().getBoolean(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED);
        }

        protected long getMinRetentionTime() {
            return this.tableConfig.getMinIdleStateRetentionTime();
        }

        protected long getMiniBatchSize() {
            if (!isMiniBatchEnabled()) {
                return -1L;
            }
            long j = this.tableConfig.getConfiguration().getLong(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE);
            Preconditions.checkArgument(j > 0, ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE.key() + " should be greater than 0.");
            return j;
        }

        abstract OneInputStreamOperator<RowData, RowData> createDeduplicateOperator();
    }

    /* loaded from: input_file:org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDeduplicate$ProcTimeDeduplicateOperatorTranslator.class */
    private static class ProcTimeDeduplicateOperatorTranslator extends DeduplicateOperatorTranslator {
        private final GeneratedRecordEqualiser generatedEqualiser;

        protected ProcTimeDeduplicateOperatorTranslator(TableConfig tableConfig, InternalTypeInfo<RowData> internalTypeInfo, TypeSerializer<RowData> typeSerializer, RowType rowType, boolean z, boolean z2) {
            super(tableConfig, internalTypeInfo, typeSerializer, z, z2);
            this.generatedEqualiser = new EqualiserCodeGenerator(rowType).generateRecordEqualiser("DeduplicateRowEqualiser");
        }

        @Override // org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecDeduplicate.DeduplicateOperatorTranslator
        OneInputStreamOperator<RowData, RowData> createDeduplicateOperator() {
            if (!isMiniBatchEnabled()) {
                return this.keepLastRow ? new KeyedProcessOperator(new ProcTimeDeduplicateKeepLastRowFunction(this.rowTypeInfo, getMinRetentionTime(), this.generateUpdateBefore, generateInsert(), true, this.generatedEqualiser)) : new KeyedProcessOperator(new ProcTimeDeduplicateKeepFirstRowFunction(getMinRetentionTime()));
            }
            CountBundleTrigger countBundleTrigger = new CountBundleTrigger(getMiniBatchSize());
            return this.keepLastRow ? new KeyedMapBundleOperator(new ProcTimeMiniBatchDeduplicateKeepLastRowFunction(this.rowTypeInfo, this.typeSerializer, getMinRetentionTime(), this.generateUpdateBefore, generateInsert(), true, this.generatedEqualiser), countBundleTrigger) : new KeyedMapBundleOperator(new ProcTimeMiniBatchDeduplicateKeepFirstRowFunction(this.typeSerializer, getMinRetentionTime()), countBundleTrigger);
        }
    }

    /* loaded from: input_file:org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDeduplicate$RowtimeDeduplicateOperatorTranslator.class */
    private static class RowtimeDeduplicateOperatorTranslator extends DeduplicateOperatorTranslator {
        private final RowType inputRowType;

        protected RowtimeDeduplicateOperatorTranslator(TableConfig tableConfig, InternalTypeInfo<RowData> internalTypeInfo, TypeSerializer<RowData> typeSerializer, RowType rowType, boolean z, boolean z2) {
            super(tableConfig, internalTypeInfo, typeSerializer, z, z2);
            this.inputRowType = rowType;
        }

        @Override // org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecDeduplicate.DeduplicateOperatorTranslator
        OneInputStreamOperator<RowData, RowData> createDeduplicateOperator() {
            int i = -1;
            int i2 = 0;
            while (true) {
                if (i2 >= this.inputRowType.getFieldCount()) {
                    break;
                }
                if (TypeCheckUtils.isRowTime(this.inputRowType.getTypeAt(i2))) {
                    i = i2;
                    break;
                }
                i2++;
            }
            Preconditions.checkArgument(i >= 0);
            if (isMiniBatchEnabled()) {
                return new KeyedMapBundleOperator(new RowTimeMiniBatchDeduplicateFunction(this.rowTypeInfo, this.typeSerializer, getMinRetentionTime(), i, this.generateUpdateBefore, generateInsert(), this.keepLastRow), new CountBundleTrigger(getMiniBatchSize()));
            }
            return new KeyedProcessOperator(new RowTimeDeduplicateFunction(this.rowTypeInfo, getMinRetentionTime(), i, this.generateUpdateBefore, generateInsert(), this.keepLastRow));
        }
    }

    public StreamExecDeduplicate(int[] iArr, boolean z, boolean z2, boolean z3, InputProperty inputProperty, RowType rowType, String str) {
        this(iArr, z, z2, z3, getNewNodeId(), Collections.singletonList(inputProperty), rowType, str);
    }

    @JsonCreator
    public StreamExecDeduplicate(@JsonProperty("uniqueKeys") int[] iArr, @JsonProperty("isRowtime") boolean z, @JsonProperty("keepLastRow") boolean z2, @JsonProperty("generateUpdateBefore") boolean z3, @JsonProperty("id") int i, @JsonProperty("inputProperties") List<InputProperty> list, @JsonProperty("outputType") RowType rowType, @JsonProperty("description") String str) {
        super(i, list, rowType, str);
        Preconditions.checkArgument(list.size() == 1);
        this.uniqueKeys = (int[]) Preconditions.checkNotNull(iArr);
        this.isRowtime = z;
        this.keepLastRow = z2;
        this.generateUpdateBefore = z3;
    }

    @Override // org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase
    protected Transformation<RowData> translateToPlanInternal(PlannerBase plannerBase) {
        ExecEdge execEdge = getInputEdges().get(0);
        Transformation<?> translateToPlan = execEdge.translateToPlan(plannerBase);
        RowType rowType = (RowType) execEdge.getOutputType();
        InternalTypeInfo internalTypeInfo = (InternalTypeInfo) translateToPlan.getOutputType();
        TypeSerializer createSerializer = internalTypeInfo.createSerializer(plannerBase.getExecEnv().getConfig());
        OneInputTransformation oneInputTransformation = new OneInputTransformation(translateToPlan, getDescription(), this.isRowtime ? new RowtimeDeduplicateOperatorTranslator(plannerBase.getTableConfig(), internalTypeInfo, createSerializer, rowType, this.keepLastRow, this.generateUpdateBefore).createDeduplicateOperator() : new ProcTimeDeduplicateOperatorTranslator(plannerBase.getTableConfig(), internalTypeInfo, createSerializer, rowType, this.keepLastRow, this.generateUpdateBefore).createDeduplicateOperator(), internalTypeInfo, translateToPlan.getParallelism());
        RowDataKeySelector rowDataSelector = KeySelectorUtil.getRowDataSelector(this.uniqueKeys, internalTypeInfo);
        oneInputTransformation.setStateKeySelector(rowDataSelector);
        oneInputTransformation.setStateKeyType(rowDataSelector.mo5567getProducedType());
        return oneInputTransformation;
    }
}
