/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import com.iterable.shade.com.fasterxml.jackson.databind.ObjectWriter;
import com.iterable.shade.com.fasterxml.jackson.databind.SerializationFeature;
import com.iterable.shade.io.netty.util.Timeout;
import com.iterable.shade.io.netty.util.TimerTask;
import java.io.IOException;
import java.text.DecimalFormat;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Collectors;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerStats;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.impl.ConsumerBase;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.ConsumerStatsRecorder;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsumerStatsRecorderImpl
implements ConsumerStatsRecorder {
    private static final long serialVersionUID = 1L;
    private TimerTask stat;
    private Timeout statTimeout;
    private final Consumer<?> consumer;
    private PulsarClientImpl pulsarClient;
    private long oldTime;
    private long statsIntervalSeconds;
    private final LongAdder numMsgsReceived;
    private final LongAdder numBytesReceived;
    private final LongAdder numReceiveFailed;
    private final LongAdder numBatchReceiveFailed;
    private final LongAdder numAcksSent;
    private final LongAdder numAcksFailed;
    private final LongAdder totalMsgsReceived;
    private final LongAdder totalBytesReceived;
    private final LongAdder totalReceiveFailed;
    private final LongAdder totalBatchReceiveFailed;
    private final LongAdder totalAcksSent;
    private final LongAdder totalAcksFailed;
    private volatile double receivedMsgsRate;
    private volatile double receivedBytesRate;
    private static final DecimalFormat THROUGHPUT_FORMAT = new DecimalFormat("0.00");
    private static final Logger log = LoggerFactory.getLogger(ConsumerStatsRecorderImpl.class);

    public ConsumerStatsRecorderImpl() {
        this(null);
    }

    public ConsumerStatsRecorderImpl(Consumer<?> consumer) {
        this.consumer = consumer;
        this.numMsgsReceived = new LongAdder();
        this.numBytesReceived = new LongAdder();
        this.numReceiveFailed = new LongAdder();
        this.numBatchReceiveFailed = new LongAdder();
        this.numAcksSent = new LongAdder();
        this.numAcksFailed = new LongAdder();
        this.totalMsgsReceived = new LongAdder();
        this.totalBytesReceived = new LongAdder();
        this.totalReceiveFailed = new LongAdder();
        this.totalBatchReceiveFailed = new LongAdder();
        this.totalAcksSent = new LongAdder();
        this.totalAcksFailed = new LongAdder();
    }

    public ConsumerStatsRecorderImpl(PulsarClientImpl pulsarClient, ConsumerConfigurationData<?> conf, Consumer<?> consumer) {
        this.pulsarClient = pulsarClient;
        this.consumer = consumer;
        this.statsIntervalSeconds = pulsarClient.getConfiguration().getStatsIntervalSeconds();
        this.numMsgsReceived = new LongAdder();
        this.numBytesReceived = new LongAdder();
        this.numReceiveFailed = new LongAdder();
        this.numBatchReceiveFailed = new LongAdder();
        this.numAcksSent = new LongAdder();
        this.numAcksFailed = new LongAdder();
        this.totalMsgsReceived = new LongAdder();
        this.totalBytesReceived = new LongAdder();
        this.totalReceiveFailed = new LongAdder();
        this.totalBatchReceiveFailed = new LongAdder();
        this.totalAcksSent = new LongAdder();
        this.totalAcksFailed = new LongAdder();
        this.init(conf);
    }

    private void init(ConsumerConfigurationData<?> conf) {
        ObjectWriter w = ObjectMapperFactory.getMapperWithIncludeAlways().writer().without(SerializationFeature.FAIL_ON_EMPTY_BEANS);
        try {
            log.info("Starting Pulsar consumer status recorder with config: {}", (Object)w.writeValueAsString(conf));
            log.info("Pulsar client config: {}", (Object)w.writeValueAsString(this.pulsarClient.getConfiguration()));
        }
        catch (IOException e) {
            log.error("Failed to dump config info", (Throwable)e);
        }
        this.stat = timeout -> {
            if (timeout.isCancelled() || !(this.consumer instanceof ConsumerImpl)) {
                return;
            }
            ConsumerImpl consumerImpl = (ConsumerImpl)this.consumer;
            try {
                long now = System.nanoTime();
                double elapsed = (double)(now - this.oldTime) / 1.0E9;
                this.oldTime = now;
                long currentNumMsgsReceived = this.numMsgsReceived.sumThenReset();
                long currentNumBytesReceived = this.numBytesReceived.sumThenReset();
                long currentNumReceiveFailed = this.numReceiveFailed.sumThenReset();
                long currentNumBatchReceiveFailed = this.numBatchReceiveFailed.sumThenReset();
                long currentNumAcksSent = this.numAcksSent.sumThenReset();
                long currentNumAcksFailed = this.numAcksFailed.sumThenReset();
                this.totalMsgsReceived.add(currentNumMsgsReceived);
                this.totalBytesReceived.add(currentNumBytesReceived);
                this.totalReceiveFailed.add(currentNumReceiveFailed);
                this.totalBatchReceiveFailed.add(currentNumBatchReceiveFailed);
                this.totalAcksSent.add(currentNumAcksSent);
                this.totalAcksFailed.add(currentNumAcksFailed);
                this.receivedMsgsRate = (double)currentNumMsgsReceived / elapsed;
                this.receivedBytesRate = (double)currentNumBytesReceived / elapsed;
                if ((currentNumMsgsReceived | currentNumBytesReceived | currentNumReceiveFailed | currentNumAcksSent | currentNumAcksFailed) != 0L) {
                    log.info("[{}] [{}] [{}] Prefetched messages: {} --- Consume throughput received: {} msgs/s --- {} Mbit/s --- Ack sent rate: {} ack/s --- Failed messages: {} --- batch messages: {} ---Failed acks: {}", new Object[]{consumerImpl.getTopic(), consumerImpl.getSubscription(), consumerImpl.consumerName, consumerImpl.incomingMessages.size(), THROUGHPUT_FORMAT.format(this.receivedMsgsRate), THROUGHPUT_FORMAT.format(this.receivedBytesRate * 8.0 / 1024.0 / 1024.0), THROUGHPUT_FORMAT.format((double)currentNumAcksSent / elapsed), currentNumReceiveFailed, currentNumBatchReceiveFailed, currentNumAcksFailed});
                }
            }
            catch (Exception e) {
                log.error("[{}] [{}] [{}]: {}", new Object[]{consumerImpl.getTopic(), consumerImpl.subscription, consumerImpl.consumerName, e.getMessage()});
            }
            finally {
                this.statTimeout = this.pulsarClient.timer().newTimeout(this.stat, this.statsIntervalSeconds, TimeUnit.SECONDS);
            }
        };
        this.oldTime = System.nanoTime();
        this.statTimeout = this.pulsarClient.timer().newTimeout(this.stat, this.statsIntervalSeconds, TimeUnit.SECONDS);
    }

    @Override
    public void updateNumMsgsReceived(Message<?> message) {
        if (message != null) {
            this.numMsgsReceived.increment();
            this.numBytesReceived.add(message.size());
        }
    }

    @Override
    public void incrementNumAcksSent(long numAcks) {
        this.numAcksSent.add(numAcks);
    }

    @Override
    public void incrementNumAcksFailed() {
        this.numAcksFailed.increment();
    }

    @Override
    public void incrementNumReceiveFailed() {
        this.numReceiveFailed.increment();
    }

    @Override
    public void incrementNumBatchReceiveFailed() {
        this.numBatchReceiveFailed.increment();
    }

    @Override
    public Optional<Timeout> getStatTimeout() {
        return Optional.ofNullable(this.statTimeout);
    }

    @Override
    public void reset() {
        this.numMsgsReceived.reset();
        this.numBytesReceived.reset();
        this.numReceiveFailed.reset();
        this.numBatchReceiveFailed.reset();
        this.numAcksSent.reset();
        this.numAcksFailed.reset();
        this.totalMsgsReceived.reset();
        this.totalBytesReceived.reset();
        this.totalReceiveFailed.reset();
        this.totalBatchReceiveFailed.reset();
        this.totalAcksSent.reset();
        this.totalAcksFailed.reset();
    }

    @Override
    public void updateCumulativeStats(ConsumerStats stats) {
        if (stats == null) {
            return;
        }
        this.numMsgsReceived.add(stats.getNumMsgsReceived());
        this.numBytesReceived.add(stats.getNumBytesReceived());
        this.numReceiveFailed.add(stats.getNumReceiveFailed());
        this.numBatchReceiveFailed.add(stats.getNumBatchReceiveFailed());
        this.numAcksSent.add(stats.getNumAcksSent());
        this.numAcksFailed.add(stats.getNumAcksFailed());
        this.totalMsgsReceived.add(stats.getTotalMsgsReceived());
        this.totalBytesReceived.add(stats.getTotalBytesReceived());
        this.totalReceiveFailed.add(stats.getTotalReceivedFailed());
        this.totalBatchReceiveFailed.add(stats.getTotaBatchReceivedFailed());
        this.totalAcksSent.add(stats.getTotalAcksSent());
        this.totalAcksFailed.add(stats.getTotalAcksFailed());
    }

    @Override
    public Integer getMsgNumInReceiverQueue() {
        if (this.consumer instanceof ConsumerBase) {
            ConsumerBase consumerBase = (ConsumerBase)this.consumer;
            if (consumerBase.listener != null) {
                return ConsumerBase.MESSAGE_LISTENER_QUEUE_SIZE_UPDATER.get(consumerBase);
            }
            return consumerBase.incomingMessages.size();
        }
        return null;
    }

    @Override
    public Map<Long, Integer> getMsgNumInSubReceiverQueue() {
        if (this.consumer instanceof MultiTopicsConsumerImpl) {
            List consumerList = ((MultiTopicsConsumerImpl)this.consumer).getConsumers();
            return consumerList.stream().collect(Collectors.toMap(consumerImpl -> consumerImpl.consumerId, consumerImpl -> consumerImpl.incomingMessages.size()));
        }
        return null;
    }

    @Override
    public long getNumMsgsReceived() {
        return this.numMsgsReceived.longValue();
    }

    @Override
    public long getNumBytesReceived() {
        return this.numBytesReceived.longValue();
    }

    @Override
    public long getNumAcksSent() {
        return this.numAcksSent.longValue();
    }

    @Override
    public long getNumAcksFailed() {
        return this.numAcksFailed.longValue();
    }

    @Override
    public long getNumReceiveFailed() {
        return this.numReceiveFailed.longValue();
    }

    @Override
    public long getNumBatchReceiveFailed() {
        return this.numBatchReceiveFailed.longValue();
    }

    @Override
    public long getTotalMsgsReceived() {
        return this.totalMsgsReceived.longValue();
    }

    @Override
    public long getTotalBytesReceived() {
        return this.totalBytesReceived.longValue();
    }

    @Override
    public long getTotalReceivedFailed() {
        return this.totalReceiveFailed.longValue();
    }

    @Override
    public long getTotaBatchReceivedFailed() {
        return this.totalBatchReceiveFailed.longValue();
    }

    @Override
    public long getTotalAcksSent() {
        return this.totalAcksSent.longValue();
    }

    @Override
    public long getTotalAcksFailed() {
        return this.totalAcksFailed.longValue();
    }

    @Override
    public double getRateMsgsReceived() {
        return this.receivedMsgsRate;
    }

    @Override
    public double getRateBytesReceived() {
        return this.receivedBytesRate;
    }
}

