/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.storm;

import backtype.storm.metric.api.IMetric;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerConfiguration;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.storm.PulsarSpoutConfiguration;
import org.apache.pulsar.storm.SharedPulsarClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pulsar-storm-shade.com.google.common.base.Preconditions;
import pulsar-storm-shade.com.google.common.collect.Maps;
import pulsar-storm-shade.com.google.common.collect.Queues;

public class PulsarSpout
extends BaseRichSpout
implements IMetric {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(PulsarSpout.class);
    public static final String NO_OF_PENDING_FAILED_MESSAGES = "numberOfPendingFailedMessages";
    public static final String NO_OF_MESSAGES_RECEIVED = "numberOfMessagesReceived";
    public static final String NO_OF_MESSAGES_EMITTED = "numberOfMessagesEmitted";
    public static final String NO_OF_PENDING_ACKS = "numberOfPendingAcks";
    public static final String CONSUMER_RATE = "consumerRate";
    public static final String CONSUMER_THROUGHPUT_BYTES = "consumerThroughput";
    private final ClientConfiguration clientConf;
    private final ConsumerConfiguration consumerConf;
    private final PulsarSpoutConfiguration pulsarSpoutConf;
    private final long failedRetriesTimeoutNano;
    private final int maxFailedRetries;
    private final ConcurrentMap<MessageId, MessageRetries> pendingMessageRetries = Maps.newConcurrentMap();
    private final Queue<Message> failedMessages = Queues.newConcurrentLinkedQueue();
    private final ConcurrentMap<String, Object> metricsMap = Maps.newConcurrentMap();
    private SharedPulsarClient sharedPulsarClient;
    private String componentId;
    private String spoutId;
    private SpoutOutputCollector collector;
    private Consumer consumer;
    private volatile long messagesReceived = 0L;
    private volatile long messagesEmitted = 0L;
    private volatile long pendingAcks = 0L;
    private volatile long messageSizeReceived = 0L;

    public PulsarSpout(PulsarSpoutConfiguration pulsarSpoutConf, ClientConfiguration clientConf) {
        this(pulsarSpoutConf, clientConf, new ConsumerConfiguration());
    }

    public PulsarSpout(PulsarSpoutConfiguration pulsarSpoutConf, ClientConfiguration clientConf, ConsumerConfiguration consumerConf) {
        this.clientConf = clientConf;
        this.consumerConf = consumerConf;
        Preconditions.checkNotNull(pulsarSpoutConf.getServiceUrl());
        Preconditions.checkNotNull(pulsarSpoutConf.getTopic());
        Preconditions.checkNotNull(pulsarSpoutConf.getSubscriptionName());
        Preconditions.checkNotNull(pulsarSpoutConf.getMessageToValuesMapper());
        this.pulsarSpoutConf = pulsarSpoutConf;
        this.failedRetriesTimeoutNano = pulsarSpoutConf.getFailedRetriesTimeout(TimeUnit.NANOSECONDS);
        this.maxFailedRetries = pulsarSpoutConf.getMaxFailedRetries();
    }

    public void close() {
        try {
            LOG.info("[{}] Closing Pulsar consumer for topic {}", (Object)this.spoutId, (Object)this.pulsarSpoutConf.getTopic());
            if (!this.pulsarSpoutConf.isSharedConsumerEnabled() && this.consumer != null) {
                this.consumer.close();
            }
            if (this.sharedPulsarClient != null) {
                this.sharedPulsarClient.close();
            }
            this.pendingMessageRetries.clear();
            this.failedMessages.clear();
        }
        catch (PulsarClientException e) {
            LOG.error("[{}] Error closing Pulsar consumer for topic {}", new Object[]{this.spoutId, this.pulsarSpoutConf.getTopic(), e});
        }
    }

    public void ack(Object msgId) {
        if (msgId instanceof Message) {
            Message msg = (Message)msgId;
            if (LOG.isDebugEnabled()) {
                LOG.debug("[{}] Received ack for message {}", (Object)this.spoutId, (Object)msg.getMessageId());
            }
            this.consumer.acknowledgeAsync(msg);
            this.pendingMessageRetries.remove(msg.getMessageId());
            --this.pendingAcks;
        }
    }

    public void fail(Object msgId) {
        if (msgId instanceof Message) {
            Message msg = (Message)msgId;
            MessageId id = msg.getMessageId();
            LOG.warn("[{}] Error processing message {}", (Object)this.spoutId, (Object)id);
            MessageRetries messageRetries = this.pendingMessageRetries.computeIfAbsent(id, k -> new MessageRetries());
            if (!(this.failedRetriesTimeoutNano >= 0L && messageRetries.getTimeStamp() + this.failedRetriesTimeoutNano <= System.nanoTime() || this.maxFailedRetries >= 0 && messageRetries.numRetries >= this.maxFailedRetries)) {
                LOG.info("[{}] Putting message {} in the retry queue", (Object)this.spoutId, (Object)id);
                messageRetries.incrementAndGet();
                this.pendingMessageRetries.putIfAbsent(id, messageRetries);
                this.failedMessages.add(msg);
                --this.pendingAcks;
            } else {
                LOG.warn("[{}] Number of retries limit reached, dropping the message {}", (Object)this.spoutId, (Object)id);
                this.ack(msg);
            }
        }
    }

    public void nextTuple() {
        Message msg = this.failedMessages.peek();
        if (msg != null) {
            MessageRetries messageRetries = (MessageRetries)this.pendingMessageRetries.get(msg.getMessageId());
            if (Backoff.shouldBackoff((long)messageRetries.getTimeStamp(), (TimeUnit)TimeUnit.NANOSECONDS, (int)messageRetries.getNumRetries())) {
                Utils.sleep((long)100L);
            } else {
                LOG.info("[{}] Retrying failed message {}", (Object)this.spoutId, (Object)msg.getMessageId());
                this.failedMessages.remove();
                this.mapToValueAndEmit(msg);
            }
            return;
        }
        if (this.consumer != null) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("[{}] Receiving the next message from pulsar consumer to emit to the collector", (Object)this.spoutId);
            }
            try {
                msg = this.consumer.receive(1, TimeUnit.SECONDS);
                if (msg != null) {
                    ++this.messagesReceived;
                    this.messageSizeReceived += (long)msg.getData().length;
                }
                this.mapToValueAndEmit(msg);
            }
            catch (PulsarClientException e) {
                LOG.error("[{}] Error receiving message from pulsar consumer", (Object)this.spoutId, (Object)e);
            }
        }
    }

    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.componentId = context.getThisComponentId();
        this.spoutId = String.format("%s-%s", this.componentId, context.getThisTaskId());
        this.collector = collector;
        this.pendingMessageRetries.clear();
        this.failedMessages.clear();
        try {
            this.sharedPulsarClient = SharedPulsarClient.get(this.componentId, this.pulsarSpoutConf.getServiceUrl(), this.clientConf);
            this.consumer = this.pulsarSpoutConf.isSharedConsumerEnabled() ? this.sharedPulsarClient.getSharedConsumer(this.pulsarSpoutConf.getTopic(), this.pulsarSpoutConf.getSubscriptionName(), this.consumerConf) : this.sharedPulsarClient.getClient().subscribe(this.pulsarSpoutConf.getTopic(), this.pulsarSpoutConf.getSubscriptionName(), this.consumerConf);
            LOG.info("[{}] Created a pulsar consumer on topic {} to receive messages with subscription {}", new Object[]{this.spoutId, this.pulsarSpoutConf.getTopic(), this.pulsarSpoutConf.getSubscriptionName()});
        }
        catch (PulsarClientException e) {
            LOG.error("[{}] Error creating pulsar consumer on topic {}", new Object[]{this.spoutId, this.pulsarSpoutConf.getTopic(), e});
        }
        context.registerMetric(String.format("PulsarSpoutMetrics-%s-%s", this.componentId, context.getThisTaskIndex()), (IMetric)this, this.pulsarSpoutConf.getMetricsTimeIntervalInSecs());
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        this.pulsarSpoutConf.getMessageToValuesMapper().declareOutputFields(declarer);
    }

    private void mapToValueAndEmit(Message msg) {
        if (msg != null) {
            Values values = this.pulsarSpoutConf.getMessageToValuesMapper().toValues(msg);
            ++this.pendingAcks;
            if (values == null) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("[{}] Dropping message {}", (Object)this.spoutId, (Object)msg.getMessageId());
                }
                this.ack(msg);
            } else {
                this.collector.emit((List)values, (Object)msg);
                ++this.messagesEmitted;
                if (LOG.isDebugEnabled()) {
                    LOG.debug("[{}] Emitted message {} to the collector", (Object)this.spoutId, (Object)msg.getMessageId());
                }
            }
        }
    }

    ConcurrentMap getMetrics() {
        this.metricsMap.put(NO_OF_PENDING_FAILED_MESSAGES, this.pendingMessageRetries.size());
        this.metricsMap.put(NO_OF_MESSAGES_RECEIVED, this.messagesReceived);
        this.metricsMap.put(NO_OF_MESSAGES_EMITTED, this.messagesEmitted);
        this.metricsMap.put(NO_OF_PENDING_ACKS, this.pendingAcks);
        this.metricsMap.put(CONSUMER_RATE, (double)this.messagesReceived / (double)this.pulsarSpoutConf.getMetricsTimeIntervalInSecs());
        this.metricsMap.put(CONSUMER_THROUGHPUT_BYTES, (double)this.messageSizeReceived / (double)this.pulsarSpoutConf.getMetricsTimeIntervalInSecs());
        return this.metricsMap;
    }

    void resetMetrics() {
        this.messagesReceived = 0L;
        this.messagesEmitted = 0L;
        this.messageSizeReceived = 0L;
    }

    public Object getValueAndReset() {
        ConcurrentMap metrics = this.getMetrics();
        this.resetMetrics();
        return metrics;
    }

    public class MessageRetries {
        private final long timestampInNano = System.nanoTime();
        private int numRetries = 0;

        public long getTimeStamp() {
            return this.timestampInNano;
        }

        public int incrementAndGet() {
            return ++this.numRetries;
        }

        public int getNumRetries() {
            return this.numRetries;
        }
    }
}

