package org.apache.pulsar.storm;

import backtype.storm.metric.api.IMetric;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerConfiguration;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.Backoff;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import p000pulsarstormshade.com.google.common.base.Preconditions;
import p000pulsarstormshade.com.google.common.collect.Maps;
import p000pulsarstormshade.com.google.common.collect.Queues;

/* loaded from: input_file:org/apache/pulsar/storm/PulsarSpout.class */
public class PulsarSpout extends BaseRichSpout implements IMetric {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(PulsarSpout.class);
    public static final String NO_OF_PENDING_FAILED_MESSAGES = "numberOfPendingFailedMessages";
    public static final String NO_OF_MESSAGES_RECEIVED = "numberOfMessagesReceived";
    public static final String NO_OF_MESSAGES_EMITTED = "numberOfMessagesEmitted";
    public static final String NO_OF_PENDING_ACKS = "numberOfPendingAcks";
    public static final String CONSUMER_RATE = "consumerRate";
    public static final String CONSUMER_THROUGHPUT_BYTES = "consumerThroughput";
    private final ClientConfiguration clientConf;
    private final ConsumerConfiguration consumerConf;
    private final PulsarSpoutConfiguration pulsarSpoutConf;
    private final long failedRetriesTimeoutNano;
    private final int maxFailedRetries;
    private final ConcurrentMap<MessageId, MessageRetries> pendingMessageRetries;
    private final Queue<Message> failedMessages;
    private final ConcurrentMap<String, Object> metricsMap;
    private SharedPulsarClient sharedPulsarClient;
    private String componentId;
    private String spoutId;
    private SpoutOutputCollector collector;
    private Consumer consumer;
    private volatile long messagesReceived;
    private volatile long messagesEmitted;
    private volatile long pendingAcks;
    private volatile long messageSizeReceived;

    /* loaded from: input_file:org/apache/pulsar/storm/PulsarSpout$MessageRetries.class */
    public class MessageRetries {
        private final long timestampInNano = System.nanoTime();
        private int numRetries = 0;

        public MessageRetries() {
        }

        public long getTimeStamp() {
            return this.timestampInNano;
        }

        public int incrementAndGet() {
            int i = this.numRetries + 1;
            this.numRetries = i;
            return i;
        }

        public int getNumRetries() {
            return this.numRetries;
        }
    }

    public PulsarSpout(PulsarSpoutConfiguration pulsarSpoutConfiguration, ClientConfiguration clientConfiguration) {
        this(pulsarSpoutConfiguration, clientConfiguration, new ConsumerConfiguration());
    }

    public PulsarSpout(PulsarSpoutConfiguration pulsarSpoutConfiguration, ClientConfiguration clientConfiguration, ConsumerConfiguration consumerConfiguration) {
        this.pendingMessageRetries = Maps.newConcurrentMap();
        this.failedMessages = Queues.newConcurrentLinkedQueue();
        this.metricsMap = Maps.newConcurrentMap();
        this.messagesReceived = 0L;
        this.messagesEmitted = 0L;
        this.pendingAcks = 0L;
        this.messageSizeReceived = 0L;
        this.clientConf = clientConfiguration;
        this.consumerConf = consumerConfiguration;
        Preconditions.checkNotNull(pulsarSpoutConfiguration.getServiceUrl());
        Preconditions.checkNotNull(pulsarSpoutConfiguration.getTopic());
        Preconditions.checkNotNull(pulsarSpoutConfiguration.getSubscriptionName());
        Preconditions.checkNotNull(pulsarSpoutConfiguration.getMessageToValuesMapper());
        this.pulsarSpoutConf = pulsarSpoutConfiguration;
        this.failedRetriesTimeoutNano = pulsarSpoutConfiguration.getFailedRetriesTimeout(TimeUnit.NANOSECONDS);
        this.maxFailedRetries = pulsarSpoutConfiguration.getMaxFailedRetries();
    }

    public void close() {
        try {
            LOG.info("[{}] Closing Pulsar consumer for topic {}", this.spoutId, this.pulsarSpoutConf.getTopic());
            if (!this.pulsarSpoutConf.isSharedConsumerEnabled() && this.consumer != null) {
                this.consumer.close();
            }
            if (this.sharedPulsarClient != null) {
                this.sharedPulsarClient.close();
            }
            this.pendingMessageRetries.clear();
            this.failedMessages.clear();
        } catch (PulsarClientException e) {
            LOG.error("[{}] Error closing Pulsar consumer for topic {}", new Object[]{this.spoutId, this.pulsarSpoutConf.getTopic(), e});
        }
    }

    public void ack(Object obj) {
        if (obj instanceof Message) {
            Message message = (Message) obj;
            if (LOG.isDebugEnabled()) {
                LOG.debug("[{}] Received ack for message {}", this.spoutId, message.getMessageId());
            }
            this.consumer.acknowledgeAsync(message);
            this.pendingMessageRetries.remove(message.getMessageId());
            this.pendingAcks -= serialVersionUID;
        }
    }

