/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.loadbalance.impl;

import java.util.Map;
import org.apache.pulsar.broker.BrokerData;
import org.apache.pulsar.broker.BundleData;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.TimeAverageMessageData;
import org.apache.pulsar.broker.loadbalance.LoadData;
import org.apache.pulsar.broker.loadbalance.LoadSheddingStrategy;
import org.apache.pulsar.shade.com.google.common.collect.ArrayListMultimap;
import org.apache.pulsar.shade.com.google.common.collect.Multimap;
import org.apache.pulsar.shade.org.apache.commons.lang3.mutable.MutableDouble;
import org.apache.pulsar.shade.org.apache.commons.lang3.mutable.MutableInt;
import org.apache.pulsar.shade.org.apache.commons.lang3.mutable.MutableObject;
import org.apache.pulsar.shade.org.apache.commons.lang3.tuple.Triple;
import org.apache.pulsar.shade.org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UniformLoadShedder
implements LoadSheddingStrategy {
    private static final Logger log = LoggerFactory.getLogger(UniformLoadShedder.class);
    private static final int MB = 0x100000;
    private static final double MAX_UNLOAD_PERCENTAGE = 0.2;
    private static final int MIN_UNLOAD_MESSAGE = 1000;
    private static final int MIN_UNLOAD_THROUGHPUT = 0x100000;
    private final Multimap<String, String> selectedBundlesCache = ArrayListMultimap.create();
    private static final double EPS = 1.0E-6;

    @Override
    public Multimap<String, String> findBundlesForUnloading(LoadData loadData, ServiceConfiguration conf) {
        boolean isMsgThroughputThresholdExceeded;
        this.selectedBundlesCache.clear();
        Map<String, BrokerData> brokersData = loadData.getBrokerData();
        Map<String, BundleData> loadBundleData = loadData.getBundleDataForLoadShedding();
        Map<String, Long> recentlyUnloadedBundles = loadData.getRecentlyUnloadedBundles();
        MutableObject overloadedBroker = new MutableObject();
        MutableObject underloadedBroker = new MutableObject();
        MutableDouble maxMsgRate = new MutableDouble(-1.0);
        MutableDouble maxThroughputRate = new MutableDouble(-1.0);
        MutableDouble minMsgRate = new MutableDouble(2.147483647E9);
        MutableDouble minThroughputRate = new MutableDouble(2.147483647E9);
        brokersData.forEach((broker, data) -> {
            if (data.getLocalData().getBundles().size() <= 1) {
                return;
            }
            double msgRate = data.getLocalData().getMsgRateIn() + data.getLocalData().getMsgRateOut();
            double throughputRate = data.getLocalData().getMsgThroughputIn() + data.getLocalData().getMsgThroughputOut();
            if (msgRate > maxMsgRate.getValue() || throughputRate > maxThroughputRate.getValue()) {
                overloadedBroker.setValue(broker);
                maxMsgRate.setValue(msgRate);
                maxThroughputRate.setValue(throughputRate);
            }
            if (msgRate < minMsgRate.getValue() || throughputRate < minThroughputRate.getValue()) {
                underloadedBroker.setValue(broker);
                minMsgRate.setValue(msgRate);
                minThroughputRate.setValue(throughputRate);
            }
        });
        if (minMsgRate.getValue() <= 1.0E-6 && minMsgRate.getValue() >= -1.0E-6) {
            minMsgRate.setValue(1.0);
        }
        if (minThroughputRate.getValue() <= 1.0E-6 && minThroughputRate.getValue() >= -1.0E-6) {
            minThroughputRate.setValue(1.0);
        }
        double msgRateDifferencePercentage = (maxMsgRate.getValue() - minMsgRate.getValue()) * 100.0 / minMsgRate.getValue();
        double msgThroughputDifferenceRate = maxThroughputRate.getValue() / minThroughputRate.getValue();
        boolean isMsgRateThresholdExceeded = conf.getLoadBalancerMsgRateDifferenceShedderThreshold() > 0.0 && msgRateDifferencePercentage > conf.getLoadBalancerMsgRateDifferenceShedderThreshold();
        boolean bl = isMsgThroughputThresholdExceeded = conf.getLoadBalancerMsgThroughputMultiplierDifferenceShedderThreshold() > 0.0 && msgThroughputDifferenceRate > conf.getLoadBalancerMsgThroughputMultiplierDifferenceShedderThreshold();
        if (isMsgRateThresholdExceeded || isMsgThroughputThresholdExceeded) {
            if (log.isDebugEnabled()) {
                log.debug("Found bundles for uniform load balancing. overloaded broker {} with (msgRate,throughput)= ({},{}) and underloaded broker {} with (msgRate,throughput)= ({},{})", new Object[]{overloadedBroker.getValue(), maxMsgRate.getValue(), maxThroughputRate.getValue(), underloadedBroker.getValue(), minMsgRate.getValue(), minThroughputRate.getValue()});
            }
            MutableInt msgRateRequiredFromUnloadedBundles = new MutableInt((int)((maxMsgRate.getValue() - minMsgRate.getValue()) * 0.2));
            MutableInt msgThroughputRequiredFromUnloadedBundles = new MutableInt((int)((maxThroughputRate.getValue() - minThroughputRate.getValue()) * 0.2));
            LocalBrokerData overloadedBrokerData = brokersData.get(overloadedBroker.getValue()).getLocalData();
            if (overloadedBrokerData.getBundles().size() > 1 && (msgRateRequiredFromUnloadedBundles.getValue() >= 1000 || msgThroughputRequiredFromUnloadedBundles.getValue() >= 0x100000)) {
                loadBundleData.entrySet().stream().filter(e -> overloadedBrokerData.getBundles().contains(e.getKey())).map(e -> {
                    String bundle = (String)e.getKey();
                    BundleData bundleData = (BundleData)e.getValue();
                    TimeAverageMessageData shortTermData = bundleData.getShortTermData();
                    double throughput = isMsgRateThresholdExceeded ? shortTermData.getMsgRateIn() + shortTermData.getMsgRateOut() : shortTermData.getMsgThroughputIn() + shortTermData.getMsgThroughputOut();
                    return Triple.of(bundle, bundleData, throughput);
                }).filter(e -> !recentlyUnloadedBundles.containsKey(e.getLeft())).filter(e -> overloadedBrokerData.getBundles().contains(e.getLeft())).sorted((e1, e2) -> Double.compare((Double)e2.getRight(), (Double)e1.getRight())).forEach(e -> {
                    String bundle = (String)e.getLeft();
                    BundleData bundleData = (BundleData)e.getMiddle();
                    TimeAverageMessageData shortTermData = bundleData.getShortTermData();
                    double throughput = shortTermData.getMsgThroughputIn() + shortTermData.getMsgThroughputOut();
                    double bundleMsgRate = shortTermData.getMsgRateIn() + shortTermData.getMsgRateOut();
                    if (isMsgRateThresholdExceeded) {
                        if (bundleMsgRate <= (double)(msgRateRequiredFromUnloadedBundles.getValue() + 1000)) {
                            log.info("Found bundle to unload with msgRate {}", (Object)bundleMsgRate);
                            msgRateRequiredFromUnloadedBundles.add(-bundleMsgRate);
                            this.selectedBundlesCache.put((String)overloadedBroker.getValue(), bundle);
                        }
                    } else if (throughput <= (double)msgThroughputRequiredFromUnloadedBundles.getValue().intValue()) {
                        log.info("Found bundle to unload with throughput {}", (Object)throughput);
                        msgThroughputRequiredFromUnloadedBundles.add(-throughput);
                        this.selectedBundlesCache.put((String)overloadedBroker.getValue(), bundle);
                    }
                });
            }
        }
        return this.selectedBundlesCache;
    }
}

