package com.twitter.heron.dsl.impl.operators;

import com.twitter.heron.api.bolt.OutputCollector;
import com.twitter.heron.api.topology.TopologyContext;
import com.twitter.heron.api.tuple.Tuple;
import com.twitter.heron.api.tuple.Values;
import com.twitter.heron.api.windowing.TupleWindow;
import com.twitter.heron.dsl.KeyValue;
import com.twitter.heron.dsl.SerializableBinaryOperator;
import com.twitter.heron.dsl.Window;
import java.util.Iterator;
import java.util.Map;

/* loaded from: input_file:com/twitter/heron/dsl/impl/operators/ReduceByWindowOperator.class */
public class ReduceByWindowOperator<I> extends DslWindowOperator {
    private static final long serialVersionUID = 6513775685209414130L;
    private SerializableBinaryOperator<I> reduceFn;
    private OutputCollector collector;

    public ReduceByWindowOperator(SerializableBinaryOperator<I> serializableBinaryOperator) {
        this.reduceFn = serializableBinaryOperator;
    }

    @Override // com.twitter.heron.api.bolt.BaseWindowedBolt, com.twitter.heron.api.bolt.IWindowedBolt
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
    }

    @Override // com.twitter.heron.api.bolt.IWindowedBolt
    public void execute(TupleWindow tupleWindow) {
        Object obj = null;
        Iterator<Tuple> it = tupleWindow.get().iterator();
        while (it.hasNext()) {
            Object value = it.next().getValue(0);
            obj = obj == null ? value : this.reduceFn.apply(obj, value);
        }
        this.collector.emit(new Values(new KeyValue(new Window(tupleWindow.getStartTimestamp() == null ? 0L : tupleWindow.getStartTimestamp().longValue(), tupleWindow.getEndTimestamp() == null ? 0L : tupleWindow.getEndTimestamp().longValue(), tupleWindow.get().size()), obj)));
    }
}
