package org.apache.pulsar.storm;

import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConfiguration;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.storm.metric.api.IMetric;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.utils.TupleUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import p000pulsarstormshade.com.google.common.base.Preconditions;
import p000pulsarstormshade.com.google.common.collect.Maps;

/* loaded from: input_file:org/apache/pulsar/storm/PulsarBolt.class */
public class PulsarBolt extends BaseRichBolt implements IMetric {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(PulsarBolt.class);
    public static final String NO_OF_MESSAGES_SENT = "numberOfMessagesSent";
    public static final String PRODUCER_RATE = "producerRate";
    public static final String PRODUCER_THROUGHPUT_BYTES = "producerThroughput";
    private final ClientConfigurationData clientConf;
    private final ProducerConfigurationData producerConf;
    private final PulsarBoltConfiguration pulsarBoltConf;
    private final ConcurrentMap<String, Object> metricsMap;
    private SharedPulsarClient sharedPulsarClient;
    private String componentId;
    private String boltId;
    private OutputCollector collector;
    private Producer<byte[]> producer;
    private volatile long messagesSent;
    private volatile long messageSizeSent;

    public PulsarBolt(PulsarBoltConfiguration pulsarBoltConfiguration, ClientBuilder clientBuilder) {
        this.metricsMap = Maps.newConcurrentMap();
        this.messagesSent = 0L;
        this.messageSizeSent = 0L;
        this.clientConf = ((ClientBuilderImpl) clientBuilder).getClientConfigurationData().clone();
        this.producerConf = new ProducerConfigurationData();
        Preconditions.checkNotNull(pulsarBoltConfiguration.getServiceUrl());
        Preconditions.checkNotNull(pulsarBoltConfiguration.getTopic());
        Preconditions.checkNotNull(pulsarBoltConfiguration.getTupleToMessageMapper());
        this.clientConf.setServiceUrl(pulsarBoltConfiguration.getServiceUrl());
        this.producerConf.setTopicName(pulsarBoltConfiguration.getTopic());
        this.pulsarBoltConf = pulsarBoltConfiguration;
    }

    @Deprecated
    public PulsarBolt(PulsarBoltConfiguration pulsarBoltConfiguration, ClientConfiguration clientConfiguration) {
        this(pulsarBoltConfiguration, clientConfiguration, new ProducerConfiguration());
    }

    @Deprecated
    public PulsarBolt(PulsarBoltConfiguration pulsarBoltConfiguration, ClientConfiguration clientConfiguration, ProducerConfiguration producerConfiguration) {
        this.metricsMap = Maps.newConcurrentMap();
        this.messagesSent = 0L;
        this.messageSizeSent = 0L;
        this.clientConf = clientConfiguration.getConfigurationData().clone();
        this.producerConf = producerConfiguration.getProducerConfigurationData().clone();
        Preconditions.checkNotNull(pulsarBoltConfiguration.getServiceUrl());
        Preconditions.checkNotNull(pulsarBoltConfiguration.getTopic());
        Preconditions.checkNotNull(pulsarBoltConfiguration.getTupleToMessageMapper());
        this.clientConf.setServiceUrl(pulsarBoltConfiguration.getServiceUrl());
        this.producerConf.setTopicName(pulsarBoltConfiguration.getTopic());
        this.pulsarBoltConf = pulsarBoltConfiguration;
    }

    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.componentId = topologyContext.getThisComponentId();
        this.boltId = String.format("%s-%s", this.componentId, Integer.valueOf(topologyContext.getThisTaskId()));
        this.collector = outputCollector;
        try {
            this.sharedPulsarClient = SharedPulsarClient.get(this.componentId, this.clientConf);
            this.producer = this.sharedPulsarClient.getSharedProducer(this.producerConf);
            LOG.info("[{}] Created a pulsar producer on topic {} to send messages", this.boltId, this.pulsarBoltConf.getTopic());
            topologyContext.registerMetric(String.format("PulsarBoltMetrics-%s-%s", this.componentId, Integer.valueOf(topologyContext.getThisTaskIndex())), this, this.pulsarBoltConf.getMetricsTimeIntervalInSecs());
        } catch (PulsarClientException e) {
            LOG.error("[{}] Error initializing pulsar producer on topic {}", new Object[]{this.boltId, this.pulsarBoltConf.getTopic(), e});
            throw new IllegalStateException(String.format("Failed to initialize producer for %s : %s", this.pulsarBoltConf.getTopic(), e.getMessage()), e);
        }
    }

    public void execute(Tuple tuple) {
        if (TupleUtils.isTick(tuple)) {
            this.collector.ack(tuple);
            return;
        }
        try {
            if (this.producer != null) {
                Message<byte[]> message = this.pulsarBoltConf.getTupleToMessageMapper().toMessage(tuple);
                if (message == null) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("[{}] Cannot send null message, acking the collector", this.boltId);
                    }
                    this.collector.ack(tuple);
                } else {
                    long length = message.getData().length;
                    this.producer.sendAsync(message).handle((messageId, th) -> {
                        synchronized (this.collector) {
                            if (th != null) {
                                this.collector.reportError(th);
                                this.collector.fail(tuple);
                                LOG.error("[{}] Message send failed", this.boltId, th);
                            } else {
                                this.collector.ack(tuple);
                                this.messagesSent += serialVersionUID;
                                this.messageSizeSent += length;
                                if (LOG.isDebugEnabled()) {
                                    LOG.debug("[{}] Message sent with id {}", this.boltId, message.getMessageId());
                                }
                            }
                        }
                        return null;
                    });
                }
            }
        } catch (Exception e) {
            LOG.error("[{}] Message processing failed", this.boltId, e);
            this.collector.reportError(e);
            this.collector.fail(tuple);
        }
    }

    public void close() {
        try {
            LOG.info("[{}] Closing Pulsar producer on topic {}", this.boltId, this.pulsarBoltConf.getTopic());
            if (this.sharedPulsarClient != null) {
                this.sharedPulsarClient.close();
            }
        } catch (PulsarClientException e) {
            LOG.error("[{}] Error closing Pulsar producer on topic {}", new Object[]{this.boltId, this.pulsarBoltConf.getTopic(), e});
        }
    }

    public void cleanup() {
        close();
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        this.pulsarBoltConf.getTupleToMessageMapper().declareOutputFields(outputFieldsDeclarer);
    }

    ConcurrentMap getMetrics() {
        this.metricsMap.put(NO_OF_MESSAGES_SENT, Long.valueOf(this.messagesSent));
        this.metricsMap.put(PRODUCER_RATE, Double.valueOf(this.messagesSent / this.pulsarBoltConf.getMetricsTimeIntervalInSecs()));
        this.metricsMap.put(PRODUCER_THROUGHPUT_BYTES, Double.valueOf(this.messageSizeSent / this.pulsarBoltConf.getMetricsTimeIntervalInSecs()));
        return this.metricsMap;
    }

    void resetMetrics() {
        this.messagesSent = 0L;
        this.messageSizeSent = 0L;
    }

    public Object getValueAndReset() {
        ConcurrentMap metrics = getMetrics();
        resetMetrics();
        return metrics;
    }
}
