/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nemo.compiler.frontend.spark.transform;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.ir.vertex.transform.NoWatermarkEmitTransform;
import org.apache.nemo.common.ir.vertex.transform.Transform;
import org.apache.nemo.compiler.frontend.spark.transform.ReduceTransform;
import org.apache.spark.api.java.function.Function2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

public final class ReduceByKeyTransform<K, V>
extends NoWatermarkEmitTransform<Tuple2<K, V>, Tuple2<K, V>> {
    private static final Logger LOG = LoggerFactory.getLogger((String)ReduceByKeyTransform.class.getName());
    private final Map<K, List<V>> keyToValues;
    private final Function2<V, V, V> func;
    private OutputCollector<Tuple2<K, V>> outputCollector;

    public ReduceByKeyTransform(Function2<V, V, V> func) {
        this.func = func;
        this.keyToValues = new HashMap<K, List<V>>();
    }

    public void prepare(Transform.Context context, OutputCollector<Tuple2<K, V>> oc) {
        this.outputCollector = oc;
    }

    public void onData(Tuple2<K, V> element) {
        Object key = element._1;
        Object value = element._2;
        this.keyToValues.putIfAbsent(key, new ArrayList());
        this.keyToValues.get(key).add(value);
    }

    public void close() {
        if (this.keyToValues.isEmpty()) {
            LOG.warn("Spark ReduceByKeyTransform received no data!");
        } else {
            this.keyToValues.entrySet().stream().map(entry -> {
                Object value = ReduceTransform.reduceIterator(((List)entry.getValue()).iterator(), this.func);
                return new Tuple2(entry.getKey(), value);
            }).forEach(arg_0 -> this.outputCollector.emit(arg_0));
            this.keyToValues.clear();
        }
    }
}

