/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.windowing;

import java.util.ArrayList;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.AbstractKeyedTimePanes;
import org.apache.flink.streaming.runtime.operators.windowing.KeyMap;
import org.apache.flink.util.Collector;
import org.apache.flink.util.UnionIterator;

@Internal
public class AccumulatingKeyedTimePanes<Type, Key, Result>
extends AbstractKeyedTimePanes<Type, Key, ArrayList<Type>, Result> {
    private final KeySelector<Type, Key> keySelector;
    private final KeyMap.LazyFactory<ArrayList<Type>> listFactory = AccumulatingKeyedTimePanes.getListFactory();
    private final WindowFunction<Type, Result, Key, Window> function;
    private long evaluationPass = 1L;
    private static final KeyMap.LazyFactory<?> LIST_FACTORY = new KeyMap.LazyFactory<ArrayList<?>>(){

        @Override
        public ArrayList<?> create() {
            return new ArrayList(4);
        }
    };

    public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, WindowFunction<Type, Result, Key, Window> function) {
        this.keySelector = keySelector;
        this.function = function;
    }

    @Override
    public void addElementToLatestPane(Type element) throws Exception {
        Object k = this.keySelector.getKey(element);
        ArrayList<Type> elements = this.latestPane.putIfAbsent(k, this.listFactory);
        elements.add(element);
    }

    @Override
    public void evaluateWindow(Collector<Result> out, TimeWindow window, AbstractStreamOperator<Result> operator) throws Exception {
        if (this.previousPanes.isEmpty()) {
            for (KeyMap.Entry entry : this.latestPane) {
                Object key = entry.getKey();
                operator.setKeyContext(key);
                this.function.apply(entry.getKey(), window, (Iterable)entry.getValue(), out);
            }
        } else {
            WindowFunctionTraversal<Key, Type, Result> evaluator = new WindowFunctionTraversal<Key, Type, Result>(this.function, window, out, operator);
            this.traverseAllPanes(evaluator, this.evaluationPass);
        }
        ++this.evaluationPass;
    }

    private static <V> KeyMap.LazyFactory<ArrayList<V>> getListFactory() {
        return LIST_FACTORY;
    }

    static final class WindowFunctionTraversal<Key, Type, Result>
    implements KeyMap.TraversalEvaluator<Key, ArrayList<Type>> {
        private final WindowFunction<Type, Result, Key, Window> function;
        private final UnionIterator<Type> unionIterator;
        private final Collector<Result> out;
        private final TimeWindow window;
        private final AbstractStreamOperator<Result> contextOperator;
        private Key currentKey;

        WindowFunctionTraversal(WindowFunction<Type, Result, Key, Window> function, TimeWindow window, Collector<Result> out, AbstractStreamOperator<Result> contextOperator) {
            this.function = function;
            this.out = out;
            this.unionIterator = new UnionIterator();
            this.window = window;
            this.contextOperator = contextOperator;
        }

        @Override
        public void startNewKey(Key key) {
            this.unionIterator.clear();
            this.currentKey = key;
        }

        @Override
        public void nextValue(ArrayList<Type> value) {
            this.unionIterator.addList(value);
        }

        @Override
        public void keyDone() throws Exception {
            this.contextOperator.setKeyContext(this.currentKey);
            this.function.apply(this.currentKey, this.window, (Iterable<Type>)this.unionIterator, this.out);
        }
    }
}

