package org.apache.flink.table.runtime.operators.aggregate;

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.JoinedRow;
import org.apache.flink.table.dataformat.util.BaseRowUtil;
import org.apache.flink.table.runtime.dataview.PerKeyStateDataViewStore;
import org.apache.flink.table.runtime.functions.KeyedProcessFunctionWithCleanupState;
import org.apache.flink.table.runtime.generated.AggsHandleFunction;
import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction;
import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
import org.apache.flink.table.runtime.generated.RecordEqualiser;
import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.util.Collector;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/aggregate/GroupAggFunction.class */
public class GroupAggFunction extends KeyedProcessFunctionWithCleanupState<BaseRow, BaseRow, BaseRow> {
    private static final long serialVersionUID = -4767158666069797704L;
    private final GeneratedAggsHandleFunction genAggsHandler;
    private final GeneratedRecordEqualiser genRecordEqualiser;
    private final LogicalType[] accTypes;
    private final RecordCounter recordCounter;
    private final boolean generateRetraction;
    private transient JoinedRow resultRow;
    private transient AggsHandleFunction function;
    private transient RecordEqualiser equaliser;
    private transient ValueState<BaseRow> accState;

    public GroupAggFunction(long j, long j2, GeneratedAggsHandleFunction generatedAggsHandleFunction, GeneratedRecordEqualiser generatedRecordEqualiser, LogicalType[] logicalTypeArr, int i, boolean z) {
        super(j, j2);
        this.resultRow = null;
        this.function = null;
        this.equaliser = null;
        this.accState = null;
        this.genAggsHandler = generatedAggsHandleFunction;
        this.genRecordEqualiser = generatedRecordEqualiser;
        this.accTypes = logicalTypeArr;
        this.recordCounter = RecordCounter.of(i);
        this.generateRetraction = z;
    }

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        this.function = this.genAggsHandler.newInstance(getRuntimeContext().getUserCodeClassLoader());
        this.function.open(new PerKeyStateDataViewStore(getRuntimeContext()));
        this.equaliser = this.genRecordEqualiser.newInstance(getRuntimeContext().getUserCodeClassLoader());
        this.accState = getRuntimeContext().getState(new ValueStateDescriptor("accState", new BaseRowTypeInfo(this.accTypes)));
        initCleanupTimeState("GroupAggregateCleanupTime");
        this.resultRow = new JoinedRow();
    }

    public void processElement(BaseRow baseRow, KeyedProcessFunction<BaseRow, BaseRow, BaseRow>.Context context, Collector<BaseRow> collector) throws Exception {
        boolean z;
        registerProcessingCleanupTimer(context, context.timerService().currentProcessingTime());
        BaseRow baseRow2 = (BaseRow) context.getCurrentKey();
        BaseRow baseRow3 = (BaseRow) this.accState.value();
        if (null != baseRow3) {
            z = false;
        } else {
            if (BaseRowUtil.isRetractMsg(baseRow)) {
                return;
            }
            z = true;
            baseRow3 = this.function.createAccumulators();
        }
        this.function.setAccumulators(baseRow3);
        BaseRow value = this.function.getValue();
        if (BaseRowUtil.isAccumulateMsg(baseRow)) {
            this.function.accumulate(baseRow);
        } else {
            this.function.retract(baseRow);
        }
        BaseRow value2 = this.function.getValue();
        BaseRow accumulators = this.function.getAccumulators();
        if (this.recordCounter.recordCountIsZero(accumulators)) {
            if (!z) {
                this.resultRow.replace(baseRow2, value).setHeader((byte) 1);
                collector.collect(this.resultRow);
            }
            this.accState.clear();
            this.function.cleanup();
            return;
        }
        this.accState.update(accumulators);
        if (!z) {
            if (!this.stateCleaningEnabled && this.equaliser.equalsWithoutHeader(value, value2)) {
                return;
            }
            if (this.generateRetraction) {
                this.resultRow.replace(baseRow2, value).setHeader((byte) 1);
                collector.collect(this.resultRow);
            }
        }
        this.resultRow.replace(baseRow2, value2).setHeader((byte) 0);
        collector.collect(this.resultRow);
    }

    public void onTimer(long j, KeyedProcessFunction<BaseRow, BaseRow, BaseRow>.OnTimerContext onTimerContext, Collector<BaseRow> collector) throws Exception {
        if (this.stateCleaningEnabled) {
            cleanupState(this.accState);
            this.function.cleanup();
        }
    }

    public void close() throws Exception {
        if (this.function != null) {
            this.function.close();
        }
    }

    public /* bridge */ /* synthetic */ void processElement(Object obj, KeyedProcessFunction.Context context, Collector collector) throws Exception {
        processElement((BaseRow) obj, (KeyedProcessFunction<BaseRow, BaseRow, BaseRow>.Context) context, (Collector<BaseRow>) collector);
    }
}
