package org.apache.storm.kafka;

import com.google.common.base.Strings;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.storm.kafka.KafkaUtils;
import org.apache.storm.kafka.PartitionManager;
import org.apache.storm.kafka.trident.GlobalPartitionInformation;
import org.apache.storm.metric.api.IMetric;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Deprecated
/* loaded from: input_file:org/apache/storm/kafka/KafkaSpout.class */
public class KafkaSpout extends BaseRichSpout {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);
    SpoutConfig _spoutConfig;
    SpoutOutputCollector _collector;
    PartitionCoordinator _coordinator;
    DynamicPartitionConnections _connections;
    ZkState _state;
    long _lastUpdateMs = 0;
    int _currPartitionIndex = 0;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/storm/kafka/KafkaSpout$EmitState.class */
    public enum EmitState {
        EMITTED_MORE_LEFT,
        EMITTED_END,
        NO_EMITTED
    }

    public KafkaSpout(SpoutConfig spoutConfig) {
        this._spoutConfig = spoutConfig;
    }

    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this._collector = spoutOutputCollector;
        String stormId = topologyContext.getStormId();
        HashMap hashMap = new HashMap(map);
        List<String> list = this._spoutConfig.zkServers;
        if (list == null) {
            list = (List) map.get("storm.zookeeper.servers");
        }
        Integer num = this._spoutConfig.zkPort;
        if (num == null) {
            num = Integer.valueOf(((Number) map.get("storm.zookeeper.port")).intValue());
        }
        hashMap.put("transactional.zookeeper.servers", list);
        hashMap.put("transactional.zookeeper.port", num);
        hashMap.put("transactional.zookeeper.root", this._spoutConfig.zkRoot);
        this._state = new ZkState(hashMap);
        this._connections = new DynamicPartitionConnections(this._spoutConfig, KafkaUtils.makeBrokerReader(map, this._spoutConfig));
        int size = topologyContext.getComponentTasks(topologyContext.getThisComponentId()).size();
        if (this._spoutConfig.hosts instanceof StaticHosts) {
            this._coordinator = new StaticCoordinator(this._connections, map, this._spoutConfig, this._state, topologyContext.getThisTaskIndex(), size, stormId);
        } else {
            this._coordinator = new ZkCoordinator(this._connections, map, this._spoutConfig, this._state, topologyContext.getThisTaskIndex(), size, stormId);
        }
        topologyContext.registerMetric("kafkaOffset", new IMetric() { // from class: org.apache.storm.kafka.KafkaSpout.1
            KafkaUtils.KafkaOffsetMetric _kafkaOffsetMetric;

            {
                this._kafkaOffsetMetric = new KafkaUtils.KafkaOffsetMetric(KafkaSpout.this._connections);
            }

            public Object getValueAndReset() {
                List<PartitionManager> myManagedPartitions = KafkaSpout.this._coordinator.getMyManagedPartitions();
                HashSet hashSet = new HashSet();
                Iterator<PartitionManager> it = myManagedPartitions.iterator();
                while (it.hasNext()) {
                    hashSet.add(it.next().getPartition());
                }
                this._kafkaOffsetMetric.refreshPartitions(hashSet);
                for (PartitionManager partitionManager : myManagedPartitions) {
                    this._kafkaOffsetMetric.setOffsetData(partitionManager.getPartition(), partitionManager.getOffsetData());
                }
                return this._kafkaOffsetMetric.getValueAndReset();
            }
        }, this._spoutConfig.metricsTimeBucketSizeInSecs);
        topologyContext.registerMetric("kafkaPartition", new IMetric() { // from class: org.apache.storm.kafka.KafkaSpout.2
            public Object getValueAndReset() {
                List<PartitionManager> myManagedPartitions = KafkaSpout.this._coordinator.getMyManagedPartitions();
                HashMap hashMap2 = new HashMap();
                Iterator<PartitionManager> it = myManagedPartitions.iterator();
                while (it.hasNext()) {
                    hashMap2.putAll(it.next().getMetricsDataMap());
                }
                return hashMap2;
            }
        }, this._spoutConfig.metricsTimeBucketSizeInSecs);
    }

    public void close() {
        this._state.close();
    }

    public void nextTuple() {
        EmitState next;
        List<PartitionManager> myManagedPartitions = this._coordinator.getMyManagedPartitions();
        for (int i = 0; i < myManagedPartitions.size(); i++) {
            try {
                this._currPartitionIndex %= myManagedPartitions.size();
                next = myManagedPartitions.get(this._currPartitionIndex).next(this._collector);
                if (next != EmitState.EMITTED_MORE_LEFT) {
                    this._currPartitionIndex = (this._currPartitionIndex + 1) % myManagedPartitions.size();
                }
            } catch (FailedFetchException e) {
                LOG.warn("Fetch failed", e);
                this._coordinator.refresh();
            }
            if (next != EmitState.NO_EMITTED) {
                break;
            }
        }
        long currentTimeMillis = System.currentTimeMillis() - this._lastUpdateMs;
        if (currentTimeMillis > this._spoutConfig.stateUpdateIntervalMs || currentTimeMillis < 0) {
            commit();
        }
    }

    private PartitionManager getManagerForPartition(int i) {
        for (PartitionManager partitionManager : this._coordinator.getMyManagedPartitions()) {
            if (partitionManager.getPartition().partition == i) {
                return partitionManager;
            }
        }
        return null;
    }

    public void ack(Object obj) {
        PartitionManager.KafkaMessageId kafkaMessageId = (PartitionManager.KafkaMessageId) obj;
        PartitionManager manager = this._coordinator.getManager(kafkaMessageId.partition);
        if (manager != null) {
            manager.ack(Long.valueOf(kafkaMessageId.offset));
            return;
        }
        PartitionManager managerForPartition = getManagerForPartition(kafkaMessageId.partition.partition);
        if (managerForPartition != null) {
            managerForPartition.ack(Long.valueOf(kafkaMessageId.offset));
        }
    }

    public void fail(Object obj) {
        PartitionManager.KafkaMessageId kafkaMessageId = (PartitionManager.KafkaMessageId) obj;
        PartitionManager manager = this._coordinator.getManager(kafkaMessageId.partition);
        if (manager != null) {
            manager.fail(Long.valueOf(kafkaMessageId.offset));
            return;
        }
        PartitionManager managerForPartition = getManagerForPartition(kafkaMessageId.partition.partition);
        if (managerForPartition != null) {
            managerForPartition.fail(Long.valueOf(kafkaMessageId.offset));
        }
    }

    public void deactivate() {
        commit();
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        if (Strings.isNullOrEmpty(this._spoutConfig.outputStreamId)) {
            outputFieldsDeclarer.declare(this._spoutConfig.scheme.getOutputFields());
        } else {
            outputFieldsDeclarer.declareStream(this._spoutConfig.outputStreamId, this._spoutConfig.scheme.getOutputFields());
        }
    }

    public Map<String, Object> getComponentConfiguration() {
        Map<String, Object> componentConfiguration = super.getComponentConfiguration();
        if (componentConfiguration == null) {
            componentConfiguration = new HashMap();
        }
        componentConfiguration.put("config.topics", this._spoutConfig.topic);
        StringBuilder sb = new StringBuilder();
        if (this._spoutConfig.zkServers != null && this._spoutConfig.zkServers.size() > 0) {
            Iterator<String> it = this._spoutConfig.zkServers.iterator();
            while (it.hasNext()) {
                sb.append(it.next() + ":" + this._spoutConfig.zkPort + ",");
            }
            componentConfiguration.put("config.zkServers", sb.toString());
        }
        BrokerHosts brokerHosts = this._spoutConfig.hosts;
        String str = this._spoutConfig.zkRoot + "/" + this._spoutConfig.id;
        if (brokerHosts instanceof ZkHosts) {
            componentConfiguration.put("config.zkNodeBrokers", ((ZkHosts) brokerHosts).brokerZkPath);
        } else if (brokerHosts instanceof StaticHosts) {
            GlobalPartitionInformation partitionInformation = ((StaticHosts) brokerHosts).getPartitionInformation();
            if (partitionInformation.getbUseTopicNameForPartitionPathId().booleanValue()) {
                str = str + "/" + this._spoutConfig.topic;
            }
            List<Partition> orderedPartitions = partitionInformation.getOrderedPartitions();
            StringBuilder sb2 = new StringBuilder();
            StringBuilder sb3 = new StringBuilder();
            for (Partition partition : orderedPartitions) {
                sb2.append(partition.partition + ",");
                sb3.append(partition.host.host + ":" + partition.host.port).append(",");
            }
            componentConfiguration.put("config.partitions", sb2.toString());
            componentConfiguration.put("config.leaders", sb3.toString());
        }
        componentConfiguration.put("config.zkRoot", str);
        return componentConfiguration;
    }

    private void commit() {
        this._lastUpdateMs = System.currentTimeMillis();
        Iterator<PartitionManager> it = this._coordinator.getMyManagedPartitions().iterator();
        while (it.hasNext()) {
            it.next().commit();
        }
    }
}