    public void fail(Object obj) {
        if (obj instanceof Message) {
            Message message = (Message) obj;
            MessageId messageId = message.getMessageId();
            LOG.warn("[{}] Error processing message {}", this.spoutId, messageId);
            MessageRetries computeIfAbsent = this.pendingMessageRetries.computeIfAbsent(messageId, messageId2 -> {
                return new MessageRetries();
            });
            if ((this.failedRetriesTimeoutNano >= 0 && computeIfAbsent.getTimeStamp() + this.failedRetriesTimeoutNano <= System.nanoTime()) || (this.maxFailedRetries >= 0 && computeIfAbsent.numRetries >= this.maxFailedRetries)) {
                LOG.warn("[{}] Number of retries limit reached, dropping the message {}", this.spoutId, messageId);
                ack(message);
                return;
            }
            LOG.info("[{}] Putting message {} in the retry queue", this.spoutId, messageId);
            computeIfAbsent.incrementAndGet();
            this.pendingMessageRetries.putIfAbsent(messageId, computeIfAbsent);
            this.failedMessages.add(message);
            this.pendingAcks -= serialVersionUID;
        }
    }

    public void nextTuple() {
        Message peek = this.failedMessages.peek();
        if (peek != null) {
            MessageRetries messageRetries = this.pendingMessageRetries.get(peek.getMessageId());
            if (Backoff.shouldBackoff(messageRetries.getTimeStamp(), TimeUnit.NANOSECONDS, messageRetries.getNumRetries())) {
                Utils.sleep(100L);
                return;
            }
            LOG.info("[{}] Retrying failed message {}", this.spoutId, peek.getMessageId());
            this.failedMessages.remove();
            mapToValueAndEmit(peek);
            return;
        }
        if (this.consumer != null) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("[{}] Receiving the next message from pulsar consumer to emit to the collector", this.spoutId);
            }
            try {
                Message receive = this.consumer.receive(1, TimeUnit.SECONDS);
                if (receive != null) {
                    this.messagesReceived += serialVersionUID;
                    this.messageSizeReceived += receive.getData().length;
                }
                mapToValueAndEmit(receive);
            } catch (PulsarClientException e) {
                LOG.error("[{}] Error receiving message from pulsar consumer", this.spoutId, e);
            }
        }
    }

    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.componentId = topologyContext.getThisComponentId();
        this.spoutId = String.format("%s-%s", this.componentId, Integer.valueOf(topologyContext.getThisTaskId()));
        this.collector = spoutOutputCollector;
        this.pendingMessageRetries.clear();
        this.failedMessages.clear();
        try {
            this.sharedPulsarClient = SharedPulsarClient.get(this.componentId, this.pulsarSpoutConf.getServiceUrl(), this.clientConf);
            if (this.pulsarSpoutConf.isSharedConsumerEnabled()) {
                this.consumer = this.sharedPulsarClient.getSharedConsumer(this.pulsarSpoutConf.getTopic(), this.pulsarSpoutConf.getSubscriptionName(), this.consumerConf);
            } else {
                this.consumer = this.sharedPulsarClient.getClient().subscribe(this.pulsarSpoutConf.getTopic(), this.pulsarSpoutConf.getSubscriptionName(), this.consumerConf);
            }
            LOG.info("[{}] Created a pulsar consumer on topic {} to receive messages with subscription {}", new Object[]{this.spoutId, this.pulsarSpoutConf.getTopic(), this.pulsarSpoutConf.getSubscriptionName()});
        } catch (PulsarClientException e) {
            LOG.error("[{}] Error creating pulsar consumer on topic {}", new Object[]{this.spoutId, this.pulsarSpoutConf.getTopic(), e});
        }
        topologyContext.registerMetric(String.format("PulsarSpoutMetrics-%s-%s", this.componentId, Integer.valueOf(topologyContext.getThisTaskIndex())), this, this.pulsarSpoutConf.getMetricsTimeIntervalInSecs());
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        this.pulsarSpoutConf.getMessageToValuesMapper().declareOutputFields(outputFieldsDeclarer);
    }

    private void mapToValueAndEmit(Message message) {
        if (message != null) {
            Values values = this.pulsarSpoutConf.getMessageToValuesMapper().toValues(message);
            this.pendingAcks += serialVersionUID;
            if (values == null) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("[{}] Dropping message {}", this.spoutId, message.getMessageId());
                }
                ack(message);
            } else {
                this.collector.emit(values, message);
                this.messagesEmitted += serialVersionUID;
                if (LOG.isDebugEnabled()) {
                    LOG.debug("[{}] Emitted message {} to the collector", this.spoutId, message.getMessageId());
                }
            }
        }
    }

    ConcurrentMap getMetrics() {
        this.metricsMap.put(NO_OF_PENDING_FAILED_MESSAGES, Long.valueOf(this.pendingMessageRetries.size()));
        this.metricsMap.put(NO_OF_MESSAGES_RECEIVED, Long.valueOf(this.messagesReceived));
        this.metricsMap.put(NO_OF_MESSAGES_EMITTED, Long.valueOf(this.messagesEmitted));
        this.metricsMap.put(NO_OF_PENDING_ACKS, Long.valueOf(this.pendingAcks));
        this.metricsMap.put(CONSUMER_RATE, Double.valueOf(this.messagesReceived / this.pulsarSpoutConf.getMetricsTimeIntervalInSecs()));
        this.metricsMap.put(CONSUMER_THROUGHPUT_BYTES, Double.valueOf(this.messageSizeReceived / this.pulsarSpoutConf.getMetricsTimeIntervalInSecs()));
        return this.metricsMap;
    }

    void resetMetrics() {
        this.messagesReceived = 0L;
        this.messagesEmitted = 0L;
        this.messageSizeReceived = 0L;
    }

    public Object getValueAndReset() {
        ConcurrentMap metrics = getMetrics();
        resetMetrics();
        return metrics;
    }
}
