package org.apache.pulsar.broker.loadbalance.impl;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.BrokerData;
import org.apache.pulsar.broker.BundleData;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.TimeAverageBrokerData;
import org.apache.pulsar.broker.TimeAverageMessageData;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.loadbalance.BrokerFilter;
import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
import org.apache.pulsar.broker.loadbalance.BrokerHostUsage;
import org.apache.pulsar.broker.loadbalance.BundleSplitStrategy;
import org.apache.pulsar.broker.loadbalance.LoadData;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.LoadSheddingStrategy;
import org.apache.pulsar.broker.loadbalance.ModularLoadManager;
import org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.shade.com.google.common.collect.Maps;
import org.apache.pulsar.shade.io.netty.util.concurrent.DefaultThreadFactory;
import org.apache.pulsar.shade.org.apache.bookkeeper.util.BookKeeperConstants;
import org.apache.pulsar.shade.org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.shade.org.apache.commons.lang3.SystemUtils;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.FailureDomain;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.ResourceQuota;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.pulsar.shade.org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.apache.pulsar.shade.org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.shade.org.apache.zookeeper.CreateMode;
import org.apache.pulsar.shade.org.apache.zookeeper.KeeperException;
import org.apache.pulsar.shade.org.apache.zookeeper.Watcher;
import org.apache.pulsar.shade.org.apache.zookeeper.ZooDefs;
import org.apache.pulsar.shade.org.apache.zookeeper.ZooKeeper;
import org.apache.pulsar.shade.org.apache.zookeeper.data.Stat;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperCacheListener;
import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.class */
public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCacheListener<LocalBrokerData> {
    public static final String BUNDLE_DATA_ZPATH = "/loadbalance/bundle-data";
    public static final double DEFAULT_MESSAGE_RATE = 50.0d;
    public static final double DEFAULT_MESSAGE_THROUGHPUT = 50000.0d;
    public static final int NUM_LONG_SAMPLES = 1000;
    public static final int NUM_SHORT_SAMPLES = 10;
    public static final String RESOURCE_QUOTA_ZPATH = "/loadbalance/resource-quota/namespace";
    public static final String TIME_AVERAGE_BROKER_ZPATH = "/loadbalance/broker-time-average";
    private ZooKeeperChildrenCache availableActiveBrokers;
    private final Set<String> brokerCandidateCache;
    private ZooKeeperDataCache<LocalBrokerData> brokerDataCache;
    private BrokerHostUsage brokerHostUsage;
    private final ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange;
    private String brokerZnodePath;
    private BundleSplitStrategy bundleSplitStrategy;
    private ServiceConfiguration conf;
    private final NamespaceBundleStats defaultStats;
    private final List<BrokerFilter> filterPipeline;
    private long lastBundleDataUpdate;
    private LocalBrokerData lastData;
    private final List<LoadSheddingStrategy> loadSheddingPipeline;
    private LocalBrokerData localData;
    private final LoadData loadData;
    private final Map<String, String> preallocatedBundleToBroker;
    private ModularLoadManagerStrategy placementStrategy;
    private SimpleResourceAllocationPolicies policies;
    private PulsarService pulsar;
    private final ScheduledExecutorService scheduler;
    private ZooKeeper zkClient;
    private final LoadManagerShared.BrokerTopicLoadingPredicate brokerTopicLoadingPredicate;
    private Map<String, String> brokerToFailureDomainMap;
    private static final Logger log = LoggerFactory.getLogger(ModularLoadManagerImpl.class);
    private static final ZooKeeperCache.Deserializer<LocalBrokerData> loadReportDeserializer = (str, bArr) -> {
        return (LocalBrokerData) AdminResource.jsonMapper().readValue(bArr, LocalBrokerData.class);
    };

    public ModularLoadManagerImpl() {
        this.brokerCandidateCache = new HashSet();
        this.brokerToNamespaceToBundleRange = new ConcurrentOpenHashMap<>();
        this.defaultStats = new NamespaceBundleStats();
        this.filterPipeline = new ArrayList();
        this.loadData = new LoadData();
        this.loadSheddingPipeline = new ArrayList();
        this.loadSheddingPipeline.add(new OverloadShedder());
        this.preallocatedBundleToBroker = new ConcurrentHashMap();
        this.scheduler = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-modular-load-manager"));
        this.brokerToFailureDomainMap = Maps.newHashMap();
        this.brokerTopicLoadingPredicate = new LoadManagerShared.BrokerTopicLoadingPredicate() { // from class: org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl.1
            @Override // org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLoadingPredicate
            public boolean isEnablePersistentTopics(String str) {
                BrokerData brokerData = ModularLoadManagerImpl.this.loadData.getBrokerData().get(str.replace("http://", ""));
                return (brokerData == null || brokerData.getLocalData() == null || !brokerData.getLocalData().isPersistentTopicsEnabled()) ? false : true;
            }

            @Override // org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLoadingPredicate
            public boolean isEnableNonPersistentTopics(String str) {
                BrokerData brokerData = ModularLoadManagerImpl.this.loadData.getBrokerData().get(str.replace("http://", ""));
                return (brokerData == null || brokerData.getLocalData() == null || !brokerData.getLocalData().isNonPersistentTopicsEnabled()) ? false : true;
            }
        };
    }

    @Override // org.apache.pulsar.broker.loadbalance.ModularLoadManager
    public void initialize(PulsarService pulsarService) {
        this.pulsar = pulsarService;
        this.availableActiveBrokers = new ZooKeeperChildrenCache(pulsarService.getLocalZkCache(), LoadManager.LOADBALANCE_BROKERS_ROOT);
        this.availableActiveBrokers.registerListener(new ZooKeeperCacheListener<Set<String>>() { // from class: org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl.2
            @Override // org.apache.pulsar.zookeeper.ZooKeeperCacheListener
            public void onUpdate(String str, Set<String> set, Stat stat) {
                if (ModularLoadManagerImpl.log.isDebugEnabled()) {
                    ModularLoadManagerImpl.log.debug("Update Received for path {}", str);
                }
                ModularLoadManagerImpl.this.reapDeadBrokerPreallocations(set);
                ScheduledExecutorService scheduledExecutorService = ModularLoadManagerImpl.this.scheduler;
                ModularLoadManagerImpl modularLoadManagerImpl = ModularLoadManagerImpl.this;
                scheduledExecutorService.submit(modularLoadManagerImpl::updateAll);
            }
        });
        this.brokerDataCache = new ZooKeeperDataCache<LocalBrokerData>(pulsarService.getLocalZkCache()) { // from class: org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl.3
            @Override // org.apache.pulsar.zookeeper.ZooKeeperCache.Deserializer
            public LocalBrokerData deserialize(String str, byte[] bArr) throws Exception {
                return (LocalBrokerData) ObjectMapperFactory.getThreadLocal().readValue(bArr, LocalBrokerData.class);
            }
        };
        this.brokerDataCache.registerListener(this);
        if (SystemUtils.IS_OS_LINUX) {
            this.brokerHostUsage = new LinuxBrokerHostUsageImpl(pulsarService);
        } else {
            this.brokerHostUsage = new GenericBrokerHostUsageImpl(pulsarService);
        }
        this.bundleSplitStrategy = new BundleSplitterTask(pulsarService);
        this.conf = pulsarService.getConfiguration();
        this.defaultStats.msgThroughputIn = 50000.0d;
        this.defaultStats.msgThroughputOut = 50000.0d;
        this.defaultStats.msgRateIn = 50.0d;
        this.defaultStats.msgRateOut = 50.0d;
        this.lastData = new LocalBrokerData(pulsarService.getSafeWebServiceAddress(), pulsarService.getWebServiceAddressTls(), pulsarService.getSafeBrokerServiceUrl(), pulsarService.getBrokerServiceUrlTls());
        this.localData = new LocalBrokerData(pulsarService.getSafeWebServiceAddress(), pulsarService.getWebServiceAddressTls(), pulsarService.getSafeBrokerServiceUrl(), pulsarService.getBrokerServiceUrlTls());
        this.localData.setBrokerVersionString(pulsarService.getBrokerVersion());
        this.lastData.setPersistentTopicsEnabled(pulsarService.getConfiguration().isEnablePersistentTopics());
        this.lastData.setNonPersistentTopicsEnabled(pulsarService.getConfiguration().isEnableNonPersistentTopics());
        this.localData.setPersistentTopicsEnabled(pulsarService.getConfiguration().isEnablePersistentTopics());
        this.localData.setNonPersistentTopicsEnabled(pulsarService.getConfiguration().isEnableNonPersistentTopics());
        this.placementStrategy = ModularLoadManagerStrategy.create(this.conf);
        this.policies = new SimpleResourceAllocationPolicies(pulsarService);
        this.zkClient = pulsarService.getZkClient();
        this.filterPipeline.add(new BrokerVersionFilter());
        refreshBrokerToFailureDomainMap();
        pulsarService.getConfigurationCache().failureDomainListCache().registerListener((str, set, stat) -> {
            this.scheduler.execute(() -> {
                refreshBrokerToFailureDomainMap();
            });
        });
        pulsarService.getConfigurationCache().failureDomainCache().registerListener((str2, failureDomain, stat2) -> {
            this.scheduler.execute(() -> {
                refreshBrokerToFailureDomainMap();
            });
        });
    }

    public ModularLoadManagerImpl(PulsarService pulsarService) {
        this();
        initialize(pulsarService);
    }

    private static void createZPathIfNotExists(ZooKeeper zooKeeper, String str) throws Exception {
        if (zooKeeper.exists(str, false) == null) {
            try {
                ZkUtils.createFullPathOptimistic(zooKeeper, str, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            } catch (KeeperException.NodeExistsException e) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void reapDeadBrokerPreallocations(Set<String> set) {
        for (String str : this.loadData.getBrokerData().keySet()) {
            if (!set.contains(str)) {
                if (log.isDebugEnabled()) {
                    log.debug("Broker {} appears to have stopped; now reclaiming any preallocations", str);
                }
                Iterator<Map.Entry<String, String>> it = this.preallocatedBundleToBroker.entrySet().iterator();
                while (it.hasNext()) {
                    Map.Entry<String, String> next = it.next();
                    String key = next.getKey();
                    String value = next.getValue();
                    if (str.equals(value)) {
                        if (log.isDebugEnabled()) {
                            log.debug("Removing old preallocation on dead broker {} for bundle {}", value, key);
                        }
                        it.remove();
                    }
                }
            }
        }
    }

    @Override // org.apache.pulsar.broker.loadbalance.ModularLoadManager
    public Set<String> getAvailableBrokers() {
        try {
            return this.availableActiveBrokers.get();
        } catch (Exception e) {
            log.warn("Error when trying to get active brokers", e);
            return this.loadData.getBrokerData().keySet();
        }
    }

    private BundleData getBundleDataOrDefault(String str) {
        BundleData bundleData = null;
        try {
            String bundleDataZooKeeperPath = getBundleDataZooKeeperPath(str);
            String format = String.format("%s/%s", RESOURCE_QUOTA_ZPATH, str);
            if (this.zkClient.exists(bundleDataZooKeeperPath, (Watcher) null) != null) {
                bundleData = (BundleData) readJson(this.zkClient.getData(bundleDataZooKeeperPath, (Watcher) null, (Stat) null), BundleData.class);
            } else if (this.zkClient.exists(format, (Watcher) null) != null) {
                ResourceQuota resourceQuota = (ResourceQuota) readJson(this.zkClient.getData(format, (Watcher) null, (Stat) null), ResourceQuota.class);
                bundleData = new BundleData(10, 1000);
                TimeAverageMessageData shortTermData = bundleData.getShortTermData();
                TimeAverageMessageData longTermData = bundleData.getLongTermData();
                shortTermData.setMsgRateIn(resourceQuota.getMsgRateIn());
                shortTermData.setMsgRateOut(resourceQuota.getMsgRateOut());
                shortTermData.setMsgThroughputIn(resourceQuota.getBandwidthIn());
                shortTermData.setMsgThroughputOut(resourceQuota.getBandwidthOut());
                longTermData.setMsgRateIn(resourceQuota.getMsgRateIn());
                longTermData.setMsgRateOut(resourceQuota.getMsgRateOut());
                longTermData.setMsgThroughputIn(resourceQuota.getBandwidthIn());
                longTermData.setMsgThroughputOut(resourceQuota.getBandwidthOut());
                shortTermData.setNumSamples(10);
                longTermData.setNumSamples(1000);
            }
        } catch (Exception e) {
            log.warn("Error when trying to find bundle {} on zookeeper: {}", str, e);
        }
        if (bundleData == null) {
            bundleData = new BundleData(10, 1000, this.defaultStats);
        }
        return bundleData;
    }

    private static String getBundleDataZooKeeperPath(String str) {
        return "/loadbalance/bundle-data/" + str;
    }

    private Map<String, NamespaceBundleStats> getBundleStats() {
        return this.pulsar.getBrokerService().getBundleStats();
    }

    private static <T> T readJson(byte[] bArr, Class<T> cls) throws IOException {
        return (T) ObjectMapperFactory.getThreadLocal().readValue(bArr, cls);
    }

    private double percentChange(double d, double d2) {
        return d == 0.0d ? d2 == 0.0d ? 0.0d : Double.POSITIVE_INFINITY : 100.0d * Math.abs((d - d2) / d);
    }

    private boolean needBrokerDataUpdate() {
        long millis = TimeUnit.MINUTES.toMillis(this.conf.getLoadBalancerReportUpdateMaxIntervalMinutes());
        long currentTimeMillis = System.currentTimeMillis() - this.localData.getLastUpdate();
        if (currentTimeMillis > millis) {
            log.info("Writing local data to ZooKeeper because time since last update exceeded threshold of {} minutes", Integer.valueOf(this.conf.getLoadBalancerReportUpdateMaxIntervalMinutes()));
            return true;
        }
        double max = Math.max(100.0d * Math.abs(this.lastData.getMaxResourceUsage() - this.localData.getMaxResourceUsage()), Math.max(percentChange(this.lastData.getMsgRateIn() + this.lastData.getMsgRateOut(), this.localData.getMsgRateIn() + this.localData.getMsgRateOut()), Math.max(percentChange(this.lastData.getMsgThroughputIn() + this.lastData.getMsgThroughputOut(), this.localData.getMsgThroughputIn() + this.localData.getMsgThroughputOut()), percentChange(this.lastData.getNumBundles(), this.localData.getNumBundles()))));
        if (max <= this.conf.getLoadBalancerReportUpdateThresholdPercentage()) {
            return false;
        }
        log.info("Writing local data to ZooKeeper because maximum change {}% exceeded threshold {}%; time since last report written is {} seconds", new Object[]{Double.valueOf(max), Integer.valueOf(this.conf.getLoadBalancerReportUpdateThresholdPercentage()), Double.valueOf(currentTimeMillis / 1000.0d)});
        return true;
    }

    public void updateAll() {
        if (log.isDebugEnabled()) {
            log.debug("Updating broker and bundle data for loadreport");
        }
        updateAllBrokerData();
        updateBundleData();
        checkNamespaceBundleSplit();
    }

    private void updateAllBrokerData() {
        Set<String> availableBrokers = getAvailableBrokers();
        Map<String, BrokerData> brokerData = this.loadData.getBrokerData();
        for (String str : availableBrokers) {
            try {
                LocalBrokerData orElseThrow = this.brokerDataCache.get(String.format("%s/%s", LoadManager.LOADBALANCE_BROKERS_ROOT, str)).orElseThrow(KeeperException.NoNodeException::new);
                if (brokerData.containsKey(str)) {
                    brokerData.get(str).setLocalData(orElseThrow);
                } else {
                    brokerData.put(str, new BrokerData(orElseThrow));
                }
            } catch (KeeperException.NoNodeException e) {
                brokerData.remove(str);
                log.warn("[{}] broker load-report znode not present", str, e);
            } catch (Exception e2) {
                log.warn("Error reading broker data from cache for broker - [{}], [{}]", str, e2.getMessage());
            }
        }
        for (String str2 : brokerData.keySet()) {
            if (!availableBrokers.contains(str2)) {
                brokerData.remove(str2);
            }
        }
    }

    private void updateBundleData() {
        Map<String, BundleData> bundleData = this.loadData.getBundleData();
        for (Map.Entry<String, BrokerData> entry : this.loadData.getBrokerData().entrySet()) {
            String key = entry.getKey();
            BrokerData value = entry.getValue();
            Map<String, NamespaceBundleStats> lastStats = value.getLocalData().getLastStats();
            for (Map.Entry<String, NamespaceBundleStats> entry2 : lastStats.entrySet()) {
                String key2 = entry2.getKey();
                NamespaceBundleStats value2 = entry2.getValue();
                if (bundleData.containsKey(key2)) {
                    bundleData.get(key2).update(value2);
                } else {
                    BundleData bundleDataOrDefault = getBundleDataOrDefault(key2);
                    bundleDataOrDefault.update(value2);
                    bundleData.put(key2, bundleDataOrDefault);
                }
            }
            Map<String, BundleData> preallocatedBundleData = value.getPreallocatedBundleData();
            synchronized (preallocatedBundleData) {
                for (String str : value.getPreallocatedBundleData().keySet()) {
                    if (value.getLocalData().getBundles().contains(str)) {
                        Iterator<Map.Entry<String, BundleData>> it = preallocatedBundleData.entrySet().iterator();
                        while (it.hasNext()) {
                            String key3 = it.next().getKey();
                            if (bundleData.containsKey(key3)) {
                                it.remove();
                                this.preallocatedBundleToBroker.remove(key3);
                            }
                        }
                    }
                    if (this.preallocatedBundleToBroker.containsKey(str)) {
                        this.preallocatedBundleToBroker.remove(str);
                    }
                }
            }
            value.getTimeAverageData().reset(lastStats.keySet(), bundleData, this.defaultStats);
            ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> computeIfAbsent = this.brokerToNamespaceToBundleRange.computeIfAbsent(key, str2 -> {
                return new ConcurrentOpenHashMap();
            });
            synchronized (computeIfAbsent) {
                computeIfAbsent.clear();
                LoadManagerShared.fillNamespaceToBundlesMap(lastStats.keySet(), computeIfAbsent);
                LoadManagerShared.fillNamespaceToBundlesMap(preallocatedBundleData.keySet(), computeIfAbsent);
            }
        }
    }

    @Override // org.apache.pulsar.broker.loadbalance.ModularLoadManager
    public void disableBroker() throws PulsarServerException {
        if (StringUtils.isNotEmpty(this.brokerZnodePath)) {
            try {
                this.pulsar.getZkClient().delete(this.brokerZnodePath, -1);
            } catch (KeeperException.NoNodeException e) {
                throw new PulsarServerException.NotFoundException(e);
            } catch (Exception e2) {
                throw new PulsarServerException(e2);
            }
        }
    }

    @Override // org.apache.pulsar.broker.loadbalance.ModularLoadManager
    public synchronized void doLoadShedding() {
        if (LoadManagerShared.isLoadSheddingEnabled(this.pulsar)) {
            if (getAvailableBrokers().size() <= 1) {
                log.info("Only 1 broker available: no load shedding will be performed");
                return;
            }
            long currentTimeMillis = System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(this.conf.getLoadBalancerSheddingGracePeriodMinutes());
            Map<String, Long> recentlyUnloadedBundles = this.loadData.getRecentlyUnloadedBundles();
            recentlyUnloadedBundles.keySet().removeIf(str -> {
                return ((Long) recentlyUnloadedBundles.get(str)).longValue() < currentTimeMillis;
            });
            Iterator<LoadSheddingStrategy> it = this.loadSheddingPipeline.iterator();
            while (it.hasNext()) {
                it.next().findBundlesForUnloading(this.loadData, this.conf).asMap().forEach((str2, collection) -> {
                    collection.forEach(str2 -> {
                        String namespaceNameFromBundleName = LoadManagerShared.getNamespaceNameFromBundleName(str2);
                        String bundleRangeFromBundleName = LoadManagerShared.getBundleRangeFromBundleName(str2);
                        if (shouldAntiAffinityNamespaceUnload(namespaceNameFromBundleName, bundleRangeFromBundleName, str2)) {
                            log.info("[Overload shedder] Unloading bundle: {} from broker {}", str2, str2);
                            try {
                                this.pulsar.getAdminClient().namespaces().unloadNamespaceBundle(namespaceNameFromBundleName, bundleRangeFromBundleName);
                                this.loadData.getRecentlyUnloadedBundles().put(str2, Long.valueOf(System.currentTimeMillis()));
                            } catch (PulsarServerException | PulsarAdminException e) {
                                log.warn("Error when trying to perform load shedding on {} for broker {}", new Object[]{str2, str2, e});
                            }
                        }
                    });
                });
            }
        }
    }

    public boolean shouldAntiAffinityNamespaceUnload(String str, String str2, String str3) {
        boolean shouldAntiAffinityNamespaceUnload;
        try {
            Optional<Policies> optional = this.pulsar.getConfigurationCache().policiesCache().get(PulsarWebResource.path("policies", str));
            if (!optional.isPresent() || StringUtils.isBlank(optional.get().antiAffinityGroup)) {
                return true;
            }
            synchronized (this.brokerCandidateCache) {
                this.brokerCandidateCache.clear();
                LoadManagerShared.applyNamespacePolicies(this.pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(str, str2), this.policies, this.brokerCandidateCache, getAvailableBrokers(), this.brokerTopicLoadingPredicate);
                shouldAntiAffinityNamespaceUnload = LoadManagerShared.shouldAntiAffinityNamespaceUnload(str, str2, str3, this.pulsar, this.brokerToNamespaceToBundleRange, this.brokerCandidateCache);
            }
            return shouldAntiAffinityNamespaceUnload;
        } catch (Exception e) {
            log.warn("Failed to check anti-affinity namespace ownership for {}/{}/{}, {}", new Object[]{str, str2, str3, e.getMessage()});
            return true;
        }
    }

    @Override // org.apache.pulsar.broker.loadbalance.ModularLoadManager
    public void checkNamespaceBundleSplit() {
        String namespaceNameFromBundleName;
        String bundleRangeFromBundleName;
        if (this.conf.isLoadBalancerAutoBundleSplitEnabled() && this.pulsar.getLeaderElectionService() != null && this.pulsar.getLeaderElectionService().isLeader()) {
            boolean isLoadBalancerAutoUnloadSplitBundlesEnabled = this.pulsar.getConfiguration().isLoadBalancerAutoUnloadSplitBundlesEnabled();
            synchronized (this.bundleSplitStrategy) {
                Set<String> findBundlesToSplit = this.bundleSplitStrategy.findBundlesToSplit(this.loadData, this.pulsar);
                NamespaceBundleFactory namespaceBundleFactory = this.pulsar.getNamespaceService().getNamespaceBundleFactory();
                for (String str : findBundlesToSplit) {
                    try {
                        namespaceNameFromBundleName = LoadManagerShared.getNamespaceNameFromBundleName(str);
                        bundleRangeFromBundleName = LoadManagerShared.getBundleRangeFromBundleName(str);
                    } catch (Exception e) {
                        log.error("Failed to split namespace bundle {}", str, e);
                    }
                    if (namespaceBundleFactory.canSplitBundle(namespaceBundleFactory.getBundle(namespaceNameFromBundleName, bundleRangeFromBundleName))) {
                        log.info("Load-manager splitting bundle {} and unloading {}", str, Boolean.valueOf(isLoadBalancerAutoUnloadSplitBundlesEnabled));
                        this.pulsar.getAdminClient().namespaces().splitNamespaceBundle(namespaceNameFromBundleName, bundleRangeFromBundleName, isLoadBalancerAutoUnloadSplitBundlesEnabled);
                        this.loadData.getBundleData().remove(str);
                        this.localData.getLastStats().remove(str);
                        this.pulsar.getNamespaceService().getNamespaceBundleFactory().invalidateBundleCache(NamespaceName.get(namespaceNameFromBundleName));
                        deleteBundleDataFromZookeeper(str);
                        log.info("Successfully split namespace bundle {}", str);
                    }
                }
            }
        }
    }

    @Override // org.apache.pulsar.zookeeper.ZooKeeperCacheListener
    public void onUpdate(String str, LocalBrokerData localBrokerData, Stat stat) {
        this.scheduler.submit(this::updateAll);
    }

    @Override // org.apache.pulsar.broker.loadbalance.ModularLoadManager
    public Optional<String> selectBrokerForAssignment(ServiceUnitId serviceUnitId) {
        synchronized (this.brokerCandidateCache) {
            String serviceUnitId2 = serviceUnitId.toString();
            if (this.preallocatedBundleToBroker.containsKey(serviceUnitId2)) {
                return Optional.of(this.preallocatedBundleToBroker.get(serviceUnitId2));
            }
            BundleData computeIfAbsent = this.loadData.getBundleData().computeIfAbsent(serviceUnitId2, str -> {
                return getBundleDataOrDefault(serviceUnitId2);
            });
            this.brokerCandidateCache.clear();
            LoadManagerShared.applyNamespacePolicies(serviceUnitId, this.policies, this.brokerCandidateCache, getAvailableBrokers(), this.brokerTopicLoadingPredicate);
            LoadManagerShared.filterBrokersWithLargeTopicCount(this.brokerCandidateCache, this.loadData, this.conf.getLoadBalancerBrokerMaxTopics());
            LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(this.pulsar, serviceUnitId.toString(), this.brokerCandidateCache, this.brokerToNamespaceToBundleRange, this.brokerToFailureDomainMap);
            LoadManagerShared.removeMostServicingBrokersForNamespace(serviceUnitId.toString(), this.brokerCandidateCache, this.brokerToNamespaceToBundleRange);
            log.info("{} brokers being considered for assignment of {}", Integer.valueOf(this.brokerCandidateCache.size()), serviceUnitId2);
            try {
                Iterator<BrokerFilter> it = this.filterPipeline.iterator();
                while (it.hasNext()) {
                    it.next().filter(this.brokerCandidateCache, computeIfAbsent, this.loadData, this.conf);
                }
            } catch (BrokerFilterException e) {
                LoadManagerShared.applyNamespacePolicies(serviceUnitId, this.policies, this.brokerCandidateCache, getAvailableBrokers(), this.brokerTopicLoadingPredicate);
            }
            if (this.brokerCandidateCache.isEmpty()) {
                LoadManagerShared.applyNamespacePolicies(serviceUnitId, this.policies, this.brokerCandidateCache, getAvailableBrokers(), this.brokerTopicLoadingPredicate);
            }
            Optional<String> selectBroker = this.placementStrategy.selectBroker(this.brokerCandidateCache, computeIfAbsent, this.loadData, this.conf);
            if (log.isDebugEnabled()) {
                log.debug("Selected broker {} from candidate brokers {}", selectBroker, this.brokerCandidateCache);
            }
            if (!selectBroker.isPresent()) {
                return selectBroker;
            }
            if (this.loadData.getBrokerData().get(selectBroker.get()).getLocalData().getMaxResourceUsage() > this.conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0d) {
                LoadManagerShared.applyNamespacePolicies(serviceUnitId, this.policies, this.brokerCandidateCache, getAvailableBrokers(), this.brokerTopicLoadingPredicate);
                selectBroker = this.placementStrategy.selectBroker(this.brokerCandidateCache, computeIfAbsent, this.loadData, this.conf);
            }
            this.loadData.getBrokerData().get(selectBroker.get()).getPreallocatedBundleData().put(serviceUnitId2, computeIfAbsent);
            this.preallocatedBundleToBroker.put(serviceUnitId2, selectBroker.get());
            String namespaceNameFromBundleName = LoadManagerShared.getNamespaceNameFromBundleName(serviceUnitId2);
            String bundleRangeFromBundleName = LoadManagerShared.getBundleRangeFromBundleName(serviceUnitId2);
            ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> computeIfAbsent2 = this.brokerToNamespaceToBundleRange.computeIfAbsent(selectBroker.get(), str2 -> {
                return new ConcurrentOpenHashMap();
            });
            synchronized (computeIfAbsent2) {
                computeIfAbsent2.computeIfAbsent(namespaceNameFromBundleName, str3 -> {
                    return new ConcurrentOpenHashSet();
                }).add(bundleRangeFromBundleName);
            }
            return selectBroker;
        }
    }

    @Override // org.apache.pulsar.broker.loadbalance.ModularLoadManager
    public void start() throws PulsarServerException {
        try {
            createZPathIfNotExists(this.zkClient, LoadManager.LOADBALANCE_BROKERS_ROOT);
            String str = this.pulsar.getAdvertisedAddress() + BookKeeperConstants.COLON + (this.conf.getWebServicePort().isPresent() ? this.conf.getWebServicePort().get() : this.conf.getWebServicePortTls().get());
            this.brokerZnodePath = "/loadbalance/brokers/" + str;
            String str2 = "/loadbalance/broker-time-average/" + str;
            updateLocalBrokerData();
            try {
                try {
                    ZkUtils.createFullPathOptimistic(this.zkClient, this.brokerZnodePath, this.localData.getJsonBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                } catch (KeeperException.NodeExistsException e) {
                    long brokerZnodeOwner = getBrokerZnodeOwner();
                    if (brokerZnodeOwner != 0 && brokerZnodeOwner != this.zkClient.getSessionId()) {
                        log.error("Broker znode - [{}] is own by different zookeeper-ssession {} ", this.brokerZnodePath, Long.valueOf(brokerZnodeOwner));
                        throw new PulsarServerException("Broker-znode owned by different zk-session " + brokerZnodeOwner);
                    }
                    this.zkClient.setData(this.brokerZnodePath, this.localData.getJsonBytes(), -1);
                }
                createZPathIfNotExists(this.zkClient, str2);
                this.zkClient.setData(str2, new TimeAverageBrokerData().getJsonBytes(), -1);
                updateAll();
                this.lastBundleDataUpdate = System.currentTimeMillis();
            } catch (Exception e2) {
                log.error("Unable to create znode - [{}] for load balance on zookeeper ", this.brokerZnodePath, e2);
                throw e2;
            }
        } catch (Exception e3) {
            log.error("Unable to create znode - [{}] for load balance on zookeeper ", this.brokerZnodePath, e3);
            throw new PulsarServerException(e3);
        }
    }

    @Override // org.apache.pulsar.broker.loadbalance.ModularLoadManager
    public void stop() throws PulsarServerException {
        if (this.availableActiveBrokers != null) {
            this.availableActiveBrokers.close();
        }
        if (this.brokerDataCache != null) {
            this.brokerDataCache.close();
            this.brokerDataCache.clear();
        }
        this.scheduler.shutdown();
    }

    @Override // org.apache.pulsar.broker.loadbalance.ModularLoadManager
    public LocalBrokerData updateLocalBrokerData() {
        try {
            this.localData.update(LoadManagerShared.getSystemResourceUsage(this.brokerHostUsage), getBundleStats());
        } catch (Exception e) {
            log.warn("Error when attempting to update local broker data: {}", e);
        }
        return this.localData;
    }

    @Override // org.apache.pulsar.broker.loadbalance.ModularLoadManager
    public void writeBrokerDataOnZooKeeper() {
        try {
            updateLocalBrokerData();
            if (needBrokerDataUpdate()) {
                this.localData.setLastUpdate(System.currentTimeMillis());
                this.zkClient.setData(this.brokerZnodePath, this.localData.getJsonBytes(), -1);
                this.localData.getLastBundleGains().clear();
                this.localData.getLastBundleLosses().clear();
                this.lastData.update(this.localData);
            }
        } catch (Exception e) {
            log.warn("Error writing broker data on ZooKeeper: {}", e);
        }
    }

    @Override // org.apache.pulsar.broker.loadbalance.ModularLoadManager
    public ZooKeeperCache.Deserializer<LocalBrokerData> getLoadReportDeserializer() {
        return loadReportDeserializer;
    }

    @Override // org.apache.pulsar.broker.loadbalance.ModularLoadManager
    public void writeBundleDataOnZooKeeper() {
        updateBundleData();
        for (Map.Entry<String, BundleData> entry : this.loadData.getBundleData().entrySet()) {
            String key = entry.getKey();
            BundleData value = entry.getValue();
            try {
                String bundleDataZooKeeperPath = getBundleDataZooKeeperPath(key);
                createZPathIfNotExists(this.zkClient, bundleDataZooKeeperPath);
                this.zkClient.setData(bundleDataZooKeeperPath, value.getJsonBytes(), -1);
            } catch (Exception e) {
                log.warn("Error when writing data for bundle {} to ZooKeeper: {}", key, e);
            }
        }
        for (Map.Entry<String, BrokerData> entry2 : this.loadData.getBrokerData().entrySet()) {
            String key2 = entry2.getKey();
            TimeAverageBrokerData timeAverageData = entry2.getValue().getTimeAverageData();
            try {
                String str = "/loadbalance/broker-time-average/" + key2;
                createZPathIfNotExists(this.zkClient, str);
                this.zkClient.setData(str, timeAverageData.getJsonBytes(), -1);
                if (log.isDebugEnabled()) {
                    log.debug("Writing zookeeper report {}", timeAverageData);
                }
            } catch (Exception e2) {
                log.warn("Error when writing time average broker data for {} to ZooKeeper: {}", key2, e2);
            }
        }
    }

    private void deleteBundleDataFromZookeeper(String str) {
        String bundleDataZooKeeperPath = getBundleDataZooKeeperPath(str);
        try {
            if (this.zkClient.exists(bundleDataZooKeeperPath, (Watcher) null) != null) {
                this.zkClient.delete(bundleDataZooKeeperPath, -1);
            }
        } catch (Exception e) {
            log.warn("Failed to delete bundle-data {} from zookeeper", str, e);
        }
    }

    private long getBrokerZnodeOwner() {
        try {
            Stat stat = new Stat();
            this.zkClient.getData(this.brokerZnodePath, false, stat);
            return stat.getEphemeralOwner();
        } catch (Exception e) {
            log.warn("Failed to get stat of {}", this.brokerZnodePath, e);
            return 0L;
        }
    }

    private void refreshBrokerToFailureDomainMap() {
        if (this.pulsar.getConfiguration().isFailureDomainsEnabled()) {
            String str = this.pulsar.getConfigurationCache().CLUSTER_FAILURE_DOMAIN_ROOT;
            try {
                synchronized (this.brokerToFailureDomainMap) {
                    HashMap newHashMap = Maps.newHashMap();
                    for (String str2 : this.pulsar.getConfigurationCache().failureDomainListCache().get()) {
                        try {
                            Optional<FailureDomain> optional = this.pulsar.getConfigurationCache().failureDomainCache().get(str + "/" + str2);
                            if (optional.isPresent()) {
                                Iterator<String> it = optional.get().brokers.iterator();
                                while (it.hasNext()) {
                                    newHashMap.put(it.next(), str2);
                                }
                            }
                        } catch (Exception e) {
                            log.warn("Failed to get domain {}", str2, e);
                        }
                    }
                    this.brokerToFailureDomainMap = newHashMap;
                }
                log.info("Cluster domain refreshed {}", this.brokerToFailureDomainMap);
            } catch (Exception e2) {
                log.warn("Failed to get domain-list for cluster {}", e2.getMessage());
            }
        }
    }

    @Override // org.apache.pulsar.broker.loadbalance.ModularLoadManager
    public LocalBrokerData getBrokerLocalData(String str) {
        try {
            return this.brokerDataCache.get(String.format("%s/%s", LoadManager.LOADBALANCE_BROKERS_ROOT, str)).orElse(null);
        } catch (Exception e) {
            log.warn("Failed to get local-broker data for {}", str, e);
            return null;
        }
    }
}
