package org.apache.storm.eventhubs.spout;

import com.google.common.base.Strings;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.storm.metric.api.IMetric;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/eventhubs/spout/EventHubSpout.class */
public class EventHubSpout extends BaseRichSpout {
    private static final Logger logger = LoggerFactory.getLogger(EventHubSpout.class);
    private final UUID instanceId;
    private final EventHubSpoutConfig eventHubConfig;
    private final IEventDataScheme scheme;
    private final int checkpointIntervalInSeconds;
    private IStateStore stateStore;
    private IPartitionCoordinator partitionCoordinator;
    private IPartitionManagerFactory pmFactory;
    private IEventHubReceiverFactory recvFactory;
    private SpoutOutputCollector collector;
    private long lastCheckpointTime;
    private int currentPartitionIndex;

    public EventHubSpout(String str, String str2, String str3, String str4, int i) {
        this(new EventHubSpoutConfig(str, str2, str3, str4, i));
    }

    public EventHubSpout(EventHubSpoutConfig eventHubSpoutConfig) {
        this(eventHubSpoutConfig, null, null, null);
    }

    public EventHubSpout(EventHubSpoutConfig eventHubSpoutConfig, IStateStore iStateStore, IPartitionManagerFactory iPartitionManagerFactory, IEventHubReceiverFactory iEventHubReceiverFactory) {
        this.currentPartitionIndex = -1;
        this.eventHubConfig = eventHubSpoutConfig;
        this.scheme = eventHubSpoutConfig.getEventDataScheme();
        this.instanceId = UUID.randomUUID();
        this.checkpointIntervalInSeconds = eventHubSpoutConfig.getCheckpointIntervalInSeconds();
        this.lastCheckpointTime = System.currentTimeMillis();
        this.stateStore = iStateStore;
        this.pmFactory = iPartitionManagerFactory;
        if (this.pmFactory == null) {
            this.pmFactory = new IPartitionManagerFactory() { // from class: org.apache.storm.eventhubs.spout.EventHubSpout.1
                @Override // org.apache.storm.eventhubs.spout.IPartitionManagerFactory
                public IPartitionManager create(EventHubSpoutConfig eventHubSpoutConfig2, String str, IStateStore iStateStore2, IEventHubReceiver iEventHubReceiver) {
                    return new PartitionManager(eventHubSpoutConfig2, str, iStateStore2, iEventHubReceiver);
                }
            };
        }
        this.recvFactory = iEventHubReceiverFactory;
        if (this.recvFactory == null) {
            this.recvFactory = new IEventHubReceiverFactory() { // from class: org.apache.storm.eventhubs.spout.EventHubSpout.2
                @Override // org.apache.storm.eventhubs.spout.IEventHubReceiverFactory
                public IEventHubReceiver create(EventHubSpoutConfig eventHubSpoutConfig2, String str) {
                    return new EventHubReceiverImpl(eventHubSpoutConfig2, str);
                }
            };
        }
    }

