package org.apache.storm.eventhubs.spout;

import com.microsoft.eventhubs.client.EventHubException;
import com.microsoft.eventhubs.client.IEventHubFilter;
import com.microsoft.eventhubs.client.ResilientEventHubReceiver;
import java.util.HashMap;
import java.util.Map;
import org.apache.qpid.amqp_1_0.client.Message;
import org.apache.qpid.amqp_1_0.type.Symbol;
import org.apache.qpid.amqp_1_0.type.messaging.MessageAnnotations;
import org.apache.storm.metric.api.CountMetric;
import org.apache.storm.metric.api.MeanReducer;
import org.apache.storm.metric.api.ReducedMetric;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/eventhubs/spout/EventHubReceiverImpl.class */
public class EventHubReceiverImpl implements IEventHubReceiver {
    private static final Logger logger = LoggerFactory.getLogger(EventHubReceiverImpl.class);
    private static final Symbol OffsetKey = Symbol.valueOf("x-opt-offset");
    private static final Symbol SequenceNumberKey = Symbol.valueOf("x-opt-sequence-number");
    private final String connectionString;
    private final String entityName;
    private final String partitionId;
    private final int defaultCredits;
    private final String consumerGroupName;
    private ResilientEventHubReceiver receiver;
    private ReducedMetric receiveApiLatencyMean = new ReducedMetric(new MeanReducer());
    private CountMetric receiveApiCallCount = new CountMetric();
    private CountMetric receiveMessageCount = new CountMetric();

    public EventHubReceiverImpl(EventHubSpoutConfig eventHubSpoutConfig, String str) {
        this.connectionString = eventHubSpoutConfig.getConnectionString();
        this.entityName = eventHubSpoutConfig.getEntityPath();
        this.defaultCredits = eventHubSpoutConfig.getReceiverCredits();
        this.partitionId = str;
        this.consumerGroupName = eventHubSpoutConfig.getConsumerGroupName();
    }

    @Override // org.apache.storm.eventhubs.spout.IEventHubReceiver
    public void open(IEventHubFilter iEventHubFilter) throws EventHubException {
        logger.info("creating eventhub receiver: partitionId=" + this.partitionId + ", filterString=" + iEventHubFilter.getFilterString());
        long currentTimeMillis = System.currentTimeMillis();
        this.receiver = new ResilientEventHubReceiver(this.connectionString, this.entityName, this.partitionId, this.consumerGroupName, this.defaultCredits, iEventHubFilter);
        this.receiver.initialize();
        logger.info("created eventhub receiver, time taken(ms): " + (System.currentTimeMillis() - currentTimeMillis));
    }

    @Override // org.apache.storm.eventhubs.spout.IEventHubReceiver
    public void close() {
        if (this.receiver != null) {
            this.receiver.close();
            logger.info("closed eventhub receiver: partitionId=" + this.partitionId);
            this.receiver = null;
        }
    }

    @Override // org.apache.storm.eventhubs.spout.IEventHubReceiver
    public boolean isOpen() {
        return this.receiver != null;
    }

    @Override // org.apache.storm.eventhubs.spout.IEventHubReceiver
    public EventData receive(long j) {
        long currentTimeMillis = System.currentTimeMillis();
        Message receive = this.receiver.receive(j);
        this.receiveApiLatencyMean.update(Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
        this.receiveApiCallCount.incr();
        if (receive == null) {
            return null;
        }
        this.receiveMessageCount.incr();
        return EventData.create(receive, createMessageId(receive));
    }

    private MessageId createMessageId(Message message) {
        String str = null;
        long j = 0;
        for (MessageAnnotations messageAnnotations : message.getPayload()) {
            if (messageAnnotations instanceof MessageAnnotations) {
                HashMap hashMap = (HashMap) messageAnnotations.getValue();
                if (hashMap.containsKey(OffsetKey)) {
                    str = (String) hashMap.get(OffsetKey);
                }
                if (hashMap.containsKey(SequenceNumberKey)) {
                    j = ((Long) hashMap.get(SequenceNumberKey)).longValue();
                }
            }
        }
        return MessageId.create(this.partitionId, str, j);
    }

    @Override // org.apache.storm.eventhubs.spout.IEventHubReceiver
    public Map getMetricsData() {
        HashMap hashMap = new HashMap();
        hashMap.put(this.partitionId + "/receiveApiLatencyMean", this.receiveApiLatencyMean.getValueAndReset());
        hashMap.put(this.partitionId + "/receiveApiCallCount", this.receiveApiCallCount.getValueAndReset());
        hashMap.put(this.partitionId + "/receiveMessageCount", this.receiveMessageCount.getValueAndReset());
        return hashMap;
    }
}
