package org.apache.storm.grouping;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.storm.Config;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.generated.NodeInfo;
import org.apache.storm.networktopography.DNSToSwitchMapping;
import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.storm.shade.com.google.common.collect.Sets;
import org.apache.storm.task.WorkerTopologyContext;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/grouping/LoadAwareShuffleGrouping.class */
public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping, Serializable {
    private static final int MAX_WEIGHT = 100;
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) LoadAwareShuffleGrouping.class);
    private final Map<Integer, IndexAndWeights> orig = new HashMap();

    @VisibleForTesting
    List<Integer>[] rets;

    @VisibleForTesting
    volatile int[] choices;
    private int capacity;
    private Random random;
    private volatile int[] prepareChoices;
    private AtomicInteger current;
    private Scope currentScope;
    private NodeInfo sourceNodeInfo;
    private List<Integer> targetTasks;
    private AtomicReference<Map<Integer, NodeInfo>> taskToNodePort;
    private Map<String, Object> conf;
    private DNSToSwitchMapping dnsToSwitchMapping;
    private Map<Scope, List<Integer>> localityGroup;
    private double higherBound;
    private double lowerBound;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/storm/grouping/LoadAwareShuffleGrouping$IndexAndWeights.class */
    public static class IndexAndWeights {
        final int index;
        int weight = 100;

        IndexAndWeights(int i) {
            this.index = i;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void resetWeight() {
            this.weight = 100;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/storm/grouping/LoadAwareShuffleGrouping$Scope.class */
    public enum Scope {
        WORKER_LOCAL,
        HOST_LOCAL,
        RACK_LOCAL,
        EVERYTHING;

        public static Scope downgrade(Scope scope) {
            switch (scope) {
                case EVERYTHING:
                    return RACK_LOCAL;
                case RACK_LOCAL:
                    return HOST_LOCAL;
                case HOST_LOCAL:
                case WORKER_LOCAL:
                default:
                    return WORKER_LOCAL;
            }
        }

        public static Scope upgrade(Scope scope) {
            switch (scope) {
                case EVERYTHING:
                case RACK_LOCAL:
                default:
                    return EVERYTHING;
                case HOST_LOCAL:
                    return RACK_LOCAL;
                case WORKER_LOCAL:
                    return HOST_LOCAL;
            }
        }
    }

    @Override // org.apache.storm.grouping.CustomStreamGrouping
    public void prepare(WorkerTopologyContext workerTopologyContext, GlobalStreamId globalStreamId, List<Integer> list) {
        this.random = new Random();
        this.sourceNodeInfo = new NodeInfo(workerTopologyContext.getThisWorkerHost(), Sets.newHashSet(Long.valueOf(workerTopologyContext.getThisWorkerPort().intValue())));
        this.taskToNodePort = workerTopologyContext.getTaskToNodePort();
        this.targetTasks = list;
        this.capacity = list.size() == 1 ? 1 : Math.max(1000, list.size() * 5);
        this.conf = workerTopologyContext.getConf();
        this.dnsToSwitchMapping = (DNSToSwitchMapping) ReflectionUtils.newInstance((String) this.conf.get(Config.STORM_NETWORK_TOPOGRAPHY_PLUGIN));
        this.localityGroup = new HashMap();
        this.currentScope = Scope.WORKER_LOCAL;
        this.higherBound = ObjectReader.getDouble(this.conf.get(Config.TOPOLOGY_LOCALITYAWARE_HIGHER_BOUND)).doubleValue();
        this.lowerBound = ObjectReader.getDouble(this.conf.get(Config.TOPOLOGY_LOCALITYAWARE_LOWER_BOUND)).doubleValue();
        this.rets = new List[list.size()];
        int i = 0;
        Iterator<Integer> it = list.iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            this.rets[i] = Arrays.asList(Integer.valueOf(intValue));
            this.orig.put(Integer.valueOf(intValue), new IndexAndWeights(i));
            i++;
        }
        this.choices = new int[this.capacity];
        this.current = new AtomicInteger(0);
        this.prepareChoices = new int[this.capacity];
        updateRing(null);
    }

    @Override // org.apache.storm.grouping.CustomStreamGrouping
    public List<Integer> chooseTasks(int i, List<Object> list) {
        int incrementAndGet;
        do {
            incrementAndGet = this.current.incrementAndGet();
            if (incrementAndGet < this.capacity) {
                return this.rets[this.choices[incrementAndGet]];
            }
        } while (incrementAndGet != this.capacity);
        this.current.set(0);
        return this.rets[this.choices[0]];
    }

    @Override // org.apache.storm.grouping.LoadAwareCustomStreamGrouping
    public void refreshLoad(LoadMapping loadMapping) {
        updateRing(loadMapping);
    }

    private void refreshLocalityGroup() {
        Map<Integer, NodeInfo> map = this.taskToNodePort.get();
        Map<String, String> hostToRackMapping = getHostToRackMapping(map);
        this.localityGroup.values().stream().forEach(list -> {
            list.clear();
        });
        Iterator<Integer> it = this.targetTasks.iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            Scope calculateScope = calculateScope(map, hostToRackMapping, intValue);
            if (!this.localityGroup.containsKey(calculateScope)) {
                this.localityGroup.put(calculateScope, new ArrayList());
            }
            this.localityGroup.get(calculateScope).add(Integer.valueOf(intValue));
        }
    }

    private List<Integer> getTargetsInScope(Scope scope) {
        ArrayList arrayList = new ArrayList();
        List<Integer> list = this.localityGroup.get(scope);
        if (null != list) {
            arrayList.addAll(list);
        }
        Scope downgrade = Scope.downgrade(scope);
        if (downgrade != scope) {
            arrayList.addAll(getTargetsInScope(downgrade));
        }
        return arrayList;
    }

    private Scope transition(LoadMapping loadMapping) {
        Scope upgrade;
        List<Integer> targetsInScope = getTargetsInScope(this.currentScope);
        if (targetsInScope.isEmpty()) {
            Scope upgrade2 = Scope.upgrade(this.currentScope);
            if (upgrade2 == this.currentScope) {
                throw new RuntimeException("The current scope " + this.currentScope + " has no target tasks.");
            }
            this.currentScope = upgrade2;
            return transition(loadMapping);
        }
        if (null == loadMapping) {
            return this.currentScope;
        }
        double asDouble = targetsInScope.stream().mapToDouble(num -> {
            return loadMapping.get(num.intValue());
        }).average().getAsDouble();
        if (asDouble < this.lowerBound) {
            upgrade = Scope.downgrade(this.currentScope);
            if (getTargetsInScope(upgrade).isEmpty()) {
                upgrade = this.currentScope;
            }
        } else {
            upgrade = asDouble > this.higherBound ? Scope.upgrade(this.currentScope) : this.currentScope;
        }
        return upgrade;
    }

    private synchronized void updateRing(LoadMapping loadMapping) {
        refreshLocalityGroup();
        Scope scope = this.currentScope;
        this.currentScope = transition(loadMapping);
        if (this.currentScope != scope) {
            this.orig.values().stream().forEach(indexAndWeights -> {
                indexAndWeights.resetWeight();
            });
        }
        List<Integer> targetsInScope = getTargetsInScope(this.currentScope);
        double asDouble = loadMapping == null ? 0.0d : targetsInScope.stream().mapToDouble(num -> {
            return loadMapping.get(num.intValue());
        }).min().getAsDouble();
        Iterator<Integer> it = targetsInScope.iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            IndexAndWeights indexAndWeights2 = this.orig.get(Integer.valueOf(intValue));
            if ((loadMapping == null ? 0.0d : loadMapping.get(intValue)) <= asDouble + 0.05d) {
                indexAndWeights2.weight = Math.min(100, indexAndWeights2.weight + 1);
            } else {
                indexAndWeights2.weight = Math.max(0, indexAndWeights2.weight - 10);
            }
        }
        long sum = targetsInScope.stream().mapToLong(num2 -> {
            return this.orig.get(num2).weight;
        }).sum();
        int i = 0;
        if (sum > 0) {
            Iterator<Integer> it2 = targetsInScope.iterator();
            while (it2.hasNext()) {
                IndexAndWeights indexAndWeights3 = this.orig.get(Integer.valueOf(it2.next().intValue()));
                int i2 = (int) ((indexAndWeights3.weight / sum) * this.capacity);
                for (int i3 = 0; i3 < i2 && i < this.capacity; i3++) {
                    this.prepareChoices[i] = indexAndWeights3.index;
                    i++;
                }
            }
            if (i > 0) {
                while (i < this.capacity) {
                    this.prepareChoices[i] = this.prepareChoices[this.random.nextInt(i)];
                    i++;
                }
            }
        }
        if (i == 0) {
            while (i < this.capacity) {
                this.prepareChoices[i] = i % this.rets.length;
                i++;
            }
        }
        shuffleArray(this.prepareChoices);
        int[] iArr = this.choices;
        this.choices = this.prepareChoices;
        this.prepareChoices = iArr;
        this.current.set(-1);
    }

    private void shuffleArray(int[] iArr) {
        for (int length = iArr.length; length > 1; length--) {
            swap(iArr, length - 1, this.random.nextInt(length));
        }
    }

    private void swap(int[] iArr, int i, int i2) {
        int i3 = iArr[i];
        iArr[i] = iArr[i2];
        iArr[i2] = i3;
    }

    private Scope calculateScope(Map<Integer, NodeInfo> map, Map<String, String> map2, int i) {
        NodeInfo nodeInfo = map.get(Integer.valueOf(i));
        if (nodeInfo == null) {
            return Scope.EVERYTHING;
        }
        String str = map2.get(this.sourceNodeInfo.get_node());
        String str2 = map2.get(nodeInfo.get_node());
        return (str == null || str2 == null || !str.equals(str2)) ? Scope.EVERYTHING : this.sourceNodeInfo.get_node().equals(nodeInfo.get_node()) ? this.sourceNodeInfo.get_port().equals(nodeInfo.get_port()) ? Scope.WORKER_LOCAL : Scope.HOST_LOCAL : Scope.RACK_LOCAL;
    }

    private Map<String, String> getHostToRackMapping(Map<Integer, NodeInfo> map) {
        HashSet hashSet = new HashSet();
        Iterator<Integer> it = this.targetTasks.iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            if (map.containsKey(Integer.valueOf(intValue))) {
                hashSet.add(map.get(Integer.valueOf(intValue)).get_node());
            } else {
                LOG.error("Could not find task NodeInfo from local cache.");
            }
        }
        hashSet.add(this.sourceNodeInfo.get_node());
        return this.dnsToSwitchMapping.resolve(new ArrayList(hashSet));
    }

    public int getCapacity() {
        return this.capacity;
    }
}
