package org.apache.flink.table.runtime.operators.aggregate;

import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.JoinedRow;
import org.apache.flink.table.runtime.context.ExecutionContext;
import org.apache.flink.table.runtime.dataview.PerKeyStateDataViewStore;
import org.apache.flink.table.runtime.generated.AggsHandleFunction;
import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction;
import org.apache.flink.table.runtime.operators.bundle.MapBundleFunction;
import org.apache.flink.util.Collector;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/aggregate/MiniBatchIncrementalGroupAggFunction.class */
public class MiniBatchIncrementalGroupAggFunction extends MapBundleFunction<BaseRow, BaseRow, BaseRow, BaseRow> {
    private static final long serialVersionUID = 1;
    private final GeneratedAggsHandleFunction genPartialAggsHandler;
    private final GeneratedAggsHandleFunction genFinalAggsHandler;
    private final KeySelector<BaseRow, BaseRow> finalKeySelector;
    private transient JoinedRow resultRow = new JoinedRow();
    private transient AggsHandleFunction partialAgg = null;
    private transient AggsHandleFunction finalAgg = null;

    public MiniBatchIncrementalGroupAggFunction(GeneratedAggsHandleFunction generatedAggsHandleFunction, GeneratedAggsHandleFunction generatedAggsHandleFunction2, KeySelector<BaseRow, BaseRow> keySelector) {
        this.genPartialAggsHandler = generatedAggsHandleFunction;
        this.genFinalAggsHandler = generatedAggsHandleFunction2;
        this.finalKeySelector = keySelector;
    }

    @Override // org.apache.flink.table.runtime.operators.bundle.MapBundleFunction
    public void open(ExecutionContext executionContext) throws Exception {
        super.open(executionContext);
        ClassLoader userCodeClassLoader = executionContext.getRuntimeContext().getUserCodeClassLoader();
        this.partialAgg = this.genPartialAggsHandler.newInstance(userCodeClassLoader);
        this.partialAgg.open(new PerKeyStateDataViewStore(executionContext.getRuntimeContext()));
        this.finalAgg = this.genFinalAggsHandler.newInstance(userCodeClassLoader);
        this.finalAgg.open(new PerKeyStateDataViewStore(executionContext.getRuntimeContext()));
        this.resultRow = new JoinedRow();
    }

    @Override // org.apache.flink.table.runtime.operators.bundle.MapBundleFunction
    public BaseRow addInput(@Nullable BaseRow baseRow, BaseRow baseRow2) throws Exception {
        this.partialAgg.setAccumulators(baseRow == null ? this.partialAgg.createAccumulators() : baseRow);
        this.partialAgg.merge(baseRow2);
        return this.partialAgg.getAccumulators();
    }

    @Override // org.apache.flink.table.runtime.operators.bundle.MapBundleFunction
    public void finishBundle(Map<BaseRow, BaseRow> map, Collector<BaseRow> collector) throws Exception {
        HashMap hashMap = new HashMap();
        for (Map.Entry<BaseRow, BaseRow> entry : map.entrySet()) {
            BaseRow key = entry.getKey();
            BaseRow baseRow = (BaseRow) this.finalKeySelector.getKey(key);
            ((Map) hashMap.computeIfAbsent(baseRow, baseRow2 -> {
                return new HashMap();
            })).put(key, entry.getValue());
        }
        for (Map.Entry entry2 : hashMap.entrySet()) {
            BaseRow baseRow3 = (BaseRow) entry2.getKey();
            Map map2 = (Map) entry2.getValue();
            this.finalAgg.resetAccumulators();
            for (Map.Entry entry3 : map2.entrySet()) {
                BaseRow baseRow4 = (BaseRow) entry3.getKey();
                BaseRow baseRow5 = (BaseRow) entry3.getValue();
                this.ctx.setCurrentKey(baseRow4);
                this.finalAgg.merge(baseRow5);
            }
            this.resultRow.replace(baseRow3, this.finalAgg.getAccumulators());
            collector.collect(this.resultRow);
        }
        hashMap.clear();
    }

    @Override // org.apache.flink.table.runtime.operators.bundle.MapBundleFunction
    public void close() throws Exception {
        if (this.partialAgg != null) {
            this.partialAgg.close();
        }
        if (this.finalAgg != null) {
            this.finalAgg.close();
        }
    }
}
