package org.apache.flink.streaming.api.invokable.operator;

import java.util.HashMap;
import java.util.Map;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;

/* loaded from: input_file:org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.class */
public class GroupedReduceInvokable<IN> extends StreamReduceInvokable<IN> {
    private static final long serialVersionUID = 1;
    private KeySelector<IN, ?> keySelector;
    private Map<Object, IN> values;
    private IN reduced;

    public GroupedReduceInvokable(ReduceFunction<IN> reduceFunction, KeySelector<IN, ?> keySelector) {
        super(reduceFunction);
        this.keySelector = keySelector;
        this.values = new HashMap();
    }

    @Override // org.apache.flink.streaming.api.invokable.operator.StreamReduceInvokable
    protected void reduce() throws Exception {
        Object key = this.nextRecord.getKey(this.keySelector);
        this.currentValue = this.values.get(key);
        this.nextValue = this.nextRecord.getObject();
        if (this.currentValue == null) {
            this.values.put(key, this.nextValue);
            this.collector.collect(this.nextValue);
        } else {
            callUserFunctionAndLogException();
            this.values.put(key, this.reduced);
            this.collector.collect(this.reduced);
        }
    }

    @Override // org.apache.flink.streaming.api.invokable.operator.StreamReduceInvokable, org.apache.flink.streaming.api.invokable.StreamInvokable
    protected void callUserFunction() throws Exception {
        this.reduced = (IN) this.reducer.reduce(this.currentValue, this.nextValue);
    }
}
