package org.apache.storm.kafka;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/kafka/ZkCoordinator.class */
public class ZkCoordinator implements PartitionCoordinator {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) ZkCoordinator.class);
    SpoutConfig _spoutConfig;
    int _taskIndex;
    int _totalTasks;
    int _taskId;
    String _topologyInstanceId;
    Map<Partition, PartitionManager> _managers;
    List<PartitionManager> _cachedList;
    Long _lastRefreshTime;
    int _refreshFreqMs;
    DynamicPartitionConnections _connections;
    DynamicBrokersReader _reader;
    ZkState _state;
    Map _stormConf;

    public ZkCoordinator(DynamicPartitionConnections dynamicPartitionConnections, Map map, SpoutConfig spoutConfig, ZkState zkState, int i, int i2, int i3, String str) {
        this(dynamicPartitionConnections, map, spoutConfig, zkState, i, i2, i3, str, buildReader(map, spoutConfig));
    }

    public ZkCoordinator(DynamicPartitionConnections dynamicPartitionConnections, Map map, SpoutConfig spoutConfig, ZkState zkState, int i, int i2, int i3, String str, DynamicBrokersReader dynamicBrokersReader) {
        this._managers = new HashMap();
        this._cachedList = new ArrayList();
        this._lastRefreshTime = null;
        this._spoutConfig = spoutConfig;
        this._connections = dynamicPartitionConnections;
        this._taskIndex = i;
        this._totalTasks = i2;
        this._taskId = i3;
        this._topologyInstanceId = str;
        this._stormConf = map;
        this._state = zkState;
        this._refreshFreqMs = ((ZkHosts) spoutConfig.hosts).refreshFreqSecs * 1000;
        this._reader = dynamicBrokersReader;
    }

    private static DynamicBrokersReader buildReader(Map map, SpoutConfig spoutConfig) {
        ZkHosts zkHosts = (ZkHosts) spoutConfig.hosts;
        return new DynamicBrokersReader(map, zkHosts.brokerZkStr, zkHosts.brokerZkPath, spoutConfig.topic);
    }

    @Override // org.apache.storm.kafka.PartitionCoordinator
    public List<PartitionManager> getMyManagedPartitions() {
        if (this._lastRefreshTime == null || System.currentTimeMillis() - this._lastRefreshTime.longValue() > this._refreshFreqMs) {
            refresh();
            this._lastRefreshTime = Long.valueOf(System.currentTimeMillis());
        }
        return this._cachedList;
    }

    @Override // org.apache.storm.kafka.PartitionCoordinator
    public void refresh() {
        try {
            LOG.info(KafkaUtils.taskPrefix(this._taskIndex, this._totalTasks, this._taskId) + " Refreshing partition manager connections");
            List<Partition> calculatePartitionsForTask = KafkaUtils.calculatePartitionsForTask(this._reader.getBrokerInfo(), this._totalTasks, this._taskIndex, this._taskId);
            Set<Partition> keySet = this._managers.keySet();
            HashSet<Partition> hashSet = new HashSet(calculatePartitionsForTask);
            hashSet.removeAll(keySet);
            HashSet<Partition> hashSet2 = new HashSet(keySet);
            hashSet2.removeAll(calculatePartitionsForTask);
            LOG.info(KafkaUtils.taskPrefix(this._taskIndex, this._totalTasks, this._taskId) + " Deleted partition managers: " + hashSet2.toString());
            HashMap hashMap = new HashMap();
            for (Partition partition : hashSet2) {
                hashMap.put(Integer.valueOf(partition.partition), this._managers.remove(partition));
            }
            for (PartitionManager partitionManager : hashMap.values()) {
                if (partitionManager != null) {
                    partitionManager.close();
                }
            }
            LOG.info(KafkaUtils.taskPrefix(this._taskIndex, this._totalTasks, this._taskId) + " New partition managers: " + hashSet.toString());
            for (Partition partition2 : hashSet) {
                this._managers.put(partition2, new PartitionManager(this._connections, this._topologyInstanceId, this._state, this._stormConf, this._spoutConfig, partition2, (PartitionManager) hashMap.get(Integer.valueOf(partition2.partition))));
            }
            this._cachedList = new ArrayList(this._managers.values());
            LOG.info(KafkaUtils.taskPrefix(this._taskIndex, this._totalTasks, this._taskId) + " Finished refreshing");
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.storm.kafka.PartitionCoordinator
    public PartitionManager getManager(Partition partition) {
        return this._managers.get(partition);
    }
}
