/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.aggregate.window.combines;

import java.time.ZoneId;
import java.util.Iterator;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.dataview.PerWindowStateDataViewStore;
import org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
import org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunction;
import org.apache.flink.table.runtime.operators.window.combines.WindowCombineFunction;
import org.apache.flink.table.runtime.operators.window.slicing.WindowTimerService;
import org.apache.flink.table.runtime.operators.window.state.StateKeyContext;
import org.apache.flink.table.runtime.operators.window.state.WindowState;
import org.apache.flink.table.runtime.operators.window.state.WindowValueState;
import org.apache.flink.table.runtime.util.StateConfigUtil;
import org.apache.flink.table.runtime.util.TimeWindowUtil;
import org.apache.flink.table.runtime.util.WindowKey;

public final class GlobalAggAccCombiner
implements WindowCombineFunction {
    private final WindowTimerService<Long> timerService;
    private final StateKeyContext keyContext;
    private final WindowValueState<Long> accState;
    private final NamespaceAggsHandleFunction<Long> localAggregator;
    private final NamespaceAggsHandleFunction<Long> globalAggregator;
    private final boolean requiresCopy;
    private final TypeSerializer<RowData> keySerializer;

    public GlobalAggAccCombiner(WindowTimerService<Long> timerService, StateKeyContext keyContext, WindowValueState<Long> accState, NamespaceAggsHandleFunction<Long> localAggregator, NamespaceAggsHandleFunction<Long> globalAggregator, boolean requiresCopy, TypeSerializer<RowData> keySerializer) {
        this.timerService = timerService;
        this.keyContext = keyContext;
        this.accState = accState;
        this.localAggregator = localAggregator;
        this.globalAggregator = globalAggregator;
        this.requiresCopy = requiresCopy;
        this.keySerializer = keySerializer;
    }

    @Override
    public void combine(WindowKey windowKey, Iterator<RowData> localAccs) throws Exception {
        RowData key = this.requiresCopy ? (RowData)this.keySerializer.copy((Object)windowKey.getKey()) : windowKey.getKey();
        this.keyContext.setCurrentKey(key);
        Long window = windowKey.getWindow();
        RowData acc = this.localAggregator.createAccumulators();
        this.localAggregator.setAccumulators(window, acc);
        while (localAccs.hasNext()) {
            RowData localAcc = localAccs.next();
            this.localAggregator.merge(window, localAcc);
        }
        RowData mergedLocalAcc = this.localAggregator.getAccumulators();
        RowData stateAcc = this.accState.value(window);
        if (stateAcc == null) {
            stateAcc = this.globalAggregator.createAccumulators();
        }
        this.globalAggregator.setAccumulators(window, stateAcc);
        this.globalAggregator.merge(window, mergedLocalAcc);
        stateAcc = this.globalAggregator.getAccumulators();
        this.accState.update(window, stateAcc);
        long currentWatermark = this.timerService.currentWatermark();
        ZoneId shiftTimeZone = this.timerService.getShiftTimeZone();
        if (!TimeWindowUtil.isWindowFired(window, currentWatermark, shiftTimeZone)) {
            this.timerService.registerEventTimeWindowTimer(window);
        }
    }

    @Override
    public void close() throws Exception {
        this.localAggregator.close();
        this.globalAggregator.close();
    }

    public static final class Factory
    implements WindowCombineFunction.Factory {
        private static final long serialVersionUID = 1L;
        private final GeneratedNamespaceAggsHandleFunction<Long> genLocalAggsHandler;
        private final GeneratedNamespaceAggsHandleFunction<Long> genGlobalAggsHandler;
        private final TypeSerializer<RowData> keySerializer;

        public Factory(GeneratedNamespaceAggsHandleFunction<Long> genLocalAggsHandler, GeneratedNamespaceAggsHandleFunction<Long> genGlobalAggsHandler, TypeSerializer<RowData> keySerializer) {
            this.genLocalAggsHandler = genLocalAggsHandler;
            this.genGlobalAggsHandler = genGlobalAggsHandler;
            this.keySerializer = keySerializer;
        }

        @Override
        public WindowCombineFunction create(RuntimeContext runtimeContext, WindowTimerService<Long> timerService, KeyedStateBackend<RowData> stateBackend, WindowState<Long> windowState, boolean isEventTime) throws Exception {
            NamespaceAggsHandleFunction localAggregator = (NamespaceAggsHandleFunction)this.genLocalAggsHandler.newInstance(runtimeContext.getUserCodeClassLoader());
            NamespaceAggsHandleFunction globalAggregator = (NamespaceAggsHandleFunction)this.genGlobalAggsHandler.newInstance(runtimeContext.getUserCodeClassLoader());
            localAggregator.open(new PerWindowStateDataViewStore(stateBackend, (TypeSerializer<?>)LongSerializer.INSTANCE, runtimeContext));
            globalAggregator.open(new PerWindowStateDataViewStore(stateBackend, (TypeSerializer<?>)LongSerializer.INSTANCE, runtimeContext));
            boolean requiresCopy = !StateConfigUtil.isStateImmutableInStateBackend(stateBackend);
            WindowValueState windowValueState = (WindowValueState)windowState;
            return new GlobalAggAccCombiner(timerService, arg_0 -> stateBackend.setCurrentKey(arg_0), windowValueState, localAggregator, globalAggregator, requiresCopy, this.keySerializer);
        }
    }
}

