/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import java.io.IOException;
import java.text.DecimalFormat;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import org.apache.pulsar.client.api.ProducerStats;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.ProducerStatsRecorder;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.shade.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.pulsar.shade.com.fasterxml.jackson.databind.ObjectWriter;
import org.apache.pulsar.shade.com.fasterxml.jackson.databind.SerializationFeature;
import org.apache.pulsar.shade.com.yahoo.sketches.quantiles.DoublesSketch;
import org.apache.pulsar.shade.io.netty.util.Timeout;
import org.apache.pulsar.shade.io.netty.util.TimerTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProducerStatsRecorderImpl
implements ProducerStatsRecorder {
    private static final long serialVersionUID = 1L;
    private transient TimerTask stat;
    private transient Timeout statTimeout;
    private transient ProducerImpl<?> producer;
    private transient PulsarClientImpl pulsarClient;
    private long oldTime;
    private long statsIntervalSeconds;
    private final LongAdder numMsgsSent;
    private final LongAdder numBytesSent;
    private final LongAdder numSendFailed;
    private final LongAdder numAcksReceived;
    private final LongAdder totalMsgsSent;
    private final LongAdder totalBytesSent;
    private final LongAdder totalSendFailed;
    private final LongAdder totalAcksReceived;
    private static final DecimalFormat DEC = new DecimalFormat("0.000");
    private static final DecimalFormat THROUGHPUT_FORMAT = new DecimalFormat("0.00");
    private final transient DoublesSketch ds;
    private volatile double sendMsgsRate;
    private volatile double sendBytesRate;
    private volatile double[] latencyPctValues = new double[PERCENTILES.length];
    private static final double[] PERCENTILES = new double[]{0.5, 0.75, 0.95, 0.99, 0.999, 1.0};
    private static final Logger log = LoggerFactory.getLogger(ProducerStatsRecorderImpl.class);

    public ProducerStatsRecorderImpl() {
        this.numMsgsSent = new LongAdder();
        this.numBytesSent = new LongAdder();
        this.numSendFailed = new LongAdder();
        this.numAcksReceived = new LongAdder();
        this.totalMsgsSent = new LongAdder();
        this.totalBytesSent = new LongAdder();
        this.totalSendFailed = new LongAdder();
        this.totalAcksReceived = new LongAdder();
        this.ds = DoublesSketch.builder().build(256);
    }

    public ProducerStatsRecorderImpl(PulsarClientImpl pulsarClient, ProducerConfigurationData conf, ProducerImpl<?> producer) {
        this.pulsarClient = pulsarClient;
        this.statsIntervalSeconds = pulsarClient.getConfiguration().getStatsIntervalSeconds();
        this.producer = producer;
        this.numMsgsSent = new LongAdder();
        this.numBytesSent = new LongAdder();
        this.numSendFailed = new LongAdder();
        this.numAcksReceived = new LongAdder();
        this.totalMsgsSent = new LongAdder();
        this.totalBytesSent = new LongAdder();
        this.totalSendFailed = new LongAdder();
        this.totalAcksReceived = new LongAdder();
        this.ds = DoublesSketch.builder().build(256);
        this.init(conf);
    }

    private void init(ProducerConfigurationData conf) {
        ObjectMapper m = new ObjectMapper();
        m.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
        ObjectWriter w = m.writer();
        try {
            log.info("Starting Pulsar producer perf with config: {}", (Object)w.writeValueAsString(conf));
            log.info("Pulsar client config: {}", (Object)w.withoutAttribute("authentication").writeValueAsString(this.pulsarClient.getConfiguration()));
        }
        catch (IOException e) {
            log.error("Failed to dump config info", (Throwable)e);
        }
        this.stat = timeout -> {
            if (timeout.isCancelled()) {
                return;
            }
            try {
                this.updateStats();
            }
            catch (Exception e) {
                log.error("[{}] [{}]: {}", new Object[]{this.producer.getTopic(), this.producer.getProducerName(), e.getMessage()});
            }
            finally {
                this.statTimeout = this.pulsarClient.timer().newTimeout(this.stat, this.statsIntervalSeconds, TimeUnit.SECONDS);
            }
        };
        this.oldTime = System.nanoTime();
        this.statTimeout = this.pulsarClient.timer().newTimeout(this.stat, this.statsIntervalSeconds, TimeUnit.SECONDS);
    }

    Timeout getStatTimeout() {
        return this.statTimeout;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void updateStats() {
        long now = System.nanoTime();
        double elapsed = (double)(now - this.oldTime) / 1.0E9;
        this.oldTime = now;
        long currentNumMsgsSent = this.numMsgsSent.sumThenReset();
        long currentNumBytesSent = this.numBytesSent.sumThenReset();
        long currentNumSendFailedMsgs = this.numSendFailed.sumThenReset();
        long currentNumAcksReceived = this.numAcksReceived.sumThenReset();
        this.totalMsgsSent.add(currentNumMsgsSent);
        this.totalBytesSent.add(currentNumBytesSent);
        this.totalSendFailed.add(currentNumSendFailedMsgs);
        this.totalAcksReceived.add(currentNumAcksReceived);
        DoublesSketch doublesSketch = this.ds;
        synchronized (doublesSketch) {
            this.latencyPctValues = this.ds.getQuantiles(PERCENTILES);
            this.ds.reset();
        }
        this.sendMsgsRate = (double)currentNumMsgsSent / elapsed;
        this.sendBytesRate = (double)currentNumBytesSent / elapsed;
        if ((currentNumMsgsSent | currentNumSendFailedMsgs | currentNumAcksReceived | currentNumMsgsSent) != 0L) {
            for (int i = 0; i < this.latencyPctValues.length; ++i) {
                if (!Double.isNaN(this.latencyPctValues[i])) continue;
                this.latencyPctValues[i] = 0.0;
            }
            log.info("[{}] [{}] Pending messages: {} --- Publish throughput: {} msg/s --- {} Mbit/s --- Latency: med: {} ms - 95pct: {} ms - 99pct: {} ms - 99.9pct: {} ms - max: {} ms --- Ack received rate: {} ack/s --- Failed messages: {}", new Object[]{this.producer.getTopic(), this.producer.getProducerName(), this.producer.getPendingQueueSize(), THROUGHPUT_FORMAT.format(this.sendMsgsRate), THROUGHPUT_FORMAT.format(this.sendBytesRate / 1024.0 / 1024.0 * 8.0), DEC.format(this.latencyPctValues[0]), DEC.format(this.latencyPctValues[2]), DEC.format(this.latencyPctValues[3]), DEC.format(this.latencyPctValues[4]), DEC.format(this.latencyPctValues[5]), THROUGHPUT_FORMAT.format((double)currentNumAcksReceived / elapsed), currentNumSendFailedMsgs});
        }
    }

    @Override
    public void updateNumMsgsSent(long numMsgs, long totalMsgsSize) {
        this.numMsgsSent.add(numMsgs);
        this.numBytesSent.add(totalMsgsSize);
    }

    @Override
    public void incrementSendFailed() {
        this.numSendFailed.increment();
    }

    @Override
    public void incrementSendFailed(long numMsgs) {
        this.numSendFailed.add(numMsgs);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void incrementNumAcksReceived(long latencyNs) {
        this.numAcksReceived.increment();
        DoublesSketch doublesSketch = this.ds;
        synchronized (doublesSketch) {
            this.ds.update(TimeUnit.NANOSECONDS.toMillis(latencyNs));
        }
    }

    void reset() {
        this.numMsgsSent.reset();
        this.numBytesSent.reset();
        this.numSendFailed.reset();
        this.numAcksReceived.reset();
        this.totalMsgsSent.reset();
        this.totalBytesSent.reset();
        this.totalSendFailed.reset();
        this.totalAcksReceived.reset();
    }

    void updateCumulativeStats(ProducerStats stats) {
        if (stats == null) {
            return;
        }
        this.numMsgsSent.add(stats.getNumMsgsSent());
        this.numBytesSent.add(stats.getNumBytesSent());
        this.numSendFailed.add(stats.getNumSendFailed());
        this.numAcksReceived.add(stats.getNumAcksReceived());
        this.totalMsgsSent.add(stats.getTotalMsgsSent());
        this.totalBytesSent.add(stats.getTotalBytesSent());
        this.totalSendFailed.add(stats.getTotalSendFailed());
        this.totalAcksReceived.add(stats.getTotalAcksReceived());
    }

    public long getNumMsgsSent() {
        return this.numMsgsSent.longValue();
    }

    public long getNumBytesSent() {
        return this.numBytesSent.longValue();
    }

    public long getNumSendFailed() {
        return this.numSendFailed.longValue();
    }

    public long getNumAcksReceived() {
        return this.numAcksReceived.longValue();
    }

    public long getTotalMsgsSent() {
        return this.totalMsgsSent.longValue();
    }

    public long getTotalBytesSent() {
        return this.totalBytesSent.longValue();
    }

    public long getTotalSendFailed() {
        return this.totalSendFailed.longValue();
    }

    public long getTotalAcksReceived() {
        return this.totalAcksReceived.longValue();
    }

    public double getSendMsgsRate() {
        return this.sendMsgsRate;
    }

    public double getSendBytesRate() {
        return this.sendBytesRate;
    }

    public double getSendLatencyMillis50pct() {
        return this.latencyPctValues[0];
    }

    public double getSendLatencyMillis75pct() {
        return this.latencyPctValues[1];
    }

    public double getSendLatencyMillis95pct() {
        return this.latencyPctValues[2];
    }

    public double getSendLatencyMillis99pct() {
        return this.latencyPctValues[3];
    }

    public double getSendLatencyMillis999pct() {
        return this.latencyPctValues[4];
    }

    public double getSendLatencyMillisMax() {
        return this.latencyPctValues[5];
    }

    public int getPendingQueueSize() {
        return this.producer.getPendingQueueSize();
    }

    @Override
    public void cancelStatsTimeout() {
        this.updateStats();
        if (this.statTimeout != null) {
            this.statTimeout.cancel();
            this.statTimeout = null;
        }
    }
}