    public void preparePartitions(Map map, int i, int i2, SpoutOutputCollector spoutOutputCollector) throws Exception {
        this.collector = spoutOutputCollector;
        if (this.stateStore == null) {
            String zkConnectionString = this.eventHubConfig.getZkConnectionString();
            if (zkConnectionString == null || zkConnectionString.length() == 0) {
                List<String> list = (List) map.get("storm.zookeeper.servers");
                Integer valueOf = Integer.valueOf(((Number) map.get("storm.zookeeper.port")).intValue());
                StringBuilder sb = new StringBuilder();
                for (String str : list) {
                    if (sb.length() > 0) {
                        sb.append(',');
                    }
                    sb.append(str + ":" + valueOf);
                }
                zkConnectionString = sb.toString();
            }
            this.stateStore = new ZookeeperStateStore(zkConnectionString, Integer.parseInt(map.get("storm.zookeeper.retry.times").toString()), Integer.parseInt(map.get("storm.zookeeper.retry.interval").toString()));
        }
        this.stateStore.open();
        this.partitionCoordinator = new StaticPartitionCoordinator(this.eventHubConfig, i2, i, this.stateStore, this.pmFactory, this.recvFactory);
        Iterator<IPartitionManager> it = this.partitionCoordinator.getMyPartitionManagers().iterator();
        while (it.hasNext()) {
            it.next().open();
        }
    }

    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        logger.info("begin: open()");
        String str = (String) map.get("topology.name");
        this.eventHubConfig.setTopologyName(str);
        int size = topologyContext.getComponentTasks(topologyContext.getThisComponentId()).size();
        int thisTaskIndex = topologyContext.getThisTaskIndex();
        if (size > this.eventHubConfig.getPartitionCount()) {
            throw new RuntimeException("total tasks of EventHubSpout is greater than partition count.");
        }
        logger.info(String.format("topologyName: %s, totalTasks: %d, taskIndex: %d", str, Integer.valueOf(size), Integer.valueOf(thisTaskIndex)));
        try {
            preparePartitions(map, size, thisTaskIndex, spoutOutputCollector);
            topologyContext.registerMetric("EventHubReceiver", new IMetric() { // from class: org.apache.storm.eventhubs.spout.EventHubSpout.3
                public Object getValueAndReset() {
                    HashMap hashMap = new HashMap();
                    Iterator<IPartitionManager> it = EventHubSpout.this.partitionCoordinator.getMyPartitionManagers().iterator();
                    while (it.hasNext()) {
                        hashMap.putAll(it.next().getMetricsData());
                    }
                    return hashMap;
                }
            }, Integer.parseInt(map.get("topology.builtin.metrics.bucket.size.secs").toString()));
            logger.info("end open()");
        } catch (Exception e) {
            spoutOutputCollector.reportError(e);
            throw new RuntimeException(e);
        }
    }

    public void nextTuple() {
        EventData eventData = null;
        List<IPartitionManager> myPartitionManagers = this.partitionCoordinator.getMyPartitionManagers();
        for (int i = 0; i < myPartitionManagers.size(); i++) {
            this.currentPartitionIndex = (this.currentPartitionIndex + 1) % myPartitionManagers.size();
            IPartitionManager iPartitionManager = myPartitionManagers.get(this.currentPartitionIndex);
            if (iPartitionManager == null) {
                throw new RuntimeException("partitionManager doesn't exist.");
            }
            eventData = iPartitionManager.receive();
            if (eventData != null) {
                break;
            }
        }
        if (eventData != null) {
            MessageId messageId = eventData.getMessageId();
            List<Object> deserialize = this.scheme.deserialize(eventData.getMessage());
            if (deserialize != null) {
                this.collector.emit(deserialize, messageId);
            }
        }
        checkpointIfNeeded();
    }

    public void ack(Object obj) {
        MessageId messageId = (MessageId) obj;
        this.partitionCoordinator.getPartitionManager(messageId.getPartitionId()).ack(messageId.getOffset());
    }

    public void fail(Object obj) {
        MessageId messageId = (MessageId) obj;
        this.partitionCoordinator.getPartitionManager(messageId.getPartitionId()).fail(messageId.getOffset());
    }

    public void deactivate() {
        checkpoint();
    }

    public void close() {
        Iterator<IPartitionManager> it = this.partitionCoordinator.getMyPartitionManagers().iterator();
        while (it.hasNext()) {
            it.next().close();
        }
        this.stateStore.close();
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        if (Strings.isNullOrEmpty(this.eventHubConfig.getOutputStreamId())) {
            outputFieldsDeclarer.declare(this.scheme.getOutputFields());
        } else {
            outputFieldsDeclarer.declareStream(this.eventHubConfig.getOutputStreamId(), this.scheme.getOutputFields());
        }
    }

    private void checkpointIfNeeded() {
        if (this.lastCheckpointTime + (this.checkpointIntervalInSeconds * 1000) < System.currentTimeMillis()) {
            checkpoint();
            this.lastCheckpointTime = System.currentTimeMillis();
        }
    }

    private void checkpoint() {
        Iterator<IPartitionManager> it = this.partitionCoordinator.getMyPartitionManagers().iterator();
        while (it.hasNext()) {
            it.next().checkpoint();
        }
    }
}
