/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nemo.compiler.frontend.spark.transform;

import java.util.Iterator;
import javax.annotation.Nullable;
import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.ir.vertex.transform.Transform;
import org.apache.nemo.common.punctuation.Watermark;
import org.apache.spark.api.java.function.Function2;

public final class ReduceTransform<T>
implements Transform<T, T> {
    private final Function2<T, T, T> func;
    private OutputCollector<T> outputCollector;
    private T result;

    public ReduceTransform(Function2<T, T, T> func) {
        this.func = func;
    }

    public void prepare(Transform.Context context, OutputCollector<T> oc) {
        this.outputCollector = oc;
        this.result = null;
    }

    public void onData(T element) {
        if (element == null) {
            return;
        }
        try {
            if (this.result == null) {
                this.result = element;
            }
            this.result = this.func.call(this.result, element);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        this.outputCollector.emit(this.result);
    }

    public void onWatermark(Watermark watermark) {
        this.outputCollector.emitWatermark(watermark);
    }

    @Nullable
    public static <T> T reduceIterator(Iterator<T> elements, Function2<T, T, T> func) {
        if (!elements.hasNext()) {
            return null;
        }
        Object res = elements.next();
        while (elements.hasNext()) {
            try {
                res = func.call(res, elements.next());
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        return res;
    }

    public void close() {
    }
}

