package stream.monitor;

import java.text.DecimalFormat;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import stream.AbstractProcessor;
import stream.Data;
import stream.ProcessContext;
import stream.annotations.Parameter;
import stream.data.Statistics;
import stream.monitor.DataRateGroup;
import stream.statistics.StatisticsService;

/* loaded from: input_file:stream/monitor/DataRate.class */
public class DataRate extends AbstractProcessor implements StatisticsService {
    static Logger log = LoggerFactory.getLogger((Class<?>) DataRate.class);
    String id;

    @Parameter
    String group;
    DataRateGroup dataRateGroup;
    final DecimalFormat fmt = new DecimalFormat("0.000");
    String clock = null;
    Long count = 0L;
    Long start = null;
    Long windowCount = 0L;
    Long last = 0L;
    Double elapsed = Double.valueOf(0.0d);
    Double rate = new Double(0.0d);
    Integer every = null;
    String key = "dataRate";
    final String internalId = UUID.randomUUID().toString();

    public String getId() {
        return this.id;
    }

    public void setId(String str) {
        this.id = str;
    }

    public String getClock() {
        return this.clock;
    }

    public void setClock(String str) {
        this.clock = str;
    }

    public String getKey() {
        return this.key;
    }

    public void setKey(String str) {
        this.key = str;
    }

    @Override // stream.AbstractProcessor, stream.StatefulProcessor
    public void init(ProcessContext processContext) throws Exception {
        super.init(processContext);
        if (this.group != null) {
            log.info("Registering to data-rate-group '{}'", this.group);
            this.dataRateGroup = DataRateGroup.get(this.group);
            this.dataRateGroup.register(this.internalId);
        }
    }

    @Override // stream.Processor
    public Data process(Data data) {
        long currentTimeMillis = System.currentTimeMillis();
        if (this.start == null) {
            this.start = Long.valueOf(currentTimeMillis);
        }
        Long l = this.count;
        this.count = Long.valueOf(this.count.longValue() + 1);
        if (this.every != null && this.count.longValue() % this.every.intValue() == 0) {
            printDataRate(Long.valueOf(currentTimeMillis));
            if (this.key != null) {
                data.put(this.key, Double.valueOf(this.count.doubleValue() / Double.valueOf((currentTimeMillis - this.start.longValue()) / 1000.0d).doubleValue()));
                data.put(this.key + ":time", Long.valueOf(currentTimeMillis));
                data.put(this.key + ":items", Double.valueOf(this.count.doubleValue()));
            }
        }
        Long valueOf = Long.valueOf(currentTimeMillis - this.start.longValue());
        if (valueOf.longValue() > 0 && this.count.longValue() % 10 == 0) {
            synchronized (this.rate) {
                this.rate = Double.valueOf(this.count.doubleValue() / (valueOf.doubleValue() / 1000.0d));
            }
        }
        this.last = Long.valueOf(currentTimeMillis);
        return data;
    }

    public void printDataRate() {
        printDataRate(Long.valueOf(System.currentTimeMillis()));
    }

    protected void printDataRate(Long l) {
        Long valueOf = Long.valueOf((l.longValue() - this.start.longValue()) / 1000);
        if (valueOf.longValue() > 0) {
            log.info("Data rate '" + getId() + "': {} items processed, data-rate is: {}/second", this.count, this.fmt.format(this.count.doubleValue() / valueOf.doubleValue()));
        }
    }

    @Override // stream.AbstractProcessor, stream.StatefulProcessor
    public void finish() throws Exception {
        super.finish();
        if (this.start == null) {
            log.info("Start time not available.");
            return;
        }
        Long valueOf = Long.valueOf(this.last.longValue() - this.start.longValue());
        log.info("DataRate processor '{}' has been running for {} ms, {} items.", this.id, valueOf, Integer.valueOf(this.count.intValue()));
        Double valueOf2 = Double.valueOf(valueOf.doubleValue() / 1000.0d);
        if (valueOf2.doubleValue() > 0.0d) {
            log.info("Overall average data-rate for processor '{}' is: {}/second", this.id, this.fmt.format(this.count.doubleValue() / valueOf2.doubleValue()));
        }
        if (this.dataRateGroup != null) {
            this.dataRateGroup.add(this.internalId, new DataRateGroup.Result(this.internalId, this.start.longValue(), this.last.longValue(), this.count.longValue()));
        }
    }

    @Override // stream.service.Service
    public void reset() throws Exception {
        this.count = 0L;
        this.windowCount = 1L;
        this.last = 0L;
        this.start = null;
    }

    @Override // stream.statistics.StatisticsService
    public Statistics getStatistics() {
        Statistics statistics = new Statistics();
        synchronized (this.rate) {
            statistics.put("dataRate", new Double(this.rate.doubleValue()));
        }
        return statistics;
    }

    public Integer getEvery() {
        return this.every;
    }

    public void setEvery(Integer num) {
        this.every = num;
    }
}
