/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.windowing;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.runtime.operators.windowing.AbstractKeyedTimePanes;
import org.apache.flink.streaming.runtime.operators.windowing.KeyMap;
import org.apache.flink.util.Collector;

@Internal
public class AggregatingKeyedTimePanes<Type, Key>
extends AbstractKeyedTimePanes<Type, Key, Type, Type> {
    private final KeySelector<Type, Key> keySelector;
    private final ReduceFunction<Type> reducer;
    private long evaluationPass = 1L;

    public AggregatingKeyedTimePanes(KeySelector<Type, Key> keySelector, ReduceFunction<Type> reducer) {
        this.keySelector = keySelector;
        this.reducer = reducer;
    }

    @Override
    public void addElementToLatestPane(Type element) throws Exception {
        Object k = this.keySelector.getKey(element);
        this.latestPane.putOrAggregate(k, element, this.reducer);
    }

    @Override
    public void evaluateWindow(Collector<Type> out, TimeWindow window, AbstractStreamOperator<Type> operator) throws Exception {
        if (this.previousPanes.isEmpty()) {
            for (KeyMap.Entry entry : this.latestPane) {
                out.collect(entry.getValue());
            }
        } else {
            AggregatingTraversal evaluator = new AggregatingTraversal(this.reducer, out, operator);
            this.traverseAllPanes(evaluator, this.evaluationPass);
        }
        ++this.evaluationPass;
    }

    static final class AggregatingTraversal<Key, Type>
    implements KeyMap.TraversalEvaluator<Key, Type> {
        private final ReduceFunction<Type> function;
        private final Collector<Type> out;
        private final AbstractStreamOperator<Type> operator;
        private Type currentValue;

        AggregatingTraversal(ReduceFunction<Type> function, Collector<Type> out, AbstractStreamOperator<Type> operator) {
            this.function = function;
            this.out = out;
            this.operator = operator;
        }

        @Override
        public void startNewKey(Key key) {
            this.currentValue = null;
            this.operator.setCurrentKey(key);
        }

        @Override
        public void nextValue(Type value) throws Exception {
            this.currentValue = this.currentValue != null ? this.function.reduce(this.currentValue, value) : value;
        }

        @Override
        public void keyDone() throws Exception {
            this.out.collect(this.currentValue);
        }
    }
}

