/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.storm;

import backtype.storm.metric.api.IMetric;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConfiguration;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.storm.PulsarBoltConfiguration;
import org.apache.pulsar.storm.SharedPulsarClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pulsar-storm-shade.com.google.common.base.Preconditions;
import pulsar-storm-shade.com.google.common.collect.Maps;

public class PulsarBolt
extends BaseRichBolt
implements IMetric {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(PulsarBolt.class);
    public static final String NO_OF_MESSAGES_SENT = "numberOfMessagesSent";
    public static final String PRODUCER_RATE = "producerRate";
    public static final String PRODUCER_THROUGHPUT_BYTES = "producerThroughput";
    private final ClientConfiguration clientConf;
    private final ProducerConfiguration producerConf;
    private final PulsarBoltConfiguration pulsarBoltConf;
    private final ConcurrentMap<String, Object> metricsMap = Maps.newConcurrentMap();
    private SharedPulsarClient sharedPulsarClient;
    private String componentId;
    private String boltId;
    private OutputCollector collector;
    private Producer producer;
    private volatile long messagesSent = 0L;
    private volatile long messageSizeSent = 0L;

    public PulsarBolt(PulsarBoltConfiguration pulsarBoltConf, ClientConfiguration clientConf) {
        this(pulsarBoltConf, clientConf, new ProducerConfiguration());
    }

    public PulsarBolt(PulsarBoltConfiguration pulsarBoltConf, ClientConfiguration clientConf, ProducerConfiguration producerConf) {
        this.clientConf = clientConf;
        this.producerConf = producerConf;
        Preconditions.checkNotNull(pulsarBoltConf.getServiceUrl());
        Preconditions.checkNotNull(pulsarBoltConf.getTopic());
        Preconditions.checkNotNull(pulsarBoltConf.getTupleToMessageMapper());
        this.pulsarBoltConf = pulsarBoltConf;
    }

    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        this.componentId = context.getThisComponentId();
        this.boltId = String.format("%s-%s", this.componentId, context.getThisTaskId());
        this.collector = collector;
        try {
            this.sharedPulsarClient = SharedPulsarClient.get(this.componentId, this.pulsarBoltConf.getServiceUrl(), this.clientConf);
            this.producer = this.sharedPulsarClient.getSharedProducer(this.pulsarBoltConf.getTopic(), this.producerConf);
            LOG.info("[{}] Created a pulsar producer on topic {} to send messages", (Object)this.boltId, (Object)this.pulsarBoltConf.getTopic());
        }
        catch (PulsarClientException e) {
            LOG.error("[{}] Error initializing pulsar producer on topic {}", new Object[]{this.boltId, this.pulsarBoltConf.getTopic(), e});
        }
        context.registerMetric(String.format("PulsarBoltMetrics-%s-%s", this.componentId, context.getThisTaskIndex()), (IMetric)this, this.pulsarBoltConf.getMetricsTimeIntervalInSecs());
    }

    public void execute(Tuple input) {
        if (PulsarBolt.isTickTuple(input)) {
            this.collector.ack(input);
            return;
        }
        try {
            if (this.producer != null) {
                Message msg = this.pulsarBoltConf.getTupleToMessageMapper().toMessage(input);
                if (msg == null) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("[{}] Cannot send null message, acking the collector", (Object)this.boltId);
                    }
                    this.collector.ack(input);
                } else {
                    long messageSizeToBeSent = msg.getData().length;
                    this.producer.sendAsync(msg).handle((r, ex) -> {
                        OutputCollector outputCollector = this.collector;
                        synchronized (outputCollector) {
                            if (ex != null) {
                                this.collector.reportError(ex);
                                this.collector.fail(input);
                                LOG.error("[{}] Message send failed", (Object)this.boltId, ex);
                            } else {
                                this.collector.ack(input);
                                ++this.messagesSent;
                                this.messageSizeSent += messageSizeToBeSent;
                                if (LOG.isDebugEnabled()) {
                                    LOG.debug("[{}] Message sent with id {}", (Object)this.boltId, (Object)msg.getMessageId());
                                }
                            }
                        }
                        return null;
                    });
                }
            }
        }
        catch (Exception e) {
            LOG.error("[{}] Message processing failed", (Object)this.boltId, (Object)e);
            this.collector.reportError((Throwable)e);
            this.collector.fail(input);
        }
    }

    public void close() {
        try {
            LOG.info("[{}] Closing Pulsar producer on topic {}", (Object)this.boltId, (Object)this.pulsarBoltConf.getTopic());
            if (this.sharedPulsarClient != null) {
                this.sharedPulsarClient.close();
            }
        }
        catch (PulsarClientException e) {
            LOG.error("[{}] Error closing Pulsar producer on topic {}", new Object[]{this.boltId, this.pulsarBoltConf.getTopic(), e});
        }
    }

    public void cleanup() {
        this.close();
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        this.pulsarBoltConf.getTupleToMessageMapper().declareOutputFields(declarer);
    }

    protected static boolean isTickTuple(Tuple tuple) {
        return tuple != null && "__system".equals(tuple.getSourceComponent()) && "__tick".equals(tuple.getSourceStreamId());
    }

    ConcurrentMap getMetrics() {
        this.metricsMap.put(NO_OF_MESSAGES_SENT, this.messagesSent);
        this.metricsMap.put(PRODUCER_RATE, (double)this.messagesSent / (double)this.pulsarBoltConf.getMetricsTimeIntervalInSecs());
        this.metricsMap.put(PRODUCER_THROUGHPUT_BYTES, (double)this.messageSizeSent / (double)this.pulsarBoltConf.getMetricsTimeIntervalInSecs());
        return this.metricsMap;
    }

    void resetMetrics() {
        this.messagesSent = 0L;
        this.messageSizeSent = 0L;
    }

    public Object getValueAndReset() {
        ConcurrentMap metrics = this.getMetrics();
        this.resetMetrics();
        return metrics;
    }
}

