package com.datatorrent.lib.math;

import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.StreamCodec;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.datatorrent.lib.util.BaseNumberKeyValueOperator;
import com.datatorrent.lib.util.KeyValPair;
import java.lang.Number;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.commons.lang.mutable.MutableDouble;

/* loaded from: input_file:com/datatorrent/lib/math/SumKeyVal.class */
public class SumKeyVal<K, V extends Number> extends BaseNumberKeyValueOperator<K, V> {
    protected HashMap<K, SumEntry> sums = new HashMap<>();
    protected boolean cumulative = false;
    public final transient DefaultInputPort<KeyValPair<K, V>> data = (DefaultInputPort<KeyValPair<K, V>>) new DefaultInputPort<KeyValPair<K, V>>() { // from class: com.datatorrent.lib.math.SumKeyVal.1
        public void process(KeyValPair<K, V> keyValPair) {
            K key = keyValPair.getKey();
            if (SumKeyVal.this.doprocessKey(key)) {
                SumEntry sumEntry = SumKeyVal.this.sums.get(key);
                if (sumEntry == null) {
                    sumEntry = new SumEntry(new MutableDouble(keyValPair.getValue().doubleValue()), true);
                } else {
                    sumEntry.sum.add(keyValPair.getValue().doubleValue());
                    sumEntry.changed = true;
                }
                SumKeyVal.this.sums.put(SumKeyVal.this.cloneKey(key), sumEntry);
            }
        }

        public StreamCodec<KeyValPair<K, V>> getStreamCodec() {
            return (StreamCodec<KeyValPair<K, V>>) SumKeyVal.this.getKeyValPairStreamCodec();
        }
    };

    @OutputPortFieldAnnotation(optional = true)
    public final transient DefaultOutputPort<KeyValPair<K, V>> sum = new DefaultOutputPort<>();

    @OutputPortFieldAnnotation(optional = true)
    public final transient DefaultOutputPort<KeyValPair<K, Double>> sumDouble = new DefaultOutputPort<>();

    @OutputPortFieldAnnotation(optional = true)
    public final transient DefaultOutputPort<KeyValPair<K, Integer>> sumInteger = new DefaultOutputPort<>();

    @OutputPortFieldAnnotation(optional = true)
    public final transient DefaultOutputPort<KeyValPair<K, Long>> sumLong = new DefaultOutputPort<>();

    @OutputPortFieldAnnotation(optional = true)
    public final transient DefaultOutputPort<KeyValPair<K, Short>> sumShort = new DefaultOutputPort<>();

    @OutputPortFieldAnnotation(optional = true)
    public final transient DefaultOutputPort<KeyValPair<K, Float>> sumFloat = new DefaultOutputPort<>();

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:com/datatorrent/lib/math/SumKeyVal$SumEntry.class */
    public static class SumEntry {
        public MutableDouble sum;
        public boolean changed;

        SumEntry() {
            this.changed = true;
        }

        SumEntry(MutableDouble mutableDouble, boolean z) {
            this.changed = true;
            this.sum = mutableDouble;
            this.changed = z;
        }
    }

    public boolean isCumulative() {
        return this.cumulative;
    }

    public void setCumulative(boolean z) {
        this.cumulative = z;
    }

    public void endWindow() {
        for (Map.Entry<K, SumEntry> entry : this.sums.entrySet()) {
            K key = entry.getKey();
            SumEntry value = entry.getValue();
            if (value.changed) {
                this.sum.emit(new KeyValPair(key, getValue(Double.valueOf(value.sum.doubleValue()))));
                this.sumDouble.emit(new KeyValPair(key, Double.valueOf(value.sum.doubleValue())));
                this.sumInteger.emit(new KeyValPair(key, Integer.valueOf(value.sum.intValue())));
                this.sumFloat.emit(new KeyValPair(key, Float.valueOf(value.sum.floatValue())));
                this.sumShort.emit(new KeyValPair(key, Short.valueOf(value.sum.shortValue())));
                this.sumLong.emit(new KeyValPair(key, Long.valueOf(value.sum.longValue())));
            }
        }
        clearCache();
    }

    public void clearCache() {
        if (!this.cumulative) {
            this.sums.clear();
            return;
        }
        Iterator<Map.Entry<K, SumEntry>> it = this.sums.entrySet().iterator();
        while (it.hasNext()) {
            it.next().getValue().changed = false;
        }
    }
}
